This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 40d54eac1a Move CeleryExecutor to the celery provider (#32526)
40d54eac1a is described below

commit 40d54eac1a2f35167bdd179fda3fd018fe32d116
Author: Jarek Potiuk <[email protected]>
AuthorDate: Wed Jul 12 09:22:09 2023 +0200

    Move CeleryExecutor to the celery provider (#32526)
    
    Originally Celery Executor and Celery Kubernetes executor
    have been part of the Airflow Core. However, with AIP-51 where
    some of the executor internals have been decoupled from the
    core, we can now move the Celery (and as next step Kubernetes)
    executors to the providers.
    
    This will allow to release fixes to the decoupled executors
    independently from releasing of the Airflow core version.
    
    Co-authored-by: Jed Cunningham 
<[email protected]>
    Co-authored-by: Niko Oliveira <[email protected]>
---
 .github/boring-cyborg.yml                          |  2 +-
 airflow/cli/cli_config.py                          |  6 +-
 airflow/cli/commands/celery_command.py             |  2 +-
 airflow/config_templates/config.yml                |  2 +-
 airflow/config_templates/default_airflow.cfg       |  2 +-
 airflow/executors/__init__.py                      | 16 +++++
 airflow/executors/executor_loader.py               |  5 +-
 airflow/providers/celery/CHANGELOG.rst             |  8 +++
 airflow/providers/celery/executors/__init__.py     | 36 ++++++++++
 .../celery}/executors/celery_executor.py           | 12 ++--
 .../celery}/executors/celery_executor_utils.py     | 15 +++-
 .../executors/celery_kubernetes_executor.py        |  2 +-
 airflow/providers/celery/provider.yaml             |  1 +
 chart/templates/workers/worker-deployment.yaml     |  2 +-
 .../logging-monitoring/check-health.rst            |  4 +-
 .../core-concepts/executor/celery_kubernetes.rst   |  2 +-
 docs/apache-airflow/extra-packages-ref.rst         | 82 +++++++++++-----------
 .../howto/docker-compose/docker-compose.yaml       |  3 +-
 newsfragments/32526.significant.rst                |  5 ++
 .../pre_commit_check_setup_extra_packages_ref.py   |  2 +-
 scripts/in_container/_in_container_utils.sh        |  9 +++
 setup.py                                           | 16 ++++-
 tests/cli/conftest.py                              |  2 +-
 .../integration/executors/test_celery_executor.py  | 26 ++++---
 .../providers/celery}/executors/__init__.py        |  2 +-
 .../celery}/executors/test_celery_executor.py      | 26 ++++---
 .../executors/test_celery_kubernetes_executor.py   |  4 +-
 tests/sensors/test_base.py                         |  4 +-
 tests/www/views/test_views_tasks.py                |  2 +-
 29 files changed, 208 insertions(+), 92 deletions(-)

diff --git a/.github/boring-cyborg.yml b/.github/boring-cyborg.yml
index 5aa495022b..989bb055df 100644
--- a/.github/boring-cyborg.yml
+++ b/.github/boring-cyborg.yml
@@ -167,7 +167,7 @@ labelPRBasedOnFilePath:
     - airflow/kubernetes/**/*
     - airflow/kubernetes_executor_templates/**/*
     - airflow/executors/kubernetes_executor.py
-    - airflow/executors/celery_kubernetes_executor.py
+    - airflow/providers/celery/executors/celery_kubernetes_executor.py
     - docs/apache-airflow/core-concepts/executor/kubernetes.rst
     - docs/apache-airflow/core-concepts/executor/celery_kubernetes.rst
     - docs/apache-airflow-providers-cncf-kubernetes/**/*
diff --git a/airflow/cli/cli_config.py b/airflow/cli/cli_config.py
index 587934769e..046cbaad14 100644
--- a/airflow/cli/cli_config.py
+++ b/airflow/cli/cli_config.py
@@ -67,7 +67,7 @@ class DefaultHelpParser(argparse.ArgumentParser):
                 executor_cls, _ = ExecutorLoader.import_executor_cls(executor)
                 classes = ()
                 try:
-                    from airflow.executors.celery_executor import 
CeleryExecutor
+                    from airflow.providers.celery.executors.celery_executor 
import CeleryExecutor
 
                     classes += (CeleryExecutor,)
                 except ImportError:
@@ -77,7 +77,9 @@ class DefaultHelpParser(argparse.ArgumentParser):
                     )
                     raise ArgumentError(action, message)
                 try:
-                    from airflow.executors.celery_kubernetes_executor import 
CeleryKubernetesExecutor
+                    from 
airflow.providers.celery.executors.celery_kubernetes_executor import (
+                        CeleryKubernetesExecutor,
+                    )
 
                     classes += (CeleryKubernetesExecutor,)
                 except ImportError:
diff --git a/airflow/cli/commands/celery_command.py 
b/airflow/cli/commands/celery_command.py
index 2ac976d10c..6d3dbe64bb 100644
--- a/airflow/cli/commands/celery_command.py
+++ b/airflow/cli/commands/celery_command.py
@@ -34,7 +34,7 @@ from lockfile.pidlockfile import read_pid_from_pidfile, 
remove_existing_pidfile
 
 from airflow import settings
 from airflow.configuration import conf
-from airflow.executors.celery_executor import app as celery_app
+from airflow.providers.celery.executors.celery_executor import app as 
celery_app
 from airflow.utils import cli as cli_utils
 from airflow.utils.cli import setup_locations, setup_logging
 from airflow.utils.serve_logs import serve_logs
diff --git a/airflow/config_templates/config.yml 
b/airflow/config_templates/config.yml
index 5336a88f4f..015da426c4 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -2023,7 +2023,7 @@ celery:
       version_added: ~
       type: string
       example: ~
