This is an automated email from the ASF dual-hosted git repository.

Lee-W pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new e6fc8734077 fix(scheduler): populate partition_date for temporal asset 
partitions (#68266)
e6fc8734077 is described below

commit e6fc873407711f5e254e35c81b51d82fee94f765
Author: Wei Lee <[email protected]>
AuthorDate: Wed Jun 10 22:31:43 2026 +0800

    fix(scheduler): populate partition_date for temporal asset partitions 
(#68266)
---
 airflow-core/newsfragments/68266.bugfix.rst        |   1 +
 .../src/airflow/jobs/scheduler_job_runner.py       |  79 +++++++++-
 airflow-core/src/airflow/partition_mappers/base.py |  19 +++
 .../src/airflow/partition_mappers/chain.py         |   9 +-
 .../src/airflow/partition_mappers/temporal.py      |  12 ++
 airflow-core/tests/unit/jobs/test_scheduler_job.py | 167 ++++++++++++++++++++-
 .../tests/unit/partition_mappers/test_chain.py     |  18 +++
 .../tests/unit/partition_mappers/test_temporal.py  |  95 ++++++++++++
 8 files changed, 392 insertions(+), 8 deletions(-)

diff --git a/airflow-core/newsfragments/68266.bugfix.rst 
b/airflow-core/newsfragments/68266.bugfix.rst
new file mode 100644
index 00000000000..0d41b654186
--- /dev/null
+++ b/airflow-core/newsfragments/68266.bugfix.rst
@@ -0,0 +1 @@
+Asset-triggered partitioned Dag runs now set ``partition_date`` when the 
consumer's partition mapper is temporal (directly, or wrapped in 
``RollupMapper`` / ``FanOutMapper`` / ``ChainMapper``). Non-temporal mappers 
leave ``partition_date`` unset.
diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py 
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index a6496665e8b..a860810b746 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -113,8 +113,8 @@ from airflow.partition_mappers.base import is_rollup
 from airflow.serialization.definitions.assets import SerializedAssetUniqueKey
 from airflow.serialization.definitions.notset import NOTSET
 from airflow.ti_deps.dependencies_states import ACTIVE_STATES, EXECUTION_STATES
-from airflow.timetables.base import compute_rollup_fingerprint
-from airflow.timetables.simple import AssetTriggeredTimetable, 
PartitionedAssetTimetable
+from airflow.timetables.base import Timetable, compute_rollup_fingerprint
+from airflow.timetables.simple import AssetTriggeredTimetable
 from airflow.triggers.base import TriggerEvent
 from airflow.utils.event_scheduler import EventScheduler
 from airflow.utils.log.logging_mixin import LoggingMixin
@@ -1916,7 +1916,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
         name: str,
         uri: str,
         apdr: AssetPartitionDagRun,
-        timetable: PartitionedAssetTimetable,
+        timetable: Timetable,
         actual_by_asset: dict[int, set[str]],
     ) -> bool:
         """
@@ -1953,6 +1953,68 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
             )
             return False
 
+    def _resolve_partition_date(
+        self,
+        *,
+        timetable: Timetable,
+        asset_infos: Iterable[tuple[str, str]],
+        partition_key: str,
+        dag_id: str,
+    ) -> datetime | None:
+        """
+        Return the temporal anchor (period-start datetime) for *partition_key*.
+
+        Resolves the temporal anchor (period-start datetime) for 
*partition_key*
+        across *asset_infos* — the ``(name, uri)`` pairs of the upstream assets
+        that contributed to it. Each upstream mapper resolves the key via
+        
:meth:`~airflow.partition_mappers.base.PartitionMapper.to_partition_date`:
+        temporal mappers decode the key, composite mappers delegate to their
+        child, and non-temporal mappers (e.g.
+        :class:`~airflow.partition_mappers.identity.IdentityMapper`) return 
``None``.
+
+        A partitioned consumer has a single partition identity, so every 
temporal
+        mapper feeding it must resolve the same key to the same instant. 
Anchors
+        are compared by instant (timezone-aware), so equivalent moments 
collapse
+        to one. When the temporal mappers agree, that anchor is returned; when
+        they disagree — a misconfiguration, e.g. assets mapping the same key 
under
+        different timezones — ``partition_date`` is left unset and a warning is
+        logged rather than silently picking one by scan order. Returns 
``None`` if
+        no mapper is temporal.
+
+        A failure in any mapper aborts the whole resolution and returns 
``None``
+        (logged) — anchors accumulated from earlier mappers are discarded 
rather
+        than used as a partial result, since a partial set could hide a 
conflict.
+        A broken mapper must not crash the scheduler tick.
+        """
+        anchors: set[datetime] = set()
+        try:
+            for name, uri in asset_infos:
+                mapper = timetable.get_partition_mapper(name=name, uri=uri)
+                anchor = mapper.to_partition_date(partition_key)
+                if anchor is not None:
+                    anchors.add(anchor)
+        except Exception:
+            self.log.exception(
+                "Failed to resolve partition_date for asset-triggered Dag run; 
partition_date will be None.",
+                dag_id=dag_id,
+                partition_key=partition_key,
+            )
+            return None
+
+        if not anchors:
+            return None
+        if len(anchors) > 1:
+            self.log.warning(
+                "Upstream partition mappers resolved conflicting 
partition_date values for the same "
+                "key; leaving partition_date unset. The consumer's assets 
likely use inconsistent "
+                "partition mappers.",
+                dag_id=dag_id,
+                partition_key=partition_key,
+                partition_dates=sorted(anchor.isoformat() for anchor in 
anchors),
+            )
+            return None
+        return anchors.pop()
+
     def _create_dagruns_for_partitioned_asset_dags(self, session: Session) -> 
set[str]:
         """
         Create Dag runs for pending :class:`AssetPartitionDagRun` rows whose 
