This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new 16e06f8 [AIRFLOW-5500] Fix the trigger_dag api in the case of nested
subdags
16e06f8 is described below
commit 16e06f802b30ce7c3972b1a8165e3d6327dee761
Author: Charles Bournhonesque <[email protected]>
AuthorDate: Fri Jun 5 11:50:19 2020 -0400
[AIRFLOW-5500] Fix the trigger_dag api in the case of nested subdags
Co-authored-by: Charles Bournhonesque <[email protected]>
---
airflow/api/common/experimental/trigger_dag.py | 9 +++-----
tests/api/common/experimental/test_trigger_dag.py | 25 +++++++++++++++++++++++
2 files changed, 28 insertions(+), 6 deletions(-)
diff --git a/airflow/api/common/experimental/trigger_dag.py
b/airflow/api/common/experimental/trigger_dag.py
index a7a64b8..05dadc7 100644
--- a/airflow/api/common/experimental/trigger_dag.py
+++ b/airflow/api/common/experimental/trigger_dag.py
@@ -81,10 +81,9 @@ def _trigger_dag(
run_conf = conf if isinstance(conf, dict) else json.loads(conf)
triggers = []
- dags_to_trigger = [dag]
- while dags_to_trigger:
- dag = dags_to_trigger.pop()
- trigger = dag.create_dagrun(
+ dags_to_trigger = [dag] + dag.subdags
+ for _dag in dags_to_trigger:
+ trigger = _dag.create_dagrun(
run_id=run_id,
execution_date=execution_date,
state=State.RUNNING,
@@ -93,8 +92,6 @@ def _trigger_dag(
)
triggers.append(trigger)
- if dag.subdags:
- dags_to_trigger.extend(dag.subdags)
return triggers
diff --git a/tests/api/common/experimental/test_trigger_dag.py
b/tests/api/common/experimental/test_trigger_dag.py
index d0c904c..ad6423c 100644
--- a/tests/api/common/experimental/test_trigger_dag.py
+++ b/tests/api/common/experimental/test_trigger_dag.py
@@ -89,6 +89,31 @@ class TestTriggerDag(unittest.TestCase):
self.assertEqual(3, len(triggers))
+ @mock.patch('airflow.models.DAG')
+ @mock.patch('airflow.models.DagRun')
+ @mock.patch('airflow.models.DagBag')
+ def test_trigger_dag_include_nested_subdags(self, dag_bag_mock,
dag_run_mock, dag_mock):
+ dag_id = "trigger_dag"
+ dag_bag_mock.dags = [dag_id]
+ dag_bag_mock.get_dag.return_value = dag_mock
+ dag_run_mock.find.return_value = None
+ dag1 = mock.MagicMock()
+ dag1.subdags = []
+ dag2 = mock.MagicMock()
+ dag2.subdags = [dag1]
+ dag_mock.subdags = [dag1, dag2]
+
+ triggers = _trigger_dag(
+ dag_id,
+ dag_bag_mock,
+ dag_run_mock,
+ run_id=None,
+ conf=None,
+ execution_date=None,
+ replace_microseconds=True)
+
+ self.assertEqual(3, len(triggers))
+
@mock.patch('airflow.models.DagBag')
def test_trigger_dag_with_str_conf(self, dag_bag_mock):
dag_id = "trigger_dag_with_str_conf"