[ 
https://issues.apache.org/jira/browse/AIRFLOW-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15869793#comment-15869793
 ] 

Erik Cederstrand edited comment on AIRFLOW-342 at 2/16/17 12:52 PM:
--------------------------------------------------------------------

I think I found the culprit. The scheduler is not careful to rate-limit 
hearbeats to the executor, and if they happen too often, then RabbitMQ will 
close the connection. Here's a patch that fixes the exception for me:

{code}
--- /usr/local/lib/python3.4/dist-packages/airflow/jobs.py.orig 2017-02-16 
11:58:55.057991344 +0000
+++ /usr/local/lib/python3.4/dist-packages/airflow/jobs.py      2017-02-16 
11:57:07.060060262 +0000
@@ -1371,6 +1371,8 @@
         last_stat_print_time = datetime(2000, 1, 1)
         # Last time that self.heartbeat() was called.
         last_self_heartbeat_time = datetime.now()
+        # Last time that self.executor.heartbeat() was called.
+        last_executor_heartbeat_time = datetime.now()
         # Last time that the DAG dir was traversed to look for files
         last_dag_dir_refresh_time = datetime.now()
 
@@ -1436,9 +1438,13 @@
                 self._execute_task_instances(simple_dag_bag,
                                              (State.SCHEDULED,))
 
-            # Call hearbeats
-            self.logger.info("Heartbeating the executor")
-            self.executor.heartbeat()
+            # Heartbeat the executor periodically
+            time_since_last_heartbeat = (datetime.now() -
+                                         
last_executor_heartbeat_time).total_seconds()
+            if time_since_last_heartbeat > self.heartrate:
+                self.logger.info("Heartbeating the executor")
+                self.executor.heartbeat()
+                last_executor_heartbeat_time = datetime.now()
 
             # Process events from the executor
             self._process_executor_events()
{code}

I still think the scheduler should survive {{ConnectionResetError}} exceptions 
from an executor hearbeat as they still could occur, but I'll leave the patch 
as-is to show the minimal change required.


was (Author: erikcederstrand):
I think I found the culprit. The scheduler is not careful to rate-limit 
hearbeats to the executor, and if they happen too often, then RabbitMQ will 
close the connection. Here's a patch that fixes the exception for me:

{code}
--- /usr/local/lib/python3.4/dist-packages/airflow/jobs.py.orig 2017-02-16 
11:58:55.057991344 +0000
+++ /usr/local/lib/python3.4/dist-packages/airflow/jobs.py      2017-02-16 
11:57:07.060060262 +0000
@@ -1371,6 +1371,8 @@
         last_stat_print_time = datetime(2000, 1, 1)
         # Last time that self.heartbeat() was called.
         last_self_heartbeat_time = datetime.now()
+        # Last time that self.executor.heartbeat() was called.
+        last_executor_heartbeat_time = datetime.now()
         # Last time that the DAG dir was traversed to look for files
         last_dag_dir_refresh_time = datetime.now()
 
@@ -1436,9 +1438,13 @@
                 self._execute_task_instances(simple_dag_bag,
                                              (State.SCHEDULED,))
 
-            # Call hearbeats
-            self.logger.info("Heartbeating the executor")
-            self.executor.heartbeat()
+            # Heartbeat the executor periodically
+            time_since_last_heartbeat = (datetime.now() -
+                                         
last_executor_heartbeat_time).total_seconds()
+            if time_since_last_heartbeat > self.heartrate:
+                self.logger.info("Heartbeating the executor")
+                self.executor.heartbeat()
+                last_executor_heartbeat_time = datetime.now()
 
             # Process events from the executor
             self._process_executor_events()
{code}

I still think the scheduler should survive {{ConnectionClosed}} exceptions from 
an executor hearbeat as they still could occur, but I'll leave the patch as-is 
to show the minimal change required.

>  exception in 'airflow scheduler' : Connection reset by peer
> ------------------------------------------------------------
>
>                 Key: AIRFLOW-342
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-342
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: celery, scheduler
>    Affects Versions: Airflow 1.7.1.3
>         Environment: OS: Red Hat Enterprise Linux Server 7.2 (Maipo)
> Python: 2.7.5
> Airflow: 1.7.1.3
>            Reporter: Hila Visan
>
> 'airflow scheduler' command throws an exception when running it. 
> Despite the exception, the workers run the tasks from the queues as expected.
> Error details:
>  
> [2016-06-30 19:00:10,130] {jobs.py:758} ERROR - [Errno 104] Connection reset 
> by peer
> Traceback (most recent call last):
>   File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 755, in 
> _execute
>     executor.heartbeat()
>   File "/usr/lib/python2.7/site-packages/airflow/executors/base_executor.py", 
> line 107, in heartbeat
>     self.sync()
>   File 
> "/usr/lib/python2.7/site-packages/airflow/executors/celery_executor.py", line 
> 74, in sync
>     state = async.state
>   File "/usr/lib/python2.7/site-packages/celery/result.py", line 394, in state
>     return self._get_task_meta()['status']
>   File "/usr/lib/python2.7/site-packages/celery/result.py", line 339, in 
> _get_task_meta
>     return self._maybe_set_cache(self.backend.get_task_meta(self.id))
>   File "/usr/lib/python2.7/site-packages/celery/backends/amqp.py", line 163, 
> in get_task_meta
>     binding.declare()
>   File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 521, in 
> declare
>    self.exchange.declare(nowait)
>   File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 174, in 
> declare
>     nowait=nowait, passive=passive,
>   File "/usr/lib/python2.7/site-packages/amqp/channel.py", line 615, in 
> exchange_declare
>     self._send_method((40, 10), args)
>   File "/usr/lib/python2.7/site-packages/amqp/abstract_channel.py", line 56, 
> in _send_method
>     self.channel_id, method_sig, args, content,
>   File "/usr/lib/python2.7/site-packages/amqp/method_framing.py", line 221, 
> in write_method
>     write_frame(1, channel, payload)
>   File "/usr/lib/python2.7/site-packages/amqp/transport.py", line 182, in 
> write_frame
>     frame_type, channel, size, payload, 0xce,
>   File "/usr/lib64/python2.7/socket.py", line 224, in meth
>     return getattr(self._sock,name)(*args)
> error: [Errno 104] Connection reset by peer
> [2016-06-30 19:00:10,131] {jobs.py:759} ERROR - Tachycardia!



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to