This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 1ae9279346 Init providers before importing Celery (#34782)
1ae9279346 is described below
commit 1ae9279346315d99e7f7c546fbcd335aa5a871cd
Author: Jonny Fuller <[email protected]>
AuthorDate: Tue Oct 17 13:58:52 2023 -0400
Init providers before importing Celery (#34782)
Fixes _SECRET and _CMD broker configurations not working after 2.7 release
Co-authored-by: JonnyFuller <[email protected]>
---
.../celery/executors/celery_executor_utils.py | 49 ++++++++++++++--------
1 file changed, 31 insertions(+), 18 deletions(-)
diff --git a/airflow/providers/celery/executors/celery_executor_utils.py
b/airflow/providers/celery/executors/celery_executor_utils.py
index 01be860b92..6e056409d0 100644
--- a/airflow/providers/celery/executors/celery_executor_utils.py
+++ b/airflow/providers/celery/executors/celery_executor_utils.py
@@ -43,11 +43,11 @@ import airflow.settings as settings
from airflow.configuration import conf
from airflow.exceptions import AirflowException, RemovedInAirflow3Warning
from airflow.executors.base_executor import BaseExecutor
-from airflow.providers.celery.executors.default_celery import
DEFAULT_CELERY_CONFIG
from airflow.stats import Stats
from airflow.utils.dag_parsing_context import _airflow_parsing_context_manager
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.net import get_hostname
+from airflow.utils.providers_configuration_loader import
providers_configuration_loaded
from airflow.utils.timeout import timeout
log = logging.getLogger(__name__)
@@ -65,23 +65,36 @@ OPERATION_TIMEOUT = conf.getfloat("celery",
"operation_timeout")
# Make it constant for unit test.
CELERY_FETCH_ERR_MSG_HEADER = "Error fetching Celery task state"
-if conf.has_option("celery", "celery_config_options"):
- celery_configuration = conf.getimport("celery", "celery_config_options")
-
-else:
- celery_configuration = DEFAULT_CELERY_CONFIG
-
-celery_app_name = conf.get("celery", "CELERY_APP_NAME")
-if celery_app_name == "airflow.executors.celery_executor":
- warnings.warn(
- "The celery.CELERY_APP_NAME configuration uses deprecated package
name: "
- "'airflow.executors.celery_executor'. "
- "Change it to `airflow.providers.celery.executors.celery_executor`,
and "
- "update the `-app` flag in your Celery Health Checks "
- "to use `airflow.providers.celery.executors.celery_executor.app`.",
- RemovedInAirflow3Warning,
- )
-app = Celery(celery_app_name, config_source=celery_configuration)
+celery_configuration = None
+
+
+@providers_configuration_loaded
+def _get_celery_app() -> Celery:
+ """Init providers before importing the configuration, so the _SECRET and
_CMD options work."""
+ global celery_configuration
+
+ if conf.has_option("celery", "celery_config_options"):
+ celery_configuration = conf.getimport("celery",
"celery_config_options")
+ else:
+ from airflow.providers.celery.executors.default_celery import
DEFAULT_CELERY_CONFIG
+
+ celery_configuration = DEFAULT_CELERY_CONFIG
+
+ celery_app_name = conf.get("celery", "CELERY_APP_NAME")
+ if celery_app_name == "airflow.executors.celery_executor":
+ warnings.warn(
+ "The celery.CELERY_APP_NAME configuration uses deprecated package
name: "
+ "'airflow.executors.celery_executor'. "
+ "Change it to
`airflow.providers.celery.executors.celery_executor`, and "
+ "update the `-app` flag in your Celery Health Checks "
+ "to use `airflow.providers.celery.executors.celery_executor.app`.",
+ RemovedInAirflow3Warning,
+ )
+
+ return Celery(celery_app_name, config_source=celery_configuration)
+
+
+app = _get_celery_app()
@celery_import_modules.connect