-      default: "airflow.executors.celery_executor"
+      default: "airflow.providers.celery.executors.celery_executor"
     worker_concurrency:
       description: |
         The concurrency that will be used when starting workers with the
diff --git a/airflow/config_templates/default_airflow.cfg 
b/airflow/config_templates/default_airflow.cfg
index 7cae11065b..cff56508df 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -1037,7 +1037,7 @@ kubernetes_queue = kubernetes
 # This section only applies if you are using the CeleryExecutor in
 # ``[core]`` section above
 # The app name that will be used by celery
-celery_app_name = airflow.executors.celery_executor
+celery_app_name = airflow.providers.celery.executors.celery_executor
 
 # The concurrency that will be used when starting workers with the
 # ``airflow celery worker`` command. This defines the number of task instances 
that
diff --git a/airflow/executors/__init__.py b/airflow/executors/__init__.py
index 21ee94b03b..2c342d45f1 100644
--- a/airflow/executors/__init__.py
+++ b/airflow/executors/__init__.py
@@ -15,3 +15,19 @@
 # specific language governing permissions and limitations
 # under the License.
 """Executors."""
+from __future__ import annotations
+
+from airflow.utils.deprecation_tools import add_deprecated_classes
+
+__deprecated_classes = {
+    "celery_executor": {
+        "app": "airflow.providers.celery.executors.celery_executor_utils.app",
+        "CeleryExecutor": 
"airflow.providers.celery.executors.celery_executor.CeleryExecutor",
+    },
+    "celery_kubernetes_executor": {
+        "CeleryKubernetesExecutor": "airflow.providers.celery.executors."
+        "celery_kubernetes_executor.CeleryKubernetesExecutor",
+    },
+}
+
+add_deprecated_classes(__deprecated_classes, __name__)
diff --git a/airflow/executors/executor_loader.py 
b/airflow/executors/executor_loader.py
index 3ac49d46ca..58cd9ae916 100644
--- a/airflow/executors/executor_loader.py
+++ b/airflow/executors/executor_loader.py
@@ -60,8 +60,9 @@ class ExecutorLoader:
         LOCAL_EXECUTOR: "airflow.executors.local_executor.LocalExecutor",
         LOCAL_KUBERNETES_EXECUTOR: 
"airflow.executors.local_kubernetes_executor.LocalKubernetesExecutor",
         SEQUENTIAL_EXECUTOR: 
"airflow.executors.sequential_executor.SequentialExecutor",
-        CELERY_EXECUTOR: "airflow.executors.celery_executor.CeleryExecutor",
-        CELERY_KUBERNETES_EXECUTOR: 
"airflow.executors.celery_kubernetes_executor.CeleryKubernetesExecutor",
+        CELERY_EXECUTOR: 
"airflow.providers.celery.executors.celery_executor.CeleryExecutor",
+        CELERY_KUBERNETES_EXECUTOR: "airflow.providers.celery."
+        "executors.celery_kubernetes_executor.CeleryKubernetesExecutor",
         DASK_EXECUTOR: "airflow.executors.dask_executor.DaskExecutor",
         KUBERNETES_EXECUTOR: 
"airflow.executors.kubernetes_executor.KubernetesExecutor",
         DEBUG_EXECUTOR: "airflow.executors.debug_executor.DebugExecutor",
diff --git a/airflow/providers/celery/CHANGELOG.rst 
b/airflow/providers/celery/CHANGELOG.rst
index 85ccbc5fc4..0d0038cc36 100644
--- a/airflow/providers/celery/CHANGELOG.rst
+++ b/airflow/providers/celery/CHANGELOG.rst
@@ -27,6 +27,14 @@
 Changelog
 ---------
 
+3.3.0
+.....
+
+.. note::
+  This provider release is the first release that has Celery Executor and
+  Celery Kubernetes Executor moved from the core ``apache-airflow`` package to 
a Celery
+  provider package.
+
 3.2.1
 .....
 
diff --git a/airflow/providers/celery/executors/__init__.py 
b/airflow/providers/celery/executors/__init__.py
new file mode 100644
index 0000000000..56c59e58e7
--- /dev/null
+++ b/airflow/providers/celery/executors/__init__.py
@@ -0,0 +1,36 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import packaging.version
+
+from airflow.exceptions import AirflowOptionalProviderFeatureException
+
+try:
+    from airflow import __version__ as airflow_version
+except ImportError:
+    from airflow.version import version as airflow_version
+
+base_version = packaging.version.parse(airflow_version).base_version
+
+if packaging.version.parse(base_version) < packaging.version.parse("2.7.0"):
+    raise AirflowOptionalProviderFeatureException(
+        "Celery Executor from Celery Provider should only be used with Airflow 
2.7.0+.\n"
+        f"This is Airflow {airflow_version} and Celery and 
CeleryKubernetesExecutor are "
+        f"available in the 'airflow.executors' package. You should not use "
+        f"the provider's executors in this version of Airflow."
+    )
diff --git a/airflow/executors/celery_executor.py 
b/airflow/providers/celery/executors/celery_executor.py
similarity index 95%
rename from airflow/executors/celery_executor.py
rename to airflow/providers/celery/executors/celery_executor.py
index b3a011c815..0786c95538 100644
--- a/airflow/executors/celery_executor.py
+++ b/airflow/providers/celery/executors/celery_executor.py
@@ -64,7 +64,7 @@ def __getattr__(name):
     # celery_executor module without the time cost of its import and
     # construction
     if name == "app":
-        from airflow.executors.celery_executor_utils import app
+        from airflow.providers.celery.executors.celery_executor_utils import 
app
 
         return app
     raise AttributeError(f"module '{__name__}' has no attribute '{name}'")
@@ -99,7 +99,7 @@ class CeleryExecutor(BaseExecutor):
         self._sync_parallelism = conf.getint("celery", "SYNC_PARALLELISM")
         if self._sync_parallelism == 0:
             self._sync_parallelism = max(1, cpu_count() - 1)
