This is an automated email from the ASF dual-hosted git repository.
rahulvats pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 52734ea7cc6 docs: asset partition (#63262)
52734ea7cc6 is described below
commit 52734ea7cc67615f63a5c9f61502f5d18e9e6094
Author: Wei Lee <[email protected]>
AuthorDate: Tue Mar 24 18:58:48 2026 +0800
docs: asset partition (#63262)
* docs: asset partition
* fixup! docs: asset partition
---
.../docs/authoring-and-scheduling/assets.rst | 148 +++++++++++++++++++++
1 file changed, 148 insertions(+)
diff --git a/airflow-core/docs/authoring-and-scheduling/assets.rst
b/airflow-core/docs/authoring-and-scheduling/assets.rst
index b5b97b9979c..42bc78c0005 100644
--- a/airflow-core/docs/authoring-and-scheduling/assets.rst
+++ b/airflow-core/docs/authoring-and-scheduling/assets.rst
@@ -401,3 +401,151 @@ As mentioned in :ref:`Fetching information from
previously emitted asset events<
def consume_asset_alias_events(*, inlet_events):
events = inlet_events[AssetAlias("example-alias")]
last_row_count = events[-1].extra["row_count"]
+
+Asset partitions
+----------------
+
+.. versionadded:: 3.2.0
+
+Asset events can include a ``partition_key`` to make it _partitioned__. This
lets you model
+the same asset at partition granularity (for example, ``2026-03-10T09:00:00``
for an
+hourly partition).
+
+To produce partitioned events on a schedule, use
+``CronPartitionTimetable`` in the producer Dag (or ``@asset``). This timetable
+creates asset events with a partition key on each run.
+
+.. code-block:: python
+
+ from airflow.sdk import CronPartitionTimetable, asset
+
+
+ @asset(
+ uri="file://incoming/player-stats/team_b.csv",
+ schedule=CronPartitionTimetable("15 * * * *", timezone="UTC"),
+ )
+ def team_b_player_stats():
+ pass
+
+Partitioned events are intended for partition-aware downstream scheduling, and
+do not trigger non-partition-aware Dags.
+
+For downstream partition-aware scheduling, use ``PartitionedAssetTimetable``:
+
+.. code-block:: python
+
+ from airflow.sdk import DAG, HourlyMapper, PartitionedAssetTimetable
+
+ with DAG(
+ dag_id="clean_and_combine_player_stats",
+ schedule=PartitionedAssetTimetable(
+ assets=team_a_player_stats & team_b_player_stats &
team_c_player_stats,
+ default_partition_mapper=HourlyMapper(),
+ ),
+ catchup=False,
+ ):
+ ...
+
+``PartitionedAssetTimetable`` requires partitioned asset events. If an asset
+event does not contain a ``partition_key``, it will not trigger a downstream
+Dag that uses ``PartitionedAssetTimetable``.
+
+``default_partition_mapper`` is used for every upstream asset unless you
+override it via ``partition_mapper_config``. The default mapper is
+``IdentityMapper`` (no key transformation).
+
+Partition mappers define how upstream partition keys are transformed to the
+downstream Dag partition key:
+
+* ``IdentityMapper`` keeps keys unchanged.
+* Temporal mappers such as ``HourlyMapper``, ``DailyMapper``, and
+ ``YearlyMapper`` normalize time keys to a chosen grain. For input key
+ ``2026-03-10T09:37:51``, the default outputs are:
+
+ * ``HourlyMapper`` -> ``2026-03-10T09``
+ * ``DailyMapper`` -> ``2026-03-10``
+ * ``YearlyMapper`` -> ``2026``
+* ``ProductMapper`` maps composite keys segment-by-segment.
+ It applies one mapper per segment and then rejoins the mapped segments.
+ For example, with key ``us|2026-03-10T09:00:00``,
+ ``ProductMapper(IdentityMapper(), DailyMapper())`` produces
+ ``us|2026-03-10``.
+* ``AllowedKeyMapper`` validates that keys are in a fixed allow-list and
+ passes the key through unchanged if valid.
+ For example, ``AllowedKeyMapper(["us", "eu", "apac"])`` accepts only those
+ region keys and rejects all others.
+
+Example of per-asset mapper configuration and composite-key mapping:
+
+.. code-block:: python
+
+ from airflow.sdk import (
+ Asset,
+ DailyMapper,
+ IdentityMapper,
+ PartitionedAssetTimetable,
+ ProductMapper,
+ )
+
+ regional_sales = Asset(uri="file://incoming/sales/regional.csv",
name="regional_sales")
+
+ with DAG(
+ dag_id="aggregate_regional_sales",
+ schedule=PartitionedAssetTimetable(
+ assets=regional_sales,
+ default_partition_mapper=ProductMapper(IdentityMapper(),
DailyMapper()),
+ ),
+ ):
+ ...
+
+You can also override mappers for specific upstream assets with
+``partition_mapper_config``:
+
+.. code-block:: python
+
+ from airflow.sdk import Asset, DAG, DailyMapper, IdentityMapper,
PartitionedAssetTimetable
+
+ hourly_sales = Asset(uri="file://incoming/sales/hourly.csv",
name="hourly_sales")
+ daily_targets = Asset(uri="file://incoming/sales/targets.csv",
name="daily_targets")
+
+ with DAG(
+ dag_id="join_sales_and_targets",
+ schedule=PartitionedAssetTimetable(
+ assets=hourly_sales & daily_targets,
+ # Default behavior: map timestamp-like keys to daily keys.
+ default_partition_mapper=DailyMapper(),
+ # Override for assets that already emit daily partition keys.
+ partition_mapper_config={
+ daily_targets: IdentityMapper(),
+ },
+ ),
+ ):
+ ...
+
+If transformed partition keys from all required upstream assets do not align,
+the downstream Dag will not be triggered for that partition.
+
+The same applies when a mapper cannot transform a key. For example, if an
+upstream event has ``partition_key="random-text"`` and the downstream mapping
+uses ``DailyMapper`` (which expects a timestamp-like key), no downstream
+partition match can be produced, so the downstream Dag is not triggered for
+that key.
+
+Inside partitioned Dag runs, access the resolved partition through
+``dag_run.partition_key``.
+
+You can also trigger a DagRun manually with a partition key (for example,
+through the Trigger Dag window in the UI, or through the REST API by
+including ``partition_key`` in the request body):
+
+.. code-block:: bash
+
+ curl -X POST
"http://<airflow-host>/api/v2/dags/aggregate_regional_sales/dagRuns" \
+ -H "Content-Type: application/json" \
+ -d '{
+ "logical_date": "2026-03-10T00:00:00Z",
+ "partition_key": "us|2026-03-10T09:00:00"
+ }'
+
+For complete runnable examples, see
+``airflow-core/src/airflow/example_dags/example_asset_partition.py``.