Repository: incubator-airflow
Updated Branches:
  refs/heads/master 45b735bae -> 26c31d9bc


[AIRFLOW-180] Fix timeout behavior for sensors

In the previous state of the code, datetime.now was compared to
started_at and seconds was pulled out. It turns out that the seconds
attribute of a timedelta has a maximum of 86400 and the rolls up to 1 day.
The unintended consequence is that timeout larger than 86400 are
ignored, with sensors running forever.

To fix this we use the total_seconds method to get at the real
timedelta in seconds.


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/c38a5c2a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/c38a5c2a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/c38a5c2a

Branch: refs/heads/master
Commit: c38a5c2a8b227194ec52d81e8a5a85c97751ecd9
Parents: 45b735b
Author: Arthur Wiedmer <arthur.wied...@gmail.com>
Authored: Thu May 26 10:27:55 2016 -0700
Committer: Arthur Wiedmer <arthur.wied...@gmail.com>
Committed: Mon Jun 20 15:45:44 2016 -0700

----------------------------------------------------------------------
 airflow/operators/sensors.py |  4 +-
 tests/operators/sensors.py   | 77 ++++++++++++++++++++++++++++++++++++++-
 2 files changed, 77 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c38a5c2a/airflow/operators/sensors.py
----------------------------------------------------------------------
diff --git a/airflow/operators/sensors.py b/airflow/operators/sensors.py
index 5276f6e..4e4cb3b 100644
--- a/airflow/operators/sensors.py
+++ b/airflow/operators/sensors.py
@@ -69,12 +69,12 @@ class BaseSensorOperator(BaseOperator):
     def execute(self, context):
         started_at = datetime.now()
         while not self.poke(context):
-            sleep(self.poke_interval)
-            if (datetime.now() - started_at).seconds > self.timeout:
+            if (datetime.now() - started_at).total_seconds() > self.timeout:
                 if self.soft_fail:
                     raise AirflowSkipException('Snap. Time is OUT.')
                 else:
                     raise AirflowSensorTimeout('Snap. Time is OUT.')
+            sleep(self.poke_interval)
         logging.info("Success criteria met. Exiting.")
 
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c38a5c2a/tests/operators/sensors.py
----------------------------------------------------------------------
diff --git a/tests/operators/sensors.py b/tests/operators/sensors.py
index 025790e..325ee8d 100644
--- a/tests/operators/sensors.py
+++ b/tests/operators/sensors.py
@@ -12,11 +12,84 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+
+import logging
 import os
+import time
 import unittest
 
-from airflow.operators.sensors import HttpSensor
-from airflow.exceptions import AirflowException
+from datetime import datetime, timedelta
+
+from airflow import DAG, configuration
+from airflow.operators.sensors import HttpSensor, BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+from airflow.exceptions import (AirflowException,
+                                AirflowSensorTimeout,
+                                AirflowSkipException)
+configuration.test_mode()
+
+DEFAULT_DATE = datetime(2015, 1, 1)
+TEST_DAG_ID = 'unit_test_dag'
+
+
+class TimeoutTestSensor(BaseSensorOperator):
+    """
+    Sensor that always returns the return_value provided
+
+    :param return_value: Set to true to mark the task as SKIPPED on failure
+    :type return_value: any
+    """
+
+    @apply_defaults
+    def __init__(
+            self,
+            return_value=False,
+            *args,
+            **kwargs):
+        self.return_value = return_value
+        super(TimeoutTestSensor, self).__init__(*args, **kwargs)
+
+    def poke(self, context):
+        return self.return_value
+
+    def execute(self, context):
+        started_at = datetime.now()
+        time_jump = self.params.get('time_jump')
+        while not self.poke(context):
+            if time_jump:
+                started_at -= time_jump
+            if (datetime.now() - started_at).total_seconds() > self.timeout:
+                if self.soft_fail:
+                    raise AirflowSkipException('Snap. Time is OUT.')
+                else:
+                    raise AirflowSensorTimeout('Snap. Time is OUT.')
+            time.sleep(self.poke_interval)
+        logging.info("Success criteria met. Exiting.")
+
+
+class SensorTimeoutTest(unittest.TestCase):
+    def setUp(self):
+        configuration.test_mode()
+        args = {
+            'owner': 'airflow',
+            'start_date': DEFAULT_DATE
+        }
+        dag = DAG(TEST_DAG_ID, default_args=args)
+        self.dag = dag
+
+    def test_timeout(self):
+        t = TimeoutTestSensor(
+            task_id='test_timeout',
+            execution_timeout=timedelta(days=2),
+            return_value=False,
+            poke_interval=5,
+            params={'time_jump': timedelta(days=2, seconds=1)},
+            dag=self.dag
+            )
+        self.assertRaises(
+            AirflowSensorTimeout,
+            t.run,
+            start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
 
 
 class HttpSensorTests(unittest.TestCase):

Reply via email to