This is an automated email from the ASF dual-hosted git repository. potiuk pushed a commit to branch v1-10-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit e1b4fcfa7eba4fa3fd0af00242e1809f7ae75a04 Author: Charles Bournhonesque <[email protected]> AuthorDate: Fri Jun 5 11:50:19 2020 -0400 [AIRFLOW-5500] Fix the trigger_dag api in the case of nested subdags Co-authored-by: Charles Bournhonesque <[email protected]> (cherry picked from commit 16e06f802b30ce7c3972b1a8165e3d6327dee761) --- airflow/api/common/experimental/trigger_dag.py | 12 ++++------- tests/api/common/experimental/test_trigger_dag.py | 25 +++++++++++++++++++++++ 2 files changed, 29 insertions(+), 8 deletions(-) diff --git a/airflow/api/common/experimental/trigger_dag.py b/airflow/api/common/experimental/trigger_dag.py index e7aad06..7adfac6 100644 --- a/airflow/api/common/experimental/trigger_dag.py +++ b/airflow/api/common/experimental/trigger_dag.py @@ -85,12 +85,10 @@ def _trigger_dag( else: run_conf = json.loads(conf) - triggers = list() - dags_to_trigger = list() - dags_to_trigger.append(dag) - while dags_to_trigger: - dag = dags_to_trigger.pop() - trigger = dag.create_dagrun( + triggers = [] + dags_to_trigger = [dag] + dag.subdags + for _dag in dags_to_trigger: + trigger = _dag.create_dagrun( run_id=run_id, execution_date=execution_date, state=State.RUNNING, @@ -98,8 +96,6 @@ def _trigger_dag( external_trigger=True, ) triggers.append(trigger) - if dag.subdags: - dags_to_trigger.extend(dag.subdags) return triggers diff --git a/tests/api/common/experimental/test_trigger_dag.py b/tests/api/common/experimental/test_trigger_dag.py index 396268d..66fc007 100644 --- a/tests/api/common/experimental/test_trigger_dag.py +++ b/tests/api/common/experimental/test_trigger_dag.py @@ -89,6 +89,31 @@ class TriggerDagTests(unittest.TestCase): self.assertEqual(3, len(triggers)) + @mock.patch('airflow.models.DAG') + @mock.patch('airflow.models.DagRun') + @mock.patch('airflow.models.DagBag') + def test_trigger_dag_include_nested_subdags(self, dag_bag_mock, dag_run_mock, dag_mock): + dag_id = "trigger_dag" + dag_bag_mock.dags = [dag_id] + dag_bag_mock.get_dag.return_value = dag_mock + dag_run_mock.find.return_value = None + dag1 = mock.MagicMock() + dag1.subdags = [] + dag2 = mock.MagicMock() + dag2.subdags = [dag1] + dag_mock.subdags = [dag1, dag2] + + triggers = _trigger_dag( + dag_id, + dag_bag_mock, + dag_run_mock, + run_id=None, + conf=None, + execution_date=None, + replace_microseconds=True) + + self.assertEqual(3, len(triggers)) + @mock.patch('airflow.models.DagBag') def test_trigger_dag_with_str_conf(self, dag_bag_mock): dag_id = "trigger_dag_with_str_conf"
