[AIRFLOW-100] Add execution_date_fn to ExternalTaskSensor Currently, ExternalTaskSensor only supports querying execution_dates that are either the same as the ExternalTaskSensor's execution_date or a fixed interval from that date (using `execution_delta`). This adds the ability to provide a fn (`execution_date_fn`) that accepts the current execution_date and can return any desired date for querying. This is much more flexible. For example, it could supply the last date of the previous month.
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/efdbbb5d Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/efdbbb5d Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/efdbbb5d Branch: refs/heads/master Commit: efdbbb5d3beba49f9b633f0a25ce768f896c0a6a Parents: 0965648 Author: jlowin <jlo...@users.noreply.github.com> Authored: Thu Jun 30 16:55:16 2016 -0400 Committer: jlowin <jlo...@users.noreply.github.com> Committed: Thu Jun 30 18:34:16 2016 -0400 ---------------------------------------------------------------------- airflow/operators/sensors.py | 17 ++++++++++++++++- airflow/utils/tests.py | 5 ++++- tests/core.py | 39 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 59 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/efdbbb5d/airflow/operators/sensors.py ---------------------------------------------------------------------- diff --git a/airflow/operators/sensors.py b/airflow/operators/sensors.py index e9b8885..9f7f380 100644 --- a/airflow/operators/sensors.py +++ b/airflow/operators/sensors.py @@ -179,8 +179,14 @@ class ExternalTaskSensor(BaseSensorOperator): :type allowed_states: list :param execution_delta: time difference with the previous execution to look at, the default is the same execution_date as the current task. - For yesterday, use [positive!] datetime.timedelta(days=1) + For yesterday, use [positive!] datetime.timedelta(days=1). Either + execution_delta or execution_date_fn can be passed to + ExternalTaskSensor, but not both. :type execution_delta: datetime.timedelta + :param execution_date_fn: function that receives the current execution date + and returns the desired execution date to query. Either execution_delta + or execution_date_fn can be passed to ExternalTaskSensor, but not both. + :type execution_date_fn: callable """ @apply_defaults @@ -190,16 +196,25 @@ class ExternalTaskSensor(BaseSensorOperator): external_task_id, allowed_states=None, execution_delta=None, + execution_date_fn=None, *args, **kwargs): super(ExternalTaskSensor, self).__init__(*args, **kwargs) self.allowed_states = allowed_states or [State.SUCCESS] + if execution_delta is not None and execution_date_fn is not None: + raise ValueError( + 'Only one of `execution_date` or `execution_date_fn` may' + 'be provided to ExternalTaskSensor; not both.') + self.execution_delta = execution_delta + self.execution_date_fn = execution_date_fn self.external_dag_id = external_dag_id self.external_task_id = external_task_id def poke(self, context): if self.execution_delta: dttm = context['execution_date'] - self.execution_delta + elif self.execution_date_fn: + dttm = self.execution_date_fn(context['execution_date']) else: dttm = context['execution_date'] http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/efdbbb5d/airflow/utils/tests.py ---------------------------------------------------------------------- diff --git a/airflow/utils/tests.py b/airflow/utils/tests.py index 50490d3..83db6e8 100644 --- a/airflow/utils/tests.py +++ b/airflow/utils/tests.py @@ -16,7 +16,10 @@ import unittest def skipUnlessImported(module, obj): import importlib - m = importlib.import_module(module) + try: + m = importlib.import_module(module) + except ImportError: + m = None return unittest.skipUnless( obj in dir(m), "Skipping test because {} could not be imported from {}".format( http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/efdbbb5d/tests/core.py ---------------------------------------------------------------------- diff --git a/tests/core.py b/tests/core.py index 24d2938..4f3197d 100644 --- a/tests/core.py +++ b/tests/core.py @@ -437,6 +437,45 @@ class CoreTest(unittest.TestCase): dag=self.dag) t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) + def test_external_task_sensor_fn(self): + self.test_time_sensor() + # check that the execution_fn works + t = sensors.ExternalTaskSensor( + task_id='test_external_task_sensor_check_delta', + external_dag_id=TEST_DAG_ID, + external_task_id='time_sensor_check', + execution_date_fn=lambda dt: dt + timedelta(0), + allowed_states=['success'], + dag=self.dag) + t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) + + # double check that the execution is being called by failing the test + t2 = sensors.ExternalTaskSensor( + task_id='test_external_task_sensor_check_delta', + external_dag_id=TEST_DAG_ID, + external_task_id='time_sensor_check', + execution_date_fn=lambda dt: dt + timedelta(days=1), + allowed_states=['success'], + timeout=1, + poke_interval=1, + dag=self.dag) + with self.assertRaises(exceptions.AirflowSensorTimeout): + t2.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) + + def test_external_task_sensor_error_delta_and_fn(self): + """ + Test that providing execution_delta and a function raises an error + """ + with self.assertRaises(ValueError): + t = sensors.ExternalTaskSensor( + task_id='test_external_task_sensor_check_delta', + external_dag_id=TEST_DAG_ID, + external_task_id='time_sensor_check', + execution_delta=timedelta(0), + execution_date_fn=lambda dt: dt, + allowed_states=['success'], + dag=self.dag) + def test_timeout(self): t = PythonOperator( task_id='test_timeout',