[ https://issues.apache.org/jira/browse/AIRFLOW-2011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16562316#comment-16562316 ]
James Davidheiser commented on AIRFLOW-2011: -------------------------------------------- Confirming that I am also running into this error - can this configuration change be made in the [celery] section of airflow.cfg? > Airflow ampq pool maintains dead connections > -------------------------------------------- > > Key: AIRFLOW-2011 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2011 > Project: Apache Airflow > Issue Type: Bug > Components: celery, scheduler > Affects Versions: 1.9.1 > Environment: OS: Ubuntu 16.04 LTS (debian) > Python: 3.6.3 > Airflow: 1.9.1rc1 > Reporter: Kevin Reilly > Priority: Minor > Original Estimate: 0.5h > Remaining Estimate: 0.5h > > Airflow scheduler deadlocks on queue-up for tasks > [2018-01-08 07:01:09,315] \{{celery_executor.py:101}} ERROR - Error syncing > the celery executor, ignoring it: > [2018-01-08 07:01:09,315] \{{celery_executor.py:102}} ERROR - [Errno 104] > Connection reset by peer > Traceback (most recent call last): > File > "/usr/local/lib/python3.6/dist-packages/airflow/executors/celery_executor.py", > line 83, in > state = async.state > File "/usr/local/lib/python3.6/dist-packages/celery/result.py", line 436, in > state > return self._get_task_meta()['status'] > File "/usr/local/lib/python3.6/dist-packages/celery/result.py", line 375, in > _get_task_meta > return self._maybe_set_cache(self.backend.get_task_meta(self.id)) > File "/usr/local/lib/python3.6/dist-packages/celery/backends/rpc.py", line > 244, in get_task_meta > for acc in self._slurp_from_queue(task_id, self.accept, backlog_limit): > File "/usr/local/lib/python3.6/dist-packages/celery/backends/rpc.py", line > 278, in > binding.declare() > File "/usr/local/lib/python3.6/dist-packages/kombu/entity.py", line 605, in > declare > self._create_queue(nowait=nowait, channel=channel) > File "/usr/local/lib/python3.6/dist-packages/kombu/entity.py", line 614, in > _create_queue > self.queue_declare(nowait=nowait, passive=False, channel=channel) > File "/usr/local/lib/python3.6/dist-packages/kombu/entity.py", line 649, in > queue_declare > nowait=nowait, > File "/usr/local/lib/python3.6/dist-packages/amqp/channel.py", line 1147, in > queue_declare > nowait, arguments), > File "/usr/local/lib/python3.6/dist-packages/amqp/abstract_channel.py", line > 50, in send_method > conn.frame_writer(1, self.channel_id, sig, args, content) > File "/usr/local/lib/python3.6/dist-packages/amqp/method_framing.py", line > 166, in write_frame > write(view[:offset]) > File "/usr/local/lib/python3.6/dist-packages/amqp/transport.py", line 258, in > write > self._write(s) > ConnectionResetError: [Errno 104] Connection reset by peer > If I edit the celery settings file and add an argument to set > broker_pool_limit=None > editing default_celery.py > and adding > "broker_pool_limit":None, > between lines 37 and 38 would solve the issue. This particular setting > requires celery to create a new ampq connection each time it needs one, > thereby preventing the rabbitmq server from disconnecting the connection > where the client is unaware and leaving broken sockets open for use. -- This message was sent by Atlassian JIRA (v7.6.3#76005)