This is an automated email from the ASF dual-hosted git repository.
rahulvats pushed a commit to branch v3-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-2-test by this push:
new 196f7639e2d [v3-2-test] Make temporal mapper timezone aware (#62709)
(#64467)
196f7639e2d is described below
commit 196f7639e2d76f85f36f18c4c3cbe40acee65d47
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon Mar 30 19:03:13 2026 +0530
[v3-2-test] Make temporal mapper timezone aware (#62709) (#64467)
(cherry picked from commit 774ad02f75557adeb9d9ef47019ec718ba7f1cd3)
Co-authored-by: Wei Lee <[email protected]>
---
.../src/airflow/partition_mappers/temporal.py | 19 +++++++++-
.../tests/unit/partition_mappers/test_temporal.py | 41 ++++++++++++++++++++--
2 files changed, 57 insertions(+), 3 deletions(-)
diff --git a/airflow-core/src/airflow/partition_mappers/temporal.py
b/airflow-core/src/airflow/partition_mappers/temporal.py
index 9c86bace56b..49f4162a12e 100644
--- a/airflow-core/src/airflow/partition_mappers/temporal.py
+++ b/airflow-core/src/airflow/partition_mappers/temporal.py
@@ -18,10 +18,14 @@ from __future__ import annotations
from abc import ABC, abstractmethod
from datetime import datetime, timedelta
-from typing import Any
+from typing import TYPE_CHECKING, Any
+from airflow._shared.timezones.timezone import make_aware, parse_timezone
from airflow.partition_mappers.base import PartitionMapper
+if TYPE_CHECKING:
+ from pendulum import FixedTimezone, Timezone
+
class _BaseTemporalMapper(PartitionMapper, ABC):
"""Base class for Temporal Partition Mappers."""
@@ -30,14 +34,23 @@ class _BaseTemporalMapper(PartitionMapper, ABC):
def __init__(
self,
+ *,
+ timezone: str | Timezone | FixedTimezone = "UTC",
input_format: str = "%Y-%m-%dT%H:%M:%S",
output_format: str | None = None,
):
self.input_format = input_format
self.output_format = output_format or self.default_output_format
+ if isinstance(timezone, str):
+ timezone = parse_timezone(timezone)
+ self._timezone = timezone
def to_downstream(self, key: str) -> str:
dt = datetime.strptime(key, self.input_format)
+ if dt.tzinfo is None:
+ dt = make_aware(dt, self._timezone)
+ else:
+ dt = dt.astimezone(self._timezone)
normalized = self.normalize(dt)
return self.format(normalized)
@@ -50,7 +63,10 @@ class _BaseTemporalMapper(PartitionMapper, ABC):
return dt.strftime(self.output_format)
def serialize(self) -> dict[str, Any]:
+ from airflow.serialization.encoders import encode_timezone
+
return {
+ "timezone": encode_timezone(self._timezone),
"input_format": self.input_format,
"output_format": self.output_format,
}
@@ -58,6 +74,7 @@ class _BaseTemporalMapper(PartitionMapper, ABC):
@classmethod
def deserialize(cls, data: dict[str, Any]) -> PartitionMapper:
return cls(
+ timezone=parse_timezone(data.get("timezone", "UTC")),
input_format=data["input_format"],
output_format=data["output_format"],
)
diff --git a/airflow-core/tests/unit/partition_mappers/test_temporal.py
b/airflow-core/tests/unit/partition_mappers/test_temporal.py
index 89ad98cebbc..c54ca8a51f9 100644
--- a/airflow-core/tests/unit/partition_mappers/test_temporal.py
+++ b/airflow-core/tests/unit/partition_mappers/test_temporal.py
@@ -49,6 +49,13 @@ class TestTemporalMappers:
pm = mapper_cls()
assert pm.to_downstream("2026-02-10T14:30:45") ==
expected_downstream_key
+ @pytest.mark.parametrize(
+ ("timezone", "expected_timezone"),
+ [
+ (None, "UTC"),
+ ("America/New_York", "America/New_York"),
+ ],
+ )
@pytest.mark.parametrize(
("mapper_cls", "expected_outut_format"),
[
@@ -60,9 +67,16 @@ class TestTemporalMappers:
(StartOfYearMapper, "%Y"),
],
)
- def test_serialize(self, mapper_cls: type[_BaseTemporalMapper],
expected_outut_format: str):
- pm = mapper_cls()
+ def test_serialize(
+ self,
+ mapper_cls: type[_BaseTemporalMapper],
+ expected_outut_format: str,
+ timezone: str | None,
+ expected_timezone: str,
+ ):
+ pm = mapper_cls() if timezone is None else
mapper_cls(timezone=timezone)
assert pm.serialize() == {
+ "timezone": expected_timezone,
"input_format": "%Y-%m-%dT%H:%M:%S",
"output_format": expected_outut_format,
}
@@ -81,6 +95,7 @@ class TestTemporalMappers:
def test_deserialize(self, mapper_cls):
pm = mapper_cls.deserialize(
{
+ "timezone": "UTC",
"input_format": "%Y-%m-%dT%H:%M:%S",
"output_format": "customized-format",
}
@@ -88,3 +103,25 @@ class TestTemporalMappers:
assert isinstance(pm, mapper_cls)
assert pm.input_format == "%Y-%m-%dT%H:%M:%S"
assert pm.output_format == "customized-format"
+
+ def test_to_downstream_timezone_aware(self):
+ """Input is interpreted as local time in the given timezone."""
+ pm = StartOfDayMapper(timezone="America/New_York")
+ # 2026-02-10T23:00:00 in New York local time → start-of-day is
2026-02-10
+ assert pm.to_downstream("2026-02-10T23:00:00") == "2026-02-10"
+ # 2026-02-11T01:00:00 in New York local time → start-of-day is
2026-02-11
+ assert pm.to_downstream("2026-02-11T01:00:00") == "2026-02-11"
+
+ def test_to_downstream_input_timezone_differs_from_mapper_timezone(self):
+ """When input_format includes %z and the parsed tz differs from the
mapper tz,
+ the key is converted to the mapper timezone before normalization."""
+ pm = StartOfDayMapper(
+ timezone="America/New_York",
+ input_format="%Y-%m-%dT%H:%M:%S%z",
+ )
+ # 2026-02-11T04:00:00+00:00 UTC == 2026-02-10T23:00:00-05:00 New York
+ # → start-of-day in New York is 2026-02-10, not 2026-02-11
+ assert pm.to_downstream("2026-02-11T04:00:00+0000") == "2026-02-10"
+ # 2026-02-11T06:00:00+00:00 UTC == 2026-02-11T01:00:00-05:00 New York
+ # → start-of-day in New York is 2026-02-11
+ assert pm.to_downstream("2026-02-11T06:00:00+0000") == "2026-02-11"