[ 
https://issues.apache.org/jira/browse/AIRFLOW-1488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16723488#comment-16723488
 ] 

ASF GitHub Bot commented on AIRFLOW-1488:
-----------------------------------------

stale[bot] closed pull request #2500: [AIRFLOW-1488] Add the DagRunSensor 
operator.
URL: https://github.com/apache/incubator-airflow/pull/2500
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/contrib/operators/dagrun_sensor.py 
b/airflow/contrib/operators/dagrun_sensor.py
new file mode 100644
index 0000000000..f4465626af
--- /dev/null
+++ b/airflow/contrib/operators/dagrun_sensor.py
@@ -0,0 +1,86 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+from airflow import settings
+from airflow.utils.state import State
+from airflow.utils.decorators import apply_defaults
+from airflow.models import DagRun
+from airflow.operators.sensors import BaseSensorOperator
+
+
+class DagRunSensor(BaseSensorOperator):
+    """
+    Waits for a DAG run to complete.
+
+    :param external_dag_id: The dag_id that you want to wait for
+    :type external_dag_id: string
+    :param allowed_states: list of allowed states, default is ``['success']``
+    :type allowed_states: list
+    :param execution_delta: time difference with the previous execution to look
+    at, the default is the same execution_date as the current task.  For
+    yesterday, use [positive!] datetime.timedelta(days=1). Either
+    execution_delta or execution_date_fn can be passed to DagRunSensor, but not
+    both.
+    :type execution_delta: datetime.timedelta
+    :param execution_date_fn: function that receives the current execution date
+    and returns the desired execution dates to query. Either execution_delta or
+    execution_date_fn can be passed to DagRunSensor, but not both.
+    :type execution_date_fn: callable
+    """
+    @apply_defaults
+    def __init__(
+            self,
+            external_dag_id,
+            allowed_states=None,
+            execution_delta=None,
+            execution_date_fn=None,
+            *args, **kwargs):
+        super(DagRunSensor, self).__init__(*args, **kwargs)
+
+        if execution_delta is not None and execution_date_fn is not None:
+            raise ValueError(
+                'Only one of `execution_date` or `execution_date_fn` may'
+                'be provided to DagRunSensor; not both.')
+
+        self.allowed_states = allowed_states or [State.SUCCESS]
+        self.execution_delta = execution_delta
+        self.execution_date_fn = execution_date_fn
+        self.external_dag_id = external_dag_id
+
+    def poke(self, context):
+        if self.execution_delta:
+            dttm = context['execution_date'] - self.execution_delta
+        elif self.execution_date_fn:
+            dttm = self.execution_date_fn(context['execution_date'])
+        else:
+            dttm = context['execution_date']
+
+        dttm_filter = dttm if isinstance(dttm, list) else [dttm]
+        serialized_dttm_filter = ','.join([datetime.isoformat() for datetime in
+                                           dttm_filter])
+
+        logging.info(
+             'Poking for '
+             '{self.external_dag_id}.'
+             '{serialized_dttm_filter} ... '.format(**locals()))
+
+        session = settings.Session()
+        count = session.query(DagRun).filter(
+            DagRun.dag_id == self.external_dag_id,
+            DagRun.state.in_(self.allowed_states),
+            DagRun.execution_date.in_(dttm_filter),
+        ).count()
+        session.commit()
+        session.close()
+        return count == len(dttm_filter)
diff --git a/tests/contrib/operators/test_dagrun_sensor.py 
b/tests/contrib/operators/test_dagrun_sensor.py
new file mode 100644
index 0000000000..74e4d46ccb
--- /dev/null
+++ b/tests/contrib/operators/test_dagrun_sensor.py
@@ -0,0 +1,119 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import os
+from os.path import dirname, realpath
+import unittest
+from datetime import timedelta, datetime
+
+from airflow.models import DagBag, TaskInstance, DagRun
+from airflow.settings import Session
+from airflow.utils.state import State
+from airflow import configuration
+from airflow.exceptions import AirflowException
+
+DEFAULT_DATE = datetime(2017, 1, 1)
+TEST_DAG_ID = 'test_dagrun_sensor_dag'
+TEST_DAG_FOLDER = os.path.join(
+    dirname(dirname(dirname(realpath(__file__)))), 'dags')
+
+
+class TestDagRunSensor(unittest.TestCase):
+
+    def setUp(self):
+        configuration.load_test_config()
+        self.default_scheduler_args = {
+            "file_process_interval": 0,
+            "processor_poll_interval": 0.5,
+            "num_runs": 1
+        }
+        self.dagbag = DagBag(dag_folder=TEST_DAG_FOLDER)
+
+    def test_poke(self):
+        dag_parent = self.dagbag.get_dag(TEST_DAG_ID+'_parent_clean')
+        dag_parent.run(
+            start_date=DEFAULT_DATE+timedelta(seconds=0),
+            end_date=DEFAULT_DATE+timedelta(seconds=8),
+        )
+
+        dag_child = self.dagbag.get_dag(TEST_DAG_ID+'_child_clean')
+
+        # One of the following two runs should succeed (00:00:00), while the
+        # other (00:00:05) should have its sensor time out, since 00:00:09
+        # will never be run for the parent dag.
+
+        # first (safe) run
+        dag_child.run(
+            start_date=DEFAULT_DATE+timedelta(seconds=0),
+            end_date=DEFAULT_DATE+timedelta(seconds=0),
+        )
+
+        sess = Session()
+        TI = TaskInstance
+        sensor_tis = sess.query(TI).filter(
+            TI.dag_id == TEST_DAG_ID+'_child_clean',
+            TI.task_id == 'sense_parent',
+            TI.state == State.SUCCESS,
+        ).all()
+        self.assertEqual(len(sensor_tis), 1)
+
+        do_stuff_tis = sess.query(TI).filter(
+            TI.dag_id == TEST_DAG_ID+'_child_clean',
+            TI.task_id == 'do_stuff',
+            TI.state == State.SUCCESS,
+        ).all()
+        self.assertEqual(len(do_stuff_tis), 1)
+
+        DR = DagRun
+        drs = sess.query(DR).filter(
+            DR.dag_id == TEST_DAG_ID+'_child_clean',
+            DR.state == State.SUCCESS,
+            DR.execution_date == DEFAULT_DATE,
+        ).all()
+        self.assertEqual(len(drs), 1)
+
+        # second run
+        with self.assertRaises(AirflowException):
+            # the AirflowTaskTimeout raised by the sensor is caught by
+            # the executor, and what we see is an AirflowException for
+            # the dependent task which fails because of a failed upstream
+            # task.
+            dag_child.run(
+                start_date=DEFAULT_DATE+timedelta(seconds=5),
+                end_date=DEFAULT_DATE+timedelta(seconds=5),
+            )
+
+        failed_tis = sess.query(TI).filter(
+            TI.state == State.FAILED,
+        ).all()
+        self.assertEqual(len(failed_tis), 1)
+        failed_ti = failed_tis[0]
+        self.assertEqual(failed_ti.task_id, 'sense_parent')
+        self.assertEqual(failed_ti.dag_id, TEST_DAG_ID+'_child_clean')
+        self.assertEqual(failed_ti.execution_date,
+                         DEFAULT_DATE+timedelta(seconds=5))
+
+        failed_drs = sess.query(DR).filter(
+            DR.dag_id == TEST_DAG_ID+'_child_clean',
+            DR.state == State.FAILED,
+        ).all()
+        self.assertEqual(len(failed_drs), 1)
+
+        self.assertEqual(failed_drs[0].execution_date,
+                         DEFAULT_DATE+timedelta(seconds=5))
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/tests/dags/test_dagrun_sensor.py b/tests/dags/test_dagrun_sensor.py
new file mode 100644
index 0000000000..daee9e0801
--- /dev/null
+++ b/tests/dags/test_dagrun_sensor.py
@@ -0,0 +1,61 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from datetime import timedelta
+
+from airflow import DAG
+from airflow.operators.bash_operator import BashOperator
+from airflow.contrib.operators.dagrun_sensor import DagRunSensor
+from tests.contrib.operators.test_dagrun_sensor import (DEFAULT_DATE,
+                                                        TEST_DAG_ID)
+
+args = {
+    'start_date': DEFAULT_DATE,
+    'owner': 'airflow',
+    'depends_on_past': False
+}
+
+with DAG(dag_id=TEST_DAG_ID+'_parent_clean',
+         default_args=args,
+         start_date=DEFAULT_DATE,
+         schedule_interval=timedelta(seconds=1)) as dag_parent:
+    t1 = BashOperator(
+        task_id='task_1',
+        bash_command="echo 'one'",
+    )
+    t2 = BashOperator(
+        task_id='task_2',
+        bash_command="echo 'two'",
+    )
+    t1 >> t2
+
+
+# A five-secondly workflow that depends on the 5 secondly runs of the parent
+# dag above.
+with DAG(dag_id=TEST_DAG_ID+'_child_clean',
+         default_args=args,
+         start_date=DEFAULT_DATE,
+         schedule_interval=timedelta(seconds=5)) as dag_child:
+    t1 = DagRunSensor(
+        task_id='sense_parent',
+        external_dag_id=TEST_DAG_ID+'_parent_clean',
+        execution_date_fn=lambda d: [d+timedelta(seconds=i) for i in range(5)],
+        timeout=5,
+        poke_interval=1,
+    )
+    t2 = BashOperator(
+        task_id='do_stuff',
+        bash_command="echo 'finished'",
+    )
+    t1 >> t2


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Add a sensor operator to wait on DagRuns
> ----------------------------------------
>
>                 Key: AIRFLOW-1488
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-1488
>             Project: Apache Airflow
>          Issue Type: New Feature
>          Components: contrib, operators
>            Reporter: Yati
>            Assignee: Yati
>            Priority: Major
>
> The 
> [ExternalTaskSensor|https://airflow.incubator.apache.org/code.html#airflow.operators.ExternalTaskSensor]
>  operator already allows for encoding dependencies on tasks in external DAGs. 
> However, when you have teams, each owning multiple small-to-medium sized 
> DAGs, it is desirable to be able to wait on an external DagRun as a whole. 
> This allows the owners of an upstream DAG to refactor their code freely by 
> splitting/squashing task responsibilities, without worrying about dependent 
> DAGs breaking.
> I'll now enumerate the easiest ways of achieving this that come to mind:
> * Make all DAGs always have a join DummyOperator in the end, with a task id 
> that follows some convention, e.g., "{{ dag_id }}.__end__".
> * Make ExternalTaskSensor poke for a DagRun instead of TaskInstances when the 
> external_task_id argument is None.
> * Implement a separate DagRunSensor operator.
> After considerations, we decided to implement a separate operator, which 
> we've been using in the team for our workflows, and I think it would make a 
> good addition to contrib.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to