This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-1-test by this push:
new 4fb85274057 [v3-1-test] docs: Enhance triggering_asset_event retrieval
documentation in DAGs (#52666) (#52674) (#56957)
4fb85274057 is described below
commit 4fb85274057e51ab731878e3882c746ef677740c
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Oct 21 17:41:14 2025 +0100
[v3-1-test] docs: Enhance triggering_asset_event retrieval documentation in
DAGs (#52666) (#52674) (#56957)
Closes #52666
(cherry picked from commit 4f7908cc8830be4bdde7f53ca39749a1e1863e77)
Co-authored-by: Ranuga <[email protected]>
---
.../authoring-and-scheduling/asset-scheduling.rst | 88 +++++++++++++++++++---
1 file changed, 76 insertions(+), 12 deletions(-)
diff --git a/airflow-core/docs/authoring-and-scheduling/asset-scheduling.rst
b/airflow-core/docs/authoring-and-scheduling/asset-scheduling.rst
index 51ae64c62c7..4feaf7e24a9 100644
--- a/airflow-core/docs/authoring-and-scheduling/asset-scheduling.rst
+++ b/airflow-core/docs/authoring-and-scheduling/asset-scheduling.rst
@@ -152,16 +152,35 @@ Fetching information from a triggering asset event
A triggered Dag can fetch information from the asset that triggered it using
the ``triggering_asset_events`` template or parameter. See more at
:ref:`templates-ref`.
-Example:
+The ``triggering_asset_events`` is a dictionary that looks like this:
.. code-block:: python
- example_snowflake_asset = Asset("snowflake://my_db/my_schema/my_table")
+ {
+ Asset("s3://asset-bucket/example.csv"): [
+ AssetEvent(uri="s3://asset-bucket/example.csv",
source_dag_run=DagRun(...), ...),
+ ...,
+ ],
+ Asset("s3://another-bucket/another.csv"): [
+ AssetEvent(uri="s3://another-bucket/another.csv",
source_dag_run=DagRun(...), ...),
+ ...,
+ ],
+ }
- with DAG(dag_id="load_snowflake_data", schedule="@hourly", ...):
- SQLExecuteQueryOperator(
- task_id="load", conn_id="snowflake_default",
outlets=[example_snowflake_asset], ...
- )
+You can access this information in your tasks using Jinja templating or
directly in Python functions.
+
+Accessing triggering asset events with Jinja
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+You can use Jinja templating to pass information from the triggering asset
events to your operators.
+
+**Example: Single Triggering Asset**
+
+If your DAG is triggered by a single asset, you can access its information
like this:
+
+.. code-block:: python
+
+ example_snowflake_asset = Asset("snowflake://my_db/my_schema/my_table")
with DAG(dag_id="query_snowflake_data",
schedule=[example_snowflake_asset], ...):
SQLExecuteQueryOperator(
@@ -175,13 +194,58 @@ Example:
""",
)
- @task
- def print_triggering_asset_events(triggering_asset_events=None):
- for asset, asset_list in triggering_asset_events.items():
- print(asset, asset_list)
- print(asset_list[0].source_dag_run.dag_id)
+In this example, ``triggering_asset_events.values() | first | first`` does the
following:
+1. ``triggering_asset_events.values()``: Gets a list of all lists of asset
events.
+2. ``| first``: Gets the first list of asset events (since we only have one
triggering asset).
+3. ``| first``: Gets the first ``AssetEvent`` from that list.
+
+**Example: Multiple Triggering Assets**
+
+When your DAG is triggered by multiple assets, you can iterate through them in
your Jinja template.
+
+.. code-block:: python
+
+ with DAG(dag_id="process_assets", schedule=[asset1, asset2], ...):
+ BashOperator(
+ task_id="process",
+ bash_command="""
+ {% for asset_uri, events in triggering_asset_events.items() %}
+ echo "Processing asset: {{ asset_uri }}"
+ {% for event in events %}
+ echo " Triggered by DAG: {{ event.source_dag_run.dag_id }}"
+ echo " Data interval start: {{
event.source_dag_run.data_interval_start }}"
+ echo " Data interval end: {{
event.source_dag_run.data_interval_end }}"
+ {% endfor %}
+ {% endfor %}
+ """,
+ )
+
+
+Accessing triggering asset events in Python
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+You can also access the ``triggering_asset_events`` directly in a Python
function by passing it as a parameter.
+
+.. code-block:: python
+
+ @task
+ def print_triggering_asset_events(triggering_asset_events=None):
+ if triggering_asset_events:
+ for asset, asset_events in triggering_asset_events.items():
+ print(f"Asset: {asset.uri}")
+ for event in asset_events:
+ print(f" - Triggered by DAG run:
{event.source_dag_run.dag_id}")
+ print(
+ f" Data interval:
{event.source_dag_run.data_interval_start} to
{event.source_dag_run.data_interval_end}"
+ )
+ print(f" Run ID: {event.source_dag_run.run_id}")
+ print(f" Timestamp: {event.timestamp}")
+
+
+ print_triggering_asset_events()
- print_triggering_asset_events()
+.. note::
+ When a DAG is scheduled by multiple assets, there may be multiple asset
events for each asset. The logic for handling these events can be complex. It
is up to the DAG author to decide how to process them. For example, you might
want to process all new data since the last run, or you might want to process
each triggering event individually.
Note that this example is using `(.values() | first | first)
<https://jinja.palletsprojects.com/en/3.1.x/templates/#jinja-filters.first>`_ to
fetch the first of one asset given to the Dag, and the first of one AssetEvent
for that asset. An implementation can be quite complex if you