This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new b6472b26a44 Fix amqps:// SSL config and celery_config_options bypass
(#64392)
b6472b26a44 is described below
commit b6472b26a44ce4834a563befea72dea7deb46da1
Author: Daniel Seo <[email protected]>
AuthorDate: Fri Apr 3 12:44:45 2026 -0400
Fix amqps:// SSL config and celery_config_options bypass (#64392)
* Fix broker_use_ssl not applied for amqps:// broker URLs
* addressed copilot comments
Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
* fix import sort order
---------
Co-authored-by: hseo36 <[email protected]>
Co-authored-by: Claude Opus 4.6 (1M context) <[email protected]>
---
.../celery/executors/celery_executor_utils.py | 15 ++-
.../providers/celery/executors/default_celery.py | 2 +-
.../unit/celery/executors/test_celery_executor.py | 147 +++++++++++++++++++++
3 files changed, 162 insertions(+), 2 deletions(-)
diff --git
a/providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py
b/providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py
index 6ac9ce19029..699052b470e 100644
---
a/providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py
+++
b/providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py
@@ -33,6 +33,7 @@ import traceback
from collections.abc import Collection, Mapping, MutableMapping, Sequence
from concurrent.futures import ProcessPoolExecutor
from functools import cache
+from importlib import import_module
from typing import TYPE_CHECKING, Any
from celery import Celery, states as celery_states
@@ -124,7 +125,10 @@ def create_celery_app(team_conf: ExecutorConf |
AirflowConfigParser) -> Celery:
:param team_conf: ExecutorConf instance with team-specific configuration,
or global conf
:return: Celery app instance
"""
- from airflow.providers.celery.executors.default_celery import
get_default_celery_config
+ from airflow.providers.celery.executors.default_celery import (
+ DEFAULT_CELERY_CONFIG,
+ get_default_celery_config,
+ )
celery_app_name = team_conf.get("celery", "CELERY_APP_NAME")
@@ -138,6 +142,15 @@ def create_celery_app(team_conf: ExecutorConf |
AirflowConfigParser) -> Celery:
config = get_default_celery_config(team_conf)
+ # Apply user-provided celery_config_options on top of team config.
+ # Skip if it resolves to DEFAULT_CELERY_CONFIG (built from global conf,
not team-aware).
+ configured_path = team_conf.get("celery", "celery_config_options",
fallback=None)
+ if configured_path:
+ module_path, _, attr_name = configured_path.rpartition(".")
+ user_config = getattr(import_module(module_path), attr_name)
+ if user_config is not DEFAULT_CELERY_CONFIG and
isinstance(user_config, dict):
+ config.update(user_config)
+
celery_app = Celery(celery_app_name, config_source=config)
# Register tasks with this app
diff --git
a/providers/celery/src/airflow/providers/celery/executors/default_celery.py
b/providers/celery/src/airflow/providers/celery/executors/default_celery.py
index 52ef77a15ac..f0ef8185d1e 100644
--- a/providers/celery/src/airflow/providers/celery/executors/default_celery.py
+++ b/providers/celery/src/airflow/providers/celery/executors/default_celery.py
@@ -141,7 +141,7 @@ def get_default_celery_config(team_conf) -> dict[str, Any]:
try:
if celery_ssl_active:
- if broker_url and "amqp://" in broker_url:
+ if broker_url and re.search(r"amqps?://", broker_url):
broker_use_ssl = {
"keyfile": team_conf.get("celery", "SSL_KEY"),
"certfile": team_conf.get("celery", "SSL_CERT"),
diff --git
a/providers/celery/tests/unit/celery/executors/test_celery_executor.py
b/providers/celery/tests/unit/celery/executors/test_celery_executor.py
index ff2c146f828..bc668bef508 100644
--- a/providers/celery/tests/unit/celery/executors/test_celery_executor.py
+++ b/providers/celery/tests/unit/celery/executors/test_celery_executor.py
@@ -54,6 +54,8 @@ from tests_common.test_utils.version_compat import (
if AIRFLOW_V_3_0_PLUS:
from airflow.models.dag_version import DagVersion
+if AIRFLOW_V_3_2_PLUS:
+ from airflow.executors.base_executor import ExecutorConf
if AIRFLOW_V_3_1_PLUS:
from airflow.sdk import BaseOperator, timezone
else:
@@ -814,3 +816,148 @@ def test_execute_workload_ignores_already_running_task():
"""
with pytest.raises(Ignore):
execute_workload_unwrapped(workload_json)
+
+
+class TestAmqpsSslConfig:
+ """Tests for amqps:// broker URL SSL configuration (Fix for substring
match bug)."""
+
+ @conf_vars(
+ {
+ ("celery", "BROKER_URL"): "amqps://guest:guest@rabbitmq:5671//",
+ ("celery", "SSL_ACTIVE"): "True",
+ ("celery", "SSL_KEY"): "/path/to/key.pem",
+ ("celery", "SSL_CERT"): "/path/to/cert.pem",
+ ("celery", "SSL_CACERT"): "/path/to/ca.pem",
+ }
+ )
+ def test_amqps_broker_url_builds_ssl_config(self):
+ """Test that amqps:// broker URLs correctly build broker_use_ssl with
AMQP param names."""
+ import importlib
+ import ssl
+
+ importlib.reload(default_celery)
+
+ config = default_celery.DEFAULT_CELERY_CONFIG
+ assert "broker_use_ssl" in config, "broker_use_ssl should be set for
amqps:// URLs"
+ broker_ssl = config["broker_use_ssl"]
+ assert broker_ssl["keyfile"] == "/path/to/key.pem"
+ assert broker_ssl["certfile"] == "/path/to/cert.pem"
+ assert broker_ssl["ca_certs"] == "/path/to/ca.pem"
+ assert broker_ssl["cert_reqs"] == ssl.CERT_REQUIRED
+ # Must NOT have ssl_ prefixed keys (those are for Redis)
+ assert "ssl_keyfile" not in broker_ssl
+ assert "ssl_certfile" not in broker_ssl
+
+ @conf_vars(
+ {
+ ("celery", "BROKER_URL"): "amqp://guest:guest@rabbitmq:5672//",
+ ("celery", "SSL_ACTIVE"): "True",
+ ("celery", "SSL_KEY"): "/path/to/key.pem",
+ ("celery", "SSL_CERT"): "/path/to/cert.pem",
+ ("celery", "SSL_CACERT"): "/path/to/ca.pem",
+ }
+ )
+ def test_amqp_broker_url_still_builds_ssl_config(self):
+ """Test that amqp:// (non-TLS) broker URLs still build SSL config
correctly (no regression)."""
+ import importlib
+ import ssl
+
+ importlib.reload(default_celery)
+
+ config = default_celery.DEFAULT_CELERY_CONFIG
+ assert "broker_use_ssl" in config
+ broker_ssl = config["broker_use_ssl"]
+ assert broker_ssl["keyfile"] == "/path/to/key.pem"
+ assert broker_ssl["cert_reqs"] == ssl.CERT_REQUIRED
+
+ @conf_vars(
+ {
+ ("celery", "BROKER_URL"): "amqps://guest:guest@rabbitmq:5671//",
+ ("celery", "SSL_ACTIVE"): "False",
+ }
+ )
+ def test_amqps_broker_url_no_ssl_when_inactive(self):
+ """Test that amqps:// broker URLs don't get SSL config when SSL_ACTIVE
is False."""
+ import importlib
+
+ importlib.reload(default_celery)
+
+ config = default_celery.DEFAULT_CELERY_CONFIG
+ assert "broker_use_ssl" not in config
+
+
+class TestCreateCeleryAppTeamIsolation:
+ """Tests for create_celery_app() multi-team config isolation."""
+
+ @pytest.mark.skipif(not AIRFLOW_V_3_2_PLUS, reason="ExecutorConf requires
Airflow 3.2+")
+ def test_custom_celery_config_options_applied(self):
+ """User-provided celery_config_options (non-default) should be merged
into team config."""
+ custom_config = {"worker_concurrency": 42, "broker_url":
"redis://custom:6379/0"}
+ custom_path = "my_custom_module.CELERY_CONFIG"
+
+ team_conf = ExecutorConf(team_name="team_alpha")
+ original_get = team_conf.get
+
+ def mock_get(section, key, **kwargs):
+ if section == "celery" and key == "celery_config_options":
+ return custom_path
+ return original_get(section, key, **kwargs)
+
+ mock_module = mock.MagicMock()
+ mock_module.CELERY_CONFIG = custom_config
+
+ with (
+ mock.patch.object(team_conf, "get", side_effect=mock_get),
+ mock.patch.object(celery_executor_utils, "import_module",
return_value=mock_module),
+ ):
+ celery_app = celery_executor_utils.create_celery_app(team_conf)
+ assert celery_app.conf.worker_concurrency == 42
+ assert celery_app.conf.broker_url == "redis://custom:6379/0"
+
+ def test_default_celery_config_options_skipped_via_identity_check(self):
+ """When celery_config_options resolves to DEFAULT_CELERY_CONFIG (same
object),
+ it must be skipped — re-applying it would overwrite team-specific
config
+ since DEFAULT_CELERY_CONFIG is built from global conf."""
+ original_get = conf.get
+ # Path just needs a dot for rpartition and attr name matching
DEFAULT_CELERY_CONFIG.
+ # import_module is mocked to return default_celery module regardless
of path.
+ celery_config_path = "any.module.DEFAULT_CELERY_CONFIG"
+
+ def mock_get(section, key, **kwargs):
+ if section == "celery" and key == "celery_config_options":
+ return celery_config_path
+ return original_get(section, key, **kwargs)
+
+ with (
+ mock.patch.object(conf, "get", side_effect=mock_get),
+ mock.patch.object(celery_executor_utils, "import_module") as
mock_import,
+ ):
+ mock_import.return_value = default_celery
+ celery_app = celery_executor_utils.create_celery_app(conf)
+ # import_module called (path is non-None), but override skipped
(same object)
+ mock_import.assert_called_once()
+ default_config = default_celery.get_default_celery_config(conf)
+ assert celery_app.conf.broker_url == default_config["broker_url"]
+
+ @pytest.mark.skipif(not AIRFLOW_V_3_2_PLUS, reason="ExecutorConf requires
Airflow 3.2+")
+ def test_team_specific_broker_not_overwritten(self):
+ """Team-specific BROKER_URL set via ExecutorConf must survive
create_celery_app()."""
+ team_conf = ExecutorConf(team_name="team_alpha")
+
+ original_get = team_conf.get
+
+ def mock_team_get(section, key, **kwargs):
+ if section == "celery" and key == "BROKER_URL":
+ return "amqps://team-alpha-rabbit:5671//"
+ return original_get(section, key, **kwargs)
+
+ with mock.patch.object(team_conf, "get", side_effect=mock_team_get):
+ celery_app = celery_executor_utils.create_celery_app(team_conf)
+ assert celery_app.conf.broker_url ==
"amqps://team-alpha-rabbit:5671//"
+
+ @pytest.mark.skipif(not AIRFLOW_V_3_2_PLUS, reason="ExecutorConf requires
Airflow 3.2+")
+ def test_team_app_name_includes_team_name(self):
+ """Each team gets a unique Celery app name for broker isolation."""
+ team_conf = ExecutorConf(team_name="team_beta")
+ celery_app = celery_executor_utils.create_celery_app(team_conf)
+ assert "team_beta" in celery_app.main