Kevin Reilly created AIRFLOW-2011:
-------------------------------------
Summary: Airflow ampq pool maintains dead connections
Key: AIRFLOW-2011
URL: https://issues.apache.org/jira/browse/AIRFLOW-2011
Project: Apache Airflow
Issue Type: Bug
Components: celery, scheduler
Affects Versions: 1.9.1
Environment: OS: Ubuntu 16.04 LTS (debian)
Python: 3.6.3
Airflow: 1.9.1rc1
Reporter: Kevin Reilly
Airflow scheduler deadlocks on queue-up for tasks
[2018-01-08 07:01:09,315] \{{celery_executor.py:101}} ERROR - Error syncing the
celery executor, ignoring it:
[2018-01-08 07:01:09,315] \{{celery_executor.py:102}} ERROR - [Errno 104]
Connection reset by peer
Traceback (most recent call last):
File
"/usr/local/lib/python3.6/dist-packages/airflow/executors/celery_executor.py",
line 83, in
state = async.state
File "/usr/local/lib/python3.6/dist-packages/celery/result.py", line 436, in
state
return self._get_task_meta()['status']
File "/usr/local/lib/python3.6/dist-packages/celery/result.py", line 375, in
_get_task_meta
return self._maybe_set_cache(self.backend.get_task_meta(self.id))
File "/usr/local/lib/python3.6/dist-packages/celery/backends/rpc.py", line 244,
in get_task_meta
for acc in self._slurp_from_queue(task_id, self.accept, backlog_limit):
File "/usr/local/lib/python3.6/dist-packages/celery/backends/rpc.py", line 278,
in
binding.declare()
File "/usr/local/lib/python3.6/dist-packages/kombu/entity.py", line 605, in
declare
self._create_queue(nowait=nowait, channel=channel)
File "/usr/local/lib/python3.6/dist-packages/kombu/entity.py", line 614, in
_create_queue
self.queue_declare(nowait=nowait, passive=False, channel=channel)
File "/usr/local/lib/python3.6/dist-packages/kombu/entity.py", line 649, in
queue_declare
nowait=nowait,
File "/usr/local/lib/python3.6/dist-packages/amqp/channel.py", line 1147, in
queue_declare
nowait, arguments),
File "/usr/local/lib/python3.6/dist-packages/amqp/abstract_channel.py", line
50, in send_method
conn.frame_writer(1, self.channel_id, sig, args, content)
File "/usr/local/lib/python3.6/dist-packages/amqp/method_framing.py", line 166,
in write_frame
write(view[:offset])
File "/usr/local/lib/python3.6/dist-packages/amqp/transport.py", line 258, in
write
self._write(s)
ConnectionResetError: [Errno 104] Connection reset by peer
If I edit the celery settings file and add an argument to set
broker_pool_limit=None
editing default_celery.py
and adding
"broker_pool_limit":None,
between lines 37 and 38 would solve the issue. This particular setting
requires celery to create a new ampq connection each time it needs one, thereby
preventing the rabbitmq server from disconnecting the connection where the
client is unaware and leaving broken sockets open for use.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)