vghar-bh opened a new issue, #32484:
URL: https://github.com/apache/airflow/issues/32484
### Apache Airflow version
Other Airflow 2 version (please specify below)
### What happened
Worker seems to be stuck in a catatonic state where queued tasks instance
messages are not consumed from redis.
Redis did restart while the worker remained as is. The worker did output
logs that indicated a loss in connection but was able to reconnect after redis
came back online.
**worker logs:**
[2023-07-09 01:07:58,950: WARNING/MainProcess] consumer: Connection to
broker lost. Trying to re-establish the connection...
Traceback (most recent call last):
File
"/home/airflow/.local/lib/python3.8/site-packages/celery/worker/consumer/consumer.py",
line 336, in start
blueprint.start(self)
File
"/home/airflow/.local/lib/python3.8/site-packages/celery/bootsteps.py", line
116, in start
step.start(parent)
File
"/home/airflow/.local/lib/python3.8/site-packages/celery/worker/consumer/consumer.py",
line 726, in start
c.loop(*c.loop_args())
File
"/home/airflow/.local/lib/python3.8/site-packages/celery/worker/loops.py", line
97, in asynloop
next(loop)
File
"/home/airflow/.local/lib/python3.8/site-packages/kombu/asynchronous/hub.py",
line 373, in create_loop
cb(*cbargs)
File
"/home/airflow/.local/lib/python3.8/site-packages/kombu/transport/redis.py",
line 1336, in on_readable
self.cycle.on_readable(fileno)
File
"/home/airflow/.local/lib/python3.8/site-packages/kombu/transport/redis.py",
line 566, in on_readable
chan.handlers[type]()
File
"/home/airflow/.local/lib/python3.8/site-packages/kombu/transport/redis.py",
line 910, in _receive
ret.append(self._receive_one(c))
File
"/home/airflow/.local/lib/python3.8/site-packages/kombu/transport/redis.py",
line 920, in _receive_one
response = c.parse_response()
File "/home/airflow/.local/lib/python3.8/site-packages/redis/client.py",
line 1542, in parse_response
response = self._execute(conn, try_read)
File "/home/airflow/.local/lib/python3.8/site-packages/redis/client.py",
line 1518, in _execute
return conn.retry.call_with_retry(
File "/home/airflow/.local/lib/python3.8/site-packages/redis/retry.py",
line 49, in call_with_retry
fail(error)
File "/home/airflow/.local/lib/python3.8/site-packages/redis/client.py",
line 1520, in <lambda>
lambda error: self._disconnect_raise_connect(conn, error),
File "/home/airflow/.local/lib/python3.8/site-packages/redis/client.py",
line 1507, in _disconnect_raise_connect
raise error
File "/home/airflow/.local/lib/python3.8/site-packages/redis/retry.py",
line 46, in call_with_retry
return do()
File "/home/airflow/.local/lib/python3.8/site-packages/redis/client.py",
line 1519, in <lambda>
lambda: command(*args, **kwargs),
File "/home/airflow/.local/lib/python3.8/site-packages/redis/client.py",
line 1540, in try_read
return conn.read_response(disconnect_on_error=False)
File
"/home/airflow/.local/lib/python3.8/site-packages/redis/connection.py", line
874, in read_response
response = self._parser.read_response(disable_decoding=disable_decoding)
File
"/home/airflow/.local/lib/python3.8/site-packages/redis/connection.py", line
347, in read_response
result = self._read_response(disable_decoding=disable_decoding)
File
"/home/airflow/.local/lib/python3.8/site-packages/redis/connection.py", line
357, in _read_response
raw = self._buffer.readline()
File
"/home/airflow/.local/lib/python3.8/site-packages/redis/connection.py", line
260, in readline
self._read_from_socket()
File
"/home/airflow/.local/lib/python3.8/site-packages/redis/connection.py", line
213, in _read_from_socket
raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
redis.exceptions.ConnectionError: Connection closed by server.
[2023-07-09 01:07:58,958: WARNING/MainProcess]
/home/airflow/.local/lib/python3.8/site-packages/celery/worker/consumer/consumer.py:386:
CPendingDeprecationWarning:
In Celery 5.1 we introduced an optional breaking change which
on connection loss cancels all currently executed tasks with late
acknowledgement enabled.
These tasks cannot be acknowledged as the connection is gone, and the tasks
are automatically redelivered
back to the queue. You can enable this behavior using the
worker_cancel_long_running_tasks_on_connection_loss
setting. In Celery 5.1 it is set to False by default. The setting will be
set to True by default in Celery 6.0.
warnings.warn(CANCEL_TASKS_BY_DEFAULT, CPendingDeprecationWarning)
[2023-07-09 01:07:58,978: WARNING/MainProcess]
/home/airflow/.local/lib/python3.8/site-packages/celery/worker/consumer/consumer.py:498:
CPendingDeprecationWarning: The broker_connection_retry configuration setting
will no longer determine
whether broker connection retries are made during startup in Celery 6.0 and
above.
If you wish to retain the existing behavior for retrying connections on
startup,
you should set broker_connection_retry_on_startup to True.
warnings.warn(
[2023-07-09 01:07:58,985: ERROR/MainProcess] consumer: Cannot connect to
redis://:**@test-airflow-redis-master.test-idp.svc.cluster.local:6379/1: Error
111 connecting to test-airflow-redis-master.test-idp.svc.cluster.local:6379.
Connection refused..
Trying again in 2.00 seconds... (1/100)
[2023-07-09 01:14:09,530: ERROR/MainProcess] consumer: Cannot connect to
redis://:**@test-airflow-redis-master.test-idp.svc.cluster.local:6379/1: Error
110 connecting to test-airflow-redis-master.test-idp.svc.cluster.local:6379.
Connection timed out..
Trying again in 32.00 seconds... (16/100)
[2023-07-09 01:14:41,652: INFO/MainProcess] Connected to
redis://:**@test-airflow-redis-master.test-idp.svc.cluster.local:6379/1
[2023-07-09 01:14:41,653: WARNING/MainProcess]
/home/airflow/.local/lib/python3.8/site-packages/celery/worker/consumer/consumer.py:498:
CPendingDeprecationWarning: The broker_connection_retry configuration setting
will no longer determine
whether broker connection retries are made during startup in Celery 6.0 and
above.
If you wish to retain the existing behavior for retrying connections on
startup,
you should set broker_connection_retry_on_startup to True.
warnings.warn(
[2023-07-09 01:14:41,666: INFO/MainProcess] mingle: searching for neighbors
[2023-07-09 01:14:42,691: INFO/MainProcess] mingle: all alone
### What you think should happen instead
After redis comes back online and the worker connected again, the worker
should consume the messages and execute queued task instances.
### How to reproduce
- Delete the existing redis pod and the worker should be unable to connect
to redis
- Redis restarts and the worker connects as expected
- Worker does not consume new messages (queued task instances)
### Operating System
N/A
### Versions of Apache Airflow Providers
apache-airflow-providers-amazon 5.1.0
apache-airflow-providers-apache-livy 2.2.3
apache-airflow-providers-celery 3.0.0
apache-airflow-providers-cncf-kubernetes 4.4.0
apache-airflow-providers-common-sql 1.2.0
apache-airflow-providers-docker 3.2.0
apache-airflow-providers-elasticsearch 4.2.1
apache-airflow-providers-ftp 3.1.0
apache-airflow-providers-google 8.3.0
apache-airflow-providers-grpc 3.0.0
apache-airflow-providers-hashicorp 3.1.0
apache-airflow-providers-http 4.0.0
apache-airflow-providers-imap 3.0.0
apache-airflow-providers-microsoft-azure 4.3.0
apache-airflow-providers-mysql 3.2.1
apache-airflow-providers-odbc 3.1.2
apache-airflow-providers-postgres 5.2.2
apache-airflow-providers-redis 3.0.0
apache-airflow-providers-sendgrid 3.0.0
apache-airflow-providers-sftp 4.1.0
apache-airflow-providers-slack 5.1.0
apache-airflow-providers-sqlite 3.2.1
apache-airflow-providers-ssh 3.2.0
### Deployment
Official Apache Airflow Helm Chart
### Deployment details
Airflow version 2.4.1
currently working with helm charts from Airflow Helm Chart (User Community)
https://artifacthub.io/packages/helm/airflow-helm/airflow/8.6.1
using Argocd as GitOps CD tool for Kubernetes.
### Anything else
_No response_
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]