nathadfield commented on code in PR #67285:
URL: https://github.com/apache/airflow/pull/67285#discussion_r3457956913
##########
airflow-core/src/airflow/assets/manager.py:
##########
@@ -643,9 +651,18 @@ def _queue_partitioned_dags(
)
continue
+ # The producer's partition_date (threaded in from its DagRun via
+ # register_asset_change) is carried onto the APDR only by mappers
that
+ # opt in. IdentityMapper does, since its key carries no temporal
meaning
+ # for the scheduler to re-derive at run creation; temporal and
composite
+ # mappers return None here and are resolved from the key by the
scheduler
+ # via PartitionMapper.to_partition_date.
+ target_partition_date: datetime | None =
mapper.carry_partition_date(partition_date)
Review Comment:
@kaxil makes sense, done. Wrapped the `carry_partition_date` call in a
`try/except` that logs and falls back to `None`, so a custom mapper raising
there degrades the same way a `to_downstream` failure does (consumer still
queued via `partition_key`, just no carried date) rather than aborting the
write. Added a test with a mapper that raises in `carry_partition_date`
asserting the APDR is still created with `partition_date=None`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]