This is an automated email from the ASF dual-hosted git repository.

husseinawala pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 6c72223f86 Add `task_acks_late` configuration to Celery Executor 
(#37066)
6c72223f86 is described below

commit 6c72223f8653abf421fa4443b337c0ffb33af29b
Author: Stefan Blumentrath <[email protected]>
AuthorDate: Tue Feb 6 00:07:51 2024 +0100

    Add `task_acks_late` configuration to Celery Executor (#37066)
    
    * get task_acks_late from config
    
    * add task_acks_late to config
    
    * Update airflow/providers/celery/provider.yaml
    
    Co-authored-by: Hussein Awala <[email protected]>
    
    * Update airflow/providers/celery/executors/default_celery.py
    
    Co-authored-by: Hussein Awala <[email protected]>
    
    * add basic test
    
    * test acks_late False
    
    * linting
    
    * Update airflow/providers/celery/provider.yaml
    
    Co-authored-by: Niko Oliveira <[email protected]>
    
    * Update airflow/providers/celery/provider.yaml
    
    Co-authored-by: Niko Oliveira <[email protected]>
    
    * linting
    
    * Update spelling_wordlist.txt
    
    * alphabetic order
    
    * define version added
    
    * Update airflow/providers/celery/provider.yaml
    
    * Update airflow/providers/celery/provider.yaml
    
    Co-authored-by: Jarek Potiuk <[email protected]>
    
    ---------
    
    Co-authored-by: Hussein Awala <[email protected]>
    Co-authored-by: Niko Oliveira <[email protected]>
    Co-authored-by: Jarek Potiuk <[email protected]>
---
 airflow/providers/celery/executors/default_celery.py     |  2 +-
 airflow/providers/celery/provider.yaml                   | 15 +++++++++++++++
 docs/spelling_wordlist.txt                               |  1 +
 tests/providers/celery/executors/test_celery_executor.py |  9 +++++++++
 4 files changed, 26 insertions(+), 1 deletion(-)

diff --git a/airflow/providers/celery/executors/default_celery.py 
b/airflow/providers/celery/executors/default_celery.py
index eaaf0fd0b4..01b13b1c5a 100644
--- a/airflow/providers/celery/executors/default_celery.py
+++ b/airflow/providers/celery/executors/default_celery.py
@@ -72,7 +72,7 @@ DEFAULT_CELERY_CONFIG = {
     "accept_content": ["json"],
     "event_serializer": "json",
     "worker_prefetch_multiplier": conf.getint("celery", 
"worker_prefetch_multiplier", fallback=1),
-    "task_acks_late": True,
+    "task_acks_late": conf.getboolean("celery", "task_acks_late", 
fallback=True),
     "task_default_queue": conf.get("operators", "DEFAULT_QUEUE"),
     "task_default_exchange": conf.get("operators", "DEFAULT_QUEUE"),
     "task_track_started": conf.getboolean("celery", "task_track_started", 
fallback=True),
diff --git a/airflow/providers/celery/provider.yaml 
b/airflow/providers/celery/provider.yaml
index 947dfcb6b1..185b688ce9 100644
--- a/airflow/providers/celery/provider.yaml
+++ b/airflow/providers/celery/provider.yaml
@@ -276,6 +276,21 @@ config:
         type: float
         example: ~
         default: "1.0"
+      task_acks_late:
+        description: |
+          If an Airflow task's execution time exceeds the visibility_timeout, 
Celery will re-assign the
+          task to a Celery worker, even if the original task is still running 
successfully. The new task
+          instance then runs concurrently with the original task and the 
Airflow UI and logs only show an
+          error message:
+          'Task Instance Not Running' FAILED: Task is in the running state'
+          Setting task_acks_late to True will force Celery to wait until a 
task is finished before a
+          new task instance is assigned. This effectively overrides the 
visibility timeout.
+          See also:
+          
https://docs.celeryq.dev/en/stable/reference/celery.app.task.html#celery.app.task.Task.acks_late
+        version_added: 3.6.0
+        type: boolean
+        example: "True"
+        default: "True"
       task_track_started:
         description: |
           Celery task will report its status as 'started' when the task is 
executed by a worker.
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index 08d2817eae..8457f8403c 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -7,6 +7,7 @@ Ack
 ack
 ackIds
 acknowledgement
+acks
 actionCard
 Acyclic
 acyclic
diff --git a/tests/providers/celery/executors/test_celery_executor.py 
b/tests/providers/celery/executors/test_celery_executor.py
index e6446c0f6f..450312bd63 100644
--- a/tests/providers/celery/executors/test_celery_executor.py
+++ b/tests/providers/celery/executors/test_celery_executor.py
@@ -358,3 +358,12 @@ def test_sentinel_kwargs_loaded_from_string():
     assert 
default_celery.DEFAULT_CELERY_CONFIG["broker_transport_options"]["sentinel_kwargs"]
 == {
         "service_name": "mymaster"
     }
+
+
+@conf_vars({("celery", "task_acks_late"): "False"})
+def test_celery_task_acks_late_loaded_from_string():
+    import importlib
+
+    # reload celery conf to apply the new config
+    importlib.reload(default_celery)
+    assert default_celery.DEFAULT_CELERY_CONFIG["task_acks_late"] is False

Reply via email to