dstandish commented on code in PR #58289:
URL: https://github.com/apache/airflow/pull/58289#discussion_r2530004636
##########
airflow-core/src/airflow/timetables/simple.py:
##########
@@ -217,3 +218,65 @@ def next_dagrun_info(
restriction: TimeRestriction,
) -> DagRunInfo | None:
return None
+
+
+class PartitionMapper:
+ """
+ Base partition mapper class.
+
+ Maps keys from asset events to target dag run partitions.
+ """
+
+ def map(self, key: str) -> str: ...
+
+ def inverse_map(self, key: str) -> Iterable[str]: ...
+
+
+class IdentityMapper(PartitionMapper):
+ """Partition mapper that does not change the key."""
+
+ def map(self, key: str) -> str:
+ return key
+
+ def inverse_map(self, key: str) -> Iterable[str]:
+ yield key
+
+
+class PartitionedAssetTimetable(AssetTriggeredTimetable):
+ """Asset-driven timetable that listens for partitioned assets."""
+
+ @property
+ def summary(self) -> str:
+ return "Partitioned Asset"
+
+ def __init__(self, assets: BaseAsset, partition_mapper: PartitionMapper)
-> None:
+ super().__init__(assets=assets)
+ self.asset_condition = assets
+ self.partition_mapper = partition_mapper
+
+ def serialize(self) -> dict[str, Any]:
+ from airflow.serialization.serialized_objects import
encode_asset_condition
+
+ return {
+ "asset_condition": encode_asset_condition(self.asset_condition),
+ "partition_mapper_cls": self.partition_mapper.__class__.__name__,
+ }
+
+ @classmethod
+ def deserialize(cls, data: dict[str, Any]) -> Timetable:
+ from airflow.serialization.serialized_objects import
decode_asset_condition
+
+ ser_asset_condition = data["asset_condition"]
+ mapper_class_name = data.get("partition_mapper_cls", None)
+ if not mapper_class_name:
+ mapper_class = IdentityMapper
+ else:
+ # todo: AIP-76 what is the right way to do this?
+ if mapper_class_name in globals():
+ mapper_class = globals()[mapper_class_name]
+ else:
+ mapper_class = import_string(mapper_class_name)
Review Comment:
One thought occurs to me, we could add something like `encode_timetable` /
`decode_timetable` for _partition mappers_. But then, that begs the question,
do we need to add partition mappers as a distinct pluggable entity alongside
timetables? Maybe so. Appreciate your thoughts on this.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]