This is an automated email from the ASF dual-hosted git repository. potiuk pushed a commit to branch v2-0-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 90ce6b29c31b3bc3d74a812af0f1437c8821ce04 Author: Jennifer Melot <[email protected]> AuthorDate: Wed Jan 20 07:16:12 2021 -0500 Use DAG context manager in examples (#13297) (cherry picked from commit 9923d606d2887c52390a30639fc1ee0d4000149c) --- airflow/example_dags/example_bash_operator.py | 54 ++++----- airflow/example_dags/example_branch_operator.py | 51 ++++----- .../example_branch_python_dop_operator_3.py | 31 +++-- airflow/example_dags/example_latest_only.py | 10 +- .../example_latest_only_with_trigger.py | 18 +-- .../example_passing_params_via_test_command.py | 66 +++++------ airflow/example_dags/example_python_operator.py | 127 ++++++++++----------- .../example_dags/example_short_circuit_operator.py | 30 +++-- airflow/example_dags/example_skip_dag.py | 6 +- airflow/example_dags/example_subdag_operator.py | 45 ++++---- .../example_dags/example_trigger_controller_dag.py | 15 ++- airflow/example_dags/example_trigger_target_dag.py | 29 +++-- airflow/example_dags/example_xcom.py | 45 ++++---- airflow/example_dags/test_utils.py | 15 ++- airflow/example_dags/tutorial.py | 92 +++++++-------- airflow/example_dags/tutorial_etl_dag.py | 41 ++++--- docs/apache-airflow/executor/kubernetes.rst | 2 + docs/apache-airflow/howto/operator/bash.rst | 2 + .../howto/operator/external_task_sensor.rst | 2 + docs/apache-airflow/howto/operator/python.rst | 3 + docs/apache-airflow/tutorial.rst | 3 + docs/apache-airflow/tutorial_taskflow_api.rst | 6 + 22 files changed, 344 insertions(+), 349 deletions(-) diff --git a/airflow/example_dags/example_bash_operator.py b/airflow/example_dags/example_bash_operator.py index 1c22fff..0665971 100644 --- a/airflow/example_dags/example_bash_operator.py +++ b/airflow/example_dags/example_bash_operator.py @@ -29,7 +29,7 @@ args = { 'owner': 'airflow', } -dag = DAG( +with DAG( dag_id='example_bash_operator', default_args=args, schedule_interval='0 0 * * *', @@ -37,39 +37,35 @@ dag = DAG( dagrun_timeout=timedelta(minutes=60), tags=['example', 'example2'], params={"example_key": "example_value"}, -) +) as dag: -run_this_last = DummyOperator( - task_id='run_this_last', - dag=dag, -) + run_this_last = DummyOperator( + task_id='run_this_last', + ) + + # [START howto_operator_bash] + run_this = BashOperator( + task_id='run_after_loop', + bash_command='echo 1', + ) + # [END howto_operator_bash] -# [START howto_operator_bash] -run_this = BashOperator( - task_id='run_after_loop', - bash_command='echo 1', - dag=dag, -) -# [END howto_operator_bash] + run_this >> run_this_last -run_this >> run_this_last + for i in range(3): + task = BashOperator( + task_id='runme_' + str(i), + bash_command='echo "{{ task_instance_key_str }}" && sleep 1', + ) + task >> run_this -for i in range(3): - task = BashOperator( - task_id='runme_' + str(i), - bash_command='echo "{{ task_instance_key_str }}" && sleep 1', - dag=dag, + # [START howto_operator_bash_template] + also_run_this = BashOperator( + task_id='also_run_this', + bash_command='echo "run_id={{ run_id }} | dag_run={{ dag_run }}"', ) - task >> run_this - -# [START howto_operator_bash_template] -also_run_this = BashOperator( - task_id='also_run_this', - bash_command='echo "run_id={{ run_id }} | dag_run={{ dag_run }}"', - dag=dag, -) -# [END howto_operator_bash_template] -also_run_this >> run_this_last + # [END howto_operator_bash_template] + also_run_this >> run_this_last if __name__ == "__main__": dag.cli() diff --git a/airflow/example_dags/example_branch_operator.py b/airflow/example_dags/example_branch_operator.py index 50eb328..7c5e166 100644 --- a/airflow/example_dags/example_branch_operator.py +++ b/airflow/example_dags/example_branch_operator.py @@ -29,43 +29,38 @@ args = { 'owner': 'airflow', } -dag = DAG( +with DAG( dag_id='example_branch_operator', default_args=args, start_date=days_ago(2), schedule_interval="@daily", tags=['example', 'example2'], -) +) as dag: -run_this_first = DummyOperator( - task_id='run_this_first', - dag=dag, -) - -options = ['branch_a', 'branch_b', 'branch_c', 'branch_d'] - -branching = BranchPythonOperator( - task_id='branching', - python_callable=lambda: random.choice(options), - dag=dag, -) -run_this_first >> branching + run_this_first = DummyOperator( + task_id='run_this_first', + ) -join = DummyOperator( - task_id='join', - trigger_rule='none_failed_or_skipped', - dag=dag, -) + options = ['branch_a', 'branch_b', 'branch_c', 'branch_d'] -for option in options: - t = DummyOperator( - task_id=option, - dag=dag, + branching = BranchPythonOperator( + task_id='branching', + python_callable=lambda: random.choice(options), ) + run_this_first >> branching - dummy_follow = DummyOperator( - task_id='follow_' + option, - dag=dag, + join = DummyOperator( + task_id='join', + trigger_rule='none_failed_or_skipped', ) - branching >> t >> dummy_follow >> join + for option in options: + t = DummyOperator( + task_id=option, + ) + + dummy_follow = DummyOperator( + task_id='follow_' + option, + ) + + branching >> t >> dummy_follow >> join diff --git a/airflow/example_dags/example_branch_python_dop_operator_3.py b/airflow/example_dags/example_branch_python_dop_operator_3.py index f01fc50..badad5a 100644 --- a/airflow/example_dags/example_branch_python_dop_operator_3.py +++ b/airflow/example_dags/example_branch_python_dop_operator_3.py @@ -31,14 +31,6 @@ args = { 'depends_on_past': True, } -dag = DAG( - dag_id='example_branch_dop_operator_v3', - schedule_interval='*/1 * * * *', - start_date=days_ago(2), - default_args=args, - tags=['example'], -) - def should_run(**kwargs): """ @@ -59,12 +51,19 @@ def should_run(**kwargs): return "dummy_task_2" -cond = BranchPythonOperator( - task_id='condition', - python_callable=should_run, - dag=dag, -) +with DAG( + dag_id='example_branch_dop_operator_v3', + schedule_interval='*/1 * * * *', + start_date=days_ago(2), + default_args=args, + tags=['example'], +) as dag: + + cond = BranchPythonOperator( + task_id='condition', + python_callable=should_run, + ) -dummy_task_1 = DummyOperator(task_id='dummy_task_1', dag=dag) -dummy_task_2 = DummyOperator(task_id='dummy_task_2', dag=dag) -cond >> [dummy_task_1, dummy_task_2] + dummy_task_1 = DummyOperator(task_id='dummy_task_1') + dummy_task_2 = DummyOperator(task_id='dummy_task_2') + cond >> [dummy_task_1, dummy_task_2] diff --git a/airflow/example_dags/example_latest_only.py b/airflow/example_dags/example_latest_only.py index 272a054..d0d5db0 100644 --- a/airflow/example_dags/example_latest_only.py +++ b/airflow/example_dags/example_latest_only.py @@ -25,14 +25,14 @@ from airflow.operators.dummy import DummyOperator from airflow.operators.latest_only import LatestOnlyOperator from airflow.utils.dates import days_ago -dag = DAG( +with DAG( dag_id='latest_only', schedule_interval=dt.timedelta(hours=4), start_date=days_ago(2), tags=['example2', 'example3'], -) +) as dag: -latest_only = LatestOnlyOperator(task_id='latest_only', dag=dag) -task1 = DummyOperator(task_id='task1', dag=dag) + latest_only = LatestOnlyOperator(task_id='latest_only') + task1 = DummyOperator(task_id='task1') -latest_only >> task1 + latest_only >> task1 diff --git a/airflow/example_dags/example_latest_only_with_trigger.py b/airflow/example_dags/example_latest_only_with_trigger.py index 9178278..a8e96e7 100644 --- a/airflow/example_dags/example_latest_only_with_trigger.py +++ b/airflow/example_dags/example_latest_only_with_trigger.py @@ -28,19 +28,19 @@ from airflow.operators.latest_only import LatestOnlyOperator from airflow.utils.dates import days_ago from airflow.utils.trigger_rule import TriggerRule -dag = DAG( +with DAG( dag_id='latest_only_with_trigger', schedule_interval=dt.timedelta(hours=4), start_date=days_ago(2), tags=['example3'], -) +) as dag: -latest_only = LatestOnlyOperator(task_id='latest_only', dag=dag) -task1 = DummyOperator(task_id='task1', dag=dag) -task2 = DummyOperator(task_id='task2', dag=dag) -task3 = DummyOperator(task_id='task3', dag=dag) -task4 = DummyOperator(task_id='task4', dag=dag, trigger_rule=TriggerRule.ALL_DONE) + latest_only = LatestOnlyOperator(task_id='latest_only') + task1 = DummyOperator(task_id='task1') + task2 = DummyOperator(task_id='task2') + task3 = DummyOperator(task_id='task3') + task4 = DummyOperator(task_id='task4', trigger_rule=TriggerRule.ALL_DONE) -latest_only >> task1 >> [task3, task4] -task2 >> [task3, task4] + latest_only >> task1 >> [task3, task4] + task2 >> [task3, task4] # [END example] diff --git a/airflow/example_dags/example_passing_params_via_test_command.py b/airflow/example_dags/example_passing_params_via_test_command.py index 8eaadd7..2c930cc 100644 --- a/airflow/example_dags/example_passing_params_via_test_command.py +++ b/airflow/example_dags/example_passing_params_via_test_command.py @@ -20,23 +20,13 @@ import os from datetime import timedelta +from textwrap import dedent from airflow import DAG from airflow.operators.bash import BashOperator from airflow.operators.python import PythonOperator from airflow.utils.dates import days_ago -dag = DAG( - "example_passing_params_via_test_command", - default_args={ - "owner": "airflow", - }, - schedule_interval='*/1 * * * *', - start_date=days_ago(1), - dagrun_timeout=timedelta(minutes=4), - tags=['example'], -) - def my_py_command(test_mode, params): """ @@ -56,26 +46,6 @@ def my_py_command(test_mode, params): return 1 -my_templated_command = """ - echo " 'foo was passed in via Airflow CLI Test command with value {{ params.foo }} " - echo " 'miff was passed in via BashOperator with value {{ params.miff }} " -""" - -run_this = PythonOperator( - task_id='run_this', - python_callable=my_py_command, - params={"miff": "agg"}, - dag=dag, -) - -also_run_this = BashOperator( - task_id='also_run_this', - bash_command=my_templated_command, - params={"miff": "agg"}, - dag=dag, -) - - def print_env_vars(test_mode): """ Print out the "foo" param passed in via @@ -87,6 +57,36 @@ def print_env_vars(test_mode): print("AIRFLOW_TEST_MODE={}".format(os.environ.get('AIRFLOW_TEST_MODE'))) -env_var_test_task = PythonOperator(task_id='env_var_test_task', python_callable=print_env_vars, dag=dag) +with DAG( + "example_passing_params_via_test_command", + default_args={ + "owner": "airflow", + }, + schedule_interval='*/1 * * * *', + start_date=days_ago(1), + dagrun_timeout=timedelta(minutes=4), + tags=['example'], +) as dag: + + my_templated_command = dedent( + """ + echo " 'foo was passed in via Airflow CLI Test command with value {{ params.foo }} " + echo " 'miff was passed in via BashOperator with value {{ params.miff }} " + """ + ) + + run_this = PythonOperator( + task_id='run_this', + python_callable=my_py_command, + params={"miff": "agg"}, + ) + + also_run_this = BashOperator( + task_id='also_run_this', + bash_command=my_templated_command, + params={"miff": "agg"}, + ) + + env_var_test_task = PythonOperator(task_id='env_var_test_task', python_callable=print_env_vars) -run_this >> also_run_this + run_this >> also_run_this diff --git a/airflow/example_dags/example_python_operator.py b/airflow/example_dags/example_python_operator.py index d5e16a5..a9db342 100644 --- a/airflow/example_dags/example_python_operator.py +++ b/airflow/example_dags/example_python_operator.py @@ -28,77 +28,68 @@ args = { 'owner': 'airflow', } -dag = DAG( +with DAG( dag_id='example_python_operator', default_args=args, schedule_interval=None, start_date=days_ago(2), tags=['example'], -) - - -# [START howto_operator_python] -def print_context(ds, **kwargs): - """Print the Airflow context and ds variable from the context.""" - pprint(kwargs) - print(ds) - return 'Whatever you return gets printed in the logs' - - -run_this = PythonOperator( - task_id='print_the_context', - python_callable=print_context, - dag=dag, -) -# [END howto_operator_python] - - -# [START howto_operator_python_kwargs] -def my_sleeping_function(random_base): - """This is a function that will run within the DAG execution""" - time.sleep(random_base) - - -# Generate 5 sleeping tasks, sleeping from 0.0 to 0.4 seconds respectively -for i in range(5): - task = PythonOperator( - task_id='sleep_for_' + str(i), - python_callable=my_sleeping_function, - op_kwargs={'random_base': float(i) / 10}, - dag=dag, +) as dag: + + # [START howto_operator_python] + def print_context(ds, **kwargs): + """Print the Airflow context and ds variable from the context.""" + pprint(kwargs) + print(ds) + return 'Whatever you return gets printed in the logs' + + run_this = PythonOperator( + task_id='print_the_context', + python_callable=print_context, ) - - run_this >> task -# [END howto_operator_python_kwargs] - - -# [START howto_operator_python_venv] -def callable_virtualenv(): - """ - Example function that will be performed in a virtual environment. - - Importing at the module level ensures that it will not attempt to import the - library before it is installed. - """ - from time import sleep - - from colorama import Back, Fore, Style - - print(Fore.RED + 'some red text') - print(Back.GREEN + 'and with a green background') - print(Style.DIM + 'and in dim text') - print(Style.RESET_ALL) - for _ in range(10): - print(Style.DIM + 'Please wait...', flush=True) - sleep(10) - print('Finished') - - -virtualenv_task = PythonVirtualenvOperator( - task_id="virtualenv_python", - python_callable=callable_virtualenv, - requirements=["colorama==0.4.0"], - system_site_packages=False, - dag=dag, -) -# [END howto_operator_python_venv] + # [END howto_operator_python] + + # [START howto_operator_python_kwargs] + def my_sleeping_function(random_base): + """This is a function that will run within the DAG execution""" + time.sleep(random_base) + + # Generate 5 sleeping tasks, sleeping from 0.0 to 0.4 seconds respectively + for i in range(5): + task = PythonOperator( + task_id='sleep_for_' + str(i), + python_callable=my_sleeping_function, + op_kwargs={'random_base': float(i) / 10}, + ) + + run_this >> task + # [END howto_operator_python_kwargs] + + # [START howto_operator_python_venv] + def callable_virtualenv(): + """ + Example function that will be performed in a virtual environment. + + Importing at the module level ensures that it will not attempt to import the + library before it is installed. + """ + from time import sleep + + from colorama import Back, Fore, Style + + print(Fore.RED + 'some red text') + print(Back.GREEN + 'and with a green background') + print(Style.DIM + 'and in dim text') + print(Style.RESET_ALL) + for _ in range(10): + print(Style.DIM + 'Please wait...', flush=True) + sleep(10) + print('Finished') + + virtualenv_task = PythonVirtualenvOperator( + task_id="virtualenv_python", + python_callable=callable_virtualenv, + requirements=["colorama==0.4.0"], + system_site_packages=False, + ) + # [END howto_operator_python_venv] diff --git a/airflow/example_dags/example_short_circuit_operator.py b/airflow/example_dags/example_short_circuit_operator.py index 38163a0..3836ef9 100644 --- a/airflow/example_dags/example_short_circuit_operator.py +++ b/airflow/example_dags/example_short_circuit_operator.py @@ -27,27 +27,25 @@ args = { 'owner': 'airflow', } -dag = DAG( +with DAG( dag_id='example_short_circuit_operator', default_args=args, start_date=dates.days_ago(2), tags=['example'], -) +) as dag: -cond_true = ShortCircuitOperator( - task_id='condition_is_True', - python_callable=lambda: True, - dag=dag, -) + cond_true = ShortCircuitOperator( + task_id='condition_is_True', + python_callable=lambda: True, + ) -cond_false = ShortCircuitOperator( - task_id='condition_is_False', - python_callable=lambda: False, - dag=dag, -) + cond_false = ShortCircuitOperator( + task_id='condition_is_False', + python_callable=lambda: False, + ) -ds_true = [DummyOperator(task_id='true_' + str(i), dag=dag) for i in [1, 2]] -ds_false = [DummyOperator(task_id='false_' + str(i), dag=dag) for i in [1, 2]] + ds_true = [DummyOperator(task_id='true_' + str(i)) for i in [1, 2]] + ds_false = [DummyOperator(task_id='false_' + str(i)) for i in [1, 2]] -chain(cond_true, *ds_true) -chain(cond_false, *ds_false) + chain(cond_true, *ds_true) + chain(cond_false, *ds_false) diff --git a/airflow/example_dags/example_skip_dag.py b/airflow/example_dags/example_skip_dag.py index 633dc5e..77fbf4a 100644 --- a/airflow/example_dags/example_skip_dag.py +++ b/airflow/example_dags/example_skip_dag.py @@ -56,6 +56,6 @@ def create_test_pipeline(suffix, trigger_rule, dag_): join >> final -dag = DAG(dag_id='example_skip_dag', default_args=args, start_date=days_ago(2), tags=['example']) -create_test_pipeline('1', 'all_success', dag) -create_test_pipeline('2', 'one_success', dag) +with DAG(dag_id='example_skip_dag', default_args=args, start_date=days_ago(2), tags=['example']) as dag: + create_test_pipeline('1', 'all_success', dag) + create_test_pipeline('2', 'one_success', dag) diff --git a/airflow/example_dags/example_subdag_operator.py b/airflow/example_dags/example_subdag_operator.py index be88281..f27aec7 100644 --- a/airflow/example_dags/example_subdag_operator.py +++ b/airflow/example_dags/example_subdag_operator.py @@ -31,36 +31,31 @@ args = { 'owner': 'airflow', } -dag = DAG( +with DAG( dag_id=DAG_NAME, default_args=args, start_date=days_ago(2), schedule_interval="@once", tags=['example'] -) +) as dag: -start = DummyOperator( - task_id='start', - dag=dag, -) + start = DummyOperator( + task_id='start', + ) -section_1 = SubDagOperator( - task_id='section-1', - subdag=subdag(DAG_NAME, 'section-1', args), - dag=dag, -) + section_1 = SubDagOperator( + task_id='section-1', + subdag=subdag(DAG_NAME, 'section-1', args), + ) -some_other_task = DummyOperator( - task_id='some-other-task', - dag=dag, -) + some_other_task = DummyOperator( + task_id='some-other-task', + ) -section_2 = SubDagOperator( - task_id='section-2', - subdag=subdag(DAG_NAME, 'section-2', args), - dag=dag, -) + section_2 = SubDagOperator( + task_id='section-2', + subdag=subdag(DAG_NAME, 'section-2', args), + ) -end = DummyOperator( - task_id='end', - dag=dag, -) + end = DummyOperator( + task_id='end', + ) -start >> section_1 >> some_other_task >> section_2 >> end + start >> section_1 >> some_other_task >> section_2 >> end # [END example_subdag_operator] diff --git a/airflow/example_dags/example_trigger_controller_dag.py b/airflow/example_dags/example_trigger_controller_dag.py index 0f706c7..9d02399 100644 --- a/airflow/example_dags/example_trigger_controller_dag.py +++ b/airflow/example_dags/example_trigger_controller_dag.py @@ -25,17 +25,16 @@ from airflow import DAG from airflow.operators.trigger_dagrun import TriggerDagRunOperator from airflow.utils.dates import days_ago -dag = DAG( +with DAG( dag_id="example_trigger_controller_dag", default_args={"owner": "airflow"}, start_date=days_ago(2), schedule_interval="@once", tags=['example'], -) +) as dag: -trigger = TriggerDagRunOperator( - task_id="test_trigger_dagrun", - trigger_dag_id="example_trigger_target_dag", # Ensure this equals the dag_id of the DAG to trigger - conf={"message": "Hello World"}, - dag=dag, -) + trigger = TriggerDagRunOperator( + task_id="test_trigger_dagrun", + trigger_dag_id="example_trigger_target_dag", # Ensure this equals the dag_id of the DAG to trigger + conf={"message": "Hello World"}, + ) diff --git a/airflow/example_dags/example_trigger_target_dag.py b/airflow/example_dags/example_trigger_target_dag.py index 0355275..39ecefc 100644 --- a/airflow/example_dags/example_trigger_target_dag.py +++ b/airflow/example_dags/example_trigger_target_dag.py @@ -27,14 +27,6 @@ from airflow.operators.bash import BashOperator from airflow.operators.python import PythonOperator from airflow.utils.dates import days_ago -dag = DAG( - dag_id="example_trigger_target_dag", - default_args={"owner": "airflow"}, - start_date=days_ago(2), - schedule_interval=None, - tags=['example'], -) - def run_this_func(**context): """ @@ -46,11 +38,18 @@ def run_this_func(**context): print("Remotely received value of {} for key=message".format(context["dag_run"].conf["message"])) -run_this = PythonOperator(task_id="run_this", python_callable=run_this_func, dag=dag) +with DAG( + dag_id="example_trigger_target_dag", + default_args={"owner": "airflow"}, + start_date=days_ago(2), + schedule_interval=None, + tags=['example'], +) as dag: + + run_this = PythonOperator(task_id="run_this", python_callable=run_this_func) -bash_task = BashOperator( - task_id="bash_task", - bash_command='echo "Here is the message: $message"', - env={'message': '{{ dag_run.conf["message"] if dag_run else "" }}'}, - dag=dag, -) + bash_task = BashOperator( + task_id="bash_task", + bash_command='echo "Here is the message: $message"', + env={'message': '{{ dag_run.conf["message"] if dag_run else "" }}'}, + ) diff --git a/airflow/example_dags/example_xcom.py b/airflow/example_dags/example_xcom.py index 779e392..03f85d9 100644 --- a/airflow/example_dags/example_xcom.py +++ b/airflow/example_dags/example_xcom.py @@ -21,14 +21,6 @@ from airflow import DAG from airflow.operators.python import PythonOperator from airflow.utils.dates import days_ago -dag = DAG( - 'example_xcom', - schedule_interval="@once", - start_date=days_ago(2), - default_args={'owner': 'airflow'}, - tags=['example'], -) - value_1 = [1, 2, 3] value_2 = {'a': 'b'} @@ -65,22 +57,27 @@ def puller(**kwargs): raise ValueError(f'The two values differ {pulled_value_2} and {value_2}') -push1 = PythonOperator( - task_id='push', - dag=dag, - python_callable=push, -) +with DAG( + 'example_xcom', + schedule_interval="@once", + start_date=days_ago(2), + default_args={'owner': 'airflow'}, + tags=['example'], +) as dag: + + push1 = PythonOperator( + task_id='push', + python_callable=push, + ) -push2 = PythonOperator( - task_id='push_by_returning', - dag=dag, - python_callable=push_by_returning, -) + push2 = PythonOperator( + task_id='push_by_returning', + python_callable=push_by_returning, + ) -pull = PythonOperator( - task_id='puller', - dag=dag, - python_callable=puller, -) + pull = PythonOperator( + task_id='puller', + python_callable=puller, + ) -pull << [push1, push2] + pull << [push1, push2] diff --git a/airflow/example_dags/test_utils.py b/airflow/example_dags/test_utils.py index a1a2ed0..0211dfb 100644 --- a/airflow/example_dags/test_utils.py +++ b/airflow/example_dags/test_utils.py @@ -20,12 +20,11 @@ from airflow import DAG from airflow.operators.bash import BashOperator from airflow.utils.dates import days_ago -dag = DAG(dag_id='test_utils', schedule_interval=None, tags=['example']) +with DAG(dag_id='test_utils', schedule_interval=None, tags=['example']) as dag: -task = BashOperator( - task_id='sleeps_forever', - dag=dag, - bash_command="sleep 10000000000", - start_date=days_ago(2), - owner='airflow', -) + task = BashOperator( + task_id='sleeps_forever', + bash_command="sleep 10000000000", + start_date=days_ago(2), + owner='airflow', + ) diff --git a/airflow/example_dags/tutorial.py b/airflow/example_dags/tutorial.py index a00051c..518c801 100644 --- a/airflow/example_dags/tutorial.py +++ b/airflow/example_dags/tutorial.py @@ -24,6 +24,7 @@ Documentation that goes along with the Airflow tutorial located # [START tutorial] # [START import_module] from datetime import timedelta +from textwrap import dedent # The DAG object; we'll need this to instantiate a DAG from airflow import DAG @@ -62,62 +63,63 @@ default_args = { # [END default_args] # [START instantiate_dag] -dag = DAG( +with DAG( 'tutorial', default_args=default_args, description='A simple tutorial DAG', schedule_interval=timedelta(days=1), start_date=days_ago(2), tags=['example'], -) -# [END instantiate_dag] +) as dag: + # [END instantiate_dag] -# t1, t2 and t3 are examples of tasks created by instantiating operators -# [START basic_task] -t1 = BashOperator( - task_id='print_date', - bash_command='date', - dag=dag, -) + # t1, t2 and t3 are examples of tasks created by instantiating operators + # [START basic_task] + t1 = BashOperator( + task_id='print_date', + bash_command='date', + ) -t2 = BashOperator( - task_id='sleep', - depends_on_past=False, - bash_command='sleep 5', - retries=3, - dag=dag, -) -# [END basic_task] + t2 = BashOperator( + task_id='sleep', + depends_on_past=False, + bash_command='sleep 5', + retries=3, + ) + # [END basic_task] -# [START documentation] -dag.doc_md = __doc__ + # [START documentation] + dag.doc_md = __doc__ -t1.doc_md = """\ -#### Task Documentation -You can document your task using the attributes `doc_md` (markdown), -`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets -rendered in the UI's Task Instance Details page. - -""" -# [END documentation] + t1.doc_md = dedent( + """\ + #### Task Documentation + You can document your task using the attributes `doc_md` (markdown), + `doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets + rendered in the UI's Task Instance Details page. +  + """ + ) + # [END documentation] -# [START jinja_template] -templated_command = """ -{% for i in range(5) %} - echo "{{ ds }}" - echo "{{ macros.ds_add(ds, 7)}}" - echo "{{ params.my_param }}" -{% endfor %} -""" + # [START jinja_template] + templated_command = dedent( + """ + {% for i in range(5) %} + echo "{{ ds }}" + echo "{{ macros.ds_add(ds, 7)}}" + echo "{{ params.my_param }}" + {% endfor %} + """ + ) -t3 = BashOperator( - task_id='templated', - depends_on_past=False, - bash_command=templated_command, - params={'my_param': 'Parameter I passed in'}, - dag=dag, -) -# [END jinja_template] + t3 = BashOperator( + task_id='templated', + depends_on_past=False, + bash_command=templated_command, + params={'my_param': 'Parameter I passed in'}, + ) + # [END jinja_template] -t1 >> [t2, t3] + t1 >> [t2, t3] # [END tutorial] diff --git a/airflow/example_dags/tutorial_etl_dag.py b/airflow/example_dags/tutorial_etl_dag.py index 48b519b..8b45600 100644 --- a/airflow/example_dags/tutorial_etl_dag.py +++ b/airflow/example_dags/tutorial_etl_dag.py @@ -27,6 +27,7 @@ as part of the documentation that goes along with the Airflow Functional DAG tut # [START tutorial] # [START import_module] import json +from textwrap import dedent # The DAG object; we'll need this to instantiate a DAG from airflow import DAG @@ -98,33 +99,39 @@ with DAG( task_id='extract', python_callable=extract, ) - extract_task.doc_md = """\ -#### Extract task -A simple Extract task to get data ready for the rest of the data pipeline. -In this case, getting data is simulated by reading from a hardcoded JSON string. -This data is then put into xcom, so that it can be processed by the next task. -""" + extract_task.doc_md = dedent( + """\ + #### Extract task + A simple Extract task to get data ready for the rest of the data pipeline. + In this case, getting data is simulated by reading from a hardcoded JSON string. + This data is then put into xcom, so that it can be processed by the next task. + """ + ) transform_task = PythonOperator( task_id='transform', python_callable=transform, ) - transform_task.doc_md = """\ -#### Transform task -A simple Transform task which takes in the collection of order data from xcom -and computes the total order value. -This computed value is then put into xcom, so that it can be processed by the next task. -""" + transform_task.doc_md = dedent( + """\ + #### Transform task + A simple Transform task which takes in the collection of order data from xcom + and computes the total order value. + This computed value is then put into xcom, so that it can be processed by the next task. + """ + ) load_task = PythonOperator( task_id='load', python_callable=load, ) - load_task.doc_md = """\ -#### Load task -A simple Load task which takes in the result of the Transform task, by reading it -from xcom and instead of saving it to end user review, just prints it out. -""" + load_task.doc_md = dedent( + """\ + #### Load task + A simple Load task which takes in the result of the Transform task, by reading it + from xcom and instead of saving it to end user review, just prints it out. + """ + ) extract_task >> transform_task >> load_task diff --git a/docs/apache-airflow/executor/kubernetes.rst b/docs/apache-airflow/executor/kubernetes.rst index 9b774cf..a0df9db 100644 --- a/docs/apache-airflow/executor/kubernetes.rst +++ b/docs/apache-airflow/executor/kubernetes.rst @@ -120,6 +120,7 @@ name ``base`` and a second container containing your desired sidecar. .. exampleinclude:: /../../airflow/example_dags/example_kubernetes_executor_config.py :language: python + :dedent: 8 :start-after: [START task_with_sidecar] :end-before: [END task_with_sidecar] @@ -130,6 +131,7 @@ Here is an example of a task with both features: .. exampleinclude:: /../../airflow/example_dags/example_kubernetes_executor_config.py :language: python + :dedent: 8 :start-after: [START task_with_template] :end-before: [END task_with_template] diff --git a/docs/apache-airflow/howto/operator/bash.rst b/docs/apache-airflow/howto/operator/bash.rst index c8a923f..3d2195f 100644 --- a/docs/apache-airflow/howto/operator/bash.rst +++ b/docs/apache-airflow/howto/operator/bash.rst @@ -27,6 +27,7 @@ commands in a `Bash <https://www.gnu.org/software/bash/>`__ shell. .. exampleinclude:: /../../airflow/example_dags/example_bash_operator.py :language: python + :dedent: 4 :start-after: [START howto_operator_bash] :end-before: [END howto_operator_bash] @@ -38,6 +39,7 @@ You can use :ref:`Jinja templates <jinja-templating>` to parameterize the .. exampleinclude:: /../../airflow/example_dags/example_bash_operator.py :language: python + :dedent: 4 :start-after: [START howto_operator_bash_template] :end-before: [END howto_operator_bash_template] diff --git a/docs/apache-airflow/howto/operator/external_task_sensor.rst b/docs/apache-airflow/howto/operator/external_task_sensor.rst index eec8074..420bd13 100644 --- a/docs/apache-airflow/howto/operator/external_task_sensor.rst +++ b/docs/apache-airflow/howto/operator/external_task_sensor.rst @@ -46,6 +46,7 @@ via ``allowed_states`` and ``failed_states`` parameters. .. exampleinclude:: /../../airflow/example_dags/example_external_task_marker_dag.py :language: python + :dedent: 4 :start-after: [START howto_operator_external_task_sensor] :end-before: [END howto_operator_external_task_sensor] @@ -60,5 +61,6 @@ user clears ``parent_task``. .. exampleinclude:: /../../airflow/example_dags/example_external_task_marker_dag.py :language: python + :dedent: 4 :start-after: [START howto_operator_external_task_marker] :end-before: [END howto_operator_external_task_marker] diff --git a/docs/apache-airflow/howto/operator/python.rst b/docs/apache-airflow/howto/operator/python.rst index 7f4d2b8..4a59df6 100644 --- a/docs/apache-airflow/howto/operator/python.rst +++ b/docs/apache-airflow/howto/operator/python.rst @@ -27,6 +27,7 @@ Python callables. .. exampleinclude:: /../../airflow/example_dags/example_python_operator.py :language: python + :dedent: 4 :start-after: [START howto_operator_python] :end-before: [END howto_operator_python] @@ -38,6 +39,7 @@ to the Python callable. .. exampleinclude:: /../../airflow/example_dags/example_python_operator.py :language: python + :dedent: 4 :start-after: [START howto_operator_python_kwargs] :end-before: [END howto_operator_python_kwargs] @@ -63,6 +65,7 @@ Python callables inside a new Python virtual environment. .. exampleinclude:: /../../airflow/example_dags/example_python_operator.py :language: python + :dedent: 4 :start-after: [START howto_operator_python_venv] :end-before: [END howto_operator_python_venv] diff --git a/docs/apache-airflow/tutorial.rst b/docs/apache-airflow/tutorial.rst index 9324014..3a6b7ce9 100644 --- a/docs/apache-airflow/tutorial.rst +++ b/docs/apache-airflow/tutorial.rst @@ -109,6 +109,7 @@ instantiated from an operator is called a task. The first argument .. exampleinclude:: /../../airflow/example_dags/tutorial.py :language: python + :dedent: 4 :start-after: [START basic_task] :end-before: [END basic_task] @@ -144,6 +145,7 @@ stamp"). .. exampleinclude:: /../../airflow/example_dags/tutorial.py :language: python + :dedent: 4 :start-after: [START jinja_template] :end-before: [END jinja_template] @@ -186,6 +188,7 @@ json, yaml. .. exampleinclude:: /../../airflow/example_dags/tutorial.py :language: python + :dedent: 4 :start-after: [START documentation] :end-before: [END documentation] diff --git a/docs/apache-airflow/tutorial_taskflow_api.rst b/docs/apache-airflow/tutorial_taskflow_api.rst index cea1438..b089e03 100644 --- a/docs/apache-airflow/tutorial_taskflow_api.rst +++ b/docs/apache-airflow/tutorial_taskflow_api.rst @@ -69,6 +69,7 @@ as shown below. The function name acts as a unique identifier for the task. .. exampleinclude:: /../../airflow/example_dags/tutorial_taskflow_api_etl.py :language: python + :dedent: 4 :start-after: [START extract] :end-before: [END extract] @@ -83,6 +84,7 @@ we can move to the main part of the DAG. .. exampleinclude:: /../../airflow/example_dags/tutorial_taskflow_api_etl.py :language: python + :dedent: 4 :start-after: [START main_flow] :end-before: [END main_flow] @@ -119,6 +121,7 @@ in the middle of the data pipeline. In Airflow 1.x, this task is defined as show .. exampleinclude:: /../../airflow/example_dags/tutorial_etl_dag.py :language: python + :dedent: 4 :start-after: [START transform_function] :end-before: [END transform_function] @@ -130,6 +133,7 @@ Contrasting that with Taskflow API in Airflow 2.0 as shown below. .. exampleinclude:: /../../airflow/example_dags/tutorial_taskflow_api_etl.py :language: python + :dedent: 4 :start-after: [START transform] :end-before: [END transform] @@ -143,6 +147,7 @@ dependencies specified as shown below. .. exampleinclude:: /../../airflow/example_dags/tutorial_etl_dag.py :language: python + :dedent: 4 :start-after: [START main_flow] :end-before: [END main_flow] @@ -151,6 +156,7 @@ the dependencies as shown below. .. exampleinclude:: /../../airflow/example_dags/tutorial_taskflow_api_etl.py :language: python + :dedent: 4 :start-after: [START main_flow] :end-before: [END main_flow]
