Lee-W commented on code in PR #67285:
URL: https://github.com/apache/airflow/pull/67285#discussion_r3387236293
##########
airflow-core/docs/authoring-and-scheduling/assets.rst:
##########
@@ -623,7 +623,17 @@ partition match can be produced, so the downstream Dag is
not triggered for
that key.
Inside partitioned Dag runs, access the resolved partition through
-``dag_run.partition_key``.
+``dag_run.partition_key``. When the consumer's partition mapper is
+``IdentityMapper`` or one of the ``StartOf*Mapper`` family, the
+underlying ``datetime`` is also available as ``dag_run.partition_date``,
+so templates can use ``{{ partition_date | ds }}``. Other mappers
+(``ProductMapper``, ``ChainMapper``, ``AllowedKeyMapper``, custom
+mappers) leave ``partition_date`` ``None`` even when the resulting key
+is date-shaped, so those consumers should keep parsing
+``partition_key``. ``RollupMapper`` also leaves ``partition_date``
+``None`` — a rollup collapses many upstream partitions, each with its
Review Comment:
I think RollupMapper should have partition_date in some cases. But I have
another open PR handling it
##########
airflow-core/src/airflow/assets/manager.py:
##########
@@ -63,6 +66,47 @@
log = structlog.get_logger(__name__)
+def _compute_target_partition_date(
+ *,
+ mapper: PartitionMapper,
+ source_partition_key: str,
+ source_partition_date: datetime | None,
+) -> datetime | None:
+ """
+ Derive the consumer's ``partition_date`` from the partition mapper.
+
+ Computed once at APDR creation and stored on the row, so the consumer
+ DagRun's ``partition_date`` is locked to the mapper output at the time
+ the source event was queued — later mapper code or config changes do
+ not retroactively shift the date.
+
+ - ``RollupMapper``: ``None``. A rollup collapses many upstream partitions
+ (each with its own date) into one downstream partition, so no single
+ source ``partition_date`` describes the rolled-up window.
+ - ``IdentityMapper``: passes the source ``partition_date`` through.
+ - ``_BaseTemporalMapper`` subclasses (``StartOf*Mapper``): re-parse the
+ source key with the mapper's ``input_format`` and apply ``normalize``.
+ - All other mappers: ``None``.
+ """
+ from airflow.partition_mappers.base import is_rollup
Review Comment:
Let's move these imports to the top
##########
airflow-core/src/airflow/assets/manager.py:
##########
@@ -63,6 +66,47 @@
log = structlog.get_logger(__name__)
+def _compute_target_partition_date(
+ *,
+ mapper: PartitionMapper,
+ source_partition_key: str,
+ source_partition_date: datetime | None,
+) -> datetime | None:
+ """
+ Derive the consumer's ``partition_date`` from the partition mapper.
+
+ Computed once at APDR creation and stored on the row, so the consumer
+ DagRun's ``partition_date`` is locked to the mapper output at the time
+ the source event was queued — later mapper code or config changes do
+ not retroactively shift the date.
+
+ - ``RollupMapper``: ``None``. A rollup collapses many upstream partitions
+ (each with its own date) into one downstream partition, so no single
+ source ``partition_date`` describes the rolled-up window.
+ - ``IdentityMapper``: passes the source ``partition_date`` through.
+ - ``_BaseTemporalMapper`` subclasses (``StartOf*Mapper``): re-parse the
+ source key with the mapper's ``input_format`` and apply ``normalize``.
+ - All other mappers: ``None``.
+ """
+ from airflow.partition_mappers.base import is_rollup
+ from airflow.partition_mappers.identity import IdentityMapper
+ from airflow.partition_mappers.temporal import _BaseTemporalMapper
+
+ if is_rollup(mapper):
Review Comment:
I'm now working on another PR to avoid this type switch. #68266 adds a
polymorphic `PartitionMapper.to_partition_date(key)` where composites delegate
(`RollupMapper → upstream_mapper`, `FanOut`, `Chain`) and temporal mappers
return the anchor. Adopting it here would support rollup/fan-out/chain, and
remove the `isinstance`/`is_rollup` branching.
Keep from this PR (not in #68266)
* IdentityMapper passthrough (its key can't yield a date, so the threaded
source date is needed)
* theexposure layer — `Context["partition_date"]`
* execution-API field + Cadwyn versioning
* `DAGRunResponse`, docs. #68266 only stamps `DagRun.partition_date` in the
scheduler.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]