This is an automated email from the ASF dual-hosted git repository.

uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new d67133c  Remove default_args pattern + added get_current_context() use 
for Core Airflow example DAGs (#16866)
d67133c is described below

commit d67133c1b67f22b5bddb49b1e0e2caa18d160be2
Author: Josh Fell <[email protected]>
AuthorDate: Thu Sep 2 05:16:57 2021 -0400

    Remove default_args pattern + added get_current_context() use for Core 
Airflow example DAGs (#16866)
    
    Co-authored-by: Ash Berlin-Taylor <[email protected]>
---
 airflow/example_dags/example_bash_operator.py      |  5 ---
 .../example_branch_datetime_operator.py            |  6 ---
 .../example_branch_day_of_week_operator.py         |  5 ---
 airflow/example_dags/example_branch_operator.py    |  5 ---
 .../example_branch_python_dop_operator_3.py        |  7 +---
 airflow/example_dags/example_dag_decorator.py      |  4 +-
 .../example_dags/example_kubernetes_executor.py    |  5 ---
 .../example_kubernetes_executor_config.py          |  5 ---
 .../example_passing_params_via_test_command.py     |  3 --
 airflow/example_dags/example_python_operator.py    |  5 ---
 .../example_dags/example_short_circuit_operator.py |  5 ---
 airflow/example_dags/example_skip_dag.py           |  6 +--
 .../example_dags/example_trigger_controller_dag.py |  1 -
 airflow/example_dags/example_trigger_target_dag.py |  9 ++---
 airflow/example_dags/example_xcom.py               | 43 ++++++++++------------
 airflow/example_dags/example_xcomargs.py           |  2 -
 airflow/example_dags/tutorial_taskflow_api_etl.py  | 10 +----
 .../tutorial_taskflow_api_etl_virtualenv.py        | 14 ++-----
 18 files changed, 30 insertions(+), 110 deletions(-)

diff --git a/airflow/example_dags/example_bash_operator.py 
b/airflow/example_dags/example_bash_operator.py
index 6906fc8..b44dbcb 100644
--- a/airflow/example_dags/example_bash_operator.py
+++ b/airflow/example_dags/example_bash_operator.py
@@ -25,13 +25,8 @@ from airflow.operators.bash import BashOperator
 from airflow.operators.dummy import DummyOperator
 from airflow.utils.dates import days_ago
 
-args = {
-    'owner': 'airflow',
-}
-
 with DAG(
     dag_id='example_bash_operator',
-    default_args=args,
     schedule_interval='0 0 * * *',
     start_date=days_ago(2),
     dagrun_timeout=timedelta(minutes=60),
diff --git a/airflow/example_dags/example_branch_datetime_operator.py 
b/airflow/example_dags/example_branch_datetime_operator.py
index c0ddeef..a9e9c24 100644
--- a/airflow/example_dags/example_branch_datetime_operator.py
+++ b/airflow/example_dags/example_branch_datetime_operator.py
@@ -27,14 +27,9 @@ from airflow.operators.datetime import BranchDateTimeOperator
 from airflow.operators.dummy import DummyOperator
 from airflow.utils.dates import days_ago
 
-args = {
-    "owner": "airflow",
-}
-
 dag = DAG(
     dag_id="example_branch_datetime_operator",
     start_date=days_ago(2),
-    default_args=args,
     tags=["example"],
     schedule_interval="@daily",
 )
@@ -60,7 +55,6 @@ cond1 >> [dummy_task_1, dummy_task_2]
 dag = DAG(
     dag_id="example_branch_datetime_operator_2",
     start_date=days_ago(2),
-    default_args=args,
     tags=["example"],
     schedule_interval="@daily",
 )
diff --git a/airflow/example_dags/example_branch_day_of_week_operator.py 
b/airflow/example_dags/example_branch_day_of_week_operator.py
index 597ea90..4a7a87e 100644
--- a/airflow/example_dags/example_branch_day_of_week_operator.py
+++ b/airflow/example_dags/example_branch_day_of_week_operator.py
@@ -25,14 +25,9 @@ from airflow.operators.dummy import DummyOperator
 from airflow.operators.weekday import BranchDayOfWeekOperator
 from airflow.utils.dates import days_ago
 
-args = {
-    "owner": "airflow",
-}
-
 with DAG(
     dag_id="example_weekday_branch_operator",
     start_date=days_ago(2),
-    default_args=args,
     tags=["example"],
     schedule_interval="@daily",
 ) as dag:
diff --git a/airflow/example_dags/example_branch_operator.py 
b/airflow/example_dags/example_branch_operator.py
index ca85e6d..69e63d2 100644
--- a/airflow/example_dags/example_branch_operator.py
+++ b/airflow/example_dags/example_branch_operator.py
@@ -26,13 +26,8 @@ from airflow.operators.python import BranchPythonOperator
 from airflow.utils.dates import days_ago
 from airflow.utils.edgemodifier import Label
 
-args = {
-    'owner': 'airflow',
-}
-
 with DAG(
     dag_id='example_branch_operator',
-    default_args=args,
     start_date=days_ago(2),
     schedule_interval="@daily",
     tags=['example', 'example2'],
diff --git a/airflow/example_dags/example_branch_python_dop_operator_3.py 
b/airflow/example_dags/example_branch_python_dop_operator_3.py
index badad5a..fd69ab1 100644
--- a/airflow/example_dags/example_branch_python_dop_operator_3.py
+++ b/airflow/example_dags/example_branch_python_dop_operator_3.py
@@ -26,11 +26,6 @@ from airflow.operators.dummy import DummyOperator
 from airflow.operators.python import BranchPythonOperator
 from airflow.utils.dates import days_ago
 
-args = {
-    'owner': 'airflow',
-    'depends_on_past': True,
-}
-
 
 def should_run(**kwargs):
     """
@@ -55,7 +50,7 @@ with DAG(
     dag_id='example_branch_dop_operator_v3',
     schedule_interval='*/1 * * * *',
     start_date=days_ago(2),
-    default_args=args,
+    default_args={'depends_on_past': True},
     tags=['example'],
 ) as dag:
 
diff --git a/airflow/example_dags/example_dag_decorator.py 
b/airflow/example_dags/example_dag_decorator.py
index fe1ecf8..6bc150e 100644
--- a/airflow/example_dags/example_dag_decorator.py
+++ b/airflow/example_dags/example_dag_decorator.py
@@ -25,8 +25,6 @@ from airflow.models.baseoperator import BaseOperator
 from airflow.operators.email import EmailOperator
 from airflow.utils.dates import days_ago
 
-DEFAULT_ARGS = {"owner": "airflow"}
-
 
 class GetRequestOperator(BaseOperator):
     """Custom operator to sand GET request to provided url"""
@@ -40,7 +38,7 @@ class GetRequestOperator(BaseOperator):
 
 
 # [START dag_decorator_usage]
-@dag(default_args=DEFAULT_ARGS, schedule_interval=None, 
start_date=days_ago(2), tags=['example'])
+@dag(schedule_interval=None, start_date=days_ago(2), tags=['example'])
 def example_dag_decorator(email: str = '[email protected]'):
     """
     DAG to send server IP to email.
diff --git a/airflow/example_dags/example_kubernetes_executor.py 
b/airflow/example_dags/example_kubernetes_executor.py
index 35180e1..c76c119 100644
--- a/airflow/example_dags/example_kubernetes_executor.py
+++ b/airflow/example_dags/example_kubernetes_executor.py
@@ -25,13 +25,8 @@ from airflow.example_dags.libs.helper import print_stuff
 from airflow.operators.python import PythonOperator
 from airflow.utils.dates import days_ago
 
-args = {
-    'owner': 'airflow',
-}
-
 with DAG(
     dag_id='example_kubernetes_executor',
-    default_args=args,
     schedule_interval=None,
     start_date=days_ago(2),
     tags=['example', 'example2'],
diff --git a/airflow/example_dags/example_kubernetes_executor_config.py 
b/airflow/example_dags/example_kubernetes_executor_config.py
index 5290dd8..201220b 100644
--- a/airflow/example_dags/example_kubernetes_executor_config.py
+++ b/airflow/example_dags/example_kubernetes_executor_config.py
@@ -27,10 +27,6 @@ from airflow.operators.python import PythonOperator
 from airflow.settings import AIRFLOW_HOME
 from airflow.utils.dates import days_ago
 
-default_args = {
-    'owner': 'airflow',
-}
-
 log = logging.getLogger(__name__)
 
 try:
@@ -38,7 +34,6 @@ try:
 
     with DAG(
         dag_id='example_kubernetes_executor_config',
-        default_args=default_args,
         schedule_interval=None,
         start_date=days_ago(2),
         tags=['example3'],
diff --git a/airflow/example_dags/example_passing_params_via_test_command.py 
b/airflow/example_dags/example_passing_params_via_test_command.py
index d2ecdfa..e9e004a 100644
--- a/airflow/example_dags/example_passing_params_via_test_command.py
+++ b/airflow/example_dags/example_passing_params_via_test_command.py
@@ -59,9 +59,6 @@ def print_env_vars(test_mode):
 
 with DAG(
     "example_passing_params_via_test_command",
-    default_args={
-        "owner": "airflow",
-    },
     schedule_interval='*/1 * * * *',
     start_date=days_ago(1),
     dagrun_timeout=timedelta(minutes=4),
diff --git a/airflow/example_dags/example_python_operator.py 
b/airflow/example_dags/example_python_operator.py
index a9db342..4da7564 100644
--- a/airflow/example_dags/example_python_operator.py
+++ b/airflow/example_dags/example_python_operator.py
@@ -24,13 +24,8 @@ from airflow import DAG
 from airflow.operators.python import PythonOperator, PythonVirtualenvOperator
 from airflow.utils.dates import days_ago
 
-args = {
-    'owner': 'airflow',
-}
-
 with DAG(
     dag_id='example_python_operator',
-    default_args=args,
     schedule_interval=None,
     start_date=days_ago(2),
     tags=['example'],
diff --git a/airflow/example_dags/example_short_circuit_operator.py 
b/airflow/example_dags/example_short_circuit_operator.py
index 3836ef9..d919f91 100644
--- a/airflow/example_dags/example_short_circuit_operator.py
+++ b/airflow/example_dags/example_short_circuit_operator.py
@@ -23,13 +23,8 @@ from airflow.operators.dummy import DummyOperator
 from airflow.operators.python import ShortCircuitOperator
 from airflow.utils import dates
 
-args = {
-    'owner': 'airflow',
-}
-
 with DAG(
     dag_id='example_short_circuit_operator',
-    default_args=args,
     start_date=dates.days_ago(2),
     tags=['example'],
 ) as dag:
diff --git a/airflow/example_dags/example_skip_dag.py 
b/airflow/example_dags/example_skip_dag.py
index 77fbf4a..ba90e22 100644
--- a/airflow/example_dags/example_skip_dag.py
+++ b/airflow/example_dags/example_skip_dag.py
@@ -23,10 +23,6 @@ from airflow.exceptions import AirflowSkipException
 from airflow.operators.dummy import DummyOperator
 from airflow.utils.dates import days_ago
 
-args = {
-    'owner': 'airflow',
-}
-
 
 # Create some placeholder operators
 class DummySkipOperator(DummyOperator):
@@ -56,6 +52,6 @@ def create_test_pipeline(suffix, trigger_rule, dag_):
     join >> final
 
 
-with DAG(dag_id='example_skip_dag', default_args=args, start_date=days_ago(2), 
tags=['example']) as dag:
+with DAG(dag_id='example_skip_dag', start_date=days_ago(2), tags=['example']) 
as dag:
     create_test_pipeline('1', 'all_success', dag)
     create_test_pipeline('2', 'one_success', dag)
diff --git a/airflow/example_dags/example_trigger_controller_dag.py 
b/airflow/example_dags/example_trigger_controller_dag.py
index 9d02399..2987b8b 100644
--- a/airflow/example_dags/example_trigger_controller_dag.py
+++ b/airflow/example_dags/example_trigger_controller_dag.py
@@ -27,7 +27,6 @@ from airflow.utils.dates import days_ago
 
 with DAG(
     dag_id="example_trigger_controller_dag",
-    default_args={"owner": "airflow"},
     start_date=days_ago(2),
     schedule_interval="@once",
     tags=['example'],
diff --git a/airflow/example_dags/example_trigger_target_dag.py 
b/airflow/example_dags/example_trigger_target_dag.py
index 2b4661b..4612539 100644
--- a/airflow/example_dags/example_trigger_target_dag.py
+++ b/airflow/example_dags/example_trigger_target_dag.py
@@ -28,19 +28,18 @@ from airflow.operators.python import PythonOperator
 from airflow.utils.dates import days_ago
 
 
-def run_this_func(**context):
+def run_this_func(dag_run):
     """
     Print the payload "message" passed to the DagRun conf attribute.
 
-    :param context: The execution context
-    :type context: dict
+    :param dag_run: The DagRun object
+    :type dag_run: DagRun
     """
-    print(f"Remotely received value of {context['dag_run'].conf['message']} 
for key=message")
+    print(f"Remotely received value of {dag_run.conf['message']} for 
key=message")
 
 
 with DAG(
     dag_id="example_trigger_target_dag",
-    default_args={"owner": "airflow"},
     start_date=days_ago(2),
     schedule_interval=None,
     tags=['example'],
diff --git a/airflow/example_dags/example_xcom.py 
b/airflow/example_dags/example_xcom.py
index de5e14e..60cf3f5 100644
--- a/airflow/example_dags/example_xcom.py
+++ b/airflow/example_dags/example_xcom.py
@@ -36,26 +36,17 @@ def push_by_returning(**kwargs):
     return value_2
 
 
-def puller(**kwargs):
-    """Pull all previously pushed XComs and check if the pushed values match 
the pulled values."""
-    ti = kwargs['ti']
+def _compare_values(pulled_value, check_value):
+    if pulled_value != check_value:
+        raise ValueError(f'The two values differ {pulled_value} and 
{check_value}')
 
-    # get value_1
-    pulled_value_1 = ti.xcom_pull(key=None, task_ids='push')
-    if pulled_value_1 != value_1:
-        raise ValueError(f'The two values differ {pulled_value_1} and 
{value_1}')
 
-    # get value_2
-    pulled_value_2 = ti.xcom_pull(task_ids='push_by_returning')
-    if pulled_value_2 != value_2:
-        raise ValueError(f'The two values differ {pulled_value_2} and 
{value_2}')
+def puller(pulled_value_1, pulled_value_2, **kwargs):
+    """Pull all previously pushed XComs and check if the pushed values match 
the pulled values."""
 
-    # get both value_1 and value_2
-    pulled_value_1, pulled_value_2 = ti.xcom_pull(key=None, task_ids=['push', 
'push_by_returning'])
-    if pulled_value_1 != value_1:
-        raise ValueError(f'The two values differ {pulled_value_1} and 
{value_1}')
-    if pulled_value_2 != value_2:
-        raise ValueError(f'The two values differ {pulled_value_2} and 
{value_2}')
+    # Check pulled values from function args
+    _compare_values(pulled_value_1, value_1)
+    _compare_values(pulled_value_2, value_2)
 
 
 def pull_value_from_bash_push(**kwargs):
@@ -70,7 +61,6 @@ with DAG(
     'example_xcom',
     schedule_interval="@once",
     start_date=days_ago(2),
-    default_args={'owner': 'airflow'},
     tags=['example'],
 ) as dag:
 
@@ -87,10 +77,12 @@ with DAG(
     pull = PythonOperator(
         task_id='puller',
         python_callable=puller,
+        op_kwargs={
+            'pulled_value_1': push1.output['value from pusher 1'],
+            'pulled_value_2': push2.output,
+        },
     )
 
-    pull << [push1, push2]
-
     bash_push = BashOperator(
         task_id='bash_push',
         bash_command='echo "bash_push demo"  && '
@@ -98,13 +90,12 @@ with DAG(
         '{{ ti.xcom_push(key="manually_pushed_value", 
value="manually_pushed_value") }}" && '
         'echo "value_by_return"',
     )
+
     bash_pull = BashOperator(
         task_id='bash_pull',
         bash_command='echo "bash pull demo" && '
-        'echo "The xcom pushed manually is '
-        '"{{ ti.xcom_pull(task_ids="bash_push", key="manually_pushed_value") 
}}" && '
-        'echo "The returned_value xcom is '
-        '"{{ ti.xcom_pull(task_ids="bash_push", key="return_value") }}" && '
+        f'echo "The xcom pushed manually is 
{bash_push.output["manually_pushed_value"]}" && '
+        f'echo "The returned_value xcom is {bash_push.output}" && '
         'echo "finished"',
         do_xcom_push=False,
     )
@@ -115,3 +106,7 @@ with DAG(
     )
 
     [bash_pull, python_pull_from_bash] << bash_push
+
+    # Task dependencies created via `XComArgs`:
+    #   push1 >> pull
+    #   push2 >> pull
diff --git a/airflow/example_dags/example_xcomargs.py 
b/airflow/example_dags/example_xcomargs.py
index 0fb728e..bf7dc8a 100644
--- a/airflow/example_dags/example_xcomargs.py
+++ b/airflow/example_dags/example_xcomargs.py
@@ -42,7 +42,6 @@ def print_value(value):
 
 with DAG(
     dag_id='example_xcom_args',
-    default_args={'owner': 'airflow'},
     start_date=days_ago(2),
     schedule_interval=None,
     tags=['example'],
@@ -57,7 +56,6 @@ with DAG(
 
 with DAG(
     "example_xcom_args_with_operators",
-    default_args={'owner': 'airflow'},
     start_date=days_ago(2),
     schedule_interval=None,
     tags=['example'],
diff --git a/airflow/example_dags/tutorial_taskflow_api_etl.py 
b/airflow/example_dags/tutorial_taskflow_api_etl.py
index edc589c..bc24c3f 100644
--- a/airflow/example_dags/tutorial_taskflow_api_etl.py
+++ b/airflow/example_dags/tutorial_taskflow_api_etl.py
@@ -26,17 +26,9 @@ from airflow.utils.dates import days_ago
 
 # [END import_module]
 
-# [START default_args]
-# These args will get passed on to each operator
-# You can override them on a per-task basis during operator initialization
-default_args = {
-    'owner': 'airflow',
-}
-# [END default_args]
-
 
 # [START instantiate_dag]
-@dag(default_args=default_args, schedule_interval=None, 
start_date=days_ago(2), tags=['example'])
+@dag(schedule_interval=None, start_date=days_ago(2), tags=['example'])
 def tutorial_taskflow_api_etl():
     """
     ### TaskFlow API Tutorial Documentation
diff --git a/airflow/example_dags/tutorial_taskflow_api_etl_virtualenv.py 
b/airflow/example_dags/tutorial_taskflow_api_etl_virtualenv.py
index d2723be..bdbf476 100644
--- a/airflow/example_dags/tutorial_taskflow_api_etl_virtualenv.py
+++ b/airflow/example_dags/tutorial_taskflow_api_etl_virtualenv.py
@@ -19,24 +19,14 @@
 
 # [START tutorial]
 # [START import_module]
-import json
-
 from airflow.decorators import dag, task
 from airflow.utils.dates import days_ago
 
 # [END import_module]
 
-# [START default_args]
-# These args will get passed on to each operator
-# You can override them on a per-task basis during operator initialization
-default_args = {
-    'owner': 'airflow',
-}
-# [END default_args]
-
 
 # [START instantiate_dag]
-@dag(default_args=default_args, schedule_interval=None, 
start_date=days_ago(2), tags=['example'])
+@dag(schedule_interval=None, start_date=days_ago(2), tags=['example'])
 def tutorial_taskflow_api_etl_virtualenv():
     """
     ### TaskFlow API Tutorial Documentation
@@ -61,6 +51,8 @@ def tutorial_taskflow_api_etl_virtualenv():
         pipeline. In this case, getting data is simulated by reading from a
         hardcoded JSON string.
         """
+        import json
+
         data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
 
         order_data_dict = json.loads(data_string)

Reply via email to