dstandish commented on code in PR #64571:
URL: https://github.com/apache/airflow/pull/64571#discussion_r3343275160


##########
airflow-core/docs/authoring-and-scheduling/assets.rst:
##########
@@ -638,6 +638,87 @@ including ``partition_key`` in the request body):
         "partition_key": "us|2026-03-10T09:00:00"
       }'
 
+Rollup mappers
+~~~~~~~~~~~~~~
+
+.. versionadded:: 3.3.0
+
+The mappers shown above match upstream keys to a single downstream key 
one-for-one.
+For a coarser downstream period made up of many upstream events — an hourly 
upstream
+that drives a daily summary, daily inputs that compose a weekly report — use
+``RollupMapper``. ``RollupMapper`` composes an upstream mapper (which 
normalizes each
+upstream key to the downstream granularity) with a ``Window`` that declares 
the full
+set of upstream keys required for one downstream key. The scheduler holds the 
Dag
+run until every upstream key in the window has arrived; partial windows stay 
pending
+on the next-run-assets view so operators can see progress.
+
+The shipped windows are ``HourWindow`` (sixty minutes per hour), ``DayWindow``
+(twenty-four hours per day), ``WeekWindow`` (seven days per week), 
``MonthWindow``,
+``QuarterWindow``, and ``YearWindow``. Pair each window with an upstream 
mapper that
+decodes to the same temporal grain — for example ``StartOfHourMapper`` with
+``DayWindow``.
+
+The following hourly-to-daily example produces a daily summary once all 
twenty-four
+upstream hourly partitions for a calendar day have arrived:
+
+.. code-block:: python
+
+    from airflow.sdk import (
+        DAG,
+        Asset,
+        CronPartitionTimetable,
+        DayWindow,
+        PartitionedAssetTimetable,
+        RollupMapper,
+        StartOfHourMapper,
+        task,
+    )
+
+    hourly_sales = Asset(uri="file://incoming/sales/hourly.csv", 
name="hourly_sales")
+
+    # Producer: emits one partitioned event per hour (key looks like 
2026-03-10T09:00:00).
+    with DAG(
+        dag_id="ingest_hourly_sales",
+        schedule=CronPartitionTimetable("0 * * * *", timezone="UTC"),
+    ):
+
+        @task(outlets=[hourly_sales])
+        def ingest():
+            pass
+
+        ingest()
+
+    # Consumer: fires once a day's twenty-four hourly partitions are all in.
+    with DAG(
+        dag_id="daily_sales_summary",
+        schedule=PartitionedAssetTimetable(
+            assets=hourly_sales,
+            default_partition_mapper=RollupMapper(
+                upstream_mapper=StartOfHourMapper(),
+                window=DayWindow(),
+            ),
+        ),
+        catchup=False,
+    ):
+
+        @task
+        def summarize(dag_run=None):
+            # dag_run.partition_key is the day, e.g. "2026-03-10".
+            print(dag_run.partition_key)
+
+        summarize()
+
+A misconfigured ``RollupMapper`` — e.g. pairing an identity-decoding upstream 
mapper
+with a ``DayWindow`` — raises ``TypeError`` at Dag parse so the 
misconfiguration
+surfaces immediately instead of silently holding every downstream run forever.
+
+``DayWindow`` always enumerates twenty-four hourly steps. With an upstream 
mapper
+configured for a local timezone that observes daylight-saving time, the 
spring-forward
+day has only twenty-three real hours (one window member never has a matching 
event,
+so the run is held indefinitely) and the fall-back day has twenty-five (the 
repeated
+hour is dropped). Use a UTC-based upstream mapper for any rollup that crosses 
a DST
+boundary; see the ``DayWindow`` class docstring for the full discussion.

Review Comment:
   Doesn't that sound like a bug?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to