This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 1ae9279346 Init providers before importing Celery (#34782)
1ae9279346 is described below

commit 1ae9279346315d99e7f7c546fbcd335aa5a871cd
Author: Jonny Fuller <[email protected]>
AuthorDate: Tue Oct 17 13:58:52 2023 -0400

    Init providers before importing Celery (#34782)
    
    Fixes _SECRET and _CMD broker configurations not working after 2.7 release
    
    Co-authored-by: JonnyFuller <[email protected]>
---
 .../celery/executors/celery_executor_utils.py      | 49 ++++++++++++++--------
 1 file changed, 31 insertions(+), 18 deletions(-)

diff --git a/airflow/providers/celery/executors/celery_executor_utils.py 
b/airflow/providers/celery/executors/celery_executor_utils.py
index 01be860b92..6e056409d0 100644
--- a/airflow/providers/celery/executors/celery_executor_utils.py
+++ b/airflow/providers/celery/executors/celery_executor_utils.py
@@ -43,11 +43,11 @@ import airflow.settings as settings
 from airflow.configuration import conf
 from airflow.exceptions import AirflowException, RemovedInAirflow3Warning
 from airflow.executors.base_executor import BaseExecutor
-from airflow.providers.celery.executors.default_celery import 
DEFAULT_CELERY_CONFIG
 from airflow.stats import Stats
 from airflow.utils.dag_parsing_context import _airflow_parsing_context_manager
 from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.net import get_hostname
+from airflow.utils.providers_configuration_loader import 
providers_configuration_loaded
 from airflow.utils.timeout import timeout
 
 log = logging.getLogger(__name__)
@@ -65,23 +65,36 @@ OPERATION_TIMEOUT = conf.getfloat("celery", 
"operation_timeout")
 # Make it constant for unit test.
 CELERY_FETCH_ERR_MSG_HEADER = "Error fetching Celery task state"
 
-if conf.has_option("celery", "celery_config_options"):
-    celery_configuration = conf.getimport("celery", "celery_config_options")
-
-else:
-    celery_configuration = DEFAULT_CELERY_CONFIG
-
-celery_app_name = conf.get("celery", "CELERY_APP_NAME")
-if celery_app_name == "airflow.executors.celery_executor":
-    warnings.warn(
-        "The celery.CELERY_APP_NAME configuration uses deprecated package 
name: "
-        "'airflow.executors.celery_executor'. "
-        "Change it to `airflow.providers.celery.executors.celery_executor`, 
and "
-        "update the `-app` flag in your Celery Health Checks "
-        "to use `airflow.providers.celery.executors.celery_executor.app`.",
-        RemovedInAirflow3Warning,
-    )
-app = Celery(celery_app_name, config_source=celery_configuration)
+celery_configuration = None
+
+
+@providers_configuration_loaded
+def _get_celery_app() -> Celery:
+    """Init providers before importing the configuration, so the _SECRET and 
_CMD options work."""
+    global celery_configuration
+
+    if conf.has_option("celery", "celery_config_options"):
+        celery_configuration = conf.getimport("celery", 
"celery_config_options")
+    else:
+        from airflow.providers.celery.executors.default_celery import 
DEFAULT_CELERY_CONFIG
+
+        celery_configuration = DEFAULT_CELERY_CONFIG
+
+    celery_app_name = conf.get("celery", "CELERY_APP_NAME")
+    if celery_app_name == "airflow.executors.celery_executor":
+        warnings.warn(
+            "The celery.CELERY_APP_NAME configuration uses deprecated package 
name: "
+            "'airflow.executors.celery_executor'. "
+            "Change it to 
`airflow.providers.celery.executors.celery_executor`, and "
+            "update the `-app` flag in your Celery Health Checks "
+            "to use `airflow.providers.celery.executors.celery_executor.app`.",
+            RemovedInAirflow3Warning,
+        )
+
+    return Celery(celery_app_name, config_source=celery_configuration)
+
+
+app = _get_celery_app()
 
 
 @celery_import_modules.connect

Reply via email to