dstandish commented on code in PR #58289:
URL: https://github.com/apache/airflow/pull/58289#discussion_r2549750970
##########
airflow-core/src/airflow/timetables/simple.py:
##########
@@ -217,3 +218,65 @@ def next_dagrun_info(
restriction: TimeRestriction,
) -> DagRunInfo | None:
return None
+
+
+class PartitionMapper:
+ """
+ Base partition mapper class.
+
+ Maps keys from asset events to target dag run partitions.
+ """
+
+ def map(self, key: str) -> str: ...
+
+ def inverse_map(self, key: str) -> Iterable[str]: ...
+
+
+class IdentityMapper(PartitionMapper):
+ """Partition mapper that does not change the key."""
+
+ def map(self, key: str) -> str:
+ return key
+
+ def inverse_map(self, key: str) -> Iterable[str]:
+ yield key
+
+
+class PartitionedAssetTimetable(AssetTriggeredTimetable):
+ """Asset-driven timetable that listens for partitioned assets."""
+
+ @property
+ def summary(self) -> str:
+ return "Partitioned Asset"
+
+ def __init__(self, assets: BaseAsset, partition_mapper: PartitionMapper)
-> None:
+ super().__init__(assets=assets)
+ self.asset_condition = assets
+ self.partition_mapper = partition_mapper
+
+ def serialize(self) -> dict[str, Any]:
+ from airflow.serialization.serialized_objects import
encode_asset_condition
+
+ return {
+ "asset_condition": encode_asset_condition(self.asset_condition),
+ "partition_mapper_cls": self.partition_mapper.__class__.__name__,
+ }
+
+ @classmethod
+ def deserialize(cls, data: dict[str, Any]) -> Timetable:
+ from airflow.serialization.serialized_objects import
decode_asset_condition
+
+ ser_asset_condition = data["asset_condition"]
+ mapper_class_name = data.get("partition_mapper_cls", None)
+ if not mapper_class_name:
+ mapper_class = IdentityMapper
+ else:
+ # todo: AIP-76 what is the right way to do this?
+ if mapper_class_name in globals():
+ mapper_class = globals()[mapper_class_name]
+ else:
+ mapper_class = import_string(mapper_class_name)
Review Comment:
Ok what I have done is update the serde logic to copy exactly what is done
for timetables minus the pluggability. so we should be able to add
pluggability in the future while maintaining backward compatibility. and i'll
create a task for this.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]