This is an automated email from the ASF dual-hosted git repository. jedcunningham pushed a commit to branch v2-4-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 8a7ee215229316abf48eac5cee3b05b556ad3fed Author: Bowrna <[email protected]> AuthorDate: Fri Sep 16 16:29:21 2022 +0530 changing to task decorator in docs from classic operator use (#25711) Co-authored-by: Tzu-ping Chung <[email protected]> (cherry picked from commit 3d7673346f005732e62ef64db2c9d0089852d67f) --- docs/apache-airflow/concepts/dags.rst | 32 ++++++++++++++++++------------ docs/apache-airflow/concepts/operators.rst | 12 +++++++++-- 2 files changed, 29 insertions(+), 15 deletions(-) diff --git a/docs/apache-airflow/concepts/dags.rst b/docs/apache-airflow/concepts/dags.rst index 4e52c1532a..541a2adad5 100644 --- a/docs/apache-airflow/concepts/dags.rst +++ b/docs/apache-airflow/concepts/dags.rst @@ -269,11 +269,11 @@ By default, a DAG will only run a Task when all the Tasks it depends on are succ Branching ~~~~~~~~~ -You can make use of branching in order to tell the DAG *not* to run all dependent tasks, but instead to pick and choose one or more paths to go down. This is where the branching Operators come in. +You can make use of branching in order to tell the DAG *not* to run all dependent tasks, but instead to pick and choose one or more paths to go down. This is where the ``@task.branch`` decorator come in. -The ``BranchPythonOperator`` is much like the PythonOperator except that it expects a ``python_callable`` that returns a task_id (or list of task_ids). The task_id returned is followed, and all of the other paths are skipped. It can also return None to skip all downstream task. +The ``@task.branch`` decorator is much like ``@task``, except that it expects the decorated function to return an ID to a task (or a list of IDs). The specified task is followed, while all other paths are skipped. It can also return *None* to skip all downstream tasks. -The task_id returned by the Python function has to reference a task directly downstream from the BranchPythonOperator task. +The task_id returned by the Python function has to reference a task directly downstream from the ``@task.branch`` decorated task. .. note:: When a Task is downstream of both the branching operator *and* downstream of one or more of the selected tasks, it will not be skipped: @@ -282,10 +282,11 @@ The task_id returned by the Python function has to reference a task directly dow The paths of the branching task are ``branch_a``, ``join`` and ``branch_b``. Since ``join`` is a downstream task of ``branch_a``, it will still be run, even though it was not returned as part of the branch decision. -The ``BranchPythonOperator`` can also be used with XComs allowing branching context to dynamically decide what branch to follow based on upstream tasks. For example: +The ``@task.branch`` can also be used with XComs allowing branching context to dynamically decide what branch to follow based on upstream tasks. For example: .. code-block:: python + @task.branch(task_id="branch_task") def branch_func(ti): xcom_value = int(ti.xcom_pull(task_ids="start_task")) if xcom_value >= 5: @@ -303,20 +304,19 @@ The ``BranchPythonOperator`` can also be used with XComs allowing branching cont dag=dag, ) - branch_op = BranchPythonOperator( - task_id="branch_task", - python_callable=branch_func, - dag=dag, - ) + branch_op = branch_func() continue_op = EmptyOperator(task_id="continue_task", dag=dag) stop_op = EmptyOperator(task_id="stop_task", dag=dag) start_op >> branch_op >> [continue_op, stop_op] -If you wish to implement your own operators with branching functionality, you can inherit from :class:`~airflow.operators.branch.BaseBranchOperator`, which behaves similarly to ``BranchPythonOperator`` but expects you to provide an implementation of the method ``choose_branch``. +If you wish to implement your own operators with branching functionality, you can inherit from :class:`~airflow.operators.branch.BaseBranchOperator`, which behaves similarly to ``@task.branch`` decorator but expects you to provide an implementation of the method ``choose_branch``. + +.. note:: + The ``@task.branch`` decorator is recommended over directly instantiating :class:`~airflow.operators.python.BranchPythonOperator` in a DAG. The latter should generally only be subclassed to implement a custom operator. -As with the callable for ``BranchPythonOperator``, this method can return the ID of a downstream task, or a list of task IDs, which will be run, and all others will be skipped. It can also return None to skip all downstream task:: +As with the callable for ``@task.branch``, this method can return the ID of a downstream task, or a list of task IDs, which will be run, and all others will be skipped. It can also return None to skip all downstream task:: class MyBranchOperator(BaseBranchOperator): def choose_branch(self, context): @@ -404,7 +404,6 @@ You can also combine this with the :ref:`concepts:depends-on-past` functionality from airflow.models import DAG from airflow.operators.empty import EmptyOperator - from airflow.operators.python import BranchPythonOperator dag = DAG( dag_id="branch_without_trigger", @@ -413,7 +412,14 @@ You can also combine this with the :ref:`concepts:depends-on-past` functionality ) run_this_first = EmptyOperator(task_id="run_this_first", dag=dag) - branching = BranchPythonOperator(task_id="branching", dag=dag, python_callable=lambda: "branch_a") + + + @task.branch(task_id="branching") + def do_branching(): + return "branch_a" + + + branching = do_branching() branch_a = EmptyOperator(task_id="branch_a", dag=dag) follow_branch_a = EmptyOperator(task_id="follow_branch_a", dag=dag) diff --git a/docs/apache-airflow/concepts/operators.rst b/docs/apache-airflow/concepts/operators.rst index 944ff64a52..cb278402de 100644 --- a/docs/apache-airflow/concepts/operators.rst +++ b/docs/apache-airflow/concepts/operators.rst @@ -31,6 +31,11 @@ Airflow has a very extensive set of operators available, with some built-in to t - :class:`~airflow.operators.bash.BashOperator` - executes a bash command - :class:`~airflow.operators.python.PythonOperator` - calls an arbitrary Python function - :class:`~airflow.operators.email.EmailOperator` - sends an email +- Use the ``@task`` decorator to execute an arbitrary Python function. It doesn't support rendering jinja templates passed as arguments. + +.. note:: + The ``@task`` decorator is recommended over the classic :class:`~airflow.operators.python.PythonOperator` + to execute Python callables with no template rendering in its arguments. For a list of all core operators, see: :doc:`Core Operators and Hooks Reference </operators-and-hooks-ref>`. @@ -103,6 +108,7 @@ You can also use Jinja templating with nested fields, as long as these nested fi dag=dag, ) + .. note:: The ``template_fields`` property can equally be a class variable or an instance variable. Deep nested fields can also be substituted, as long as all intermediate fields are marked as template fields: @@ -134,6 +140,7 @@ Deep nested fields can also be substituted, as long as all intermediate fields a dag=dag, ) + You can pass custom options to the Jinja ``Environment`` when creating your DAG. One common usage is to avoid Jinja from dropping a trailing newline from a template string: .. code-block:: python @@ -168,7 +175,6 @@ Now, when the following task is run, ``order_data`` argument is passed a string, python_callable=transform, ) - If you instead want the rendered template field to return a Native Python object (``dict`` in our example), you can pass ``render_template_as_native_obj=True`` to the DAG as follows: @@ -183,11 +189,13 @@ you can pass ``render_template_as_native_obj=True`` to the DAG as follows: ) + @task(task_id="extract") def extract(): data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}' return json.loads(data_string) + @task(task_id="transform") def transform(order_data): print(type(order_data)) for value in order_data.values(): @@ -195,7 +203,7 @@ you can pass ``render_template_as_native_obj=True`` to the DAG as follows: return {"total_order_value": total_order_value} - extract_task = PythonOperator(task_id="extract", python_callable=extract) + extract_task = extract() transform_task = PythonOperator( task_id="transform",
