This is an automated email from the ASF dual-hosted git repository.

jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new e2393ee6dd Fix `sentinel_kwargs` load from ENV (#36318)
e2393ee6dd is described below

commit e2393ee6dd3a927e753e6375621af07aa0c734dc
Author: Valentine <[email protected]>
AuthorDate: Wed Dec 20 23:38:07 2023 +0200

    Fix `sentinel_kwargs` load from ENV (#36318)
    
    * fix(providers/centry): sentinel_kwargs fron env
    
    * test: add sentinel kwargs test
---
 airflow/providers/celery/executors/default_celery.py     | 10 +++++-----
 tests/providers/celery/executors/test_celery_executor.py | 11 +++++++++++
 2 files changed, 16 insertions(+), 5 deletions(-)

diff --git a/airflow/providers/celery/executors/default_celery.py 
b/airflow/providers/celery/executors/default_celery.py
index 633d090ada..eaaf0fd0b4 100644
--- a/airflow/providers/celery/executors/default_celery.py
+++ b/airflow/providers/celery/executors/default_celery.py
@@ -18,6 +18,7 @@
 """Default celery configuration."""
 from __future__ import annotations
 
+import json
 import logging
 import ssl
 
@@ -47,18 +48,17 @@ log = logging.getLogger(__name__)
 
 broker_url = conf.get("celery", "BROKER_URL", fallback="redis://redis:6379/0")
 
-broker_transport_options = conf.getsection("celery_broker_transport_options") 
or {}
+broker_transport_options: dict = 
conf.getsection("celery_broker_transport_options") or {}
 if "visibility_timeout" not in broker_transport_options:
     if _broker_supports_visibility_timeout(broker_url):
         broker_transport_options["visibility_timeout"] = 21600
 
-broker_transport_options_for_celery: dict = broker_transport_options.copy()
 if "sentinel_kwargs" in broker_transport_options:
     try:
-        sentinel_kwargs = broker_transport_options.get("sentinel_kwargs")
+        sentinel_kwargs = 
json.loads(broker_transport_options["sentinel_kwargs"])
         if not isinstance(sentinel_kwargs, dict):
             raise ValueError
-        broker_transport_options_for_celery["sentinel_kwargs"] = 
sentinel_kwargs
+        broker_transport_options["sentinel_kwargs"] = sentinel_kwargs
     except Exception:
         raise AirflowException("sentinel_kwargs should be written in the 
correct dictionary format.")
 
@@ -77,7 +77,7 @@ DEFAULT_CELERY_CONFIG = {
     "task_default_exchange": conf.get("operators", "DEFAULT_QUEUE"),
     "task_track_started": conf.getboolean("celery", "task_track_started", 
fallback=True),
     "broker_url": broker_url,
-    "broker_transport_options": broker_transport_options_for_celery,
+    "broker_transport_options": broker_transport_options,
     "result_backend": result_backend,
     "database_engine_options": conf.getjson(
         "celery", "result_backend_sqlalchemy_engine_options", fallback={}
diff --git a/tests/providers/celery/executors/test_celery_executor.py 
b/tests/providers/celery/executors/test_celery_executor.py
index 0130bccc4c..e6446c0f6f 100644
--- a/tests/providers/celery/executors/test_celery_executor.py
+++ b/tests/providers/celery/executors/test_celery_executor.py
@@ -347,3 +347,14 @@ def 
test_celery_executor_with_no_recommended_result_backend(caplog):
             "You have configured a result_backend using the protocol `rediss`,"
             " it is highly recommended to use an alternative result_backend 
(i.e. a database)."
         ) in caplog.text
+
+
+@conf_vars({("celery_broker_transport_options", "sentinel_kwargs"): 
'{"service_name": "mymaster"}'})
+def test_sentinel_kwargs_loaded_from_string():
+    import importlib
+
+    # reload celery conf to apply the new config
+    importlib.reload(default_celery)
+    assert 
default_celery.DEFAULT_CELERY_CONFIG["broker_transport_options"]["sentinel_kwargs"]
 == {
+        "service_name": "mymaster"
+    }

Reply via email to