[ https://issues.apache.org/jira/browse/AIRFLOW-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15869870#comment-15869870 ]
Erik Cederstrand edited comment on AIRFLOW-342 at 2/16/17 1:28 PM: ------------------------------------------------------------------- Hmm. Executor heartbeats still throw {{ConnectionResetError}}, but much less frequently. Preciously, the scheduler would die every ~10 seconds, now it can live for some minutes. Here's a modified patch to simply ignore {{ConnectionResetError}}, which has been running for ~1 hour now: {code} --- /usr/local/lib/python3.4/dist-packages/airflow/jobs.py.orig 2017-02-16 11:58:55.057991344 +0000 +++ /usr/local/lib/python3.4/dist-packages/airflow/jobs.py 2017-02-16 11:57:07.060060262 +0000 @@ -1371,6 +1371,8 @@ last_stat_print_time = datetime(2000, 1, 1) # Last time that self.heartbeat() was called. last_self_heartbeat_time = datetime.now() + # Last time that self.executor.heartbeat() was called. + last_executor_heartbeat_time = datetime.now() # Last time that the DAG dir was traversed to look for files last_dag_dir_refresh_time = datetime.now() @@ -1436,9 +1438,14 @@ self._execute_task_instances(simple_dag_bag, (State.SCHEDULED,)) - # Call hearbeats - self.logger.info("Heartbeating the executor") - self.executor.heartbeat() + # Heartbeat the executor periodically + time_since_last_heartbeat = (datetime.now() - + last_executor_heartbeat_time).total_seconds() + if time_since_last_heartbeat > self.heartrate: + self.logger.info("Heartbeating the executor") + try: self.executor.heartbeat() + except ConnectionResetError: pass # RabbitMQ sometimes resets the socket connection + last_executor_heartbeat_time = datetime.now() # Process events from the executor self._process_executor_events() {code} As acomment to the justifiability of this patch, our scheduler in production often dies so early in the scheduling process that jobs are never progressed. Thus, wrapping the scheduler in a {{while True}} loop as suggested elsewhere does nothing for us. was (Author: erikcederstrand): Hmm. Executor heartbeats still throw {{ConnectionResetError}}, but much less frequently. Preciously, the scheduler would die every ~10 seconds, now it can live for some minutes. Here's a modified patch to simply ignore {{ConnectionResetError}}, which has been running for ~1 hour now: {code} --- /usr/local/lib/python3.4/dist-packages/airflow/jobs.py.orig 2017-02-16 11:58:55.057991344 +0000 +++ /usr/local/lib/python3.4/dist-packages/airflow/jobs.py 2017-02-16 11:57:07.060060262 +0000 @@ -1371,6 +1371,8 @@ last_stat_print_time = datetime(2000, 1, 1) # Last time that self.heartbeat() was called. last_self_heartbeat_time = datetime.now() + # Last time that self.executor.heartbeat() was called. + last_executor_heartbeat_time = datetime.now() # Last time that the DAG dir was traversed to look for files last_dag_dir_refresh_time = datetime.now() @@ -1436,9 +1438,14 @@ self._execute_task_instances(simple_dag_bag, (State.SCHEDULED,)) - # Call hearbeats - self.logger.info("Heartbeating the executor") - self.executor.heartbeat() + # Heartbeat the executor periodically + time_since_last_heartbeat = (datetime.now() - + last_executor_heartbeat_time).total_seconds() + if time_since_last_heartbeat > self.heartrate: + self.logger.info("Heartbeating the executor") + try: self.executor.heartbeat() + except ConnectionResetError: pass # RabbitMQ sometimes resets the socket connection + last_executor_heartbeat_time = datetime.now() # Process events from the executor self._process_executor_events() {code} > exception in 'airflow scheduler' : Connection reset by peer > ------------------------------------------------------------ > > Key: AIRFLOW-342 > URL: https://issues.apache.org/jira/browse/AIRFLOW-342 > Project: Apache Airflow > Issue Type: Bug > Components: celery, scheduler > Affects Versions: Airflow 1.7.1.3 > Environment: OS: Red Hat Enterprise Linux Server 7.2 (Maipo) > Python: 2.7.5 > Airflow: 1.7.1.3 > Reporter: Hila Visan > > 'airflow scheduler' command throws an exception when running it. > Despite the exception, the workers run the tasks from the queues as expected. > Error details: > > [2016-06-30 19:00:10,130] {jobs.py:758} ERROR - [Errno 104] Connection reset > by peer > Traceback (most recent call last): > File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 755, in > _execute > executor.heartbeat() > File "/usr/lib/python2.7/site-packages/airflow/executors/base_executor.py", > line 107, in heartbeat > self.sync() > File > "/usr/lib/python2.7/site-packages/airflow/executors/celery_executor.py", line > 74, in sync > state = async.state > File "/usr/lib/python2.7/site-packages/celery/result.py", line 394, in state > return self._get_task_meta()['status'] > File "/usr/lib/python2.7/site-packages/celery/result.py", line 339, in > _get_task_meta > return self._maybe_set_cache(self.backend.get_task_meta(self.id)) > File "/usr/lib/python2.7/site-packages/celery/backends/amqp.py", line 163, > in get_task_meta > binding.declare() > File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 521, in > declare > self.exchange.declare(nowait) > File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 174, in > declare > nowait=nowait, passive=passive, > File "/usr/lib/python2.7/site-packages/amqp/channel.py", line 615, in > exchange_declare > self._send_method((40, 10), args) > File "/usr/lib/python2.7/site-packages/amqp/abstract_channel.py", line 56, > in _send_method > self.channel_id, method_sig, args, content, > File "/usr/lib/python2.7/site-packages/amqp/method_framing.py", line 221, > in write_method > write_frame(1, channel, payload) > File "/usr/lib/python2.7/site-packages/amqp/transport.py", line 182, in > write_frame > frame_type, channel, size, payload, 0xce, > File "/usr/lib64/python2.7/socket.py", line 224, in meth > return getattr(self._sock,name)(*args) > error: [Errno 104] Connection reset by peer > [2016-06-30 19:00:10,131] {jobs.py:759} ERROR - Tachycardia! -- This message was sent by Atlassian JIRA (v6.3.15#6346)