Lee-W commented on code in PR #64571:
URL: https://github.com/apache/airflow/pull/64571#discussion_r3323000572


##########
task-sdk/src/airflow/sdk/definitions/partition_mappers/base.py:
##########
@@ -16,10 +16,34 @@
 # under the License.
 from __future__ import annotations
 
+from typing import TYPE_CHECKING, ClassVar
+
+if TYPE_CHECKING:
+    from airflow.sdk.definitions.partition_mappers.window import Window
+
 
 class PartitionMapper:
     """
     Base partition mapper class.
 
-    Maps keys from asset events to target dag run partitions.
+    Maps keys from asset events to target Dag run partitions.
+    """
+
+    is_rollup: ClassVar[bool] = False
+
+
+class RollupMapper(PartitionMapper):
+    """
+    Partition mapper that rolls up many upstream keys into one downstream key.
+
+    Compose a ``upstream_mapper`` (which normalizes each upstream key to the
+    downstream granularity) with a ``window`` that declares the full set of
+    upstream keys required for a given downstream key. The scheduler holds
+    the Dag run until every upstream key in the window has arrived.
     """
+
+    is_rollup: ClassVar[bool] = True
+
+    def __init__(self, *, upstream_mapper: PartitionMapper, window: Window) -> 
None:
+        self.upstream_mapper = upstream_mapper
+        self.window = window

Review Comment:
   SDK side declares `expected_decoded_type` on both `PartitionMapper` (default 
str, _BaseTemporalMapper overrides to datetime) and `Window` (default str, six 
temporal windows override to datetime). The SDK RollupMapper.__init__ mirrors 
the core check, so a mis-paired upstream/window raises TypeError at Dag parse.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to