amoghrajesh opened a new pull request, #59745:
URL: https://github.com/apache/airflow/pull/59745
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<!--
Thank you for contributing! Please make sure that your code changes
are covered with tests. And in case of new features or big changes
remember to adjust the documentation.
Feel free to ping committers for the review!
In case of an existing issue, reference it using one of the following:
closes: #ISSUE
related: #ISSUE
How to write a good git commit message:
http://chris.beams.io/posts/git-commit/
-->
Resolves circular import issue affecting Airflow 2.x deployments when
CeleryExecutor is configured with sentry enabled. This caused Celery worker
readiness probes to fail and prevented scheduler startup with "cannot import
name 'AirflowTaskTimeout' from partially initialized module" errors.
The root cause is that the import chain `executor → version_compat →
BaseOperator →
taskinstance → sentry → executor` creates a circular dependency when sentry
is enabled, as sentry initialization attempts to load the executor while it's
being imported.
How I repro'd it:
```python
(apache-airflow) ➜ airflow git:(main-ci) ✗ docker run -it --rm --entrypoint
bash apache/airflow:2.11.0
airflow@535e59b009c4:/opt/airflow$ pip install -q sentry-sdk
apache-airflow-providers-celery==3.14.0
apache-airflow-providers-common-compat==1.10.1
[notice] A new release of pip is available: 25.1.1 -> 25.3
[notice] To update, run: pip install --upgrade pip
airflow@535e59b009c4:/opt/airflow$ export
AIRFLOW__CORE__EXECUTOR=CeleryExecutor
export AIRFLOW__SENTRY__SENTRY_ON=True
export AIRFLOW__SENTRY__SENTRY_DSN=https://[email protected]/fake
export AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=sqlite:////tmp/test.db
airflow@535e59b009c4:/opt/airflow$ celery -A
airflow.executors.celery_executor.app inspect ping
/home/airflow/.local/lib/python3.12/site-packages/airflow/metrics/base_stats_logger.py:22
RemovedInAirflow3Warning: Timer and timing metrics publish in seconds were
deprecated. It is enabled by default from Airflow 3 onwards. Enable
timer_unit_consistency to publish all the timer and timing metrics in
milliseconds.
Usage: celery [OPTIONS] COMMAND [ARGS]...
Try 'celery --help' for help.
Error:
Unable to load celery application.
While trying to load the module airflow.executors.celery_executor.app the
following error occurred:
Traceback (most recent call last):
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/deprecation_tools.py",
line 56, in getattr_with_deprecation
return getattr(importlib.import_module(new_module), new_class_name)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/importlib/__init__.py", line 90, in
import_module
return _bootstrap._gcd_import(name[level:], package, level)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "<frozen importlib._bootstrap>", line 1387, in _gcd_import
File "<frozen importlib._bootstrap>", line 1360, in _find_and_load
File "<frozen importlib._bootstrap>", line 1331, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 935, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 999, in exec_module
File "<frozen importlib._bootstrap>", line 488, in
_call_with_frames_removed
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/celery/executors/celery_executor_utils.py",
line 47, in <module>
from airflow.providers.common.compat.sdk import AirflowTaskTimeout,
timeout
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/common/compat/sdk.py",
line 28, in <module>
from airflow.providers.common.compat.version_compat import
AIRFLOW_V_3_0_PLUS
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/common/compat/version_compat.py",
line 41, in <module>
from airflow.models import BaseOperator
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/models/__init__.py",
line 79, in __getattr__
val = import_string(f"{path}.{name}")
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/module_loading.py",
line 39, in import_string
module = import_module(module_path)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/importlib/__init__.py", line 90, in
import_module
return _bootstrap._gcd_import(name[level:], package, level)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/models/baseoperator.py",
line 83, in <module>
from airflow.models.mappedoperator import OperatorPartial,
validate_mapping_kwargs
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/models/mappedoperator.py",
line 54, in <module>
from airflow.triggers.base import StartTriggerArgs
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/triggers/base.py",
line 27, in <module>
from airflow.models.taskinstance import SimpleTaskInstance
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py",
line 106, in <module>
from airflow.sentry import Sentry
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/sentry.py", line
196, in <module>
Sentry = ConfiguredSentry()
^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/sentry.py", line 88,
in __init__
executor_class, _ =
ExecutorLoader.import_default_executor_cls(validate=False)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/executors/executor_loader.py",
line 314, in import_default_executor_cls
executor, source = cls.import_executor_cls(executor_name,
validate=validate)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/executors/executor_loader.py",
line 302, in import_executor_cls
return _import_and_validate(executor_name.module_path),
executor_name.connector_source
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/executors/executor_loader.py",
line 286, in _import_and_validate
executor = import_string(path)
^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/module_loading.py",
line 39, in import_string
module = import_module(module_path)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/importlib/__init__.py", line 90, in
import_module
return _bootstrap._gcd_import(name[level:], package, level)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/celery/executors/celery_executor.py",
line 57, in <module>
from airflow.providers.common.compat.sdk import AirflowTaskTimeout
ImportError: cannot import name 'AirflowTaskTimeout' from partially
initialized module 'airflow.providers.common.compat.sdk' (most likely due to a
circular import)
(/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/common/compat/sdk.py)
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File
"/home/airflow/.local/lib/python3.12/site-packages/celery/bin/celery.py", line
141, in celery
app = find_app(app)
^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/celery/app/utils.py", line
383, in find_app
sym = symbol_by_name(app, imp=imp)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/kombu/utils/imports.py",
line 64, in symbol_by_name
return getattr(module, cls_name) if cls_name else module
^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/deprecation_tools.py",
line 63, in getattr_with_deprecation
raise ImportError(error_message) from e
ImportError: Could not import
`airflow.providers.celery.executors.celery_executor_utils.app` while trying to
import `airflow.executors.celery_executor.app`. For Celery executors, the
`celery` provider should be >= 3.3.0. For Kubernetes executors, the
`cncf.kubernetes` provider should be >= 7.4.0 for that. For Dask executors, any
version of `daskexecutor` provider is needed..
```
Since the 2.11 is a slim image without editors, I managed to make changes
via `sed` and recorded it:
```python
airflow@535e59b009c4:~/.local/lib/python3.12/site-packages/airflow/providers/common/compat$
sed -i '/^if AIRFLOW_V_3_0_PLUS:/,/^else:/d' version_compat.py
sed -i '/^ from airflow.models import BaseOperator/d' version_compat.py
airflow@535e59b009c4:~/.local/lib/python3.12/site-packages/airflow/providers/common/compat$
cat version_compat.py
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# NOTE! THIS FILE IS COPIED MANUALLY IN OTHER PROVIDERS DELIBERATELY TO
AVOID ADDING UNNECESSARY
# DEPENDENCIES BETWEEN PROVIDERS. IF YOU WANT TO ADD CONDITIONAL CODE IN
YOUR PROVIDER THAT DEPENDS
# ON AIRFLOW VERSION, PLEASE COPY THIS FILE TO THE ROOT PACKAGE OF YOUR
PROVIDER AND IMPORT
# THOSE CONSTANTS FROM IT RATHER THAN IMPORTING THEM FROM ANOTHER PROVIDER
OR TEST CODE
#
from __future__ import annotations
def get_base_airflow_version_tuple() -> tuple[int, int, int]:
from packaging.version import Version
from airflow import __version__
airflow_version = Version(__version__)
return airflow_version.major, airflow_version.minor,
airflow_version.micro
AIRFLOW_V_3_0_PLUS: bool = get_base_airflow_version_tuple() >= (3, 0, 0)
AIRFLOW_V_3_1_PLUS: bool = get_base_airflow_version_tuple() >= (3, 1, 0)
__all__ = [
"AIRFLOW_V_3_0_PLUS",
"AIRFLOW_V_3_1_PLUS",
"BaseOperator",
]
airflow@535e59b009c4:~/.local/lib/python3.12/site-packages/airflow/providers/common/compat$
sed -i '/"BaseOperator",/d' version_compat.py
airflow@535e59b009c4:~/.local/lib/python3.12/site-packages/airflow/providers/common/compat$
airflow@535e59b009c4:~/.local/lib/python3.12/site-packages/airflow/providers/common/compat$
airflow@535e59b009c4:~/.local/lib/python3.12/site-packages/airflow/providers/common/compat$
cd -
/opt/airflow
airflow@535e59b009c4:/opt/airflow$ ls
dags logs
airflow@535e59b009c4:/opt/airflow$ celery -A
airflow.executors.celery_executor.app inspect ping
/home/airflow/.local/lib/python3.12/site-packages/airflow/metrics/base_stats_logger.py:22
RemovedInAirflow3Warning: Timer and timing metrics publish in seconds were
deprecated. It is enabled by default from Airflow 3 onwards. Enable
timer_unit_consistency to publish all the timer and timing metrics in
milliseconds.
Traceback (most recent call last):
File
"/home/airflow/.local/lib/python3.12/site-packages/kombu/transport/virtual/base.py",
line 951, in create_channel
return self._avail_channels.pop()
^^^^^^^^^^^^^^^^^^^^^^^^^^
IndexError: pop from empty list
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File
"/home/airflow/.local/lib/python3.12/site-packages/redis/connection.py", line
357, in connect
sock = self.retry.call_with_retry(
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/redis/retry.py",
line 62, in call_with_retry
return do()
^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/redis/connection.py", line
358, in <lambda>
lambda: self._connect(), lambda error: self.disconnect(error)
^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/redis/connection.py", line
698, in _connect
for res in socket.getaddrinfo(
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/socket.py", line 978, in getaddrinfo
for res in _socket.getaddrinfo(host, port, family, type, proto, flags):
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
socket.gaierror: [Errno -2] Name or service not known
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File
"/home/airflow/.local/lib/python3.12/site-packages/kombu/connection.py", line
472, in _reraise_as_library_errors
yield
File
"/home/airflow/.local/lib/python3.12/site-packages/kombu/connection.py", line
459, in _ensure_connection
return retry_over_time(
^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/kombu/utils/functional.py",
line 318, in retry_over_time
return fun(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/kombu/connection.py", line
938, in _connection_factory
self._connection = self._establish_connection()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/kombu/connection.py", line
860, in _establish_connection
conn = self.transport.establish_connection()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/kombu/transport/virtual/base.py",
line 975, in establish_connection
self._avail_channels.append(self.create_channel(self))
^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/kombu/transport/virtual/base.py",
line 953, in create_channel
channel = self.Channel(connection)
^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/kombu/transport/redis.py",
line 744, in __init__
self.client.ping()
File
"/home/airflow/.local/lib/python3.12/site-packages/redis/commands/core.py",
line 1212, in ping
return self.execute_command("PING", **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/redis/client.py",
line 559, in execute_command
return self._execute_command(*args, **options)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/redis/client.py",
line 565, in _execute_command
conn = self.connection or pool.get_connection(command_name, **options)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/redis/connection.py", line
1422, in get_connection
connection.connect()
File
"/home/airflow/.local/lib/python3.12/site-packages/redis/connection.py", line
363, in connect
raise ConnectionError(self._error_message(e))
redis.exceptions.ConnectionError: Error -2 connecting to redis:6379. Name or
service not known.
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/airflow/.local/bin/celery", line 8, in <module>
sys.exit(main())
^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/celery/__main__.py", line
15, in main
sys.exit(_main())
^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/celery/bin/celery.py", line
231, in main
return celery(auto_envvar_prefix="CELERY")
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/click/core.py",
line 1442, in __call__
return self.main(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/click/core.py",
line 1363, in main
rv = self.invoke(ctx)
^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/click/core.py",
line 1830, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/click/core.py",
line 1226, in invoke
return ctx.invoke(self.callback, **ctx.params)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/click/core.py",
line 794, in invoke
return callback(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/click/decorators.py", line
34, in new_func
return f(get_current_context(), *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/celery/bin/base.py", line
135, in caller
return f(ctx, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/celery/bin/control.py", line
186, in inspect
replies = inspect._request(command, **arguments)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/celery/app/control.py", line
106, in _request
return self._prepare(self.app.control.broadcast(
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/celery/app/control.py", line
777, in broadcast
return self.mailbox(conn)._broadcast(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/kombu/pidbox.py",
line 330, in _broadcast
chan = channel or self.connection.default_channel
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/kombu/connection.py", line
957, in default_channel
self._ensure_connection(**conn_opts)
File
"/home/airflow/.local/lib/python3.12/site-packages/kombu/connection.py", line
458, in _ensure_connection
with ctx():
^^^^^
File "/usr/local/lib/python3.12/contextlib.py", line 158, in __exit__
self.gen.throw(value)
File
"/home/airflow/.local/lib/python3.12/site-packages/kombu/connection.py", line
476, in _reraise_as_library_errors
raise ConnectionError(str(exc)) from exc
kombu.exceptions.OperationalError: Error -2 connecting to redis:6379. Name
or service not known.
```
<!-- Please keep an empty line above the dashes. -->
---
**^ Add meaningful description above**
Read the **[Pull Request
Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)**
for more information.
In case of fundamental code changes, an Airflow Improvement Proposal
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals))
is needed.
In case of a new dependency, check compliance with the [ASF 3rd Party
License Policy](https://www.apache.org/legal/resolved.html#category-x).
In case of backwards incompatible changes please leave a note in a
newsfragment file, named `{pr_number}.significant.rst` or
`{issue_number}.significant.rst`, in
[airflow-core/newsfragments](https://github.com/apache/airflow/tree/main/airflow-core/newsfragments).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]