This is an automated email from the ASF dual-hosted git repository.

rahulvats pushed a commit to branch v3-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-2-test by this push:
     new 196f7639e2d [v3-2-test] Make temporal mapper timezone aware (#62709) 
(#64467)
196f7639e2d is described below

commit 196f7639e2d76f85f36f18c4c3cbe40acee65d47
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon Mar 30 19:03:13 2026 +0530

    [v3-2-test] Make temporal mapper timezone aware (#62709) (#64467)
    
    (cherry picked from commit 774ad02f75557adeb9d9ef47019ec718ba7f1cd3)
    
    Co-authored-by: Wei Lee <[email protected]>
---
 .../src/airflow/partition_mappers/temporal.py      | 19 +++++++++-
 .../tests/unit/partition_mappers/test_temporal.py  | 41 ++++++++++++++++++++--
 2 files changed, 57 insertions(+), 3 deletions(-)

diff --git a/airflow-core/src/airflow/partition_mappers/temporal.py 
b/airflow-core/src/airflow/partition_mappers/temporal.py
index 9c86bace56b..49f4162a12e 100644
--- a/airflow-core/src/airflow/partition_mappers/temporal.py
+++ b/airflow-core/src/airflow/partition_mappers/temporal.py
@@ -18,10 +18,14 @@ from __future__ import annotations
 
 from abc import ABC, abstractmethod
 from datetime import datetime, timedelta
-from typing import Any
+from typing import TYPE_CHECKING, Any
 
+from airflow._shared.timezones.timezone import make_aware, parse_timezone
 from airflow.partition_mappers.base import PartitionMapper
 
+if TYPE_CHECKING:
+    from pendulum import FixedTimezone, Timezone
+
 
 class _BaseTemporalMapper(PartitionMapper, ABC):
     """Base class for Temporal Partition Mappers."""
@@ -30,14 +34,23 @@ class _BaseTemporalMapper(PartitionMapper, ABC):
 
     def __init__(
         self,
+        *,
+        timezone: str | Timezone | FixedTimezone = "UTC",
         input_format: str = "%Y-%m-%dT%H:%M:%S",
         output_format: str | None = None,
     ):
         self.input_format = input_format
         self.output_format = output_format or self.default_output_format
+        if isinstance(timezone, str):
+            timezone = parse_timezone(timezone)
+        self._timezone = timezone
 
     def to_downstream(self, key: str) -> str:
         dt = datetime.strptime(key, self.input_format)
+        if dt.tzinfo is None:
+            dt = make_aware(dt, self._timezone)
+        else:
+            dt = dt.astimezone(self._timezone)
         normalized = self.normalize(dt)
         return self.format(normalized)
 
@@ -50,7 +63,10 @@ class _BaseTemporalMapper(PartitionMapper, ABC):
         return dt.strftime(self.output_format)
 
     def serialize(self) -> dict[str, Any]:
+        from airflow.serialization.encoders import encode_timezone
+
         return {
+            "timezone": encode_timezone(self._timezone),
             "input_format": self.input_format,
             "output_format": self.output_format,
         }
@@ -58,6 +74,7 @@ class _BaseTemporalMapper(PartitionMapper, ABC):
     @classmethod
     def deserialize(cls, data: dict[str, Any]) -> PartitionMapper:
         return cls(
+            timezone=parse_timezone(data.get("timezone", "UTC")),
             input_format=data["input_format"],
             output_format=data["output_format"],
         )
diff --git a/airflow-core/tests/unit/partition_mappers/test_temporal.py 
b/airflow-core/tests/unit/partition_mappers/test_temporal.py
index 89ad98cebbc..c54ca8a51f9 100644
--- a/airflow-core/tests/unit/partition_mappers/test_temporal.py
+++ b/airflow-core/tests/unit/partition_mappers/test_temporal.py
@@ -49,6 +49,13 @@ class TestTemporalMappers:
         pm = mapper_cls()
         assert pm.to_downstream("2026-02-10T14:30:45") == 
expected_downstream_key
 
+    @pytest.mark.parametrize(
+        ("timezone", "expected_timezone"),
+        [
+            (None, "UTC"),
+            ("America/New_York", "America/New_York"),
+        ],
+    )
     @pytest.mark.parametrize(
         ("mapper_cls", "expected_outut_format"),
         [
@@ -60,9 +67,16 @@ class TestTemporalMappers:
             (StartOfYearMapper, "%Y"),
         ],
     )
-    def test_serialize(self, mapper_cls: type[_BaseTemporalMapper], 
expected_outut_format: str):
-        pm = mapper_cls()
+    def test_serialize(
+        self,
+        mapper_cls: type[_BaseTemporalMapper],
+        expected_outut_format: str,
+        timezone: str | None,
+        expected_timezone: str,
+    ):
+        pm = mapper_cls() if timezone is None else 
mapper_cls(timezone=timezone)
         assert pm.serialize() == {
+            "timezone": expected_timezone,
             "input_format": "%Y-%m-%dT%H:%M:%S",
             "output_format": expected_outut_format,
         }
@@ -81,6 +95,7 @@ class TestTemporalMappers:
     def test_deserialize(self, mapper_cls):
         pm = mapper_cls.deserialize(
             {
+                "timezone": "UTC",
                 "input_format": "%Y-%m-%dT%H:%M:%S",
                 "output_format": "customized-format",
             }
@@ -88,3 +103,25 @@ class TestTemporalMappers:
         assert isinstance(pm, mapper_cls)
         assert pm.input_format == "%Y-%m-%dT%H:%M:%S"
         assert pm.output_format == "customized-format"
+
+    def test_to_downstream_timezone_aware(self):
+        """Input is interpreted as local time in the given timezone."""
+        pm = StartOfDayMapper(timezone="America/New_York")
+        # 2026-02-10T23:00:00 in New York local time → start-of-day is 
2026-02-10
+        assert pm.to_downstream("2026-02-10T23:00:00") == "2026-02-10"
+        # 2026-02-11T01:00:00 in New York local time → start-of-day is 
2026-02-11
+        assert pm.to_downstream("2026-02-11T01:00:00") == "2026-02-11"
+
+    def test_to_downstream_input_timezone_differs_from_mapper_timezone(self):
+        """When input_format includes %z and the parsed tz differs from the 
mapper tz,
+        the key is converted to the mapper timezone before normalization."""
+        pm = StartOfDayMapper(
+            timezone="America/New_York",
+            input_format="%Y-%m-%dT%H:%M:%S%z",
+        )
+        # 2026-02-11T04:00:00+00:00 UTC == 2026-02-10T23:00:00-05:00 New York
+        # → start-of-day in New York is 2026-02-10, not 2026-02-11
+        assert pm.to_downstream("2026-02-11T04:00:00+0000") == "2026-02-10"
+        # 2026-02-11T06:00:00+00:00 UTC == 2026-02-11T01:00:00-05:00 New York
+        # → start-of-day in New York is 2026-02-11
+        assert pm.to_downstream("2026-02-11T06:00:00+0000") == "2026-02-11"

Reply via email to