This is an automated email from the ASF dual-hosted git repository.
husseinawala pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 6c72223f86 Add `task_acks_late` configuration to Celery Executor
(#37066)
6c72223f86 is described below
commit 6c72223f8653abf421fa4443b337c0ffb33af29b
Author: Stefan Blumentrath <[email protected]>
AuthorDate: Tue Feb 6 00:07:51 2024 +0100
Add `task_acks_late` configuration to Celery Executor (#37066)
* get task_acks_late from config
* add task_acks_late to config
* Update airflow/providers/celery/provider.yaml
Co-authored-by: Hussein Awala <[email protected]>
* Update airflow/providers/celery/executors/default_celery.py
Co-authored-by: Hussein Awala <[email protected]>
* add basic test
* test acks_late False
* linting
* Update airflow/providers/celery/provider.yaml
Co-authored-by: Niko Oliveira <[email protected]>
* Update airflow/providers/celery/provider.yaml
Co-authored-by: Niko Oliveira <[email protected]>
* linting
* Update spelling_wordlist.txt
* alphabetic order
* define version added
* Update airflow/providers/celery/provider.yaml
* Update airflow/providers/celery/provider.yaml
Co-authored-by: Jarek Potiuk <[email protected]>
---------
Co-authored-by: Hussein Awala <[email protected]>
Co-authored-by: Niko Oliveira <[email protected]>
Co-authored-by: Jarek Potiuk <[email protected]>
---
airflow/providers/celery/executors/default_celery.py | 2 +-
airflow/providers/celery/provider.yaml | 15 +++++++++++++++
docs/spelling_wordlist.txt | 1 +
tests/providers/celery/executors/test_celery_executor.py | 9 +++++++++
4 files changed, 26 insertions(+), 1 deletion(-)
diff --git a/airflow/providers/celery/executors/default_celery.py
b/airflow/providers/celery/executors/default_celery.py
index eaaf0fd0b4..01b13b1c5a 100644
--- a/airflow/providers/celery/executors/default_celery.py
+++ b/airflow/providers/celery/executors/default_celery.py
@@ -72,7 +72,7 @@ DEFAULT_CELERY_CONFIG = {
"accept_content": ["json"],
"event_serializer": "json",
"worker_prefetch_multiplier": conf.getint("celery",
"worker_prefetch_multiplier", fallback=1),
- "task_acks_late": True,
+ "task_acks_late": conf.getboolean("celery", "task_acks_late",
fallback=True),
"task_default_queue": conf.get("operators", "DEFAULT_QUEUE"),
"task_default_exchange": conf.get("operators", "DEFAULT_QUEUE"),
"task_track_started": conf.getboolean("celery", "task_track_started",
fallback=True),
diff --git a/airflow/providers/celery/provider.yaml
b/airflow/providers/celery/provider.yaml
index 947dfcb6b1..185b688ce9 100644
--- a/airflow/providers/celery/provider.yaml
+++ b/airflow/providers/celery/provider.yaml
@@ -276,6 +276,21 @@ config:
type: float
example: ~
default: "1.0"
+ task_acks_late:
+ description: |
+ If an Airflow task's execution time exceeds the visibility_timeout,
Celery will re-assign the
+ task to a Celery worker, even if the original task is still running
successfully. The new task
+ instance then runs concurrently with the original task and the
Airflow UI and logs only show an
+ error message:
+ 'Task Instance Not Running' FAILED: Task is in the running state'
+ Setting task_acks_late to True will force Celery to wait until a
task is finished before a
+ new task instance is assigned. This effectively overrides the
visibility timeout.
+ See also:
+
https://docs.celeryq.dev/en/stable/reference/celery.app.task.html#celery.app.task.Task.acks_late
+ version_added: 3.6.0
+ type: boolean
+ example: "True"
+ default: "True"
task_track_started:
description: |
Celery task will report its status as 'started' when the task is
executed by a worker.
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index 08d2817eae..8457f8403c 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -7,6 +7,7 @@ Ack
ack
ackIds
acknowledgement
+acks
actionCard
Acyclic
acyclic
diff --git a/tests/providers/celery/executors/test_celery_executor.py
b/tests/providers/celery/executors/test_celery_executor.py
index e6446c0f6f..450312bd63 100644
--- a/tests/providers/celery/executors/test_celery_executor.py
+++ b/tests/providers/celery/executors/test_celery_executor.py
@@ -358,3 +358,12 @@ def test_sentinel_kwargs_loaded_from_string():
assert
default_celery.DEFAULT_CELERY_CONFIG["broker_transport_options"]["sentinel_kwargs"]
== {
"service_name": "mymaster"
}
+
+
+@conf_vars({("celery", "task_acks_late"): "False"})
+def test_celery_task_acks_late_loaded_from_string():
+ import importlib
+
+ # reload celery conf to apply the new config
+ importlib.reload(default_celery)
+ assert default_celery.DEFAULT_CELERY_CONFIG["task_acks_late"] is False