This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new e2393ee6dd Fix `sentinel_kwargs` load from ENV (#36318)
e2393ee6dd is described below
commit e2393ee6dd3a927e753e6375621af07aa0c734dc
Author: Valentine <[email protected]>
AuthorDate: Wed Dec 20 23:38:07 2023 +0200
Fix `sentinel_kwargs` load from ENV (#36318)
* fix(providers/centry): sentinel_kwargs fron env
* test: add sentinel kwargs test
---
airflow/providers/celery/executors/default_celery.py | 10 +++++-----
tests/providers/celery/executors/test_celery_executor.py | 11 +++++++++++
2 files changed, 16 insertions(+), 5 deletions(-)
diff --git a/airflow/providers/celery/executors/default_celery.py
b/airflow/providers/celery/executors/default_celery.py
index 633d090ada..eaaf0fd0b4 100644
--- a/airflow/providers/celery/executors/default_celery.py
+++ b/airflow/providers/celery/executors/default_celery.py
@@ -18,6 +18,7 @@
"""Default celery configuration."""
from __future__ import annotations
+import json
import logging
import ssl
@@ -47,18 +48,17 @@ log = logging.getLogger(__name__)
broker_url = conf.get("celery", "BROKER_URL", fallback="redis://redis:6379/0")
-broker_transport_options = conf.getsection("celery_broker_transport_options")
or {}
+broker_transport_options: dict =
conf.getsection("celery_broker_transport_options") or {}
if "visibility_timeout" not in broker_transport_options:
if _broker_supports_visibility_timeout(broker_url):
broker_transport_options["visibility_timeout"] = 21600
-broker_transport_options_for_celery: dict = broker_transport_options.copy()
if "sentinel_kwargs" in broker_transport_options:
try:
- sentinel_kwargs = broker_transport_options.get("sentinel_kwargs")
+ sentinel_kwargs =
json.loads(broker_transport_options["sentinel_kwargs"])
if not isinstance(sentinel_kwargs, dict):
raise ValueError
- broker_transport_options_for_celery["sentinel_kwargs"] =
sentinel_kwargs
+ broker_transport_options["sentinel_kwargs"] = sentinel_kwargs
except Exception:
raise AirflowException("sentinel_kwargs should be written in the
correct dictionary format.")
@@ -77,7 +77,7 @@ DEFAULT_CELERY_CONFIG = {
"task_default_exchange": conf.get("operators", "DEFAULT_QUEUE"),
"task_track_started": conf.getboolean("celery", "task_track_started",
fallback=True),
"broker_url": broker_url,
- "broker_transport_options": broker_transport_options_for_celery,
+ "broker_transport_options": broker_transport_options,
"result_backend": result_backend,
"database_engine_options": conf.getjson(
"celery", "result_backend_sqlalchemy_engine_options", fallback={}
diff --git a/tests/providers/celery/executors/test_celery_executor.py
b/tests/providers/celery/executors/test_celery_executor.py
index 0130bccc4c..e6446c0f6f 100644
--- a/tests/providers/celery/executors/test_celery_executor.py
+++ b/tests/providers/celery/executors/test_celery_executor.py
@@ -347,3 +347,14 @@ def
test_celery_executor_with_no_recommended_result_backend(caplog):
"You have configured a result_backend using the protocol `rediss`,"
" it is highly recommended to use an alternative result_backend
(i.e. a database)."
) in caplog.text
+
+
+@conf_vars({("celery_broker_transport_options", "sentinel_kwargs"):
'{"service_name": "mymaster"}'})
+def test_sentinel_kwargs_loaded_from_string():
+ import importlib
+
+ # reload celery conf to apply the new config
+ importlib.reload(default_celery)
+ assert
default_celery.DEFAULT_CELERY_CONFIG["broker_transport_options"]["sentinel_kwargs"]
== {
+ "service_name": "mymaster"
+ }