This is an automated email from the ASF dual-hosted git repository.

binh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new c8d0b0e  TaskGroup add default_args (#16557)
c8d0b0e is described below

commit c8d0b0e1fb037345855598042d426c6ccc9968b3
Author: penggongkui <[email protected]>
AuthorDate: Wed Jun 23 13:13:04 2021 +0800

    TaskGroup add default_args (#16557)
    
    * TaskGroup add default_args
    
    * test case && pylint
    
    * TaskGroup default_args docs
    
    * Update docs/apache-airflow/concepts/dags.rst
    
    Co-authored-by: Xinbin Huang <[email protected]>
    
    Co-authored-by: Xinbin Huang <[email protected]>
---
 airflow/models/baseoperator.py        |  4 ++++
 airflow/utils/task_group.py           | 11 +++++++++++
 docs/apache-airflow/concepts/dags.rst |  9 +++++++++
 tests/utils/test_task_group.py        | 21 +++++++++++++++++++++
 4 files changed, 45 insertions(+)

diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index 7af23d3..57b561c 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -136,6 +136,7 @@ class BaseOperatorMeta(abc.ABCMeta):
         @functools.wraps(func)
         def apply_defaults(self, *args: Any, **kwargs: Any) -> Any:
             from airflow.models.dag import DagContext
+            from airflow.utils.task_group import TaskGroupContext
 
             if len(args) > 0:
                 raise AirflowException("Use keyword arguments when 
initializing operators")
@@ -146,6 +147,9 @@ class BaseOperatorMeta(abc.ABCMeta):
             if dag:
                 dag_args = copy.copy(dag.default_args) or {}
                 dag_params = copy.copy(dag.params) or {}
+                task_group = TaskGroupContext.get_current_task_group(dag)
+                if task_group:
+                    dag_args.update(task_group.default_args)
 
             params = kwargs.get('params', {}) or {}
             dag_params.update(params)
diff --git a/airflow/utils/task_group.py b/airflow/utils/task_group.py
index 01a7132..71c2ae4 100644
--- a/airflow/utils/task_group.py
+++ b/airflow/utils/task_group.py
@@ -19,6 +19,7 @@
 A TaskGroup is a collection of closely related tasks on the same DAG that 
should be grouped
 together when the DAG is displayed graphically.
 """
+import copy
 import re
 from typing import TYPE_CHECKING, Dict, Generator, List, Optional, Sequence, 
Set, Union
 
@@ -48,6 +49,14 @@ class TaskGroup(TaskMixin):
     :type parent_group: TaskGroup
     :param dag: The DAG that this TaskGroup belongs to.
     :type dag: airflow.models.DAG
+    :param default_args: A dictionary of default parameters to be used
+        as constructor keyword parameters when initialising operators,
+        will override default_args defined in the DAG level.
+        Note that operators have the same hook, and precede those defined
+        here, meaning that if your dict contains `'depends_on_past': True`
+        here and `'depends_on_past': False` in the operator's call
+        `default_args`, the actual value will be `False`.
+    :type default_args: dict
     :param tooltip: The tooltip of the TaskGroup node when displayed in the UI
     :type tooltip: str
     :param ui_color: The fill color of the TaskGroup node when displayed in 
the UI
@@ -65,6 +74,7 @@ class TaskGroup(TaskMixin):
         prefix_group_id: bool = True,
         parent_group: Optional["TaskGroup"] = None,
         dag: Optional["DAG"] = None,
+        default_args: Optional[Dict] = None,
         tooltip: str = "",
         ui_color: str = "CornflowerBlue",
         ui_fgcolor: str = "#000",
@@ -73,6 +83,7 @@ class TaskGroup(TaskMixin):
         from airflow.models.dag import DagContext
 
         self.prefix_group_id = prefix_group_id
+        self.default_args = copy.deepcopy(default_args or {})
 
         if group_id is None:
             # This creates a root TaskGroup.
diff --git a/docs/apache-airflow/concepts/dags.rst 
b/docs/apache-airflow/concepts/dags.rst
index c250e0c..89bde8b 100644
--- a/docs/apache-airflow/concepts/dags.rst
+++ b/docs/apache-airflow/concepts/dags.rst
@@ -441,6 +441,15 @@ Dependency relationships can be applied across all tasks 
in a TaskGroup with the
 
     group1 >> task3
 
+TaskGroup also supports ``default_args`` like DAG, it will overwrite the 
``default_args`` in DAG level::
+
+    with DAG(dag_id='dag1', default_args={'start_date': datetime(2016, 1, 1), 
'owner': 'dag'}):
+        with TaskGroup('group1', default_args={'owner': 'group'}):
+            task1 = DummyOperator(task_id='task1')
+            task2 = DummyOperator(task_id='task2', owner='task2')
+            print(task1.owner) # "group"
+            print(task2.owner) # "task2"
+
 If you want to see a more advanced use of TaskGroup, you can look at the 
``example_task_group.py`` example DAG that comes with Airflow.
 
 .. note::
diff --git a/tests/utils/test_task_group.py b/tests/utils/test_task_group.py
index 2574ed6..cd84aee 100644
--- a/tests/utils/test_task_group.py
+++ b/tests/utils/test_task_group.py
@@ -823,6 +823,27 @@ def test_task_group_context_mix():
     assert extract_node_id(task_group_to_dict(dag.task_group)) == node_ids
 
 
+def test_default_args():
+    """Testing TaskGroup with default_args"""
+
+    execution_date = pendulum.parse("20201109")
+    with DAG(
+        dag_id='example_task_group_default_args',
+        start_date=execution_date,
+        default_args={
+            "owner": "dag",
+        },
+    ):
+        with TaskGroup("group1", default_args={"owner": "group"}):
+            task_1 = DummyOperator(task_id='task_1')
+            task_2 = DummyOperator(task_id='task_2', owner='task')
+            task_3 = DummyOperator(task_id='task_3', default_args={"owner": 
"task"})
+
+            assert task_1.owner == 'group'
+            assert task_2.owner == 'task'
+            assert task_3.owner == 'task'
+
+
 def test_duplicate_task_group_id():
     """Testing automatic suffix assignment for duplicate group_id"""
 

Reply via email to