This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new 2e8b4ec Pass conf to subdags (#9956)
2e8b4ec is described below
commit 2e8b4ece36b2edf20e50331fbc55269033755954
Author: Daniel Cohen <[email protected]>
AuthorDate: Sat Sep 12 06:58:17 2020 -0400
Pass conf to subdags (#9956)
---
airflow/operators/subdag_operator.py | 7 ++++++-
tests/operators/test_subdag_operator.py | 31 +++++++++++++++++++++++++++++++
2 files changed, 37 insertions(+), 1 deletion(-)
diff --git a/airflow/operators/subdag_operator.py
b/airflow/operators/subdag_operator.py
index 6606870..8ba565b 100644
--- a/airflow/operators/subdag_operator.py
+++ b/airflow/operators/subdag_operator.py
@@ -19,7 +19,7 @@
The module which provides a way to nest your DAGs and so your levels of
complexity.
"""
from enum import Enum
-from typing import Optional
+from typing import Dict, Optional
from sqlalchemy.orm.session import Session
@@ -55,6 +55,8 @@ class SubDagOperator(BaseSensorOperator):
:param subdag: the DAG object to run as a subdag of the current DAG.
:param session: sqlalchemy session
+ :param conf: Configuration for the subdag
+ :type conf: dict
:param propagate_skipped_state: by setting this argument you can define
whether the skipped state of leaf task(s) should be propagated to the
parent dag's downstream task.
"""
@@ -68,10 +70,12 @@ class SubDagOperator(BaseSensorOperator):
*,
subdag: DAG,
session: Optional[Session] = None,
+ conf: Optional[Dict] = None,
propagate_skipped_state:
Optional[SkippedStatePropagationOptions] = None,
**kwargs) -> None:
super().__init__(**kwargs)
self.subdag = subdag
+ self.conf = conf
self.propagate_skipped_state = propagate_skipped_state
self._validate_dag(kwargs)
@@ -151,6 +155,7 @@ class SubDagOperator(BaseSensorOperator):
run_type=DagRunType.SCHEDULED,
execution_date=execution_date,
state=State.RUNNING,
+ conf=self.conf,
external_trigger=True,
)
self.log.info("Created DagRun: %s", dag_run.run_id)
diff --git a/tests/operators/test_subdag_operator.py
b/tests/operators/test_subdag_operator.py
index 5b3acec..f1f7396 100644
--- a/tests/operators/test_subdag_operator.py
+++ b/tests/operators/test_subdag_operator.py
@@ -163,6 +163,37 @@ class TestSubDagOperator(unittest.TestCase):
subdag.create_dagrun.assert_called_once_with(
run_type=DagRunType.SCHEDULED,
execution_date=DEFAULT_DATE,
+ conf=None,
+ state=State.RUNNING,
+ external_trigger=True,
+ )
+
+ self.assertEqual(3, len(subdag_task._get_dagrun.mock_calls))
+
+ def test_execute_create_dagrun_with_conf(self):
+ """
+ When SubDagOperator executes, it creates a DagRun if there is no
existing one
+ and wait until the DagRun succeeds.
+ """
+ conf = {"key": "value"}
+ dag = DAG('parent', default_args=default_args)
+ subdag = DAG('parent.test', default_args=default_args)
+ subdag_task = SubDagOperator(task_id='test', subdag=subdag, dag=dag,
poke_interval=1, conf=conf)
+
+ subdag.create_dagrun = Mock()
+ subdag.create_dagrun.return_value = self.dag_run_running
+
+ subdag_task._get_dagrun = Mock()
+ subdag_task._get_dagrun.side_effect = [None, self.dag_run_success,
self.dag_run_success]
+
+ subdag_task.pre_execute(context={'execution_date': DEFAULT_DATE})
+ subdag_task.execute(context={'execution_date': DEFAULT_DATE})
+ subdag_task.post_execute(context={'execution_date': DEFAULT_DATE})
+
+ subdag.create_dagrun.assert_called_once_with(
+ run_type=DagRunType.SCHEDULED,
+ execution_date=DEFAULT_DATE,
+ conf=conf,
state=State.RUNNING,
external_trigger=True,
)