quangvnm opened a new issue, #41359:
URL: https://github.com/apache/airflow/issues/41359

   ### Apache Airflow version
   
   2.9.3
   
   ### If "Other Airflow 2 version" selected, which one?
   
   _No response_
   
   ### What happened?
   
   Hello, 
   
   This is a copy-paste of an issue we also post on celery/kombu and 
redis/redis-py. We include a MCVE example at the end of this post.
   
   TL;DR: The `with timeout(seconds=OPERATION_TIMEOUT):` in 
`airflow.executors.celery_executor.send_task_to_executor` might leave a very 
broken import of redis & may affect another package that we haven't discovered 
yet. This is a race condition and very hard to debug at first. To reproduce 
this bug, we have to
   
   - have a celery timeout (we kept it at default 1.0 second)
   - the timeout should happen during an import
   - a long import (we aren't sure the import of redis is long and this bug 
happens mostly with redis package, maybe this is a confirmation bias)
   
   Relates:
   
   - [**apache/airflow** discussion #36097 _CeleryExecutor is failing to launch 
tasks with redis error_](https://github.com/apache/airflow/discussions/36097)
   - [**apache/airflow** issue #33744 _Celery Executor is not working with 
redis-py 5.0.0_](https://github.com/apache/airflow/issues/33744)
   - [**celery/kombu** issue #1815 _import exception raised in transport/redis 
"module 'redis' has no attribute 'client' 
"_](https://github.com/celery/kombu/issues/1815)
   
   Our environment:
   
   ```
   airflow: this happens with both 2.6 and latest 2.9.3 version
   helm chart: 1.9, 1.14 or 1.15
   python: 3.11.9
   redis: 4.6.0 (airflow 2.6) and 5.0.7 (airflow 2.9)
   kombu: 5.3.1 (airflow 2.6) and 5.3.7 (airflow 2.9)
   ```
   
   We 've observed this issue since at least several months ago with our 
airflow deployment using official helm chart, we have the same issue as in 
related issues/discussion:
   
   ```
   Aug 8 08:29:02 airflow-XXX-scheduler-XXX-zhtb8 scheduler ERROR 
{timeout.py:68} ERROR - Process timed out, PID: 7
   Aug 8 08:29:02 airflow-XXX-scheduler-XXX-zhtb8 scheduler 
{celery_executor.py:279} INFO - [Try 1 of 3] Task Timeout Error for Task: 
(TaskInstanceKey(dag_id='XXX', task_id='XXX', 
run_id='manual__2024-06-19T14:06:39+00:00', try_number=51, map_index=-1)).
   Aug 8 08:29:03 airflow-XXX-scheduler-XXX-zhtb8 scheduler 
{celery_executor.py:290} ERROR - Error sending Celery task: module 'redis' has 
no attribute 'client'
   Aug 8 08:29:03 airflow-XXX-scheduler-XXX-zhtb8 scheduler Celery Task ID: 
TaskInstanceKey(dag_id='XXX', task_id='XXX', 
run_id='manual__2024-06-19T14:06:39+00:00', try_number=51, map_index=-1)
   Aug 8 08:29:03 airflow-XXX-scheduler-XXX-zhtb8 scheduler Traceback (most 
recent call last):
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/celery/executors/celery_executor_utils.py",
 line 220, in send_task_to_executor
       result = task_to_run.apply_async(args=[command], queue=queue)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/celery/app/task.py", line 
594, in apply_async
       return app.send_task(
              ^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/celery/app/base.py", line 
797, in send_task
       with self.producer_or_acquire(producer) as P:
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/celery/app/base.py", line 
932, in producer_or_acquire
       producer, self.producer_pool.acquire, block=True,
                 ^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/celery/app/base.py", line 
1354, in producer_pool
       return self.amqp.producer_pool
              ^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/celery/app/amqp.py", line 
591, in producer_pool
       self.app.connection_for_write()]
       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/celery/app/base.py", line 
829, in connection_for_write
       return self._connection(url or self.conf.broker_write_url, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/celery/app/base.py", line 
880, in _connection
       return self.amqp.Connection(
              ^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/kombu/connection.py", line 
201, in __init__
       if not get_transport_cls(transport).can_parse_url:
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/kombu/transport/__init__.py",
 line 90, in get_transport_cls
       _transport_cache[transport] = resolve_transport(transport)
                                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/kombu/transport/__init__.py",
 line 75, in resolve_transport
       return symbol_by_name(transport)
              ^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/kombu/utils/imports.py", 
line 59, in symbol_by_name
       module = imp(module_name, package=package, **kwargs)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.11/importlib/__init__.py", line 126, in 
import_module
       return _bootstrap._gcd_import(name[level:], package, level)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "<frozen importlib._XXX>", line 1204, in _gcd_import
     File "<frozen importlib._XXX>", line 1176, in _find_and_load
     File "<frozen importlib._XXX>", line 1147, in _find_and_load_unlocked
     File "<frozen importlib._XXX>", line 690, in _load_unlocked
     File "<frozen importlib._XXX_external>", line 940, in exec_module
     File "<frozen importlib._XXX>", line 241, in _call_with_frames_removed
     File 
"/home/airflow/.local/lib/python3.11/site-packages/kombu/transport/redis.py", 
line 285, in <module>
       class PrefixedRedisPipeline(GlobalKeyPrefixMixin, redis.client.Pipeline):
                                                         ^^^^^^^^^^^^
   Aug 8 08:29:03 airflow-XXX-scheduler-XXX-zhtb8 scheduler AttributeError: 
module 'redis' has no attribute 'client'
   ```
   
   We've verified and we have neither redis folder nor `redis.py` file from our 
dev, this is a very sporadic error where most of the time it works, then it 
stops working for unknown reason, and once if happens, the scheduler is broken 
and couldn't schedule anything (same error message) until we restart the 
scheduler process (restart the pod)
   
   This happens quite randomly (one in tens or fifty deployments of helm 
chart), and we couldn't reproduce it for sure for debugging purpose.
   
   What we found out is that if this happens, this bug won't disappear until we 
restart (kill) the scheduler pod.
   We could reproduce randomly with these steps in a test airflow:
   
   - `kubectl delete po -l release=airflow-XXX,component=scheduler --force 
--grace-period 0`
   - Clear a DAG task and hope that the bug happens, this should be immediate, 
if not, repeat the whole process
   
   At first, we suspect that this is a case of race condition in importing 
redis package, because we inject debug code before the line `class 
PrefixedRedisPipeline(GlobalKeyPrefixMixin, redis.client.Pipeline):` with 
`print(sys.path)`, `print(redis)`, `print(redis.__version__)`, ... and 
everything is okay, except `print(dir(redis))` gives a different result:
   
   ```
   sys.path=['/home/airflow/.local/bin', '/usr/local/lib/python311.zip', 
'/usr/local/lib/python3.11', '/usr/local/lib/python3.11/lib-dynload', 
'/home/airflow/.local/lib/python3.11/site-packages', '/opt/airflow/dags/repo/', 
'/opt/airflow/config', '/opt/airflow/plugins']
   
   redis=<module 'redis' from 
'/home/airflow/.local/lib/python3.11/site-packages/redis/__init__.py'>
   
   redis.__version__=5.0.7
   
   dir(redis)=['AuthenticationError', 'AuthenticationWrongNumberOfArgsError', 
'BlockingConnectionPool', 'BusyLoadingError', 'ChildDeadlockedError', 
'Connection', 'ConnectionError', 'ConnectionPool', 'CredentialProvider', 
'DataError', 'InvalidResponse', 'OutOfMemoryError', 'PubSubError', 
'ReadOnlyError', 'Redis', 'RedisCluster', 'RedisError', 'ResponseError', 
'SSLConnection', 'Sentinel', 'SentinelConnectionPool', 
'SentinelManagedConnection', 'SentinelManagedSSLConnection', 'StrictRedis', 
'TimeoutError', 'UnixDomainSocketConnection', 
'UsernamePasswordCredentialProvider', 'VERSION', 'WatchError', '__all__', 
'__builtins__', '__cached__', '__doc__', '__file__', '__loader__', '__name__', 
'__package__', '__path__', '__spec__', '__version__', 'asyncio', 'cluster', 
'default_backoff', 'from_url', 'int_or_str', 'metadata', 'sentinel', 'sys']
   ```
   
   compared to a python shell session inside the same container:
   
   ```
   Python 3.11.9 (main, Jul 23 2024, 07:22:56) [GCC 12.2.0] on linux
   Type "help", "copyright", "credits" or "license" for more information.
   >>> import redis
   >>> redis
   <module 'redis' from 
'/home/airflow/.local/lib/python3.11/site-packages/redis/__init__.py'>
   >>> redis.__version__
   '5.0.7'
   >>> dir(redis)
   ['AuthenticationError', 'AuthenticationWrongNumberOfArgsError', 
'BlockingConnectionPool', 'BusyLoadingError', 'ChildDeadlockedError', 
'Connection', 'ConnectionError', 'ConnectionPool', 'CredentialProvider', 
'DataError', 'InvalidResponse', 'OutOfMemoryError', 'PubSubError', 
'ReadOnlyError', 'Redis', 'RedisCluster', 'RedisError', 'ResponseError', 
'SSLConnection', 'Sentinel', 'SentinelConnectionPool', 
'SentinelManagedConnection', 'SentinelManagedSSLConnection', 'StrictRedis', 
'TimeoutError', 'UnixDomainSocketConnection', 
'UsernamePasswordCredentialProvider', 'VERSION', 'WatchError', '__all__', 
'__builtins__', '__cached__', '__doc__', '__file__', '__loader__', '__name__', 
'__package__', '__path__', '__spec__', '__version__', '_parsers', 'asyncio', 
'backoff', 'client', 'cluster', 'commands', 'compat', 'connection', 'crc', 
'credentials', 'default_backoff', 'exceptions', 'from_url', 'int_or_str', 
'lock', 'metadata', 'retry', 'sentinel', 'sys', 'typing', 'utils']
   ```
   
   We noted that `dir(redis)` inside the troublesome scheduler lacks several 
attributes, notably `redis.client`
   
   Another thing we discovered is that in every case, there is always a Timeout 
(as you could see the log above), and sure enough, we found out later that the 
bug always happens while the process of importing `redis` is interrupted by 
Timeout (we print line number in `redis/__init__.py` and the importing didn't 
run till the end). In very rare case, `airflow.utils.timeout` doesn't work as 
inteded, the timeout error is printed out in the middle of `import redis` but 
the `import redis` still run till the end, in this case, the bug couldn't 
happens. But most of the time, the timeout interrupt the import.
   
   With this idea, we injected a `sleep` at the end of `redis/__init__.py` and 
sure enough, we could reproduce this bug every time.
   
   So an interrupted import give a different import than a normal `import`, it 
seems that the broken import doesn't import not-public member in package, such 
as redis.client in this case, Redis, StrictRedis are exposed explicitly but 
redis.client is set "impliciteley"
   
   ```python
   # redis/__init__.py
   from redis.client import Redis, StrictRedis
   ```
   
   I've found one comment from discuss.python.org:
   
   > ```python
   > import asyncio
   > original = id(asyncio)
   > from asyncio.base_events import BaseEventLoop
   > assert id(asyncio) == original
   > assert asyncio.base_events.BaseEventLoop is BaseEventLoop
   > ```
   >
   > From which it should be clear that asyncio.base_events is indeed 
guaranteed to be set after the from-import.
   >
   > -- 
<cite>[discuss.python.org](https://discuss.python.org/t/why-do-relative-from-imports-also-add-the-submodule-itself-to-the-namespace/24440/2)</cite>
   
   In our case, if we try to reimport an interrupted import, this isn't true 
anymore, the submodule isn't set at all. We didn't dig further in internal 
python to find out why this happens.
   
   We see at least four options to fix this bug:
   
   - Increase celery's operation_timeout (by config or env var). This isn't 
error-proof but at least reduce drastically the number of this bug
   - inject `from redis import client` to `redis/__init__.py`
   - patch `kombu/transport/redis.py` with
     ```python
     from redis import client
     ```
     and replace every `redis.client` by `client`
   
   We opt for the second method in our dev at the moment
   
   We aren't sure that this bug happens enough to be taken into consideration 
in upstream? But at least other dev won't loose days of debugging session as us 
^^
   
   This raise another question: Could the Timeout or another mechanism break 
the import and introduce this bug in another package or another hard-to-catch 
race condition bug?
   
   ### What you think should happen instead?
   
   The airflow timeout should not leave a broken import. We are not sure if 
this should be addressed to redis/redis-py though, we post the same issue in 
celery/kombu and redis/redis-py.
   
   ### How to reproduce
   
   This is a race condition which is hard to duplicate in local machine, so 
what we did is to introduce a delay to the very first import of redis to 
reproduce this bug for sure. 
   
   Please find below a compressed file of three file:
   - mock_kombu_transport_redis.py
   - mock_airflow.py
   - and redis.patch/__init__.py, we need to add the content of this file at 
the end of `redis-py` package's ` __init__.py` to add the delay in import
   [mcve.zip](https://github.com/user-attachments/files/16563453/mcve.zip)
   
   `python -m mock_airflow` gives the result:
   
   ```
   FunctionTimedOut
   
   After failed import redis
   'redis' in sys.modules: True
   'redis.client' in sys.modules: True
   
   Reimport mock_kombu_transport_redis
   After reimport redis
   'redis' in sys.modules: True
   'redis.client' in sys.modules: True
   'client' in dir(redis): False
   getattr(redis, 'client', None)=None
   
   After reimport redis.client
   'client' in dir(redis): False
   getattr(redis, 'client', None)=None
   
   After reimport: from redis import client
   client=<module 'redis.client' from '***/site-packages/redis/client.py'>
   'client' in dir(redis): False
   getattr(redis, 'client', None)=None
   client.Pipeline=<class 'redis.client.Pipeline'>
   ```
   
   The important line is `'client' in dir(redis): False` event after a reimport 
of failed import and if we uncomment the line of ` print(redis.client)`, it 
will raise an error:
   ```
   AttributeError: module 'redis' has no attribute 'client'
   ```
   
   If we inject `from redis import client` to `redis/__init__.py`, the mcve 
gives the diffent output:
   
   ```
   ...
   After reimport redis
   'redis' in sys.modules: True
   'redis.client' in sys.modules: True
   'client' in dir(redis): True
   getattr(redis, 'client', None)=<module 'redis.client' from 
'XXX/site-packages/redis/client.py'>
   ...
   ```
   (Changes: `client' in dir(redis): True` and `getattr(redis, 'client', 
None)=`)
   
   ### Operating System
   
   Debian GNU/Linux 12 (bookworm)
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-celery==3.7.2
   apache-airflow-providers-redis==3.7.1
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   Helm: 3.11.3
   Helm chart: apache/airflow:1.15.0
   k8s: 1.28
   
   ### Anything else?
   
   A race condition which happen very rarely at the start of scheduler service, 
but as we deploy to dev/staging alot each day, this happens several time a day. 
Once it happens, it won't go away until restart of this scheduler service. 
Condition:
   - have a celery timeout (we kept it at default 1.0 second)
   - the timeout should happen during an import
   - a long import (we aren't sure the import of redis is long and this bug 
happens mostly with redis package, maybe this is a confirmation bias)
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to