This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 9534c0690a9 Update Object Storage tutorial to match learning path tone 
(#49170)
9534c0690a9 is described below

commit 9534c0690a9aecd84f2f2b278e4e7e6152c91155
Author: Constance Martineau <[email protected]>
AuthorDate: Mon Apr 14 11:28:38 2025 -0400

    Update Object Storage tutorial to match learning path tone (#49170)
---
 .../docs/authoring-and-scheduling/assets.rst       |   2 +-
 .../docs/authoring-and-scheduling/datasets.rst     |   4 +-
 .../docs/authoring-and-scheduling/index.rst        |   7 +-
 airflow-core/docs/best-practices.rst               |   8 +-
 airflow-core/docs/core-concepts/taskflow.rst       |   2 +-
 airflow-core/docs/tutorial/fundamentals.rst        |   4 +-
 airflow-core/docs/tutorial/objectstorage.rst       | 160 +++--
 airflow-core/docs/tutorial/taskflow.rst            | 770 ++++++---------------
 docs/spelling_wordlist.txt                         |   1 +
 9 files changed, 311 insertions(+), 647 deletions(-)

diff --git a/airflow-core/docs/authoring-and-scheduling/assets.rst 
b/airflow-core/docs/authoring-and-scheduling/assets.rst
index a2cf3a5681b..f7f018c6d97 100644
--- a/airflow-core/docs/authoring-and-scheduling/assets.rst
+++ b/airflow-core/docs/authoring-and-scheduling/assets.rst
@@ -17,7 +17,7 @@
 
 .. _asset_definitions:
 
-Asset definitions
+Asset Definitions
 =================
 
 .. versionadded:: 2.4
diff --git a/airflow-core/docs/authoring-and-scheduling/datasets.rst 
b/airflow-core/docs/authoring-and-scheduling/datasets.rst
index 117481ad89a..1c8387600c4 100644
--- a/airflow-core/docs/authoring-and-scheduling/datasets.rst
+++ b/airflow-core/docs/authoring-and-scheduling/datasets.rst
@@ -15,8 +15,8 @@
     specific language governing permissions and limitations
     under the License.
 
-Data-aware scheduling
-=====================
+Asset-Aware Scheduling
+======================
 
 .. versionadded:: 2.4
 
diff --git a/airflow-core/docs/authoring-and-scheduling/index.rst 
b/airflow-core/docs/authoring-and-scheduling/index.rst
index 2143aca03b1..fab495298c4 100644
--- a/airflow-core/docs/authoring-and-scheduling/index.rst
+++ b/airflow-core/docs/authoring-and-scheduling/index.rst
@@ -21,6 +21,8 @@ Authoring and Scheduling
 Here you can find detailed documentation about advanced authoring and 
scheduling airflow dags.
 It's recommended that you first review the pages in :doc:`core concepts 
</core-concepts/index>`
 
+.. _authoring-section:
+
 **Authoring**
 
 .. toctree::
@@ -32,6 +34,7 @@ It's recommended that you first review the pages in 
:doc:`core concepts </core-c
     dynamic-task-mapping
     assets
 
+.. _scheduling-section:
 
 **Scheduling**
 
@@ -40,6 +43,6 @@ It's recommended that you first review the pages in 
:doc:`core concepts </core-c
 
     cron
     timezone
-    Data-ware scheduling with assets <datasets>
+    Asset-Aware Scheduling <datasets>
     timetable
-    Event-driven scheduling <event-scheduling>
+    Event-Driven Scheduling <event-scheduling>
diff --git a/airflow-core/docs/best-practices.rst 
b/airflow-core/docs/best-practices.rst
index 7813bd7f2cf..9f596a9d257 100644
--- a/airflow-core/docs/best-practices.rst
+++ b/airflow-core/docs/best-practices.rst
@@ -1015,7 +1015,7 @@ There are certain limitations and overhead introduced by 
this operator:
   same worker might be affected by previous tasks creating/modifying files etc.
 
 You can see detailed examples of using 
:class:`airflow.providers.standard.operators.python.PythonVirtualenvOperator` in
-:ref:`Taskflow Virtualenv example <taskflow/virtualenv_example>`
+:ref:`this section in the Taskflow API tutorial 
<taskflow-dynamically-created-virtualenv>`.
 
 
 Using ExternalPythonOperator
@@ -1083,7 +1083,7 @@ The nice thing about this is that you can switch the 
decorator back at any time
 developing it "dynamically" with ``PythonVirtualenvOperator``.
 
 You can see detailed examples of using 
:class:`airflow.providers.standard.operators.python.ExternalPythonOperator` in
-:ref:`Taskflow External Python example <taskflow/external_python_example>`
+:ref:`Taskflow External Python example <taskflow-external-python-environment>`
 
 Using DockerOperator or Kubernetes Pod Operator
 -----------------------------------------------
@@ -1147,9 +1147,9 @@ The drawbacks:
   containers etc. in order to author a DAG that uses those operators.
 
 You can see detailed examples of using 
:class:`airflow.operators.providers.Docker` in
-:ref:`Taskflow Docker example <taskflow/docker_example>`
+:ref:`Taskflow Docker example <taskflow-docker_environment>`
 and 
:class:`airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator`
-:ref:`Taskflow Kubernetes example <taskflow/kubernetes_example>`
+:ref:`Taskflow Kubernetes example <tasfklow-kpo>`
 
 Using multiple Docker Images and Celery Queues
 ----------------------------------------------
diff --git a/airflow-core/docs/core-concepts/taskflow.rst 
b/airflow-core/docs/core-concepts/taskflow.rst
index 54b9ffe0404..73150735bd4 100644
--- a/airflow-core/docs/core-concepts/taskflow.rst
+++ b/airflow-core/docs/core-concepts/taskflow.rst
@@ -199,7 +199,7 @@ Sensors and the TaskFlow API
 .. versionadded:: 2.5.0
 
 For an example of writing a Sensor using the TaskFlow API, see
-:ref:`Using the TaskFlow API with Sensor operators 
<taskflow/task_sensor_example>`.
+:ref:`Using the TaskFlow API with Sensor operators <taskflow-using-sensors>`.
 
 History
 -------
diff --git a/airflow-core/docs/tutorial/fundamentals.rst 
b/airflow-core/docs/tutorial/fundamentals.rst
index 7b4dbc5a984..e1fa0f3684b 100644
--- a/airflow-core/docs/tutorial/fundamentals.rst
+++ b/airflow-core/docs/tutorial/fundamentals.rst
@@ -18,8 +18,8 @@
 
 
 
-Airflow Fundamentals
-====================
+Airflow 101: Building Your First Workflow
+=========================================
 Welcome to world of Apache Airflow! In this tutorial, we'll guide you through 
the essential concepts of Airflow, helping
 you understand how to write your first DAG. Whether you're familiar with 
Python or just starting out, we'll make the
 journey enjoyable and straightforward.
diff --git a/airflow-core/docs/tutorial/objectstorage.rst 
b/airflow-core/docs/tutorial/objectstorage.rst
index 22f79ddd825..59c4142f7ce 100644
--- a/airflow-core/docs/tutorial/objectstorage.rst
+++ b/airflow-core/docs/tutorial/objectstorage.rst
@@ -18,117 +18,151 @@
 
 
 
-Object Storage
-==============
+Cloud-Native Workflows with Object Storage
+==========================================
 
-This tutorial shows how to use the Object Storage API to manage objects that
-reside on object storage, like S3, gcs and azure blob storage. The API is 
introduced
-as part of Airflow 2.8.
+.. versionadded:: 2.8
 
-The tutorial covers a simple pattern that is often used in data engineering 
and data
-science workflows: accessing a web api, saving and analyzing the result.
+Welcome to the final tutorial in our Airflow series! By now, you've built DAGs 
with Python and the Taskflow API, passed
+data with XComs, and chained tasks together into clear, reusable workflows.
+
+In this tutorial we'll take it a step further by introducing the **Object 
Storage API**. This API makes it easier to
+read from and write to cloud storage -- like Amazon S3, Google Cloud Storage 
(GCS), or Azure Blob Storage -- without
+having to worry about provider-specific SDKs or low-level credentials 
management.
+
+We'll walk you through a real-world use case:
+
+1. Pulling data from a public API
+2. Saving that data to object storage in Parquet format
+3. Analyzing it using SQL with DuckDB
+
+Along the way, we'll highlight the new ``ObjectStoragePath`` abstraction, 
explain how Airflow handles cloud credentials via
+connections, and show how this enables portable, cloud-agnostic pipelines.
+
+Why This Matters
+----------------
+
+Many data workflows depend on files -- whether it's raw CSVs, intermediate 
Parquet files, or model artifacts.
+Traditionally, you'd need to write S3-specific or GCS-specific code for this. 
Now, with ``ObjectStoragePath``, you can
+write generic code that works across providers, as long as you've configured 
the right Airflow connection.
+
+Let's get started!
 
 Prerequisites
 -------------
-To complete this tutorial, you need a few things:
 
-- DuckDB, an in-process analytical database,
-  which can be installed by running ``pip install duckdb``.
-- An S3 bucket, along with the Amazon provider including ``s3fs``. You can 
install
-  the provider package by running
-  ``pip install apache-airflow-providers-amazon[s3fs]``.
-  Alternatively, you can use a different storage provider by changing the URL 
in
-  the ``create_object_storage_path`` function to the appropriate URL for your
-  provider, for example by replacing ``s3://`` with ``gs://`` for Google Cloud
-  Storage, and installing a different provider.
-- ``pandas``, which you can install by running ``pip install pandas``.
+Before diving in, make sure you have the following:
 
+- **DuckDB**, an in-process SQL database: Install with ``pip install duckdb``
+- **Amazon S3 access** and **Amazon Provider with s3fs**: ``pip install 
apache-airflow-providers-amazon[s3fs]``
+  (You can substitute your preferred provider by changing the storage URL 
protocol and installing the relevant provider.)
+- **Pandas** for working with tabular data: ``pip install pandas``
 
 Creating an ObjectStoragePath
 -----------------------------
 
-The ObjectStoragePath is a path-like object that represents a path on object 
storage.
-It is the fundamental building block of the Object Storage API.
+At the heart of this tutorial is ``ObjectStoragePath``, a new abstraction for 
handling paths on cloud object stores.
+Think of it like ``pathlib.Path``, but for buckets instead of filesystems.
 
 .. exampleinclude:: /../src/airflow/example_dags/tutorial_objectstorage.py
     :language: python
     :start-after: [START create_object_storage_path]
     :end-before: [END create_object_storage_path]
 
-The username part of the URL given to ObjectStoragePath should be a connection 
ID.
-The specified connection will be used to obtain the right credentials to access
-the backend. If it is omitted, the default connection for the backend will be 
used.
+|
+
+The URL syntax is simple: ``protocol://bucket/path/to/file``
 
-The connection ID can alternatively be passed in with a keyword argument:
+- The ``protocol`` (like ``s3``, ``gs`` or ``azure``) determines the backend
+- The "username" part of the URL can be a ``conn_id``, telling Airflow how to 
authenticate
+- If the ``conn_id`` is omitted, Airflow will fall back to the default 
connection for that backend
+
+You can also provide the ``conn_id`` as keyword argument for clarity:
 
 .. code-block:: python
 
     ObjectStoragePath("s3://airflow-tutorial-data/", conn_id="aws_default")
 
-This is useful when reusing a URL defined for another purpose (e.g. Asset),
-which generally does not contain a username part. The explicit keyword argument
-takes precedence over the URL's username value if both are specified.
+This is especially handy when reusing a path defined elsewhere (like in an 
Asset), or when the connection isn't baked
+into the URL. The keyword argument always takes precedence.
 
-It is safe to instantiate an ObjectStoragePath at the root of your DAG. 
Connections
-will not be created until the path is used. This means that you can create the
-path in the global scope of your DAG and use it in multiple tasks.
+.. tip:: You can safely create an ``ObjectStoragePath`` in your global DAG 
scope. Connections are resolved only when the
+  path is used, not when it's created.
 
-Saving data to Object Storage
+Saving Data to Object Storage
 -----------------------------
 
-An ObjectStoragePath behaves mostly like a pathlib.Path object. You can
-use it to save and load data directly to and from object storage. So, a typical
-flow could look like this:
+Let's fetch some data and save it to the cloud.
 
 .. exampleinclude:: /../src/airflow/example_dags/tutorial_objectstorage.py
     :language: python
     :start-after: [START get_air_quality_data]
     :end-before: [END get_air_quality_data]
 
-The ``get_air_quality_data`` calls the API of the Finnish Meteorological 
Institute
-to obtain the air quality data for the region of Helsinki. It creates a
-Pandas DataFrame from the resulting json. It then saves the data to object 
storage
-and converts it on the fly to parquet.
+|
+
+Here's what's happening:
 
-The key of the object is automatically generated from the logical date of the 
task,
-so we could run this everyday and it would create a new object for each day. We
-concatenate this key with the base path to create the full path to the object. 
Finally,
-after writing the object to storage, we return the path to the object. This 
allows
-us to use the path in the next task.
+- We call a public API from the Finnish Meteorological Institute for Helsinki 
air quality data
+- The JSON response is parsed into a pandas DataFrame
+- We generate a filename based on the task's logical date
+- Using ``ObjectStoragePath``, we write the data directly to cloud storage as 
Parquet
 
-Analyzing the data
-------------------
+This is a classic Taskflow pattern. The object key changes each day, allowing 
us to run this daily and build a dataset
+over time. We return the final object path to be used in the next task.
 
-In understanding the data, you typically want to analyze it. Duck DB is a great
-tool for this. It is an in-process analytical database that allows you to run
-SQL queries on data in memory.
+Why this is cool: No boto3, no GCS client setup, no credentials juggling. Just 
simple file semantics that work across
+storage backends.
 
-Because the data is already in parquet format, we can use the ``read_parquet`` 
and
-because both Duck DB and the ObjectStoragePath use ``fsspec`` we can register 
the
-backend of the ObjectStoragePath with Duck DB. ObjectStoragePath exposes the 
``fs``
-property for this. We can then use the ``register_filesystem`` function from 
Duck DB
-to register the backend with Duck DB.
+Analyzing the Data with DuckDB
+------------------------------
 
-In Duck DB we can then create a table from the data and run a query on it. The
-query is returned as a dataframe, which could be used for further analysis or
-saved to object storage.
+Now let's analyze that data using SQL with DuckDB.
 
 .. exampleinclude:: /../src/airflow/example_dags/tutorial_objectstorage.py
     :language: python
     :start-after: [START analyze]
     :end-before: [END analyze]
 
-You might note that the ``analyze`` function does not know the original
-path to the object, but that it is passed in as a parameter and obtained
-through XCom. You do not need to re-instantiate the Path object. Also
-the connection details are handled transparently.
+|
 
-Putting it all together
------------------------
+A few key things to note:
 
-The final DAG looks like this, which wraps things so that we can run it:
+- DuckDB supports reading Parquet natively
+- DuckDB and ObjectStoragePath both rely on ``fsspec``, which makes it easy to 
register the object storage backend
+- We use ``path.fs`` to grab the right filesystem object and register it with 
DuckDB
+- Finally, we query the Parquet file using SQL and return a pandas DataFrame
+
+Notice that the function doesn't recreate the path manually -- it gets the 
full path from the previous task using Xcom.
+This makes the task portable and decoupled from earlier logic.
+
+Bringing It All Together
+------------------------
+
+Here's the full DAG that ties everything together:
 
 .. exampleinclude:: /../src/airflow/example_dags/tutorial_objectstorage.py
     :language: python
     :start-after: [START tutorial]
     :end-before: [END tutorial]
+
+|
+
+You can trigger this DAG and view it in the Graph View in the Airflow UI. Each 
task logs its inputs and outputs clearly,
+and you can inspect returned paths in the Xcom tab.
+
+What to Explore Next
+--------------------
+
+Here are some ways to take this further:
+
+- Use object sensors (like ``S3KeySensor``) to wait for files uploaded by 
external systems
+- Orchestrate S3-to-GCS transfers or cross-region data syncs
+- Add branching logic to handle missing or malformed files
+- Experiment with different formats like CSV or JSON
+
+**See Also**
+
+- Learn how to securely access cloud services by configuring Airflow 
connections in the :doc:`Managing Connections guide 
<../authoring-and-scheduling/connections>`
+- Build event-driven pipelines that respond to file uploads or external 
triggers using the :doc:`Event-Driven Scheduling framework 
<../authoring-and-scheduling/event-scheduling>`
+- Reinforce your understanding of decorators, return values, and task chaining 
with the :doc:`TaskFlow API guide <../core-concepts/taskflow>`
diff --git a/airflow-core/docs/tutorial/taskflow.rst 
b/airflow-core/docs/tutorial/taskflow.rst
index 82f4685d019..0d9d543ff93 100644
--- a/airflow-core/docs/tutorial/taskflow.rst
+++ b/airflow-core/docs/tutorial/taskflow.rst
@@ -16,68 +16,65 @@
     under the License.
 
 
+Pythonic DAGs with the TaskFlow API
+===================================
 
+In the first tutorial, you built your first Airflow DAG using traditional 
Operators like ``PythonOperator``.
+Now let's look at a more modern and Pythonic way to write workflows using the 
**TaskFlow API** — introduced in Airflow
+2.0.
 
-Working with TaskFlow
-=====================
+The TaskFlow API is designed to make your code simpler, cleaner, and easier to 
maintain. You write plain Python
+functions, decorate them, and Airflow handles the rest — including task 
creation, dependency wiring, and passing data
+between tasks.
 
-This tutorial builds on the regular Airflow Tutorial and focuses specifically
-on writing data pipelines using the TaskFlow API paradigm which is introduced 
as
-part of Airflow 2.0 and contrasts this with dags written using the traditional 
paradigm.
+In this tutorial, we'll create a simple ETL pipeline — Extract → Transform → 
Load using the TaskFlow API.
+Let's dive in!
 
-The data pipeline chosen here is a simple pattern with
-three separate Extract, Transform, and Load tasks.
+The Big Picture: A TaskFlow Pipeline
+------------------------------------
 
-Example "TaskFlow API" Pipeline
--------------------------------
-
-Here is a very simple pipeline using the TaskFlow API paradigm. A more detailed
-explanation is given below.
+Here's what the full pipeline looks like using TaskFlow. Don't worry if some 
of it looks unfamiliar — we'll break it
+down step-by-step.
 
 .. exampleinclude:: /../src/airflow/example_dags/tutorial_taskflow_api.py
     :language: python
     :start-after: [START tutorial]
     :end-before: [END tutorial]
 
-It's a DAG definition file
---------------------------
-
-If this is the first DAG file you are looking at, please note that this Python 
script
-is interpreted by Airflow and is a configuration file for your data pipeline.
-For a complete introduction to DAG files, please look at the core 
:doc:`fundamentals tutorial<fundamentals>`
-which covers DAG structure and definitions extensively.
+Step 1: Define the DAG
+----------------------
 
-
-Instantiate a DAG
------------------
-
-We are creating a DAG which is the collection of our tasks with dependencies 
between
-the tasks. This is a very simple definition, since we just want the DAG to be 
run
-when we set this up with Airflow, without any retries or complex scheduling.
-In this example, please notice that we are creating this DAG using the 
``@dag`` decorator
-as shown below, with the Python function name acting as the DAG identifier.
+Just like before, your DAG is a Python script that Airflow loads and parses. 
But this time, we're using the ``@dag``
+decorator to define it.
 
 .. exampleinclude:: /../src/airflow/example_dags/tutorial_taskflow_api.py
     :language: python
     :start-after: [START instantiate_dag]
     :end-before: [END instantiate_dag]
 
-Now to actually enable this to be run as a DAG, we invoke the Python function
-``tutorial_taskflow_api`` set up using the ``@dag`` decorator earlier, as 
shown below.
+|
+
+To make this DAG discoverable by Airflow, we can call the Python function that 
was decorated with ``@dag``:
 
 .. exampleinclude:: /../src/airflow/example_dags/tutorial_taskflow_api.py
     :language: python
     :start-after: [START dag_invocation]
     :end-before: [END dag_invocation]
 
+|
+
 .. versionchanged:: 2.4
+  If you're using the ``@dag`` decorator or defining your DAG in a ``with`` 
block, you no longer need to assign it to a
+  global variable. Airflow will find it automatically.
+
+You can visualize your DAG in the Airflow UI! Once your DAG is loaded, 
navigate to the Graph View to see how tasks are
+connected.
 
-      It's no longer required to "register" the DAG into a global variable for 
Airflow to be able to detect the dag if that DAG is used inside a ``with`` 
block, or if it is the result of a ``@dag`` decorated function.
+Step 2: Write Your Tasks with ``@task``
+---------------------------------------
 
-Tasks
------
-In this data pipeline, tasks are created based on Python functions using the 
``@task`` decorator
-as shown below. The function name acts as a unique identifier for the task.
+With Taskflow, each task is just a regular Python function. You can use the 
``@task`` decorator to turn it into a task
+that Airflow can schedule and run. Here's the ``extract`` task:
 
 .. exampleinclude:: /../src/airflow/example_dags/tutorial_taskflow_api.py
     :language: python
@@ -85,14 +82,17 @@ as shown below. The function name acts as a unique 
identifier for the task.
     :start-after: [START extract]
     :end-before: [END extract]
 
-The returned value, which in this case is a dictionary, will be made available 
for use in later tasks.
+|
 
-The Transform and Load tasks are created in the same manner as the Extract 
task shown above.
+The function's return value is passed to the next task — no manual use of 
``XComs`` required. Under the hood, TaskFlow
+uses ``XComs`` to manage data passing automatically, abstracting away the 
complexity of manual XCom management from the
+previous methods. You'll define ``transform`` and ``load`` tasks using the 
same pattern.
 
-Main flow of the DAG
---------------------
-Now that we have the Extract, Transform, and Load tasks defined based on the 
Python functions,
-we can move to the main part of the DAG.
+Step 3: Build the Flow
+----------------------
+
+Once the tasks are defined, you can build the pipeline by simply calling them 
like Python functions. Airflow uses this
+functional invocation to set task dependencies and manage data passing.
 
 .. exampleinclude:: /../src/airflow/example_dags/tutorial_taskflow_api.py
     :language: python
@@ -100,173 +100,169 @@ we can move to the main part of the DAG.
     :start-after: [START main_flow]
     :end-before: [END main_flow]
 
-That's it, we are done!
-We have invoked the Extract task, obtained the order data from there and sent 
it over to
-the Transform task for summarization, and then invoked the Load task with the 
summarized data.
-The dependencies between the tasks and the passing of data between these tasks 
which could be
-running on different workers on different nodes on the network is all handled 
by Airflow.
+|
 
-Now to actually enable this to be run as a DAG, we invoke the Python function
-``tutorial_taskflow_api`` set up using the ``@dag`` decorator earlier, as 
shown below.
+That's it! Airflow knows how to schedule and orchestrate your pipeline from 
this code alone.
 
-.. exampleinclude:: /../src/airflow/example_dags/tutorial_taskflow_api.py
-    :language: python
-    :start-after: [START dag_invocation]
-    :end-before: [END dag_invocation]
+Running Your DAG
+----------------
 
+To enable and trigger your DAG:
 
-But how?
---------
-For experienced Airflow DAG authors, this is startlingly simple! Let's 
contrast this with
-how this DAG had to be written before Airflow 2.0 below:
+1. Navigate to the Airflow UI.
+2. Find your DAG in the list and click the toggle to enable it.
+3. You can trigger it manually by clicking the "Trigger DAG" button, or wait 
for it to run on its schedule.
 
-.. exampleinclude:: /../src/airflow/example_dags/tutorial_dag.py
-    :language: python
-    :start-after: [START tutorial]
-    :end-before: [END tutorial]
+What's Happening Behind the Scenes?
+-----------------------------------
 
-All of the processing shown above is being done in the new Airflow 2.0 DAG as 
well, but
-it is all abstracted from the DAG developer.
+If you've used Airflow 1.x, this probably feels like magic. Let's compare 
what's happening under the hood.
 
-Let's examine this in detail by looking at the Transform task in isolation 
since it is
-in the middle of the data pipeline. In Airflow 1.x, this task is defined as 
shown below:
+The "Old Way": Manual Wiring and XComs
+''''''''''''''''''''''''''''''''''''''
 
-.. exampleinclude:: /../src/airflow/example_dags/tutorial_dag.py
-    :language: python
-    :dedent: 4
-    :start-after: [START transform_function]
-    :end-before: [END transform_function]
+Before the TaskFlow API, you had to use Operators like ``PythonOperator`` and 
pass data manually between tasks using
+``XComs``.
 
-As we see here, the data being processed in the Transform function is passed 
to it using XCom
-variables. In turn, the summarized data from the Transform function is also 
placed
-into another XCom variable which will then be used by the Load task.
+Here's what the same DAG might have looked like using the traditional approach:
 
-Contrasting that with TaskFlow API in Airflow 2.0 as shown below.
+.. code-block:: python
 
-.. exampleinclude:: /../src/airflow/example_dags/tutorial_taskflow_api.py
-    :language: python
-    :dedent: 4
-    :start-after: [START transform]
-    :end-before: [END transform]
+   import json
+   import pendulum
+   from airflow.sdk import DAG, PythonOperator
 
-All of the XCom usage for data passing between these tasks is abstracted away 
from the DAG author
-in Airflow 2.0. However, XCom variables are used behind the scenes and can be 
viewed using
-the Airflow UI as necessary for debugging or DAG monitoring.
 
-Similarly, task dependencies are automatically generated within TaskFlows 
based on the
-functional invocation of tasks. In Airflow 1.x, tasks had to be explicitly 
created and
-dependencies specified as shown below.
+   def extract():
+       # Old way: simulate extracting data from a JSON string
+       data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
+       return json.loads(data_string)
 
-.. exampleinclude:: /../src/airflow/example_dags/tutorial_dag.py
-    :language: python
-    :dedent: 4
-    :start-after: [START main_flow]
-    :end-before: [END main_flow]
 
-In contrast, with the TaskFlow API in Airflow 2.0, the invocation itself 
automatically generates
-the dependencies as shown below.
+   def transform(ti):
+       # Old way: manually pull from XCom
+       order_data_dict = ti.xcom_pull(task_ids="extract")
+       total_order_value = sum(order_data_dict.values())
+       return {"total_order_value": total_order_value}
+
+
+   def load(ti):
+       # Old way: manually pull from XCom
+       total = ti.xcom_pull(task_ids="transform")["total_order_value"]
+       print(f"Total order value is: {total:.2f}")
+
+
+   with DAG(
+       dag_id="legacy_etl_pipeline",
+       schedule_interval=None,
+       start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
+       catchup=False,
+       tags=["example"],
+   ) as dag:
+       extract_task = PythonOperator(task_id="extract", 
python_callable=extract)
+       transform_task = PythonOperator(task_id="transform", 
python_callable=transform)
+       load_task = PythonOperator(task_id="load", python_callable=load)
+
+       extract_task >> transform_task >> load_task
+
+.. note::
+   This version produces the same result as the TaskFlow API example, but 
requires explicit management of ``XComs`` and task dependencies.
+
+The Taskflow Way
+''''''''''''''''
+
+Using TaskFlow, all of this is handled automatically.
 
 .. exampleinclude:: /../src/airflow/example_dags/tutorial_taskflow_api.py
     :language: python
-    :dedent: 4
-    :start-after: [START main_flow]
-    :end-before: [END main_flow]
+    :start-after: [START tutorial]
+    :end-before: [END tutorial]
 
+|
 
-Reusing a decorated task
--------------------------
+Airflow still uses ``XComs`` and builds a dependency graph — it's just 
abstracted away so you can focus on your business
+logic.
 
-Decorated tasks are flexible. You can reuse a decorated task in multiple dags, 
overriding the task
-parameters such as the ``task_id``, ``queue``, ``pool``, etc.
+How XComs Work
+--------------
 
-Below is an example of how you can reuse a decorated task in multiple dags:
+TaskFlow return values are stored as ``XComs`` automatically. These values can 
be inspected in the UI under the "XCom" tab.
+Manual ``xcom_pull()`` is still possible for traditional operators.
 
-.. code-block:: python
 
-    from airflow.sdk import task, dag
-    from datetime import datetime
+Error Handling and Retries
+---------------------------
 
+You can easily configure retries for your tasks using decorators. For example, 
you can set a maximum number of retries
+directly in the task decorator:
 
-    @task
-    def add_task(x, y):
-        print(f"Task args: x={x}, y={y}")
-        return x + y
+.. code-block:: python
 
+    @task(retries=3)
+    def my_task(): ...
 
-    @dag(start_date=datetime(2022, 1, 1))
-    def mydag():
-        start = add_task.override(task_id="start")(1, 2)
-        for i in range(3):
-            start >> add_task.override(task_id=f"add_start_{i}")(start, i)
+This helps ensure that transient failures do not lead to task failure.
 
+Task Parameterization
+---------------------
 
-    @dag(start_date=datetime(2022, 1, 1))
-    def mydag2():
-        start = add_task(1, 2)
-        for i in range(3):
-            start >> add_task.override(task_id=f"new_add_task_{i}")(start, i)
+You can reuse decorated tasks in multiple DAGs and override parameters like 
``task_id`` or ``retries``.
 
+.. code-block:: python
 
-    first_dag = mydag()
-    second_dag = mydag2()
+    start = add_task.override(task_id="start")(1, 2)
 
-You can also import the above ``add_task`` and use it in another DAG file.
-Suppose the ``add_task`` code lives in a file called ``common.py``. You can do 
this:
+|
 
-.. code-block:: python
+You can even import decorated tasks from a shared module.
 
-    from common import add_task
-    from airflow.sdk import dag
-    from datetime import datetime
+What to Explore Next
+--------------------
 
+Nice work! You've now written your first pipeline using the TaskFlow API. 
Curious where to go from here?
 
-    @dag(start_date=datetime(2022, 1, 1))
-    def use_add_task():
-        start = add_task.override(priority_weight=3)(1, 2)
-        for i in range(3):
-            start >> add_task.override(task_id=f"new_add_task_{i}", 
retries=4)(start, i)
+- Add a new task to the DAG -- maybe a filter or validation step
+- Modify return values and pass multiple outputs
+- Explore retries and overrides with ``.override(task_id="...")``
+- Open the Airflow UI and inspect how the data flows between tasks, including 
task logs and dependencies
 
+.. seealso::
 
-    created_dag = use_add_task()
+   - Continue to the next step: :doc:`/tutorial/pipeline`
+   - Learn more in the :doc:`TaskFlow API docs </core-concepts/taskflow>` or 
continue below for :ref:`advanced-taskflow-patterns`
+   - Read about Airflow concepts in :doc:`/core-concepts/index`
 
+.. _advanced-taskflow-patterns:
 
-Using the TaskFlow API with complex/conflicting Python dependencies
--------------------------------------------------------------------
+Advanced Taskflow Patterns
+--------------------------
 
-If you have tasks that require complex or conflicting requirements then you 
will have the ability to use the
-TaskFlow API with either Python virtual environment (since 2.0.2), Docker 
container (since 2.2.0), ExternalPythonOperator (since 2.4.0) or 
KubernetesPodOperator (since 2.4.0).
+Once you're comfortable with the basics, here are a few powerful techniques 
you can try.
 
-This functionality allows a much more comprehensive range of use-cases for the 
TaskFlow API,
-as you are not limited to the packages and system libraries of the Airflow 
worker. For all cases of
-the decorated functions described below, you have to make sure the functions 
are serializable and that
-they only use local imports for additional dependencies you use. Those 
imported additional libraries must
-be available in the target environment - they do not need to be available in 
the main Airflow environment.
+Reusing Decorated Tasks
+'''''''''''''''''''''''
 
-Which of the operators you should use, depend on several factors:
+You can reuse decorated tasks across multiple DAGs or DAG runs. This is 
especially useful for common logic like reusable
+utilities or shared business rules. Use ``.override()`` to customize task 
metadata like ``task_id`` or ``retries``.
 
-* whether you are running Airflow with access to Docker engine or Kubernetes
-* whether you can afford an overhead to dynamically create a virtual 
environment with the new dependencies
-* whether you can deploy a pre-existing, immutable Python environment for all 
Airflow components.
+.. code-block:: python
 
-These options should allow for far greater flexibility for users who wish to 
keep their workflows simpler
-and more Pythonic - and allow you to keep complete logic of your DAG in the 
DAG itself.
+    start = add_task.override(task_id="start")(1, 2)
 
-You can also get more context about the approach of managing conflicting 
dependencies, including more detailed
-explanation on boundaries and consequences of each of the options in
-:ref:`Best practices for handling conflicting/complex Python dependencies 
<best_practices/handling_conflicting_complex_python_dependencies>`
+You can even import decorated tasks from a shared module.
 
+Handling Conflicting Dependencies
+'''''''''''''''''''''''''''''''''
 
-Virtualenv created dynamically for each task
-............................................
+Sometimes tasks require different Python dependencies than the rest of your 
DAG — for example, specialized libraries or
+system-level packages. TaskFlow supports multiple execution environments to 
isolate those dependencies.
 
-The simplest approach is to create dynamically (every time a task is run) a 
separate virtual environment on the
-same machine, you can use the ``@task.virtualenv`` decorator. The decorator 
allows
-you to create dynamically a new virtualenv with custom libraries and even a 
different Python version to
-run your function.
+.. _taskflow-dynamically-created-virtualenv:
 
-.. _taskflow/virtualenv_example:
+**Dynamically Created Virtualenv**
 
-Example (dynamically created virtualenv):
+Creates a temporary virtualenv at task runtime. Great for experimental or 
dynamic tasks, but may have cold start
+overhead.
 
 .. exampleinclude:: /../src/airflow/example_dags/example_python_decorator.py
     :language: python
@@ -274,17 +270,13 @@ Example (dynamically created virtualenv):
     :start-after: [START howto_operator_python_venv]
     :end-before: [END howto_operator_python_venv]
 
-Using Python environment with pre-installed dependencies
-........................................................
+|
 
-A bit more involved ``@task.external_python`` decorator allows you to run an 
Airflow task in pre-defined,
-immutable virtualenv (or Python binary installed at system level without 
virtualenv).
-This virtualenv or system python can also have different set of custom 
libraries installed and must be
-made available in all workers that can execute the tasks in the same location.
+.. _taskflow-external-python-environment:
 
-.. _taskflow/external_python_example:
+**External Python Environment**
 
-Example with ``@task.external_python`` (using immutable, pre-existing 
virtualenv):
+Executes the task using a pre-installed Python interpreter — ideal for 
consistent environments or shared virtualenvs.
 
 .. exampleinclude:: /../src/airflow/example_dags/example_python_decorator.py
     :language: python
@@ -292,20 +284,14 @@ Example with ``@task.external_python`` (using immutable, 
pre-existing virtualenv
     :start-after: [START howto_operator_external_python]
     :end-before: [END howto_operator_external_python]
 
-Dependency separation using Docker Operator
-...........................................
-
-If your Airflow workers have access to a docker engine, you can instead use a 
``DockerOperator``
-and add any needed arguments to correctly run the task. Please note that the 
docker
-image must have a working Python installed and take in a bash command as the 
``command`` argument.
+|
 
-It is worth noting that the Python source code (extracted from the decorated 
function) and any
-callable args are sent to the container via (encoded and pickled) environment 
variables so the
-length of these is not boundless (the exact limit depends on system settings).
+.. _taskflow-docker_environment:
 
-Below is an example of using the ``@task.docker`` decorator to run a Python 
task.
+**Docker Environment**
 
-.. _taskflow/docker_example:
+Runs your task in a Docker container. Useful for packaging everything the task 
needs — but requires Docker to be
+available on your worker.
 
 .. exampleinclude:: 
/../../providers/docker/tests/system/docker/example_taskflow_api_docker_virtualenv.py
     :language: python
@@ -313,30 +299,16 @@ Below is an example of using the ``@task.docker`` 
decorator to run a Python task
     :start-after: [START transform_docker]
     :end-before: [END transform_docker]
 
+|
 
-Notes on using the operator:
-
-.. note:: Using ``@task.docker`` decorator in one of the earlier Airflow 
versions
-
-    Since ``@task.docker`` decorator is available in the docker provider, you 
might be tempted to use it in
-    Airflow version before 2.2, but this is not going to work. You will get 
this error if you try:
-
-    .. code-block:: text
+.. note:: Requires Airflow 2.2 and the Docker provider.
 
-        AttributeError: '_TaskDecorator' object has no attribute 'docker'
+.. _tasfklow-kpo:
 
-    You should upgrade to Airflow 2.2 or above in order to use it.
+**KubernetesPodOperator**
 
-Dependency separation using Kubernetes Pod Operator
-...................................................
-
-
-If your Airflow workers have access to Kubernetes, you can instead use a 
``KubernetesPodOperator``
-and add any needed arguments to correctly run the task.
-
-Below is an example of using the ``@task.kubernetes`` decorator to run a 
Python task.
-
-.. _taskflow/kubernetes_example:
+Runs your task inside a Kubernetes pod, fully isolated from the main Airflow 
environment. Ideal for large tasks or tasks
+requiring custom runtimes.
 
 .. exampleinclude:: 
/../../providers/cncf/kubernetes/tests/system/cncf/kubernetes/example_kubernetes_decorator.py
     :language: python
@@ -344,408 +316,62 @@ Below is an example of using the ``@task.kubernetes`` 
decorator to run a Python
     :start-after: [START howto_operator_kubernetes]
     :end-before: [END howto_operator_kubernetes]
 
-Notes on using the operator:
-
-.. note:: Using ``@task.kubernetes`` decorator in one of the earlier Airflow 
versions
-
-    Since ``@task.kubernetes`` decorator is available in the cncf.kubernetes 
provider, you might be tempted to use it in
-    Airflow version before 2.4, but this is not going to work. You will get 
this error if you try:
-
-    .. code-block:: text
-
-        AttributeError: '_TaskDecorator' object has no attribute 'kubernetes'
-
-    You should upgrade to Airflow 2.4 or above in order to use it.
-
-
-Using the TaskFlow API with Sensor operators
---------------------------------------------
-
-You can apply the ``@task.sensor`` decorator to convert a regular Python 
function to an instance of the
-BaseSensorOperator class. The Python function implements the poke logic and 
returns an instance of
-the ``PokeReturnValue`` class as the ``poke()`` method in the 
BaseSensorOperator does.
-In Airflow 2.3, sensor operators will be able to return XCOM values. This is 
achieved by returning
-an instance of the ``PokeReturnValue`` object at the end of the ``poke()`` 
method:
-
-  .. code-block:: python
-
-    from airflow.sdk import PokeReturnValue
-
-
-    class SensorWithXcomValue(BaseSensorOperator):
-        def poke(self, context: Context) -> Union[bool, PokeReturnValue]:
-            # ...
-            is_done = ...  # set to true if the sensor should stop poking.
-            xcom_value = ...  # return value of the sensor operator to be 
pushed to XCOM.
-            return PokeReturnValue(is_done, xcom_value)
-
-
-To implement a sensor operator that pushes a XCOM value and supports both 
version 2.3 and
-pre-2.3, you need to explicitly push the XCOM value if the version is pre-2.3.
+|
 
-  .. code-block:: python
+.. note:: Requires Airflow 2.4 and the Kubernetes provider.
 
-    try:
-        from airflow.sdk import PokeReturnValue
-    except ImportError:
-        PokeReturnValue = None
+.. _taskflow-using-sensors:
 
+Using Sensors
+'''''''''''''
 
-    class SensorWithXcomValue(BaseSensorOperator):
-        def poke(self, context: Context) -> bool:
-            # ...
-            is_done = ...  # set to true if the sensor should stop poking.
-            xcom_value = ...  # return value of the sensor operator to be 
pushed to XCOM.
-            if PokeReturnValue is not None:
-                return PokeReturnValue(is_done, xcom_value)
-            else:
-                if is_done:
-                    context["ti"].xcom_push(key="xcom_key", value=xcom_value)
-                return is_done
-
-
-
-
-Alternatively in cases where the sensor doesn't need to push XCOM values:  
both ``poke()`` and the wrapped
-function can return a boolean-like value where ``True`` designates the 
sensor's operation as complete and
-``False`` designates the sensor's operation as incomplete.
-
-.. _taskflow/task_sensor_example:
+Use ``@task.sensor`` to build lightweight, reusable sensors using Python 
functions. These support both poke and reschedule
+modes.
 
 .. exampleinclude:: /../src/airflow/example_dags/example_sensor_decorator.py
     :language: python
     :start-after: [START tutorial]
     :end-before: [END tutorial]
 
+Mixing with Traditional Tasks
+'''''''''''''''''''''''''''''
 
-Multiple outputs inference
---------------------------
-Tasks can also infer multiple outputs by using dict Python typing.
-
-.. code-block:: python
-
-   @task
-   def identity_dict(x: int, y: int) -> dict[str, int]:
-       return {"x": x, "y": y}
-
-By using the typing ``dict``, or any other class that conforms to the 
``typing.Mapping`` protocol,
-for the function return type, the ``multiple_outputs`` parameter is 
automatically set to true.
-
-Note, If you manually set the ``multiple_outputs`` parameter the inference is 
disabled and
-the parameter value is used.
-
-Adding dependencies between decorated and traditional tasks
------------------------------------------------------------
-The above tutorial shows how to create dependencies between TaskFlow 
functions. However, dependencies can also
-be set between traditional tasks (such as 
:class:`~airflow.providers.standard.operators.bash.BashOperator`
-or :class:`~airflow.providers.standard.sensors.filesystem.FileSensor`) and 
TaskFlow functions.
-
-Building this dependency is shown in the code below:
-
-.. code-block:: python
-
-    @task()
-    def extract_from_file():
-        """
-        #### Extract from file task
-        A simple Extract task to get data ready for the rest of the data
-        pipeline, by reading the data from a file into a pandas dataframe
-        """
-        order_data_file = "/tmp/order_data.csv"
-        order_data_df = pd.read_csv(order_data_file)
-        return order_data_df
-
-
-    file_task = FileSensor(task_id="check_file", 
filepath="/tmp/order_data.csv")
-    order_data = extract_from_file()
-
-    file_task >> order_data
-
-
-In the above code block, a new TaskFlow function is defined as 
``extract_from_file`` which
-reads the data from a known file location.
-In the main DAG, a new ``FileSensor`` task is defined to check for this file. 
Please note
-that this is a Sensor task which waits for the file.
-The TaskFlow function call is put in a variable ``order_data``.
-Finally, a dependency between this Sensor task and the TaskFlow function is 
specified using the variable.
-
-
-Consuming XComs between decorated and traditional tasks
--------------------------------------------------------
-As noted above, the TaskFlow API allows XComs to be consumed or passed between 
tasks in a manner that is
-abstracted away from the DAG author. This section dives further into detailed 
examples of how this is
-possible not only between TaskFlow functions but between both TaskFlow 
functions *and* traditional tasks.
+You can combine decorated tasks with classic Operators. This is helpful when 
using community providers or when migrating
+incrementally to TaskFlow.
 
-You may find it necessary to consume an XCom from traditional tasks, either 
pushed within the task's execution
-or via its return value, as an input into downstream tasks. You can access the 
pushed XCom (also known as an
-``XComArg``) by utilizing the ``.output`` property exposed for all operators.
+You can chain Taskflow and traditional tasks using ``>>`` or pass data using 
the ``.output`` attribute.
 
-By default, using the ``.output`` property to retrieve an XCom result is the 
equivalent of:
+Templating in TaskFlow
+''''''''''''''''''''''
+Like traditional tasks, decorated TaskFlow functions support templated 
arguments — including loading content from files
+or using runtime parameters.
 
-.. code-block:: python
-
-    task_instance.xcom_pull(task_ids="my_task_id", key="return_value")
-
-To retrieve an XCom result for a key other than ``return_value``, you can use:
-
-.. code-block:: python
-
-    my_op = MyOperator(...)
-    my_op_output = my_op.output["some_other_xcom_key"]
-    # OR
-    my_op_output = my_op.output.get("some_other_xcom_key")
-
-.. note::
-    Using the ``.output`` property as an input to another task is supported 
only for operator parameters
-    listed as a ``template_field``.
-
-In the code example below, a 
:class:`~airflow.providers.http.operators.http.HttpOperator` result
-is captured via :doc:`XComs </core-concepts/xcoms>`. This XCom result, which 
is the task output, is then passed
-to a TaskFlow function which parses the response as JSON.
-
-.. code-block:: python
-
-    get_api_results_task = HttpOperator(
-        task_id="get_api_results",
-        endpoint="/api/query",
-        do_xcom_push=True,
-        http_conn_id="http",
-    )
-
-
-    @task
-    def parse_results(api_results):
-        return json.loads(api_results)
-
-
-    parsed_results = parse_results(api_results=get_api_results_task.output)
-
-The reverse can also be done: passing the output of a TaskFlow function as an 
input to a traditional task.
-
-.. code-block:: python
-
-    @task(retries=3)
-    def create_queue():
-        """This is a Python function that creates an SQS queue"""
-        hook = SqsHook()
-        result = hook.create_queue(queue_name="sample-queue")
-
-        return result["QueueUrl"]
-
-
-    sqs_queue = create_queue()
-
-    publish_to_queue = SqsPublishOperator(
-        task_id="publish_to_queue",
-        sqs_queue=sqs_queue,
-        message_content="{{ task_instance }}-{{ execution_date }}",
-        message_attributes=None,
-        delay_seconds=0,
-    )
-
-Take note in the code example above, the output from the ``create_queue`` 
TaskFlow function, the URL of a
-newly-created Amazon SQS Queue, is then passed to a 
:class:`~airflow.providers.amazon.aws.operators.sqs.SqsPublishOperator`
-task as the ``sqs_queue`` arg.
-
-Finally, not only can you use traditional operator outputs as inputs for 
TaskFlow functions, but also as inputs to
-other traditional operators. In the example below, the output from the 
:class:`~airflow.providers.amazon.aws.transfers.salesforce_to_s3.SalesforceToS3Operator`
-task (which is an S3 URI for a destination file location) is used an input for 
the 
:class:`~airflow.providers.amazon.aws.operators.s3_copy_object.S3CopyObjectOperator`
-task to copy the same file to a date-partitioned storage location in S3 for 
long-term storage in a data lake.
-
-.. code-block:: python
-
-    BASE_PATH = "salesforce/customers"
-    FILE_NAME = "customer_daily_extract_{{ ds_nodash }}.csv"
-
-
-    upload_salesforce_data_to_s3_landing = SalesforceToS3Operator(
-        task_id="upload_salesforce_data_to_s3",
-        salesforce_query="SELECT Id, Name, Company, Phone, Email, 
LastModifiedDate, IsActive FROM Customers",
-        s3_bucket_name="landing-bucket",
-        s3_key=f"{BASE_PATH}/{FILE_NAME}",
-        salesforce_conn_id="salesforce",
-        aws_conn_id="s3",
-        replace=True,
-    )
-
-
-    store_to_s3_data_lake = S3CopyObjectOperator(
-        task_id="store_to_s3_data_lake",
-        aws_conn_id="s3",
-        source_bucket_key=upload_salesforce_data_to_s3_landing.output,
-        dest_bucket_name="data_lake",
-        dest_bucket_key=f"""{BASE_PATH}/{"{{ 
execution_date.strftime('%Y/%m/%d') }}"}/{FILE_NAME}""",
-    )
-
-.. _taskflow/accessing_context_variables:
-
-Accessing context variables in decorated tasks
-----------------------------------------------
-
-When running your callable, Airflow will pass a set of keyword arguments that
-can be used in your function. This set of kwargs correspond exactly to what you
-can use in your Jinja templates. For this to work, you can add context keys you
-would like to receive in the function as keyword arguments.
-
-For example, the callable in the code block below will get values of the ``ti``
-and ``next_ds`` context variables:
-
-.. code-block:: python
-
-   @task
-   def my_python_callable(*, ti, next_ds):
-       pass
-
-.. versionchanged:: 2.8
-    Previously the context key arguments must provide a default, e.g. 
``ti=None``.
-    This is no longer needed.
-
-You can also choose to receive the entire context with ``**kwargs``. Note that
-this can incur a slight performance penalty since Airflow will need to
-expand the entire context that likely contains many things you don't actually
-need. It is therefore more recommended for you to use explicit arguments, as
-demonstrated in the previous paragraph.
-
-.. code-block:: python
-
-   @task
-   def my_python_callable(**kwargs):
-       ti = kwargs["ti"]
-       next_ds = kwargs["next_ds"]
-
-Also, sometimes you might want to access the context somewhere deep in the 
stack, but you do not want to pass
-the context variables from the task callable. You can still access execution 
context via the ``get_current_context``
-method.
-
-.. code-block:: python
-
-    from airflow.providers.standard.operators.python import get_current_context
-
-
-    def some_function_in_your_library():
-        context = get_current_context()
-        ti = context["ti"]
-
-Current context is accessible only during the task execution. The context is 
not accessible during
-``pre_execute`` or ``post_execute``. Calling this method outside execution 
context will raise an error.
-
-Using templates in decorated tasks
-----------------------------------------------
-
-Arguments passed to your decorated function are automatically templated.
-
-You can also use the ``templates_exts`` parameter to template entire files.
+Arguments passed to decorated functions are automatically templated. You can 
also template file using
+``templates_exts``:
 
 .. code-block:: python
 
     @task(templates_exts=[".sql"])
-    def template_test(sql):
-        print(f"sql: {sql}")
-
-
-    template_test(sql="sql/test.sql")
-
-This will read the content of ``sql/test.sql`` and replace all template 
variables. You can also pass a list of files and all of them will be templated.
-
-You can pass additional parameters to the template engine through `the params 
parameter </concepts/params.html>`_.
+    def read_sql(sql): ...
 
-However, the ``params`` parameter must be passed to the decorator and not to 
your function directly, such as ``@task(templates_exts=['.sql'], 
params={'my_param'})`` and can then be used with ``{{ params.my_param }}`` in 
your templated files and function parameters.
+Conditional Execution
+'''''''''''''''''''''
 
-Alternatively, you can also pass it using the ``.override()`` method:
+Use ``@task.run_if()`` or ``@task.skip_if()`` to control whether a task runs 
based on dynamic conditions at runtime —
+without altering your DAG structure.
 
 .. code-block:: python
 
-    @task()
-    def template_test(input_var):
-        print(f"input_var: {input_var}")
-
-
-    template_test.override(params={"my_param": "wow"})(
-        input_var="my param is: {{ params.my_param }}",
-    )
-
-Finally, you can also manually render templates:
-
-.. code-block:: python
-
-    @task(params={"my_param": "wow"})
-    def template_test():
-        template_str = "run_id: {{ run_id }}; params.my_param: {{ 
params.my_param }}"
-
-        context = get_current_context()
-        rendered_template = context["task"].render_template(
-            template_str,
-            context,
-        )
-
-Here is a full example that demonstrates everything above:
-
-.. exampleinclude:: /../src/airflow/example_dags/tutorial_taskflow_templates.py
-    :language: python
-    :start-after: [START tutorial]
-    :end-before: [END tutorial]
-
-Conditionally skipping tasks
-----------------------------
-
-The ``run_if()`` and ``skip_if()`` are syntactic sugar for TaskFlow
-that allows you to skip a ``Task`` based on a condition.
-You can use them to simply set execution conditions
-without changing the structure of the ``DAG`` or ``Task``.
-
-It also allows you to set conditions using ``Context``,
-which is essentially the same as using ``pre_execute``.
-
-An example usage of ``run_if()`` is as follows:
-
-.. code-block:: python
-
-    @task.run_if(lambda context: context["task_instance"].task_id == "run")
+    @task.run_if(lambda ctx: ctx["task_instance"].task_id == "run")
     @task.bash()
-    def echo() -> str:
+    def echo():
         return "echo 'run'"
 
-The ``echo`` defined in the above code is only executed when the ``task_id`` 
is ``run``.
-
-If you want to leave a log when you skip a task, you have two options.
-
-.. tab-set::
-
-    .. tab-item:: Static message
-
-        .. code-block:: python
-
-            @task.run_if(lambda context: context["task_instance"].task_id == 
"run", skip_message="only task_id is 'run'")
-            @task.bash()
-            def echo() -> str:
-                return "echo 'run'"
+What's Next
+-----------
 
-    .. tab-item:: using Context
+Now that you've seen how to build clean, maintainable DAGs using the TaskFlow 
API, here are some good next steps:
 
-        .. code-block:: python
-
-            @task.run_if(
-                lambda context: (context["task_instance"].task_id == "run", 
f"{context['ts']}: only task_id is 'run'")
-            )
-            @task.bash()
-            def echo() -> str:
-                return "echo 'run'"
-
-There is also a ``skip_if()`` that works the opposite of ``run_if()``, and is 
used in the same way.
-
-.. code-block:: python
-
-    @task.skip_if(lambda context: context["task_instance"].task_id == "skip")
-    @task.bash()
-    def echo() -> str:
-        return "echo 'run'"
-
-What's Next?
-------------
-
-You have seen how simple it is to write dags using the TaskFlow API paradigm 
within Airflow 2.0. Here are a few steps you might want to take next:
-
-.. seealso::
-    - Continue to the next step of the tutorial: :doc:`/tutorial/pipeline`
-    - Read the :doc:`Concepts section </core-concepts/index>` for detailed 
explanation of Airflow concepts such as dags, Tasks, Operators, and more
-    - View the section on the :doc:`TaskFlow API </core-concepts/taskflow>` 
and the ``@task`` decorator.
+- Explore asset-aware workflows in :doc:`/authoring-and-scheduling/datasets`
+- Dive into scheduling patterns in :ref:`Scheduling Options 
<scheduling-section>`
+- Move to the next tutorial: :doc:`/tutorial/pipeline`
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index b3716fc9052..107ea9b9aac 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -1514,6 +1514,7 @@ RunQuerySensor
 runspace
 RunSubmitTaskSettings
 runtime
+runtimes
 SaaS
 sade
 Sagemaker

Reply via email to