This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit e1b4fcfa7eba4fa3fd0af00242e1809f7ae75a04
Author: Charles Bournhonesque <[email protected]>
AuthorDate: Fri Jun 5 11:50:19 2020 -0400

    [AIRFLOW-5500] Fix the trigger_dag api in the case of nested subdags
    
    Co-authored-by: Charles Bournhonesque <[email protected]>
    (cherry picked from commit 16e06f802b30ce7c3972b1a8165e3d6327dee761)
---
 airflow/api/common/experimental/trigger_dag.py    | 12 ++++-------
 tests/api/common/experimental/test_trigger_dag.py | 25 +++++++++++++++++++++++
 2 files changed, 29 insertions(+), 8 deletions(-)

diff --git a/airflow/api/common/experimental/trigger_dag.py 
b/airflow/api/common/experimental/trigger_dag.py
index e7aad06..7adfac6 100644
--- a/airflow/api/common/experimental/trigger_dag.py
+++ b/airflow/api/common/experimental/trigger_dag.py
@@ -85,12 +85,10 @@ def _trigger_dag(
         else:
             run_conf = json.loads(conf)
 
-    triggers = list()
-    dags_to_trigger = list()
-    dags_to_trigger.append(dag)
-    while dags_to_trigger:
-        dag = dags_to_trigger.pop()
-        trigger = dag.create_dagrun(
+    triggers = []
+    dags_to_trigger = [dag] + dag.subdags
+    for _dag in dags_to_trigger:
+        trigger = _dag.create_dagrun(
             run_id=run_id,
             execution_date=execution_date,
             state=State.RUNNING,
@@ -98,8 +96,6 @@ def _trigger_dag(
             external_trigger=True,
         )
         triggers.append(trigger)
-        if dag.subdags:
-            dags_to_trigger.extend(dag.subdags)
     return triggers
 
 
diff --git a/tests/api/common/experimental/test_trigger_dag.py 
b/tests/api/common/experimental/test_trigger_dag.py
index 396268d..66fc007 100644
--- a/tests/api/common/experimental/test_trigger_dag.py
+++ b/tests/api/common/experimental/test_trigger_dag.py
@@ -89,6 +89,31 @@ class TriggerDagTests(unittest.TestCase):
 
         self.assertEqual(3, len(triggers))
 
+    @mock.patch('airflow.models.DAG')
+    @mock.patch('airflow.models.DagRun')
+    @mock.patch('airflow.models.DagBag')
+    def test_trigger_dag_include_nested_subdags(self, dag_bag_mock, 
dag_run_mock, dag_mock):
+        dag_id = "trigger_dag"
+        dag_bag_mock.dags = [dag_id]
+        dag_bag_mock.get_dag.return_value = dag_mock
+        dag_run_mock.find.return_value = None
+        dag1 = mock.MagicMock()
+        dag1.subdags = []
+        dag2 = mock.MagicMock()
+        dag2.subdags = [dag1]
+        dag_mock.subdags = [dag1, dag2]
+
+        triggers = _trigger_dag(
+            dag_id,
+            dag_bag_mock,
+            dag_run_mock,
+            run_id=None,
+            conf=None,
+            execution_date=None,
+            replace_microseconds=True)
+
+        self.assertEqual(3, len(triggers))
+
     @mock.patch('airflow.models.DagBag')
     def test_trigger_dag_with_str_conf(self, dag_bag_mock):
         dag_id = "trigger_dag_with_str_conf"

Reply via email to