This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new d67133c Remove default_args pattern + added get_current_context() use
for Core Airflow example DAGs (#16866)
d67133c is described below
commit d67133c1b67f22b5bddb49b1e0e2caa18d160be2
Author: Josh Fell <[email protected]>
AuthorDate: Thu Sep 2 05:16:57 2021 -0400
Remove default_args pattern + added get_current_context() use for Core
Airflow example DAGs (#16866)
Co-authored-by: Ash Berlin-Taylor <[email protected]>
---
airflow/example_dags/example_bash_operator.py | 5 ---
.../example_branch_datetime_operator.py | 6 ---
.../example_branch_day_of_week_operator.py | 5 ---
airflow/example_dags/example_branch_operator.py | 5 ---
.../example_branch_python_dop_operator_3.py | 7 +---
airflow/example_dags/example_dag_decorator.py | 4 +-
.../example_dags/example_kubernetes_executor.py | 5 ---
.../example_kubernetes_executor_config.py | 5 ---
.../example_passing_params_via_test_command.py | 3 --
airflow/example_dags/example_python_operator.py | 5 ---
.../example_dags/example_short_circuit_operator.py | 5 ---
airflow/example_dags/example_skip_dag.py | 6 +--
.../example_dags/example_trigger_controller_dag.py | 1 -
airflow/example_dags/example_trigger_target_dag.py | 9 ++---
airflow/example_dags/example_xcom.py | 43 ++++++++++------------
airflow/example_dags/example_xcomargs.py | 2 -
airflow/example_dags/tutorial_taskflow_api_etl.py | 10 +----
.../tutorial_taskflow_api_etl_virtualenv.py | 14 ++-----
18 files changed, 30 insertions(+), 110 deletions(-)
diff --git a/airflow/example_dags/example_bash_operator.py
b/airflow/example_dags/example_bash_operator.py
index 6906fc8..b44dbcb 100644
--- a/airflow/example_dags/example_bash_operator.py
+++ b/airflow/example_dags/example_bash_operator.py
@@ -25,13 +25,8 @@ from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago
-args = {
- 'owner': 'airflow',
-}
-
with DAG(
dag_id='example_bash_operator',
- default_args=args,
schedule_interval='0 0 * * *',
start_date=days_ago(2),
dagrun_timeout=timedelta(minutes=60),
diff --git a/airflow/example_dags/example_branch_datetime_operator.py
b/airflow/example_dags/example_branch_datetime_operator.py
index c0ddeef..a9e9c24 100644
--- a/airflow/example_dags/example_branch_datetime_operator.py
+++ b/airflow/example_dags/example_branch_datetime_operator.py
@@ -27,14 +27,9 @@ from airflow.operators.datetime import BranchDateTimeOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago
-args = {
- "owner": "airflow",
-}
-
dag = DAG(
dag_id="example_branch_datetime_operator",
start_date=days_ago(2),
- default_args=args,
tags=["example"],
schedule_interval="@daily",
)
@@ -60,7 +55,6 @@ cond1 >> [dummy_task_1, dummy_task_2]
dag = DAG(
dag_id="example_branch_datetime_operator_2",
start_date=days_ago(2),
- default_args=args,
tags=["example"],
schedule_interval="@daily",
)
diff --git a/airflow/example_dags/example_branch_day_of_week_operator.py
b/airflow/example_dags/example_branch_day_of_week_operator.py
index 597ea90..4a7a87e 100644
--- a/airflow/example_dags/example_branch_day_of_week_operator.py
+++ b/airflow/example_dags/example_branch_day_of_week_operator.py
@@ -25,14 +25,9 @@ from airflow.operators.dummy import DummyOperator
from airflow.operators.weekday import BranchDayOfWeekOperator
from airflow.utils.dates import days_ago
-args = {
- "owner": "airflow",
-}
-
with DAG(
dag_id="example_weekday_branch_operator",
start_date=days_ago(2),
- default_args=args,
tags=["example"],
schedule_interval="@daily",
) as dag:
diff --git a/airflow/example_dags/example_branch_operator.py
b/airflow/example_dags/example_branch_operator.py
index ca85e6d..69e63d2 100644
--- a/airflow/example_dags/example_branch_operator.py
+++ b/airflow/example_dags/example_branch_operator.py
@@ -26,13 +26,8 @@ from airflow.operators.python import BranchPythonOperator
from airflow.utils.dates import days_ago
from airflow.utils.edgemodifier import Label
-args = {
- 'owner': 'airflow',
-}
-
with DAG(
dag_id='example_branch_operator',
- default_args=args,
start_date=days_ago(2),
schedule_interval="@daily",
tags=['example', 'example2'],
diff --git a/airflow/example_dags/example_branch_python_dop_operator_3.py
b/airflow/example_dags/example_branch_python_dop_operator_3.py
index badad5a..fd69ab1 100644
--- a/airflow/example_dags/example_branch_python_dop_operator_3.py
+++ b/airflow/example_dags/example_branch_python_dop_operator_3.py
@@ -26,11 +26,6 @@ from airflow.operators.dummy import DummyOperator
from airflow.operators.python import BranchPythonOperator
from airflow.utils.dates import days_ago
-args = {
- 'owner': 'airflow',
- 'depends_on_past': True,
-}
-
def should_run(**kwargs):
"""
@@ -55,7 +50,7 @@ with DAG(
dag_id='example_branch_dop_operator_v3',
schedule_interval='*/1 * * * *',
start_date=days_ago(2),
- default_args=args,
+ default_args={'depends_on_past': True},
tags=['example'],
) as dag:
diff --git a/airflow/example_dags/example_dag_decorator.py
b/airflow/example_dags/example_dag_decorator.py
index fe1ecf8..6bc150e 100644
--- a/airflow/example_dags/example_dag_decorator.py
+++ b/airflow/example_dags/example_dag_decorator.py
@@ -25,8 +25,6 @@ from airflow.models.baseoperator import BaseOperator
from airflow.operators.email import EmailOperator
from airflow.utils.dates import days_ago
-DEFAULT_ARGS = {"owner": "airflow"}
-
class GetRequestOperator(BaseOperator):
"""Custom operator to sand GET request to provided url"""
@@ -40,7 +38,7 @@ class GetRequestOperator(BaseOperator):
# [START dag_decorator_usage]
-@dag(default_args=DEFAULT_ARGS, schedule_interval=None,
start_date=days_ago(2), tags=['example'])
+@dag(schedule_interval=None, start_date=days_ago(2), tags=['example'])
def example_dag_decorator(email: str = '[email protected]'):
"""
DAG to send server IP to email.
diff --git a/airflow/example_dags/example_kubernetes_executor.py
b/airflow/example_dags/example_kubernetes_executor.py
index 35180e1..c76c119 100644
--- a/airflow/example_dags/example_kubernetes_executor.py
+++ b/airflow/example_dags/example_kubernetes_executor.py
@@ -25,13 +25,8 @@ from airflow.example_dags.libs.helper import print_stuff
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
-args = {
- 'owner': 'airflow',
-}
-
with DAG(
dag_id='example_kubernetes_executor',
- default_args=args,
schedule_interval=None,
start_date=days_ago(2),
tags=['example', 'example2'],
diff --git a/airflow/example_dags/example_kubernetes_executor_config.py
b/airflow/example_dags/example_kubernetes_executor_config.py
index 5290dd8..201220b 100644
--- a/airflow/example_dags/example_kubernetes_executor_config.py
+++ b/airflow/example_dags/example_kubernetes_executor_config.py
@@ -27,10 +27,6 @@ from airflow.operators.python import PythonOperator
from airflow.settings import AIRFLOW_HOME
from airflow.utils.dates import days_ago
-default_args = {
- 'owner': 'airflow',
-}
-
log = logging.getLogger(__name__)
try:
@@ -38,7 +34,6 @@ try:
with DAG(
dag_id='example_kubernetes_executor_config',
- default_args=default_args,
schedule_interval=None,
start_date=days_ago(2),
tags=['example3'],
diff --git a/airflow/example_dags/example_passing_params_via_test_command.py
b/airflow/example_dags/example_passing_params_via_test_command.py
index d2ecdfa..e9e004a 100644
--- a/airflow/example_dags/example_passing_params_via_test_command.py
+++ b/airflow/example_dags/example_passing_params_via_test_command.py
@@ -59,9 +59,6 @@ def print_env_vars(test_mode):
with DAG(
"example_passing_params_via_test_command",
- default_args={
- "owner": "airflow",
- },
schedule_interval='*/1 * * * *',
start_date=days_ago(1),
dagrun_timeout=timedelta(minutes=4),
diff --git a/airflow/example_dags/example_python_operator.py
b/airflow/example_dags/example_python_operator.py
index a9db342..4da7564 100644
--- a/airflow/example_dags/example_python_operator.py
+++ b/airflow/example_dags/example_python_operator.py
@@ -24,13 +24,8 @@ from airflow import DAG
from airflow.operators.python import PythonOperator, PythonVirtualenvOperator
from airflow.utils.dates import days_ago
-args = {
- 'owner': 'airflow',
-}
-
with DAG(
dag_id='example_python_operator',
- default_args=args,
schedule_interval=None,
start_date=days_ago(2),
tags=['example'],
diff --git a/airflow/example_dags/example_short_circuit_operator.py
b/airflow/example_dags/example_short_circuit_operator.py
index 3836ef9..d919f91 100644
--- a/airflow/example_dags/example_short_circuit_operator.py
+++ b/airflow/example_dags/example_short_circuit_operator.py
@@ -23,13 +23,8 @@ from airflow.operators.dummy import DummyOperator
from airflow.operators.python import ShortCircuitOperator
from airflow.utils import dates
-args = {
- 'owner': 'airflow',
-}
-
with DAG(
dag_id='example_short_circuit_operator',
- default_args=args,
start_date=dates.days_ago(2),
tags=['example'],
) as dag:
diff --git a/airflow/example_dags/example_skip_dag.py
b/airflow/example_dags/example_skip_dag.py
index 77fbf4a..ba90e22 100644
--- a/airflow/example_dags/example_skip_dag.py
+++ b/airflow/example_dags/example_skip_dag.py
@@ -23,10 +23,6 @@ from airflow.exceptions import AirflowSkipException
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago
-args = {
- 'owner': 'airflow',
-}
-
# Create some placeholder operators
class DummySkipOperator(DummyOperator):
@@ -56,6 +52,6 @@ def create_test_pipeline(suffix, trigger_rule, dag_):
join >> final
-with DAG(dag_id='example_skip_dag', default_args=args, start_date=days_ago(2),
tags=['example']) as dag:
+with DAG(dag_id='example_skip_dag', start_date=days_ago(2), tags=['example'])
as dag:
create_test_pipeline('1', 'all_success', dag)
create_test_pipeline('2', 'one_success', dag)
diff --git a/airflow/example_dags/example_trigger_controller_dag.py
b/airflow/example_dags/example_trigger_controller_dag.py
index 9d02399..2987b8b 100644
--- a/airflow/example_dags/example_trigger_controller_dag.py
+++ b/airflow/example_dags/example_trigger_controller_dag.py
@@ -27,7 +27,6 @@ from airflow.utils.dates import days_ago
with DAG(
dag_id="example_trigger_controller_dag",
- default_args={"owner": "airflow"},
start_date=days_ago(2),
schedule_interval="@once",
tags=['example'],
diff --git a/airflow/example_dags/example_trigger_target_dag.py
b/airflow/example_dags/example_trigger_target_dag.py
index 2b4661b..4612539 100644
--- a/airflow/example_dags/example_trigger_target_dag.py
+++ b/airflow/example_dags/example_trigger_target_dag.py
@@ -28,19 +28,18 @@ from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
-def run_this_func(**context):
+def run_this_func(dag_run):
"""
Print the payload "message" passed to the DagRun conf attribute.
- :param context: The execution context
- :type context: dict
+ :param dag_run: The DagRun object
+ :type dag_run: DagRun
"""
- print(f"Remotely received value of {context['dag_run'].conf['message']}
for key=message")
+ print(f"Remotely received value of {dag_run.conf['message']} for
key=message")
with DAG(
dag_id="example_trigger_target_dag",
- default_args={"owner": "airflow"},
start_date=days_ago(2),
schedule_interval=None,
tags=['example'],
diff --git a/airflow/example_dags/example_xcom.py
b/airflow/example_dags/example_xcom.py
index de5e14e..60cf3f5 100644
--- a/airflow/example_dags/example_xcom.py
+++ b/airflow/example_dags/example_xcom.py
@@ -36,26 +36,17 @@ def push_by_returning(**kwargs):
return value_2
-def puller(**kwargs):
- """Pull all previously pushed XComs and check if the pushed values match
the pulled values."""
- ti = kwargs['ti']
+def _compare_values(pulled_value, check_value):
+ if pulled_value != check_value:
+ raise ValueError(f'The two values differ {pulled_value} and
{check_value}')
- # get value_1
- pulled_value_1 = ti.xcom_pull(key=None, task_ids='push')
- if pulled_value_1 != value_1:
- raise ValueError(f'The two values differ {pulled_value_1} and
{value_1}')
- # get value_2
- pulled_value_2 = ti.xcom_pull(task_ids='push_by_returning')
- if pulled_value_2 != value_2:
- raise ValueError(f'The two values differ {pulled_value_2} and
{value_2}')
+def puller(pulled_value_1, pulled_value_2, **kwargs):
+ """Pull all previously pushed XComs and check if the pushed values match
the pulled values."""
- # get both value_1 and value_2
- pulled_value_1, pulled_value_2 = ti.xcom_pull(key=None, task_ids=['push',
'push_by_returning'])
- if pulled_value_1 != value_1:
- raise ValueError(f'The two values differ {pulled_value_1} and
{value_1}')
- if pulled_value_2 != value_2:
- raise ValueError(f'The two values differ {pulled_value_2} and
{value_2}')
+ # Check pulled values from function args
+ _compare_values(pulled_value_1, value_1)
+ _compare_values(pulled_value_2, value_2)
def pull_value_from_bash_push(**kwargs):
@@ -70,7 +61,6 @@ with DAG(
'example_xcom',
schedule_interval="@once",
start_date=days_ago(2),
- default_args={'owner': 'airflow'},
tags=['example'],
) as dag:
@@ -87,10 +77,12 @@ with DAG(
pull = PythonOperator(
task_id='puller',
python_callable=puller,
+ op_kwargs={
+ 'pulled_value_1': push1.output['value from pusher 1'],
+ 'pulled_value_2': push2.output,
+ },
)
- pull << [push1, push2]
-
bash_push = BashOperator(
task_id='bash_push',
bash_command='echo "bash_push demo" && '
@@ -98,13 +90,12 @@ with DAG(
'{{ ti.xcom_push(key="manually_pushed_value",
value="manually_pushed_value") }}" && '
'echo "value_by_return"',
)
+
bash_pull = BashOperator(
task_id='bash_pull',
bash_command='echo "bash pull demo" && '
- 'echo "The xcom pushed manually is '
- '"{{ ti.xcom_pull(task_ids="bash_push", key="manually_pushed_value")
}}" && '
- 'echo "The returned_value xcom is '
- '"{{ ti.xcom_pull(task_ids="bash_push", key="return_value") }}" && '
+ f'echo "The xcom pushed manually is
{bash_push.output["manually_pushed_value"]}" && '
+ f'echo "The returned_value xcom is {bash_push.output}" && '
'echo "finished"',
do_xcom_push=False,
)
@@ -115,3 +106,7 @@ with DAG(
)
[bash_pull, python_pull_from_bash] << bash_push
+
+ # Task dependencies created via `XComArgs`:
+ # push1 >> pull
+ # push2 >> pull
diff --git a/airflow/example_dags/example_xcomargs.py
b/airflow/example_dags/example_xcomargs.py
index 0fb728e..bf7dc8a 100644
--- a/airflow/example_dags/example_xcomargs.py
+++ b/airflow/example_dags/example_xcomargs.py
@@ -42,7 +42,6 @@ def print_value(value):
with DAG(
dag_id='example_xcom_args',
- default_args={'owner': 'airflow'},
start_date=days_ago(2),
schedule_interval=None,
tags=['example'],
@@ -57,7 +56,6 @@ with DAG(
with DAG(
"example_xcom_args_with_operators",
- default_args={'owner': 'airflow'},
start_date=days_ago(2),
schedule_interval=None,
tags=['example'],
diff --git a/airflow/example_dags/tutorial_taskflow_api_etl.py
b/airflow/example_dags/tutorial_taskflow_api_etl.py
index edc589c..bc24c3f 100644
--- a/airflow/example_dags/tutorial_taskflow_api_etl.py
+++ b/airflow/example_dags/tutorial_taskflow_api_etl.py
@@ -26,17 +26,9 @@ from airflow.utils.dates import days_ago
# [END import_module]
-# [START default_args]
-# These args will get passed on to each operator
-# You can override them on a per-task basis during operator initialization
-default_args = {
- 'owner': 'airflow',
-}
-# [END default_args]
-
# [START instantiate_dag]
-@dag(default_args=default_args, schedule_interval=None,
start_date=days_ago(2), tags=['example'])
+@dag(schedule_interval=None, start_date=days_ago(2), tags=['example'])
def tutorial_taskflow_api_etl():
"""
### TaskFlow API Tutorial Documentation
diff --git a/airflow/example_dags/tutorial_taskflow_api_etl_virtualenv.py
b/airflow/example_dags/tutorial_taskflow_api_etl_virtualenv.py
index d2723be..bdbf476 100644
--- a/airflow/example_dags/tutorial_taskflow_api_etl_virtualenv.py
+++ b/airflow/example_dags/tutorial_taskflow_api_etl_virtualenv.py
@@ -19,24 +19,14 @@
# [START tutorial]
# [START import_module]
-import json
-
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
# [END import_module]
-# [START default_args]
-# These args will get passed on to each operator
-# You can override them on a per-task basis during operator initialization
-default_args = {
- 'owner': 'airflow',
-}
-# [END default_args]
-
# [START instantiate_dag]
-@dag(default_args=default_args, schedule_interval=None,
start_date=days_ago(2), tags=['example'])
+@dag(schedule_interval=None, start_date=days_ago(2), tags=['example'])
def tutorial_taskflow_api_etl_virtualenv():
"""
### TaskFlow API Tutorial Documentation
@@ -61,6 +51,8 @@ def tutorial_taskflow_api_etl_virtualenv():
pipeline. In this case, getting data is simulated by reading from a
hardcoded JSON string.
"""
+ import json
+
data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
order_data_dict = json.loads(data_string)