Lee-W commented on code in PR #68458:
URL: https://github.com/apache/airflow/pull/68458#discussion_r3426847388
##########
airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py:
##########
@@ -158,6 +158,11 @@ def trigger_dag_run(
status.HTTP_400_BAD_REQUEST,
detail={"reason": "not_partitioned", "message": str(e)},
)
+ except InvalidPartitionKeyError as e:
+ raise HTTPException(
+ status.HTTP_400_BAD_REQUEST,
+ detail={"reason": "invalid_partition_key", "message": str(e)},
+ ) from e
Review Comment:
sounds good. added `test_trigger_partitioned_dag_invalid_key_returns_400`
##########
airflow-core/src/airflow/timetables/simple.py:
##########
@@ -355,6 +356,36 @@ def partition_mapper_info(self) ->
list[PartitionMapperInfo]:
entries.append(PartitionMapperInfo(uri=s_asset_ref.uri,
is_rollup=mapper.is_rollup))
return entries
+ def _decode_partition_date(self, partition_key: str) -> datetime | None:
+ """
+ Decode *partition_key* into the period-start datetime shared by all
asset mappers.
+
+ Iterates every asset (and asset ref) reachable from the asset
condition, asks
+ each mapper for the temporal anchor of *partition_key*, and returns it
when all
+ temporal mappers agree. Returns ``None`` when no mapper is temporal or
when the
+ mappers disagree — consistent with how the scheduler resolves
``partition_date``
+ for asset-triggered runs.
+ """
+ anchors: set[datetime] = set()
+ for unique_key, _ in self.asset_condition.iter_assets():
+ mapper = self.get_partition_mapper(name=unique_key.name,
uri=unique_key.uri)
+ anchor = mapper.to_partition_date(partition_key)
+ if anchor is not None:
+ anchors.add(anchor)
+ for s_asset_ref in self.asset_condition.iter_asset_refs():
+ if isinstance(s_asset_ref, SerializedAssetNameRef):
+ mapper = self.get_partition_mapper(name=s_asset_ref.name)
+ elif isinstance(s_asset_ref, SerializedAssetUriRef):
+ mapper = self.get_partition_mapper(uri=s_asset_ref.uri)
+ else:
+ continue
+ anchor = mapper.to_partition_date(partition_key)
+ if anchor is not None:
+ anchors.add(anchor)
+ if len(anchors) == 1:
+ return anchors.pop()
Review Comment:
Thanks! Now wrap `to_partition_date()` in a `try/except ValueError` and
re-raise as `InvalidPartitionKeyError`, so a malformed key surfaces as a 400
instead of a 500.
##########
airflow-core/src/airflow/timetables/simple.py:
##########
@@ -355,6 +356,36 @@ def partition_mapper_info(self) ->
list[PartitionMapperInfo]:
entries.append(PartitionMapperInfo(uri=s_asset_ref.uri,
is_rollup=mapper.is_rollup))
return entries
+ def _decode_partition_date(self, partition_key: str) -> datetime | None:
Review Comment:
sounds good. added
`test_decode_partition_date_returns_period_start_for_valid_key`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]