-        from airflow.executors.celery_executor_utils import BulkStateFetcher
+        from airflow.providers.celery.executors.celery_executor_utils import 
BulkStateFetcher
 
         self.bulk_state_fetcher = BulkStateFetcher(self._sync_parallelism)
         self.tasks = {}
@@ -118,7 +118,7 @@ class CeleryExecutor(BaseExecutor):
         return max(1, int(math.ceil(1.0 * to_send_count / 
self._sync_parallelism)))
 
     def _process_tasks(self, task_tuples: list[TaskTuple]) -> None:
-        from airflow.executors.celery_executor_utils import execute_command
+        from airflow.providers.celery.executors.celery_executor_utils import 
execute_command
 
         task_tuples_to_send = [task_tuple[:3] + (execute_command,) for 
task_tuple in task_tuples]
         first_task = next(t[3] for t in task_tuples_to_send)
@@ -129,7 +129,7 @@ class CeleryExecutor(BaseExecutor):
 
         key_and_async_results = self._send_tasks_to_celery(task_tuples_to_send)
         self.log.debug("Sent all tasks.")
-        from airflow.executors.celery_executor_utils import 
ExceptionWithTraceback
+        from airflow.providers.celery.executors.celery_executor_utils import 
ExceptionWithTraceback
 
         for key, _, result in key_and_async_results:
             if isinstance(result, ExceptionWithTraceback) and isinstance(
@@ -165,7 +165,7 @@ class CeleryExecutor(BaseExecutor):
                 self.update_task_state(key, result.state, getattr(result, 
"info", None))
 
     def _send_tasks_to_celery(self, task_tuples_to_send: 
list[TaskInstanceInCelery]):
-        from airflow.executors.celery_executor_utils import 
send_task_to_executor
+        from airflow.providers.celery.executors.celery_executor_utils import 
send_task_to_executor
 
         if len(task_tuples_to_send) == 1 or self._sync_parallelism == 1:
             # One tuple, or max one process -> send it in the main thread.
@@ -304,7 +304,7 @@ class CeleryExecutor(BaseExecutor):
         :return: List of readable task instances for a warning message
         """
         readable_tis = []
-        from airflow.executors.celery_executor_utils import app
+        from airflow.providers.celery.executors.celery_executor_utils import 
app
 
         for ti in tis:
             readable_tis.append(repr(ti))
diff --git a/airflow/executors/celery_executor_utils.py 
b/airflow/providers/celery/executors/celery_executor_utils.py
similarity index 94%
rename from airflow/executors/celery_executor_utils.py
rename to airflow/providers/celery/executors/celery_executor_utils.py
index 49112bba6a..c659d8c377 100644
--- a/airflow/executors/celery_executor_utils.py
+++ b/airflow/providers/celery/executors/celery_executor_utils.py
@@ -27,6 +27,7 @@ import math
 import os
 import subprocess
 import traceback
+import warnings
 from concurrent.futures import ProcessPoolExecutor
 from typing import TYPE_CHECKING, Any, Mapping, MutableMapping, Optional, Tuple
 
@@ -40,7 +41,7 @@ from setproctitle import setproctitle
 import airflow.settings as settings
 from airflow.config_templates.default_celery import DEFAULT_CELERY_CONFIG
 from airflow.configuration import conf
-from airflow.exceptions import AirflowException
+from airflow.exceptions import AirflowException, RemovedInAirflow3Warning
 from airflow.executors.base_executor import BaseExecutor
 from airflow.models.taskinstance import TaskInstanceKey
 from airflow.stats import Stats
@@ -66,7 +67,17 @@ if conf.has_option("celery", "celery_config_options"):
 else:
     celery_configuration = DEFAULT_CELERY_CONFIG
 
-app = Celery(conf.get("celery", "CELERY_APP_NAME"), 
config_source=celery_configuration)
+celery_app_name = conf.get("celery", "CELERY_APP_NAME")
+if celery_app_name == "airflow.executors.celery_executor":
+    warnings.warn(
+        "The celery.CELERY_APP_NAME configuration uses deprecated package 
name: "
+        "'airflow.executors.celery_executor'. "
+        "Change it to `airflow.providers.celery.executors.celery_executor`, 
and "
+        "update the `-app` flag in your Celery Health Checks "
+        "to use `airflow.providers.celery.executors.celery_executor.app`.",
+        RemovedInAirflow3Warning,
+    )
+app = Celery(celery_app_name, config_source=celery_configuration)
 
 
 @celery_import_modules.connect
diff --git a/airflow/executors/celery_kubernetes_executor.py 
b/airflow/providers/celery/executors/celery_kubernetes_executor.py
similarity index 99%
rename from airflow/executors/celery_kubernetes_executor.py
rename to airflow/providers/celery/executors/celery_kubernetes_executor.py
index d528c43fe5..0522b12f41 100644
--- a/airflow/executors/celery_kubernetes_executor.py
+++ b/airflow/providers/celery/executors/celery_kubernetes_executor.py
@@ -22,8 +22,8 @@ from typing import TYPE_CHECKING, Sequence
 from airflow.callbacks.base_callback_sink import BaseCallbackSink
 from airflow.callbacks.callback_requests import CallbackRequest
 from airflow.configuration import conf
-from airflow.executors.celery_executor import CeleryExecutor
 from airflow.executors.kubernetes_executor import KubernetesExecutor
+from airflow.providers.celery.executors.celery_executor import CeleryExecutor
 from airflow.utils.log.logging_mixin import LoggingMixin
 
 if TYPE_CHECKING:
diff --git a/airflow/providers/celery/provider.yaml 
b/airflow/providers/celery/provider.yaml
index 83dc66ded0..b6b1e7e4e5 100644
--- a/airflow/providers/celery/provider.yaml
+++ b/airflow/providers/celery/provider.yaml
@@ -23,6 +23,7 @@ description: |
 
 suspended: false
 versions:
+  - 3.3.0
   - 3.2.1
   - 3.2.0
   - 3.1.0
diff --git a/chart/templates/workers/worker-deployment.yaml 
b/chart/templates/workers/worker-deployment.yaml
index 75b2b1b60a..4e306745d7 100644
--- a/chart/templates/workers/worker-deployment.yaml
+++ b/chart/templates/workers/worker-deployment.yaml
@@ -205,7 +205,7 @@ spec:
                 {{- else }}
                 - sh
                 - -c
-                - CONNECTION_CHECK_MAX_COUNT=0 exec /entrypoint python -m 
celery --app airflow.executors.celery_executor.app inspect ping -d 
celery@$(hostname)
+                - CONNECTION_CHECK_MAX_COUNT=0 exec /entrypoint python -m 
celery --app airflow.providers.celery.executors.celery_executor.app inspect 
ping -d celery@$(hostname)
                 {{- end }}
           {{- end }}
           ports:
diff --git 
a/docs/apache-airflow/administration-and-deployment/logging-monitoring/check-health.rst
 
b/docs/apache-airflow/administration-and-deployment/logging-monitoring/check-health.rst
index 58e8834078..78cf0f3725 100644
--- 
a/docs/apache-airflow/administration-and-deployment/logging-monitoring/check-health.rst
+++ 
b/docs/apache-airflow/administration-and-deployment/logging-monitoring/check-health.rst
@@ -147,12 +147,12 @@ To check if the worker running on the local host is 
working correctly, run:
 
 .. code-block:: bash
 
-    celery --app airflow.executors.celery_executor.app inspect ping -d 
celery@${HOSTNAME}
+    celery --app airflow.providers.celery.executors.celery_executor.app 
inspect ping -d celery@${HOSTNAME}
 
 To check if the all workers in the cluster running is working correctly, run:
 
 .. code-block:: bash
 
-    celery --app airflow.executors.celery_executor.app inspect ping
+    celery --app airflow.providers.celery.executors.celery_executor.app 
inspect ping
 
 For more information, see: `Management Command-line Utilities 
(inspect/control) 
<https://docs.celeryproject.org/en/stable/userguide/monitoring.html#monitoring-control>`__
 and `Workers Guide 
<https://docs.celeryproject.org/en/stable/userguide/workers.html>`__ in the 
Celery documentation.
diff --git a/docs/apache-airflow/core-concepts/executor/celery_kubernetes.rst 
b/docs/apache-airflow/core-concepts/executor/celery_kubernetes.rst
index 6e2471b14d..720fe7e743 100644
--- a/docs/apache-airflow/core-concepts/executor/celery_kubernetes.rst
+++ b/docs/apache-airflow/core-concepts/executor/celery_kubernetes.rst
@@ -21,7 +21,7 @@
 CeleryKubernetes Executor
 =========================
 
-The 
:class:`~airflow.executors.celery_kubernetes_executor.CeleryKubernetesExecutor` 
allows users
+The 
:class:`~airflow.providers.celery.executors.celery_kubernetes_executor.CeleryKubernetesExecutor`
 allows users
 to run simultaneously a ``CeleryExecutor`` and a ``KubernetesExecutor``.
 An executor is chosen to run a task based on the task's queue.
 
diff --git a/docs/apache-airflow/extra-packages-ref.rst 
b/docs/apache-airflow/extra-packages-ref.rst
index c8cfe87237..027fbb657e 100644
--- a/docs/apache-airflow/extra-packages-ref.rst
+++ b/docs/apache-airflow/extra-packages-ref.rst
@@ -37,47 +37,47 @@ These are core airflow extras that extend capabilities of 
core Airflow. They usu
 packages (with the exception of ``celery`` and ``cncf.kubernetes`` extras), 
they just install necessary
 python dependencies for the provided package.
 
-+---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+
-| extra               | install command                                     | 
enables                                                                    |
-+=====================+=====================================================+============================================================================+
-| aiobotocore         | ``pip install 'apache-airflow[aiobotocore]'``       | 
Support for asynchronous (deferrable) operators for Amazon integration     |
-+---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+
-| async               | ``pip install 'apache-airflow[async]'``             | 
Async worker classes for Gunicorn                                          |
-+---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+
-| celery              | ``pip install 'apache-airflow[celery]'``            | 
CeleryExecutor (also installs the celery provider package!)                |
-+---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+
-| cgroups             | ``pip install 'apache-airflow[cgroups]'``           | 
Needed To use CgroupTaskRunner                                             |
-+---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+
-| cncf.kubernetes     | ``pip install 'apache-airflow[cncf.kubernetes]'``   | 
Kubernetes Executor (also installs the Kubernetes provider package)        |
-+---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+
-| dask                | ``pip install 'apache-airflow[dask]'``              | 
DaskExecutor                                                               |
-+---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+
-| deprecated_api      | ``pip install 'apache-airflow[deprecated_api]'``    | 
Deprecated, experimental API that is replaced with the new REST API        |
-+---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+
-| github_enterprise   | ``pip install 'apache-airflow[github_enterprise]'`` | 
GitHub Enterprise auth backend                                             |
-+---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+
-| google_auth         | ``pip install 'apache-airflow[google_auth]'``       | 
Google auth backend                                                        |
-+---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+
-| kerberos            | ``pip install 'apache-airflow[kerberos]'``          | 
Kerberos integration for Kerberized services (Hadoop, Presto, Trino)       |
-+---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+
-| ldap                | ``pip install 'apache-airflow[ldap]'``              | 
LDAP authentication for users                                              |
-+---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+
-| leveldb             | ``pip install 'apache-airflow[leveldb]'``           | 
Required for use leveldb extra in google provider                          |
-+---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+
-| otel                | ``pip install 'apache-airflow[otel]'``              | 
Required for OpenTelemetry metrics                                         |
-+---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+
-| pandas              | ``pip install 'apache-airflow[pandas]'``            | 
Install Pandas library compatible with Airflow                             |
-+---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+
-| password            | ``pip install 'apache-airflow[password]'``          | 
Password authentication for users                                          |
-+---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+
-| rabbitmq            | ``pip install 'apache-airflow[rabbitmq]'``          | 
RabbitMQ support as a Celery backend                                       |
-+---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+
-| sentry              | ``pip install 'apache-airflow[sentry]'``            | 
Sentry service for application logging and monitoring                      |
-+---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+
-| statsd              | ``pip install 'apache-airflow[statsd]'``            | 
Needed by StatsD metrics                                                   |
-+---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+
-| virtualenv          | ``pip install 'apache-airflow[virtualenv]'``        | 
Running python tasks in local virtualenv                                   |
-+---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+
++---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+--------------+
+| extra               | install command                                     | 
enables                                                                    | 
Preinstalled |
++=====================+=====================================================+============================================================================+==============+
+| aiobotocore         | ``pip install 'apache-airflow[aiobotocore]'``       | 
Support for asynchronous (deferrable) operators for Amazon integration     |    
          |
++---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+--------------+
+| async               | ``pip install 'apache-airflow[async]'``             | 
Async worker classes for Gunicorn                                          |    
          |
++---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+--------------+
+| celery              | ``pip install 'apache-airflow[celery]'``            | 
CeleryExecutor (also installs the celery provider package!)                |    
  *       |
++---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+--------------+
+| cgroups             | ``pip install 'apache-airflow[cgroups]'``           | 
Needed To use CgroupTaskRunner                                             |    
          |
++---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+--------------+
+| cncf.kubernetes     | ``pip install 'apache-airflow[cncf.kubernetes]'``   | 
Kubernetes Executor (also installs the Kubernetes provider package)        |    
          |
++---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+--------------+
+| dask                | ``pip install 'apache-airflow[dask]'``              | 
DaskExecutor                                                               |    
          |
++---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+--------------+
+| deprecated_api      | ``pip install 'apache-airflow[deprecated_api]'``    | 
Deprecated, experimental API that is replaced with the new REST API        |    
          |
++---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+--------------+
+| github_enterprise   | ``pip install 'apache-airflow[github_enterprise]'`` | 
GitHub Enterprise auth backend                                             |    
          |
++---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+--------------+
+| google_auth         | ``pip install 'apache-airflow[google_auth]'``       | 
Google auth backend                                                        |    
          |
++---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+--------------+
+| kerberos            | ``pip install 'apache-airflow[kerberos]'``          | 
Kerberos integration for Kerberized services (Hadoop, Presto, Trino)       |    
          |
++---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+--------------+
+| ldap                | ``pip install 'apache-airflow[ldap]'``              | 
LDAP authentication for users                                              |    
          |
++---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+--------------+
+| leveldb             | ``pip install 'apache-airflow[leveldb]'``           | 
Required for use leveldb extra in google provider                          |    
          |
++---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+--------------+
+| otel                | ``pip install 'apache-airflow[otel]'``              | 
Required for OpenTelemetry metrics                                         |    
          |
++---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+--------------+
+| pandas              | ``pip install 'apache-airflow[pandas]'``            | 
Install Pandas library compatible with Airflow                             |    
          |
++---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+--------------+
+| password            | ``pip install 'apache-airflow[password]'``          | 
Password authentication for users                                          |    
          |
++---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+--------------+
+| rabbitmq            | ``pip install 'apache-airflow[rabbitmq]'``          | 
RabbitMQ support as a Celery backend                                       |    
          |
++---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+--------------+
+| sentry              | ``pip install 'apache-airflow[sentry]'``            | 
Sentry service for application logging and monitoring                      |    
          |
++---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+--------------+
+| statsd              | ``pip install 'apache-airflow[statsd]'``            | 
Needed by StatsD metrics                                                   |    
          |
++---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+--------------+
+| virtualenv          | ``pip install 'apache-airflow[virtualenv]'``        | 
Running python tasks in local virtualenv                                   |    
          |
++---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+--------------+
 
 
 Providers extras
diff --git a/docs/apache-airflow/howto/docker-compose/docker-compose.yaml 
b/docs/apache-airflow/howto/docker-compose/docker-compose.yaml
index 097b472f05..86b319b233 100644
--- a/docs/apache-airflow/howto/docker-compose/docker-compose.yaml
+++ b/docs/apache-airflow/howto/docker-compose/docker-compose.yaml
@@ -149,9 +149,10 @@ services:
     <<: *airflow-common
     command: celery worker
     healthcheck:
+      # yamllint disable rule:line-length
       test:
         - "CMD-SHELL"
-        - 'celery --app airflow.executors.celery_executor.app inspect ping -d 
"celery@$${HOSTNAME}"'
+        - 'celery --app airflow.providers.celery.executors.celery_executor.app 
inspect ping -d "celery@$${HOSTNAME}" || celery --app 
airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
       interval: 30s
       timeout: 10s
       retries: 5
diff --git a/newsfragments/32526.significant.rst 
b/newsfragments/32526.significant.rst
new file mode 100644
index 0000000000..8b3a71518e
--- /dev/null
+++ b/newsfragments/32526.significant.rst
@@ -0,0 +1,5 @@
+Default name of the Celery application changed from 
``airflow.executors.celery_executor`` to 
``airflow.providers.celery.executors.celery_executor``
+
+You should change both your configuration and Health check command to use the 
new name:
+ * in configuration (``celery_app_name`` configuration in ``celery`` section) 
use ``airflow.providers.celery.executors.celery_executor``
+ * in your Health check command use 
``airflow.providers.celery.executors.celery_executor.app``
diff --git a/scripts/ci/pre_commit/pre_commit_check_setup_extra_packages_ref.py 
b/scripts/ci/pre_commit/pre_commit_check_setup_extra_packages_ref.py
index a523ac62c0..04b707f30a 100755
--- a/scripts/ci/pre_commit/pre_commit_check_setup_extra_packages_ref.py
+++ b/scripts/ci/pre_commit/pre_commit_check_setup_extra_packages_ref.py
@@ -209,7 +209,7 @@ def check_preinstalled_extras(console: Console) -> bool:
     :return: True if all ok, False otherwise
     """
     preinstalled_providers_from_docs = get_preinstalled_providers_from_docs()
-    preinstalled_providers_from_setup = PREINSTALLED_PROVIDERS
+    preinstalled_providers_from_setup = [provider.split(">=")[0] for provider 
in PREINSTALLED_PROVIDERS]
 
     preinstalled_providers_table = Table()
     preinstalled_providers_table.add_column("PREINSTALLED_IN_SETUP", 
justify="right", style="cyan")
diff --git a/scripts/in_container/_in_container_utils.sh 
b/scripts/in_container/_in_container_utils.sh
index b368bb3aca..e01409bbe2 100644
--- a/scripts/in_container/_in_container_utils.sh
+++ b/scripts/in_container/_in_container_utils.sh
@@ -163,7 +163,16 @@ function install_airflow_from_wheel() {
         exit 4
     fi
     if [[ ${constraints_reference} == "none" ]]; then
+        set +e
         pip install "${airflow_package}${extras}"
+        res=$?
+        set -e
+        if [[ ${res} != "0" ]]; then
+            >&2 echo
+            >&2 echo "WARNING! Could not install airflow without constraints, 
trying to install it without dependencies in case some of the required 
providers are not yet released"
+            >&2 echo
+            pip install "${airflow_package}${extras}" --no-deps
+        fi
     else
         set +e
         pip install "${airflow_package}${extras}" --constraint \
diff --git a/setup.py b/setup.py
index 21c0a7300c..756e3c8ac8 100644
--- a/setup.py
+++ b/setup.py
@@ -729,6 +729,9 @@ EXTRAS_DEPENDENCIES = sort_extras_dependencies()
 # Those providers do not have dependency on airflow2.0 because that would lead 
to circular dependencies.
 # This is not a problem for PIP but some tools (pipdeptree) show those as a 
warning.
 PREINSTALLED_PROVIDERS = [
+    # TODO: When we release 3.3.0 version of celery provider we should change 
it to "celery>=3.3.0" here
+    #       In order to make sure executors are available in the celery 
provider
+    "celery",
     "common.sql",
     "ftp",
     "http",
@@ -744,8 +747,17 @@ def get_provider_package_name_from_package_id(package_id: 
str) -> str:
     :param package_id: id of the package (like amazon or microsoft.azure)
     :return: full name of package in PyPI
     """
-    package_suffix = package_id.replace(".", "-")
-    return f"apache-airflow-providers-{package_suffix}"
+    version_spec = ""
+    if ">=" in package_id:
+        package, version = package_id.split(">=")
+        version_spec = f">={version}"
+        version_suffix = os.environ.get("VERSION_SUFFIX_FOR_PYPI")
+        if version_suffix:
+            version_spec += version_suffix
+    else:
+        package = package_id
+    package_suffix = package.replace(".", "-")
+    return f"apache-airflow-providers-{package_suffix}{version_spec}"
 
 
 def get_excluded_providers() -> list[str]:
diff --git a/tests/cli/conftest.py b/tests/cli/conftest.py
index 0a0e2ec5cb..3069726a05 100644
--- a/tests/cli/conftest.py
+++ b/tests/cli/conftest.py
@@ -23,7 +23,7 @@ import pytest
 
 from airflow import models
 from airflow.cli import cli_parser
-from airflow.executors import celery_executor, celery_kubernetes_executor
+from airflow.providers.celery.executors import celery_executor, 
celery_kubernetes_executor
 from tests.test_utils.config import conf_vars
 
 # Create custom executors here because conftest is imported first
diff --git a/tests/integration/executors/test_celery_executor.py 
b/tests/integration/executors/test_celery_executor.py
index a89034d2af..c4c3e5d6f0 100644
--- a/tests/integration/executors/test_celery_executor.py
+++ b/tests/integration/executors/test_celery_executor.py
@@ -36,10 +36,10 @@ from kombu.asynchronous import set_event_loop
 
 from airflow.configuration import conf
 from airflow.exceptions import AirflowException, AirflowTaskTimeout
-from airflow.executors import celery_executor, celery_executor_utils
 from airflow.models.dag import DAG
 from airflow.models.taskinstance import SimpleTaskInstance, TaskInstance
 from airflow.operators.bash import BashOperator
+from airflow.providers.celery.executors import celery_executor, 
celery_executor_utils
 from airflow.utils.state import State
 from tests.test_utils import db
 
@@ -68,8 +68,10 @@ def _prepare_app(broker_url=None, execute=None):
     test_config.update({"broker_url": broker_url})
     test_app = Celery(broker_url, config_source=test_config)
     test_execute = test_app.task(execute)
-    patch_app = mock.patch("airflow.executors.celery_executor.app", test_app)
-    patch_execute = 
mock.patch("airflow.executors.celery_executor_utils.execute_command", 
test_execute)
+    patch_app = 
mock.patch("airflow.providers.celery.executors.celery_executor.app", test_app)
+    patch_execute = mock.patch(
+        
"airflow.providers.celery.executors.celery_executor_utils.execute_command", 
test_execute
+    )
 
     backend = test_app.backend
 
@@ -259,7 +261,7 @@ class ClassWithCustomAttributes:
 @pytest.mark.integration("celery")
 @pytest.mark.backend("mysql", "postgres")
 class TestBulkStateFetcher:
-    bulk_state_fetcher_logger = 
"airflow.executors.celery_executor_utils.BulkStateFetcher"
+    bulk_state_fetcher_logger = 
"airflow.providers.celery.executors.celery_executor_utils.BulkStateFetcher"
 
     @mock.patch(
         "celery.backends.base.BaseKeyValueStoreBackend.mget",
@@ -269,7 +271,9 @@ class TestBulkStateFetcher:
         caplog.set_level(logging.DEBUG, logger=self.bulk_state_fetcher_logger)
         with _prepare_app():
             mock_backend = BaseKeyValueStoreBackend(app=celery_executor.app)
-            with 
mock.patch("airflow.executors.celery_executor_utils.Celery.backend", 
mock_backend):
+            with mock.patch(
+                
"airflow.providers.celery.executors.celery_executor_utils.Celery.backend", 
mock_backend
+            ):
                 caplog.clear()
                 fetcher = celery_executor_utils.BulkStateFetcher()
                 result = fetcher.get_many(
@@ -292,7 +296,9 @@ class TestBulkStateFetcher:
         caplog.set_level(logging.DEBUG, logger=self.bulk_state_fetcher_logger)
         with _prepare_app():
             mock_backend = DatabaseBackend(app=celery_executor.app, 
url="sqlite3://")
-            with 
mock.patch("airflow.executors.celery_executor_utils.Celery.backend", 
mock_backend):
+            with mock.patch(
+                
"airflow.providers.celery.executors.celery_executor_utils.Celery.backend", 
mock_backend
+            ):
                 caplog.clear()
                 mock_session = mock_backend.ResultSession.return_value
                 
mock_session.query.return_value.filter.return_value.all.return_value = [
@@ -317,7 +323,9 @@ class TestBulkStateFetcher:
 
         with _prepare_app():
             mock_backend = DatabaseBackend(app=celery_executor.app, 
url="sqlite3://")
-            with 
mock.patch("airflow.executors.celery_executor_utils.Celery.backend", 
mock_backend):
+            with mock.patch(
+                
"airflow.providers.celery.executors.celery_executor_utils.Celery.backend", 
mock_backend
+            ):
                 caplog.clear()
                 mock_session = mock_backend.ResultSession.return_value
                 mock_retry_db_result = 
mock_session.query.return_value.filter.return_value.all
@@ -348,7 +356,9 @@ class TestBulkStateFetcher:
         with _prepare_app():
             mock_backend = mock.MagicMock(autospec=BaseBackend)
 
-            with 
mock.patch("airflow.executors.celery_executor_utils.Celery.backend", 
mock_backend):
+            with mock.patch(
+                
"airflow.providers.celery.executors.celery_executor_utils.Celery.backend", 
mock_backend
+            ):
                 caplog.clear()
                 fetcher = celery_executor_utils.BulkStateFetcher(1)
                 result = fetcher.get_many(
diff --git a/airflow/executors/__init__.py 
b/tests/providers/celery/executors/__init__.py
similarity index 97%
copy from airflow/executors/__init__.py
copy to tests/providers/celery/executors/__init__.py
index 21ee94b03b..217e5db960 100644
--- a/airflow/executors/__init__.py
+++ b/tests/providers/celery/executors/__init__.py
@@ -1,3 +1,4 @@
+#
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -14,4 +15,3 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-"""Executors."""
diff --git a/tests/executors/test_celery_executor.py 
b/tests/providers/celery/executors/test_celery_executor.py
similarity index 91%
rename from tests/executors/test_celery_executor.py
rename to tests/providers/celery/executors/test_celery_executor.py
index 11b461b76d..91e3312421 100644
--- a/tests/executors/test_celery_executor.py
+++ b/tests/providers/celery/executors/test_celery_executor.py
@@ -34,11 +34,11 @@ from celery.result import AsyncResult
 from kombu.asynchronous import set_event_loop
 
 from airflow.configuration import conf
-from airflow.executors import celery_executor, celery_executor_utils
-from airflow.executors.celery_executor import CeleryExecutor
 from airflow.models.baseoperator import BaseOperator
 from airflow.models.dag import DAG
 from airflow.models.taskinstance import TaskInstance, TaskInstanceKey
+from airflow.providers.celery.executors import celery_executor, 
celery_executor_utils
+from airflow.providers.celery.executors.celery_executor import CeleryExecutor
 from airflow.utils import timezone
 from airflow.utils.state import State
 from tests.test_utils import db
@@ -71,8 +71,10 @@ def _prepare_app(broker_url=None, execute=None):
     test_config.update({"broker_url": broker_url})
     test_app = Celery(broker_url, config_source=test_config)
     test_execute = test_app.task(execute)
-    patch_app = mock.patch("airflow.executors.celery_executor.app", test_app)
-    patch_execute = 
mock.patch("airflow.executors.celery_executor_utils.execute_command", 
test_execute)
+    patch_app = 
mock.patch("airflow.providers.celery.executors.celery_executor.app", test_app)
+    patch_execute = mock.patch(
+        
"airflow.providers.celery.executors.celery_executor_utils.execute_command", 
test_execute
+    )
 
     backend = test_app.backend
 
@@ -110,7 +112,9 @@ class TestCeleryExecutor:
 
     @pytest.mark.backend("mysql", "postgres")
     def test_exception_propagation(self, caplog):
-        caplog.set_level(logging.ERROR, 
logger="airflow.executors.celery_executor_utils.BulkStateFetcher")
+        caplog.set_level(
+            logging.ERROR, 
logger="airflow.providers.celery.executors.celery_executor_utils.BulkStateFetcher"
+        )
         with _prepare_app():
             executor = celery_executor.CeleryExecutor()
             executor.tasks = {"key": FakeCeleryResult()}
@@ -118,8 +122,8 @@ class TestCeleryExecutor:
         assert celery_executor_utils.CELERY_FETCH_ERR_MSG_HEADER in 
caplog.text, caplog.record_tuples
         assert FAKE_EXCEPTION_MSG in caplog.text, caplog.record_tuples
 
-    @mock.patch("airflow.executors.celery_executor.CeleryExecutor.sync")
-    
@mock.patch("airflow.executors.celery_executor.CeleryExecutor.trigger_tasks")
+    
@mock.patch("airflow.providers.celery.executors.celery_executor.CeleryExecutor.sync")
+    
@mock.patch("airflow.providers.celery.executors.celery_executor.CeleryExecutor.trigger_tasks")
     @mock.patch("airflow.executors.base_executor.Stats.gauge")
     def test_gauge_executor_metrics(self, mock_stats_gauge, 
mock_trigger_tasks, mock_sync):
         executor = celery_executor.CeleryExecutor()
@@ -154,9 +158,9 @@ class TestCeleryExecutor:
             )
 
         with mock.patch(
-            "airflow.executors.celery_executor_utils._execute_in_subprocess"
+            
"airflow.providers.celery.executors.celery_executor_utils._execute_in_subprocess"
         ) as mock_subproc, mock.patch(
-            "airflow.executors.celery_executor_utils._execute_in_fork"
+            
"airflow.providers.celery.executors.celery_executor_utils._execute_in_fork"
         ) as mock_fork, mock.patch(
             "celery.app.task.Task.request"
         ) as mock_task:
@@ -224,7 +228,7 @@ class TestCeleryExecutor:
             yield app.control.revoke
 
     @pytest.mark.backend("mysql", "postgres")
-    @mock.patch("airflow.executors.celery_executor.CeleryExecutor.fail")
+    
@mock.patch("airflow.providers.celery.executors.celery_executor.CeleryExecutor.fail")
     def test_cleanup_stuck_queued_tasks(self, mock_fail):
         start_date = timezone.utcnow() - timedelta(days=2)
 
@@ -255,7 +259,7 @@ class TestCeleryExecutor:
         import importlib
 
         from airflow.config_templates import default_celery
-        from airflow.executors import celery_executor_utils
+        from airflow.providers.celery.executors import celery_executor_utils
 
         # reload celery conf to apply the new config
         importlib.reload(default_celery)
diff --git a/tests/executors/test_celery_kubernetes_executor.py 
b/tests/providers/celery/executors/test_celery_kubernetes_executor.py
similarity index 98%
rename from tests/executors/test_celery_kubernetes_executor.py
rename to tests/providers/celery/executors/test_celery_kubernetes_executor.py
index ec222e71f7..6302efea21 100644
--- a/tests/executors/test_celery_kubernetes_executor.py
+++ b/tests/providers/celery/executors/test_celery_kubernetes_executor.py
@@ -23,9 +23,9 @@ import pytest
 
 from airflow.callbacks.callback_requests import CallbackRequest
 from airflow.configuration import conf
-from airflow.executors.celery_executor import CeleryExecutor
-from airflow.executors.celery_kubernetes_executor import 
CeleryKubernetesExecutor
 from airflow.executors.kubernetes_executor import KubernetesExecutor
+from airflow.providers.celery.executors.celery_executor import CeleryExecutor
+from airflow.providers.celery.executors.celery_kubernetes_executor import 
CeleryKubernetesExecutor
 
 KUBERNETES_QUEUE = CeleryKubernetesExecutor.KUBERNETES_QUEUE
 
diff --git a/tests/sensors/test_base.py b/tests/sensors/test_base.py
index 8761f1de47..b4042945d3 100644
--- a/tests/sensors/test_base.py
+++ b/tests/sensors/test_base.py
@@ -24,8 +24,6 @@ import pytest
 import time_machine
 
 from airflow.exceptions import AirflowException, AirflowRescheduleException, 
AirflowSensorTimeout
-from airflow.executors.celery_executor import CeleryExecutor
-from airflow.executors.celery_kubernetes_executor import 
CeleryKubernetesExecutor
 from airflow.executors.debug_executor import DebugExecutor
 from airflow.executors.executor_constants import (
     CELERY_EXECUTOR,
@@ -44,6 +42,8 @@ from airflow.executors.sequential_executor import 
SequentialExecutor
 from airflow.models import TaskReschedule
 from airflow.models.xcom import XCom
 from airflow.operators.empty import EmptyOperator
+from airflow.providers.celery.executors.celery_executor import CeleryExecutor
+from airflow.providers.celery.executors.celery_kubernetes_executor import 
CeleryKubernetesExecutor
 from airflow.sensors.base import BaseSensorOperator, PokeReturnValue, 
poke_mode_only
 from airflow.ti_deps.deps.ready_to_reschedule import ReadyToRescheduleDep
 from airflow.utils import timezone
diff --git a/tests/www/views/test_views_tasks.py 
b/tests/www/views/test_views_tasks.py
index 9b45fc6622..43b2293250 100644
--- a/tests/www/views/test_views_tasks.py
+++ b/tests/www/views/test_views_tasks.py
@@ -29,10 +29,10 @@ import time_machine
 
 from airflow import settings
 from airflow.exceptions import AirflowException
-from airflow.executors.celery_executor import CeleryExecutor
 from airflow.models import DAG, DagBag, DagModel, TaskFail, TaskInstance, 
TaskReschedule
 from airflow.models.dagcode import DagCode
 from airflow.operators.bash import BashOperator
+from airflow.providers.celery.executors.celery_executor import CeleryExecutor
 from airflow.security import permissions
 from airflow.utils import timezone
 from airflow.utils.log.logging_mixin import ExternalLoggingMixin

Reply via email to