Lee-W commented on code in PR #58289:
URL: https://github.com/apache/airflow/pull/58289#discussion_r2541556065


##########
airflow-core/src/airflow/timetables/simple.py:
##########
@@ -217,3 +218,65 @@ def next_dagrun_info(
         restriction: TimeRestriction,
     ) -> DagRunInfo | None:
         return None
+
+
+class PartitionMapper:
+    """
+    Base partition mapper class.
+
+    Maps keys from asset events to target dag run partitions.
+    """
+
+    def map(self, key: str) -> str: ...
+
+    def inverse_map(self, key: str) -> Iterable[str]: ...
+
+
+class IdentityMapper(PartitionMapper):
+    """Partition mapper that does not change the key."""
+
+    def map(self, key: str) -> str:
+        return key
+
+    def inverse_map(self, key: str) -> Iterable[str]:
+        yield key
+
+
+class PartitionedAssetTimetable(AssetTriggeredTimetable):
+    """Asset-driven timetable that listens for partitioned assets."""
+
+    @property
+    def summary(self) -> str:
+        return "Partitioned Asset"
+
+    def __init__(self, assets: BaseAsset, partition_mapper: PartitionMapper) 
-> None:
+        super().__init__(assets=assets)
+        self.asset_condition = assets
+        self.partition_mapper = partition_mapper
+
+    def serialize(self) -> dict[str, Any]:
+        from airflow.serialization.serialized_objects import 
encode_asset_condition
+
+        return {
+            "asset_condition": encode_asset_condition(self.asset_condition),
+            "partition_mapper_cls": self.partition_mapper.__class__.__name__,
+        }
+
+    @classmethod
+    def deserialize(cls, data: dict[str, Any]) -> Timetable:
+        from airflow.serialization.serialized_objects import 
decode_asset_condition
+
+        ser_asset_condition = data["asset_condition"]
+        mapper_class_name = data.get("partition_mapper_cls", None)
+        if not mapper_class_name:
+            mapper_class = IdentityMapper
+        else:
+            # todo: AIP-76 what is the right way to do this?
+            if mapper_class_name in globals():
+                mapper_class = globals()[mapper_class_name]
+            else:
+                mapper_class = import_string(mapper_class_name)

Review Comment:
   >  we could add something like encode_timetable / decode_timetable for 
partition mappers.
   
   I like this more
   
   > do we need to add partition mappers as a distinct pluggable entity 
alongside timetables? 
   
   Next phase, probably? to keep it as simple as possible for the first phase



##########
airflow-core/src/airflow/assets/manager.py:
##########
@@ -229,12 +242,39 @@ def notify_asset_alias_created(asset_assets: AssetAlias):
     def notify_asset_changed(asset: Asset):
         """Run applicable notification actions when an asset is changed."""
         try:
+            # todo: AIP-76 this will have to change. needs to know *what* 
happened to the asset (e.g. partition key)

Review Comment:
   ```suggestion
               # TODO: AIP-76 this will have to change. needs to know *what* 
happened to the asset (e.g. partition key)
   ```
   
   nit: AFAIK, many editors highlight upper case `TODO`



##########
airflow-core/src/airflow/timetables/simple.py:
##########
@@ -217,3 +218,65 @@ def next_dagrun_info(
         restriction: TimeRestriction,
     ) -> DagRunInfo | None:
         return None
+
+
+class PartitionMapper:
+    """
+    Base partition mapper class.
+
+    Maps keys from asset events to target dag run partitions.
+    """
+
+    def map(self, key: str) -> str: ...
+
+    def inverse_map(self, key: str) -> Iterable[str]: ...

Review Comment:
   IMO, it would be better 🤔 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to