nathadfield commented on code in PR #67285:
URL: https://github.com/apache/airflow/pull/67285#discussion_r3420263279


##########
airflow-core/src/airflow/assets/manager.py:
##########
@@ -639,9 +648,21 @@ def _queue_partitioned_dags(
                 )
                 continue
 
+            # Carry the producer's partition_date onto the APDR for 
IdentityMapper
+            # only: its key carries no temporal meaning, so the scheduler 
cannot
+            # re-derive the date at run creation and the producer's date 
(threaded
+            # in from its DagRun via register_asset_change) must be carried 
here.
+            # Temporal and composite mappers are resolved from the key by the
+            # scheduler via PartitionMapper.to_partition_date, so this is None 
for
+            # them.
+            target_partition_date: datetime | None = (
+                partition_date if isinstance(mapper, IdentityMapper) else None

Review Comment:
   @Lee-W yeah, I'd like to drop the `isinstance` too. The problem is 
`to_partition_date` only gets the downstream key, and what we're carrying for 
identity is the producer's source `partition_date`, which you can't get back 
from the key (something like `batch-42` tells you nothing about a date). It's 
also captured at queue time when the event arrives, whereas `to_partition_date` 
runs later at run creation, so it doesn't really fit in there.
   
   What does work is a small sibling method on the mapper, same idea as 
`to_partition_date`:
   
   ```python
   # PartitionMapper
   def carry_partition_date(self, source_partition_date: datetime | None) -> 
datetime | None:
       return None
   
   # IdentityMapper
   def carry_partition_date(self, source_partition_date):
       return source_partition_date  # consumer key == producer key
   ```
   
   Then the line becomes `target_partition_date = 
mapper.carry_partition_date(partition_date)`, no `isinstance`, and any future 
passthrough-style mapper can opt in. Sound ok?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to