This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 2e8b4ec  Pass conf to subdags (#9956)
2e8b4ec is described below

commit 2e8b4ece36b2edf20e50331fbc55269033755954
Author: Daniel Cohen <[email protected]>
AuthorDate: Sat Sep 12 06:58:17 2020 -0400

    Pass conf to subdags (#9956)
---
 airflow/operators/subdag_operator.py    |  7 ++++++-
 tests/operators/test_subdag_operator.py | 31 +++++++++++++++++++++++++++++++
 2 files changed, 37 insertions(+), 1 deletion(-)

diff --git a/airflow/operators/subdag_operator.py 
b/airflow/operators/subdag_operator.py
index 6606870..8ba565b 100644
--- a/airflow/operators/subdag_operator.py
+++ b/airflow/operators/subdag_operator.py
@@ -19,7 +19,7 @@
 The module which provides a way to nest your DAGs and so your levels of 
complexity.
 """
 from enum import Enum
-from typing import Optional
+from typing import Dict, Optional
 
 from sqlalchemy.orm.session import Session
 
@@ -55,6 +55,8 @@ class SubDagOperator(BaseSensorOperator):
 
     :param subdag: the DAG object to run as a subdag of the current DAG.
     :param session: sqlalchemy session
+    :param conf: Configuration for the subdag
+    :type conf: dict
     :param propagate_skipped_state: by setting this argument you can define
         whether the skipped state of leaf task(s) should be propagated to the 
parent dag's downstream task.
     """
@@ -68,10 +70,12 @@ class SubDagOperator(BaseSensorOperator):
                  *,
                  subdag: DAG,
                  session: Optional[Session] = None,
+                 conf: Optional[Dict] = None,
                  propagate_skipped_state: 
Optional[SkippedStatePropagationOptions] = None,
                  **kwargs) -> None:
         super().__init__(**kwargs)
         self.subdag = subdag
+        self.conf = conf
         self.propagate_skipped_state = propagate_skipped_state
 
         self._validate_dag(kwargs)
@@ -151,6 +155,7 @@ class SubDagOperator(BaseSensorOperator):
                 run_type=DagRunType.SCHEDULED,
                 execution_date=execution_date,
                 state=State.RUNNING,
+                conf=self.conf,
                 external_trigger=True,
             )
             self.log.info("Created DagRun: %s", dag_run.run_id)
diff --git a/tests/operators/test_subdag_operator.py 
b/tests/operators/test_subdag_operator.py
index 5b3acec..f1f7396 100644
--- a/tests/operators/test_subdag_operator.py
+++ b/tests/operators/test_subdag_operator.py
@@ -163,6 +163,37 @@ class TestSubDagOperator(unittest.TestCase):
         subdag.create_dagrun.assert_called_once_with(
             run_type=DagRunType.SCHEDULED,
             execution_date=DEFAULT_DATE,
+            conf=None,
+            state=State.RUNNING,
+            external_trigger=True,
+        )
+
+        self.assertEqual(3, len(subdag_task._get_dagrun.mock_calls))
+
+    def test_execute_create_dagrun_with_conf(self):
+        """
+        When SubDagOperator executes, it creates a DagRun if there is no 
existing one
+        and wait until the DagRun succeeds.
+        """
+        conf = {"key": "value"}
+        dag = DAG('parent', default_args=default_args)
+        subdag = DAG('parent.test', default_args=default_args)
+        subdag_task = SubDagOperator(task_id='test', subdag=subdag, dag=dag, 
poke_interval=1, conf=conf)
+
+        subdag.create_dagrun = Mock()
+        subdag.create_dagrun.return_value = self.dag_run_running
+
+        subdag_task._get_dagrun = Mock()
+        subdag_task._get_dagrun.side_effect = [None, self.dag_run_success, 
self.dag_run_success]
+
+        subdag_task.pre_execute(context={'execution_date': DEFAULT_DATE})
+        subdag_task.execute(context={'execution_date': DEFAULT_DATE})
+        subdag_task.post_execute(context={'execution_date': DEFAULT_DATE})
+
+        subdag.create_dagrun.assert_called_once_with(
+            run_type=DagRunType.SCHEDULED,
+            execution_date=DEFAULT_DATE,
+            conf=conf,
             state=State.RUNNING,
             external_trigger=True,
         )

Reply via email to