phanikumv commented on code in PR #68458:
URL: https://github.com/apache/airflow/pull/68458#discussion_r3425876719


##########
airflow-core/src/airflow/timetables/simple.py:
##########
@@ -355,6 +356,36 @@ def partition_mapper_info(self) -> 
list[PartitionMapperInfo]:
                 entries.append(PartitionMapperInfo(uri=s_asset_ref.uri, 
is_rollup=mapper.is_rollup))
         return entries
 
+    def _decode_partition_date(self, partition_key: str) -> datetime | None:
+        """
+        Decode *partition_key* into the period-start datetime shared by all 
asset mappers.
+
+        Iterates every asset (and asset ref) reachable from the asset 
condition, asks
+        each mapper for the temporal anchor of *partition_key*, and returns it 
when all
+        temporal mappers agree. Returns ``None`` when no mapper is temporal or 
when the
+        mappers disagree — consistent with how the scheduler resolves 
``partition_date``
+        for asset-triggered runs.
+        """
+        anchors: set[datetime] = set()
+        for unique_key, _ in self.asset_condition.iter_assets():
+            mapper = self.get_partition_mapper(name=unique_key.name, 
uri=unique_key.uri)
+            anchor = mapper.to_partition_date(partition_key)
+            if anchor is not None:
+                anchors.add(anchor)
+        for s_asset_ref in self.asset_condition.iter_asset_refs():
+            if isinstance(s_asset_ref, SerializedAssetNameRef):
+                mapper = self.get_partition_mapper(name=s_asset_ref.name)
+            elif isinstance(s_asset_ref, SerializedAssetUriRef):
+                mapper = self.get_partition_mapper(uri=s_asset_ref.uri)
+            else:
+                continue
+            anchor = mapper.to_partition_date(partition_key)
+            if anchor is not None:
+                anchors.add(anchor)
+        if len(anchors) == 1:
+            return anchors.pop()

Review Comment:
   This calls mapper.to_partition_date() unguarded, but temporal mappers do a 
raw
    strptime and raise plain ValueError on a bad key.
   
   Suggest to wrap this in a try except block.
   
   ```
   from airflow.exceptions import InvalidPartitionKeyError
   
   try:
       for....
   
                   anchors.add(anchor)
   except ValueError as exc:
        raise InvalidPartitionKeyError(
                     f"Partition key {partition_key!r} is invalid for this 
timetable's mappers: {exc}"
                 ) from exc
        return anchors.pop()
   return None
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to