kaxil commented on code in PR #67285:
URL: https://github.com/apache/airflow/pull/67285#discussion_r3454649933
##########
airflow-core/src/airflow/assets/manager.py:
##########
@@ -643,9 +651,18 @@ def _queue_partitioned_dags(
)
continue
+ # The producer's partition_date (threaded in from its DagRun via
+ # register_asset_change) is carried onto the APDR only by mappers
that
+ # opt in. IdentityMapper does, since its key carries no temporal
meaning
+ # for the scheduler to re-derive at run creation; temporal and
composite
+ # mappers return None here and are resolved from the key by the
scheduler
+ # via PartitionMapper.to_partition_date.
+ target_partition_date: datetime | None =
mapper.carry_partition_date(partition_date)
Review Comment:
`mapper.carry_partition_date()` sits outside the `try` that wraps
`mapper.to_downstream()` just above, so a custom mapper whose
`carry_partition_date` override raises would propagate out and abort the whole
`register_asset_change` write, whereas a `to_downstream` failure in the same
loop is caught, logged, and `continue`d. The two stock mappers can't raise
(`IdentityMapper` returns the date, temporal/composite return `None`), so the
shipped path is safe. For parity with the loop's "catch every possible
exception" intent, worth moving this call inside the `try` and falling back to
`None` on failure.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]