This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new b48104e7ae6 fix: warn about hardcoded 24h visibility_timeout that
kills long-running Celery tasks (#62869)
b48104e7ae6 is described below
commit b48104e7ae680c8f7a489cea7a99ad29134d8260
Author: Yoann <[email protected]>
AuthorDate: Tue Mar 10 13:08:14 2026 -0700
fix: warn about hardcoded 24h visibility_timeout that kills long-running
Celery tasks (#62869)
* fix: warn about hardcoded 24h visibility_timeout that kills long-running
Celery tasks
Add a warning log when the default visibility_timeout of 86400 seconds (24
hours) is
applied for Redis/SQS brokers, so users with long-running tasks know to
increase it.
Fix misleading task_acks_late documentation that incorrectly claimed it
overrides
visibility_timeout (it does not for Redis/SQS brokers).
Fixes apache/airflow#62218
* fix: fix test assertion for visibility_timeout type (int vs string)
* fix: apply ruff format to test file
* fix: update get_provider_info.py to match provider.yaml description
* ci: retrigger CI (unrelated infra failures)
---
providers/celery/provider.yaml | 12 +++--
.../providers/celery/executors/default_celery.py | 7 +++
.../airflow/providers/celery/get_provider_info.py | 4 +-
.../unit/celery/executors/test_celery_executor.py | 51 ++++++++++++++++++++++
4 files changed, 69 insertions(+), 5 deletions(-)
diff --git a/providers/celery/provider.yaml b/providers/celery/provider.yaml
index 448d0fc3690..836a31b72a4 100644
--- a/providers/celery/provider.yaml
+++ b/providers/celery/provider.yaml
@@ -310,10 +310,14 @@ config:
instance then runs concurrently with the original task and the
Airflow UI and logs only show an
error message:
'Task Instance Not Running' FAILED: Task is in the running state'
- Setting task_acks_late to True will force Celery to wait until a
task is finished before a
- new task instance is assigned. This effectively overrides the
visibility timeout.
+ Setting task_acks_late to True acknowledges the task only after it
completes.
+ Note: for Redis and SQS brokers, task_acks_late does NOT override
visibility_timeout.
+ The broker will still redeliver tasks that exceed visibility_timeout
regardless of this setting.
+ For long-running tasks, you must also increase
[celery_broker_transport_options] visibility_timeout.
+ The default visibility_timeout is 86400 seconds (24 hours).
See also:
https://docs.celeryq.dev/en/stable/reference/celery.app.task.html#celery.app.task.Task.acks_late
+
https://docs.celeryq.dev/en/stable/getting-started/backends-and-brokers/redis.html#visibility-timeout
version_added: 3.6.0
type: boolean
example: "True"
@@ -368,8 +372,10 @@ config:
description: |
The visibility timeout defines the number of seconds to wait for the
worker
to acknowledge the task before the message is redelivered to another
worker.
+ If not set, Airflow defaults to 86400 seconds (24 hours) for Redis
and SQS brokers.
+ Tasks running longer than this value will be terminated and
redelivered.
Make sure to increase the visibility timeout to match the time of
the longest
- ETA you're planning to use.
+ task you're planning to run.
visibility_timeout is only supported for Redis and SQS celery
brokers.
See:
https://docs.celeryq.dev/en/stable/getting-started/backends-and-brokers/redis.html#visibility-timeout
diff --git
a/providers/celery/src/airflow/providers/celery/executors/default_celery.py
b/providers/celery/src/airflow/providers/celery/executors/default_celery.py
index 50b6c785127..c11b7e91948 100644
--- a/providers/celery/src/airflow/providers/celery/executors/default_celery.py
+++ b/providers/celery/src/airflow/providers/celery/executors/default_celery.py
@@ -58,6 +58,13 @@ def get_default_celery_config(team_conf) -> dict[str, Any]:
if "visibility_timeout" not in broker_transport_options:
if _broker_supports_visibility_timeout(broker_url):
broker_transport_options["visibility_timeout"] = 86400
+ log.warning(
+ "No visibility_timeout configured in
[celery_broker_transport_options]. "
+ "Using default of 86400 seconds (24 hours). Celery tasks
running longer than this "
+ "will be redelivered by the broker, which terminates the
original task. "
+ "If you have long-running tasks, increase this value in your
Airflow configuration: "
+ "[celery_broker_transport_options] visibility_timeout =
<seconds>"
+ )
if "sentinel_kwargs" in broker_transport_options:
try:
diff --git a/providers/celery/src/airflow/providers/celery/get_provider_info.py
b/providers/celery/src/airflow/providers/celery/get_provider_info.py
index 48097457f5d..537344c7a4e 100644
--- a/providers/celery/src/airflow/providers/celery/get_provider_info.py
+++ b/providers/celery/src/airflow/providers/celery/get_provider_info.py
@@ -205,7 +205,7 @@ def get_provider_info():
"default": "1.0",
},
"task_acks_late": {
- "description": "If an Airflow task's execution time
exceeds the visibility_timeout, Celery will re-assign the\ntask to a Celery
worker, even if the original task is still running successfully. The new
task\ninstance then runs concurrently with the original task and the Airflow UI
and logs only show an\nerror message:\n'Task Instance Not Running' FAILED: Task
is in the running state'\nSetting task_acks_late to True will force Celery to
wait until a task is finished [...]
+ "description": "If an Airflow task's execution time
exceeds the visibility_timeout, Celery will re-assign the\ntask to a Celery
worker, even if the original task is still running successfully. The new
task\ninstance then runs concurrently with the original task and the Airflow UI
and logs only show an\nerror message:\n'Task Instance Not Running' FAILED: Task
is in the running state'\nSetting task_acks_late to True acknowledges the task
only after it completes.\nNo [...]
"version_added": "3.6.0",
"type": "boolean",
"example": "True",
@@ -245,7 +245,7 @@ def get_provider_info():
"description": "This section is for specifying options which
can be passed to the\nunderlying celery broker transport.
See:\nhttps://docs.celeryq.dev/en/latest/userguide/configuration.html#std:setting-broker_transport_options\n",
"options": {
"visibility_timeout": {
- "description": "The visibility timeout defines the
number of seconds to wait for the worker\nto acknowledge the task before the
message is redelivered to another worker.\nMake sure to increase the visibility
timeout to match the time of the longest\nETA you're planning to
use.\nvisibility_timeout is only supported for Redis and SQS celery
brokers.\nSee:\nhttps://docs.celeryq.dev/en/stable/getting-started/backends-and-brokers/redis.html#visibility-timeout\n",
+ "description": "The visibility timeout defines the
number of seconds to wait for the worker\nto acknowledge the task before the
message is redelivered to another worker.\nIf not set, Airflow defaults to
86400 seconds (24 hours) for Redis and SQS brokers.\nTasks running longer than
this value will be terminated and redelivered.\nMake sure to increase the
visibility timeout to match the time of the longest\ntask you're planning to
run.\nvisibility_timeout is only su [...]
"version_added": None,
"type": "string",
"example": "21600",
diff --git
a/providers/celery/tests/unit/celery/executors/test_celery_executor.py
b/providers/celery/tests/unit/celery/executors/test_celery_executor.py
index 1a7bef2523e..f5d34fb2916 100644
--- a/providers/celery/tests/unit/celery/executors/test_celery_executor.py
+++ b/providers/celery/tests/unit/celery/executors/test_celery_executor.py
@@ -467,6 +467,57 @@ def test_celery_task_acks_late_loaded_from_string():
assert default_celery.DEFAULT_CELERY_CONFIG["task_acks_late"] is False
+@conf_vars({("celery", "BROKER_URL"): "redis://localhost:6379/0"})
+def test_visibility_timeout_default_warns_when_not_configured(caplog):
+ """Test that a warning is logged when visibility_timeout defaults to 86400
(24h)."""
+ import importlib
+
+ from airflow.providers.celery.executors.default_celery import log
+
+ with caplog.at_level(logging.WARNING, logger=log.name):
+ importlib.reload(default_celery)
+ assert
default_celery.DEFAULT_CELERY_CONFIG["broker_transport_options"]["visibility_timeout"]
== 86400
+ assert "No visibility_timeout configured" in caplog.text
+ assert "86400" in caplog.text
+ assert "long-running tasks" in caplog.text
+
+
+@conf_vars(
+ {
+ ("celery", "BROKER_URL"): "redis://localhost:6379/0",
+ ("celery_broker_transport_options", "visibility_timeout"): "172800",
+ }
+)
+def test_visibility_timeout_no_warning_when_configured(caplog):
+ """Test that no warning is logged when visibility_timeout is explicitly
configured."""
+ import importlib
+
+ from airflow.providers.celery.executors.default_celery import log
+
+ with caplog.at_level(logging.WARNING, logger=log.name):
+ importlib.reload(default_celery)
+ assert (
+
int(default_celery.DEFAULT_CELERY_CONFIG["broker_transport_options"]["visibility_timeout"])
+ == 172800
+ )
+ assert "No visibility_timeout configured" not in caplog.text
+
+
+@conf_vars({("celery", "BROKER_URL"): "amqp://guest:guest@localhost:5672//"})
+def test_visibility_timeout_not_set_for_unsupported_broker(caplog):
+ """Test that visibility_timeout is not set for brokers that don't support
it (e.g. RabbitMQ)."""
+ import importlib
+
+ from airflow.providers.celery.executors.default_celery import log
+
+ with caplog.at_level(logging.WARNING, logger=log.name):
+ importlib.reload(default_celery)
+ assert "visibility_timeout" not in
default_celery.DEFAULT_CELERY_CONFIG.get(
+ "broker_transport_options", {}
+ )
+ assert "No visibility_timeout configured" not in caplog.text
+
+
@conf_vars({("celery", "extra_celery_config"): '{"worker_max_tasks_per_child":
10}'})
def test_celery_extra_celery_config_loaded_from_string():
import importlib