partition is satisfied.
@@ -2119,8 +2181,6 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
             for asset_id, (name, uri) in asset_info_per_apdr[apdr.id].items():
                 key = SerializedAssetUniqueKey(name=name, uri=uri)
                 if timetable.partitioned:
-                    if TYPE_CHECKING:
-                        assert isinstance(timetable, PartitionedAssetTimetable)
                     statuses[key] = self._resolve_asset_partition_status(
                         session=session,
                         asset_id=asset_id,
@@ -2137,6 +2197,14 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
 
             partition_dag_ids.add(apdr.target_dag_id)
             run_after = timezone.utcnow()
+            partition_date: datetime | None = None
+            if timetable.partitioned:
+                partition_date = self._resolve_partition_date(
+                    timetable=timetable,
+                    asset_infos=asset_info_per_apdr[apdr.id].values(),
+                    partition_key=apdr.partition_key,
+                    dag_id=apdr.target_dag_id,
+                )
             dag_run = dag.create_dagrun(
                 run_id=DagRun.generate_run_id(
                     run_type=DagRunType.ASSET_TRIGGERED, logical_date=None, 
run_after=run_after
@@ -2144,6 +2212,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
                 logical_date=None,
                 data_interval=None,
                 partition_key=apdr.partition_key,
+                partition_date=partition_date,
                 run_after=run_after,
                 run_type=DagRunType.ASSET_TRIGGERED,
                 triggered_by=DagRunTriggeredByType.ASSET,
diff --git a/airflow-core/src/airflow/partition_mappers/base.py 
b/airflow-core/src/airflow/partition_mappers/base.py
index ba4a3ee658e..ee70842cdcf 100644
--- a/airflow-core/src/airflow/partition_mappers/base.py
+++ b/airflow-core/src/airflow/partition_mappers/base.py
@@ -22,6 +22,7 @@ from typing import TYPE_CHECKING, Any, ClassVar, TypeGuard
 
 if TYPE_CHECKING:
     from collections.abc import Iterable
+    from datetime import datetime
 
     from airflow.partition_mappers.window import Window
 
@@ -92,6 +93,19 @@ class PartitionMapper(ABC):
         """
         return decoded
 
+    def to_partition_date(self, downstream_key: str) -> datetime | None:
+        """
+        Return the temporal anchor (period-start datetime) for 
*downstream_key*.
+
+        The scheduler stamps this on the asset-triggered Dag run as its
+        ``partition_date``. The base implementation returns ``None`` — a plain
+        partition key carries no temporal meaning. Temporal mappers override to
+        decode the key into its window anchor; composite mappers
+        (:class:`RollupMapper`, 
:class:`~airflow.partition_mappers.temporal.FanOutMapper`)
+        delegate to whichever child owns the downstream key's identity.
+        """
+        return None
+
     def serialize(self) -> dict[str, Any]:
         return {}
 
@@ -138,6 +152,11 @@ class RollupMapper(PartitionMapper):
             for expected_upstream in self.window.to_upstream(decoded)
         )
 
+    def to_partition_date(self, downstream_key: str) -> datetime | None:
+        # The downstream key is in upstream_mapper's format (to_downstream 
delegates
+        # to it), so the anchor is the upstream_mapper's to resolve.
+        return self.upstream_mapper.to_partition_date(downstream_key)
+
     def serialize(self) -> dict[str, Any]:
         from airflow.serialization.encoders import encode_partition_mapper, 
encode_window
 
diff --git a/airflow-core/src/airflow/partition_mappers/chain.py 
b/airflow-core/src/airflow/partition_mappers/chain.py
index 850517482cf..a4a2110c256 100644
--- a/airflow-core/src/airflow/partition_mappers/chain.py
+++ b/airflow-core/src/airflow/partition_mappers/chain.py
@@ -18,10 +18,13 @@
 from __future__ import annotations
 
 from collections.abc import Iterable
-from typing import Any
+from typing import TYPE_CHECKING, Any
 
 from airflow.partition_mappers.base import PartitionMapper
 
+if TYPE_CHECKING:
+    from datetime import datetime
+
 
 class ChainMapper(PartitionMapper):
     """Partition mapper that applies multiple mappers sequentially."""
@@ -60,6 +63,10 @@ class ChainMapper(PartitionMapper):
             keys = next_keys
         return keys[0] if len(keys) == 1 else keys
 
+    def to_partition_date(self, downstream_key: str) -> datetime | None:
+        # The last mapper in the chain formats the final downstream key, so it 
owns the anchor.
+        return self.mappers[-1].to_partition_date(downstream_key)
+
     def serialize(self) -> dict[str, Any]:
         from airflow.serialization.encoders import encode_partition_mapper
 
diff --git a/airflow-core/src/airflow/partition_mappers/temporal.py 
b/airflow-core/src/airflow/partition_mappers/temporal.py
index 4fc1ed45ed3..622f0c6a556 100644
--- a/airflow-core/src/airflow/partition_mappers/temporal.py
+++ b/airflow-core/src/airflow/partition_mappers/temporal.py
@@ -198,6 +198,14 @@ class _BaseTemporalMapper(PartitionMapper):
         """
         return datetime.strptime(downstream_key, self.output_format)
 
+    def to_partition_date(self, downstream_key: str) -> datetime:
+        anchor = self.normalize(self.decode_downstream(downstream_key))
+        # decode_downstream returns a naive datetime; localise it with the 
mapper's
+        # own timezone, mirroring to_downstream, so the stored instant is 
correct.
+        if anchor.tzinfo is None:
+            anchor = make_aware(anchor, self._timezone)
+        return anchor
+
     def encode_upstream(self, dt: datetime) -> str:
         """
         Format *dt* as an upstream partition key string.
@@ -522,6 +530,10 @@ class FanOutMapper(PartitionMapper):
         coarse = self.upstream_mapper.decode_downstream(formatted)
         return [_format_with(self.downstream_mapper, item) for item in 
self.window.to_upstream(coarse)]
 
+    def to_partition_date(self, downstream_key: str) -> datetime | None:
+        # Fan-out keys are formatted by downstream_mapper, so it owns the 
anchor.
+        return self.downstream_mapper.to_partition_date(downstream_key)
+
     def serialize(self) -> dict[str, Any]:
         from airflow.serialization.encoders import encode_partition_mapper, 
encode_window
 
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py 
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index d08628a51fb..6b9c6e34309 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -93,8 +93,18 @@ from airflow.partition_mappers.base import (
     PartitionMapper as CorePartitionMapper,
     RollupMapper as CoreRollupMapper,
 )
-from airflow.partition_mappers.temporal import StartOfHourMapper as 
CoreStartOfHourMapper
-from airflow.partition_mappers.window import DayWindow as CoreDayWindow, 
HourWindow as CoreHourWindow
+from airflow.partition_mappers.identity import IdentityMapper as 
CoreIdentityMapper
+from airflow.partition_mappers.temporal import (
+    FanOutMapper as CoreFanOutMapper,
+    StartOfDayMapper as CoreStartOfDayMapper,
+    StartOfHourMapper as CoreStartOfHourMapper,
+    StartOfWeekMapper as CoreStartOfWeekMapper,
+)
+from airflow.partition_mappers.window import (
+    DayWindow as CoreDayWindow,
+    HourWindow as CoreHourWindow,
+    WeekWindow as CoreWeekWindow,
+)
 from airflow.providers.standard.operators.bash import BashOperator
 from airflow.providers.standard.operators.empty import EmptyOperator
 from airflow.providers.standard.triggers.file import FileDeleteTrigger
@@ -109,6 +119,7 @@ from airflow.sdk import (
     IdentityMapper,
     RollupMapper,
     SegmentWindow,
+    StartOfDayMapper,
     StartOfHourMapper,
     task,
 )
@@ -11712,3 +11723,155 @@ class TestReapStaleConnectionTests:
         session.expire_all()
         assert session.get(ConnectionTestRequest, ct_success.id).state == 
ConnectionTestState.SUCCESS
         assert session.get(ConnectionTestRequest, ct_failed.id).state == 
ConnectionTestState.FAILED
+
+
[email protected]_serialized_dag
[email protected]("clear_asset_partition_rows")
[email protected](
+    ("sdk_mapper", "upstream_partition_key", "expected_downstream_key", 
"expected_partition_date"),
+    [
+        (
+            StartOfDayMapper(),
+            "2024-03-15T10:30:00",
+            "2024-03-15",
+            datetime.datetime(2024, 3, 15, 0, 0, 0, 
tzinfo=datetime.timezone.utc),
+        ),
+        (
+            RollupMapper(upstream_mapper=StartOfHourMapper(), 
window=HourWindow()),
+            "2024-01-01T00:00:00",
+            "2024-01-01T00",
+            datetime.datetime(2024, 1, 1, 0, 0, 0, 
tzinfo=datetime.timezone.utc),
+        ),
+        (
+            IdentityMapper(),
+            "key-abc",
+            "key-abc",
+            None,
+        ),
+    ],
+)
+def test_partition_date_populated_on_dagrun(
+    dag_maker: DagMaker,
+    session: Session,
+    sdk_mapper,
+    upstream_partition_key,
+    expected_downstream_key,
+    expected_partition_date,
+):
+    """DagRun.partition_date is set correctly for temporal / 
rollup-of-temporal mappers."""
+    asset_1 = Asset(name="asset-pd-test")
+
+    with dag_maker(
+        dag_id="partition-date-consumer",
+        schedule=PartitionedAssetTimetable(
+            assets=asset_1,
+            default_partition_mapper=sdk_mapper,
+        ),
+        session=session,
+    ):
+        EmptyOperator(task_id="hi")
+    session.commit()
+
+    runner = SchedulerJobRunner(
+        job=Job(job_type=SchedulerJobRunner.job_type), 
executors=[MockExecutor(do_update=False)]
+    )
+
+    apdr = _produce_and_register_asset_event(
+        dag_id="partition-date-producer",
+        asset=asset_1,
+        partition_key=upstream_partition_key,
+        session=session,
+        dag_maker=dag_maker,
+        expected_partition_key=expected_downstream_key,
+    )
+
+    # For the rollup case, send all 60 minute keys so the window is complete.
+    if isinstance(sdk_mapper, RollupMapper):
+        for minute in range(1, 60):
+            _produce_and_register_asset_event(
+                dag_id=f"partition-date-producer-{minute}",
+                asset=asset_1,
+                partition_key=f"2024-01-01T00:{minute:02d}:00",
+                session=session,
+                dag_maker=dag_maker,
+                expected_partition_key=expected_downstream_key,
+            )
+
+    runner._create_dagruns_for_partitioned_asset_dags(session=session)
+    session.refresh(apdr)
+
+    assert apdr.created_dag_run_id is not None
+    dag_run = session.scalar(select(DagRun).where(DagRun.id == 
apdr.created_dag_run_id))
+    assert dag_run is not None
+    assert dag_run.partition_date == expected_partition_date
+
+
+def _make_runner() -> SchedulerJobRunner:
+    return SchedulerJobRunner(
+        job=Job(job_type=SchedulerJobRunner.job_type), 
executors=[MockExecutor(do_update=False)]
+    )
+
+
[email protected](
+    ("mappers", "partition_key", "expected"),
+    [
+        # Non-temporal mapper → no anchor.
+        pytest.param([CoreIdentityMapper()], "some-key", None, 
id="non-temporal-none"),
+        # StartOfDayMapper(NY): "2024-03-15" → NY midnight = 04:00 UTC (EDT, 
DST since 2024-03-10),
+        # localised with the mapper's own timezone rather than the global 
default.
+        pytest.param(
+            [CoreStartOfDayMapper(timezone="America/New_York")],
+            "2024-03-15",
+            datetime.datetime(2024, 3, 15, 4, 0, 0, 
tzinfo=datetime.timezone.utc),
+            id="non-utc-uses-mapper-timezone",
+        ),
+        # Key cannot be decoded by the mapper's format → caught → None (no 
raise).
+        pytest.param([CoreStartOfDayMapper()], "not-a-date", None, 
id="decode-failure-none"),
+        # FanOutMapper unwraps to its downstream_mapper (daily), which owns 
the per-day key.
+        pytest.param(
+            [CoreFanOutMapper(upstream_mapper=CoreStartOfWeekMapper(), 
window=CoreWeekWindow())],
+            "2024-01-16",
+            datetime.datetime(2024, 1, 16, 0, 0, 0, 
tzinfo=datetime.timezone.utc),
+            id="fanout-uses-downstream-mapper",
+        ),
+        # Two temporal mappers resolving the same instant → that single anchor.
+        pytest.param(
+            [CoreStartOfDayMapper(), CoreStartOfDayMapper()],
+            "2024-03-15",
+            datetime.datetime(2024, 3, 15, 0, 0, 0, 
tzinfo=datetime.timezone.utc),
+            id="agreeing-mappers-anchor",
+        ),
+        # Same key, UTC midnight (00:00Z) vs NY midnight (04:00Z) — distinct 
instants → None.
+        pytest.param(
+            [CoreStartOfDayMapper(timezone="UTC"), 
CoreStartOfDayMapper(timezone="America/New_York")],
+            "2024-03-15",
+            None,
+            id="conflicting-mappers-none",
+        ),
+        # Second mapper (hour format) raises on the day key → whole resolution 
aborts → None
+        # (the first mapper's anchor is discarded; all-or-nothing).
+        pytest.param(
+            [CoreStartOfDayMapper(), CoreStartOfHourMapper()],
+            "2024-03-15",
+            None,
+            id="one-failing-mapper-aborts",
+        ),
+    ],
+)
+def test_resolve_partition_date(mappers, partition_key, expected):
+    """_resolve_partition_date over mapper compositions: temporal / fan-out / 
agree / conflict / failure.
+
+    The mappers are consumed one per upstream asset, so ``asset_infos`` is 
sized to ``mappers``.
+    """
+    runner = _make_runner()
+    timetable = mock.MagicMock()
+    timetable.get_partition_mapper.side_effect = mappers
+    asset_infos = [(f"asset-{i}-name", f"asset-{i}-uri") for i in 
range(len(mappers))]
+
+    result = runner._resolve_partition_date(
+        timetable=timetable,
+        asset_infos=asset_infos,
+        partition_key=partition_key,
+        dag_id="test-dag",
+    )
+    assert result == expected
diff --git a/airflow-core/tests/unit/partition_mappers/test_chain.py 
b/airflow-core/tests/unit/partition_mappers/test_chain.py
index 79f888af75f..528a176b1e0 100644
--- a/airflow-core/tests/unit/partition_mappers/test_chain.py
+++ b/airflow-core/tests/unit/partition_mappers/test_chain.py
@@ -17,6 +17,8 @@
 
 from __future__ import annotations
 
+from datetime import datetime, timezone
+
 import pytest
 
 from airflow.partition_mappers.base import PartitionMapper
@@ -40,6 +42,22 @@ class TestChainMapper:
         sm = ChainMapper(StartOfHourMapper(), 
StartOfDayMapper(input_format="%Y-%m-%dT%H"))
         assert sm.to_downstream("2024-01-15T10:30:00") == "2024-01-15"
 
+    @pytest.mark.parametrize(
+        ("chain", "downstream_key", "expected"),
+        [
+            # Last mapper temporal → it owns the final downstream key, so it 
owns the anchor.
+            (
+                ChainMapper(IdentityMapper(), StartOfDayMapper()),
+                "2024-03-15",
+                datetime(2024, 3, 15, 0, 0, 0, tzinfo=timezone.utc),
+            ),
+            # Last mapper non-temporal → no anchor.
+            (ChainMapper(StartOfDayMapper(), IdentityMapper()), "anything", 
None),
+        ],
+    )
+    def test_to_partition_date_delegates_to_last_mapper(self, chain, 
downstream_key, expected):
+        assert chain.to_partition_date(downstream_key) == expected
+
     def test_to_downstream_invalid_non_iterable_return(self):
         sm = ChainMapper(IdentityMapper(), _InvalidReturnMapper())
         with pytest.raises(TypeError, match="must return a string or iterable 
of strings"):
diff --git a/airflow-core/tests/unit/partition_mappers/test_temporal.py 
b/airflow-core/tests/unit/partition_mappers/test_temporal.py
index 6ec84893506..ab2a79f52ce 100644
--- a/airflow-core/tests/unit/partition_mappers/test_temporal.py
+++ b/airflow-core/tests/unit/partition_mappers/test_temporal.py
@@ -16,11 +16,16 @@
 # under the License.
 from __future__ import annotations
 
+from datetime import datetime, timezone as dt_timezone
+
 import pendulum
 import pytest
 
 from airflow import sdk
+from airflow.partition_mappers.base import RollupMapper
+from airflow.partition_mappers.identity import IdentityMapper
 from airflow.partition_mappers.temporal import (
+    FanOutMapper,
     StartOfDayMapper,
     StartOfHourMapper,
     StartOfMonthMapper,
@@ -30,6 +35,7 @@ from airflow.partition_mappers.temporal import (
     _BaseTemporalMapper,
     _compile_output_format_regex,
 )
+from airflow.partition_mappers.window import HourWindow, WeekWindow
 from airflow.serialization.decoders import decode_partition_mapper
 from airflow.serialization.encoders import encode_partition_mapper
 
@@ -352,3 +358,92 @@ class TestOutputFormatValidation:
         assert match is not None
         assert match.group("first") == "foo"
         assert match.group("last") == "bar"
+
+
+class TestTemporalMapperDecodeNormalizeRoundTrip:
+    """
+    ``normalize(decode_downstream(to_downstream(dt)))`` must equal the anchor
+    produced by ``normalize(dt)`` for every ``_BaseTemporalMapper`` subclass.
+
+    This is the "Step 2" semantic guarantee: ``decode_downstream`` reconstructs
+    the period-start, and ``normalize`` is idempotent, so the composed call
+    used in ``_resolve_partition_date`` must not drift from the direct anchor.
+    """
+
+    SAMPLE_DT = datetime(2024, 3, 15, 10, 42, 35)
+
+    @pytest.mark.parametrize(
+        "mapper",
+        [
+            StartOfHourMapper(),
+            StartOfDayMapper(),
+            StartOfWeekMapper(),
+            StartOfMonthMapper(),
+            StartOfQuarterMapper(),
+            StartOfYearMapper(),
+        ],
+    )
+    def test_round_trip_anchor_is_stable(self, mapper: _BaseTemporalMapper):
+        """``normalize(decode_downstream(to_downstream(dt)))`` == 
``normalize(dt)``."""
+        downstream_key = 
mapper.to_downstream(self.SAMPLE_DT.strftime(mapper.input_format))
+        decoded = mapper.decode_downstream(downstream_key)
+        round_tripped = mapper.normalize(decoded)
+        direct_anchor = mapper.normalize(self.SAMPLE_DT)
+        assert round_tripped == direct_anchor, (
+            f"{type(mapper).__name__}: round-trip anchor {round_tripped!r} "
+            f"differs from direct anchor {direct_anchor!r}"
+        )
+
+    @pytest.mark.parametrize(
+        ("mapper", "expected_aware"),
+        [
+            # UTC mapper: UTC midnight stays at 00:00 UTC.
+            (
+                StartOfDayMapper(timezone="UTC"),
+                datetime(2024, 3, 15, 0, 0, 0, tzinfo=dt_timezone.utc),
+            ),
+            # Non-UTC mapper: NY midnight (EDT = UTC-4) → 04:00 UTC.
+            (
+                StartOfDayMapper(timezone="America/New_York"),
+                datetime(2024, 3, 15, 4, 0, 0, tzinfo=dt_timezone.utc),
+            ),
+        ],
+    )
+    def test_to_partition_date_uses_mapper_timezone(
+        self, mapper: _BaseTemporalMapper, expected_aware: datetime
+    ):
+        """``to_partition_date`` localises the anchor with 
``mapper._timezone``, not the global default."""
+        downstream_key = 
mapper.to_downstream(self.SAMPLE_DT.strftime(mapper.input_format))
+        aware = mapper.to_partition_date(downstream_key)
+        # Convert to UTC for a timezone-neutral comparison.
+        aware_utc = aware.astimezone(dt_timezone.utc)
+        assert aware_utc == expected_aware, (
+            f"{type(mapper).__name__} (tz={mapper._timezone}): "
+            f"to_partition_date produced {aware_utc!r}, expected 
{expected_aware!r}"
+        )
+
+
+class TestToPartitionDateDelegation:
+    """Composite mappers delegate ``to_partition_date`` to the child that owns 
the downstream key."""
+
+    @pytest.mark.parametrize(
+        ("mapper", "downstream_key", "expected"),
+        [
+            # RollupMapper (fan-in): downstream key is the upstream_mapper's 
format → it owns it.
+            (
+                RollupMapper(upstream_mapper=StartOfHourMapper(), 
window=HourWindow()),
+                "2024-01-01T00",
+                datetime(2024, 1, 1, 0, 0, 0, tzinfo=dt_timezone.utc),
+            ),
+            # FanOutMapper (fan-out): downstream keys are the 
downstream_mapper's format → it owns them.
+            (
+                FanOutMapper(upstream_mapper=StartOfWeekMapper(), 
window=WeekWindow()),
+                "2024-01-16",
+                datetime(2024, 1, 16, 0, 0, 0, tzinfo=dt_timezone.utc),
+            ),
+            # Non-temporal mapper → no anchor.
+            (IdentityMapper(), "anything", None),
+        ],
+    )
+    def test_to_partition_date(self, mapper, downstream_key, expected):
+        assert mapper.to_partition_date(downstream_key) == expected

Reply via email to