This is an automated email from the ASF dual-hosted git repository.
Lee-W pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 9cfd9943d68 Support timezone in SDK temporal partition mappers (#67164)
9cfd9943d68 is described below
commit 9cfd9943d68afa288dead1780a1ca48883b9aa3d
Author: Wei Lee <[email protected]>
AuthorDate: Fri May 22 09:25:20 2026 +0800
Support timezone in SDK temporal partition mappers (#67164)
---
airflow-core/newsfragments/67164.significant.rst | 34 ++++++
airflow-core/src/airflow/serialization/encoders.py | 3 +-
.../tests/unit/partition_mappers/test_temporal.py | 136 ++++++++++++++++++++-
.../unit/serialization/test_serialized_objects.py | 79 +++++++++---
.../sdk/definitions/partition_mappers/temporal.py | 13 ++
5 files changed, 243 insertions(+), 22 deletions(-)
diff --git a/airflow-core/newsfragments/67164.significant.rst
b/airflow-core/newsfragments/67164.significant.rst
new file mode 100644
index 00000000000..283ac07d7af
--- /dev/null
+++ b/airflow-core/newsfragments/67164.significant.rst
@@ -0,0 +1,34 @@
+SDK temporal partition mappers: ``timezone`` kwarg and keyword-only constructor
+
+``StartOfHourMapper``, ``StartOfDayMapper``, ``StartOfWeekMapper``,
+``StartOfMonthMapper``, ``StartOfQuarterMapper``, and ``StartOfYearMapper``
+(imported from ``airflow.sdk``) now accept a ``timezone`` keyword argument,
+matching the core ``_BaseTemporalMapper`` signature. The constructor is now
+keyword-only.
+
+**Behaviour changes:**
+
+- ``input_format`` and ``output_format`` are no longer accepted positionally.
+ Callers that relied on ``StartOfDayMapper("%Y-%m-%dT%H:%M:%S")`` (valid in
+ ``task-sdk`` 1.2.1) must switch to
+ ``StartOfDayMapper(input_format="%Y-%m-%dT%H:%M:%S")``.
+- A string ``timezone`` is now resolved via ``parse_timezone`` at
+ construction, so unknown names raise
``pendulum.tz.exceptions.InvalidTimezone``
+ immediately instead of being stored verbatim and failing later (or, in some
+ paths, being silently dropped during serialization).
+
+**Migration:**
+
+- Update any ``StartOf*Mapper(...)`` call sites to pass ``input_format`` and
+ ``output_format`` by name.
+
+* Types of change
+
+ * [ ] Dag changes
+ * [ ] Config changes
+ * [ ] API changes
+ * [ ] CLI changes
+ * [x] Behaviour changes
+ * [ ] Plugin changes
+ * [ ] Dependency changes
+ * [x] Code interface changes
diff --git a/airflow-core/src/airflow/serialization/encoders.py
b/airflow-core/src/airflow/serialization/encoders.py
index 741347943ca..e97dcff2623 100644
--- a/airflow-core/src/airflow/serialization/encoders.py
+++ b/airflow-core/src/airflow/serialization/encoders.py
@@ -45,6 +45,7 @@ from airflow.sdk import (
PartitionMapper,
ProductMapper,
StartOfDayMapper,
+ StartOfHourMapper,
StartOfMonthMapper,
StartOfQuarterMapper,
StartOfWeekMapper,
@@ -52,7 +53,6 @@ from airflow.sdk import (
)
from airflow.sdk.bases.timetable import BaseTimetable
from airflow.sdk.definitions.asset import AssetRef
-from airflow.sdk.definitions.partition_mappers.temporal import
StartOfHourMapper
from airflow.sdk.definitions.timetables.assets import (
AssetTriggeredTimetable,
PartitionAtRuntime,
@@ -463,6 +463,7 @@ class _Serializer:
| StartOfYearMapper,
) -> dict[str, Any]:
return {
+ "timezone": encode_timezone(partition_mapper._timezone),
"input_format": partition_mapper.input_format,
"output_format": partition_mapper.output_format,
}
diff --git a/airflow-core/tests/unit/partition_mappers/test_temporal.py
b/airflow-core/tests/unit/partition_mappers/test_temporal.py
index c54ca8a51f9..0eb10991f31 100644
--- a/airflow-core/tests/unit/partition_mappers/test_temporal.py
+++ b/airflow-core/tests/unit/partition_mappers/test_temporal.py
@@ -16,8 +16,10 @@
# under the License.
from __future__ import annotations
+import pendulum
import pytest
+from airflow import sdk
from airflow.partition_mappers.temporal import (
StartOfDayMapper,
StartOfHourMapper,
@@ -27,6 +29,8 @@ from airflow.partition_mappers.temporal import (
StartOfYearMapper,
_BaseTemporalMapper,
)
+from airflow.serialization.decoders import decode_partition_mapper
+from airflow.serialization.encoders import encode_partition_mapper
class TestTemporalMappers:
@@ -57,7 +61,7 @@ class TestTemporalMappers:
],
)
@pytest.mark.parametrize(
- ("mapper_cls", "expected_outut_format"),
+ ("mapper_cls", "expected_output_format"),
[
(StartOfHourMapper, "%Y-%m-%dT%H"),
(StartOfDayMapper, "%Y-%m-%d"),
@@ -70,7 +74,7 @@ class TestTemporalMappers:
def test_serialize(
self,
mapper_cls: type[_BaseTemporalMapper],
- expected_outut_format: str,
+ expected_output_format: str,
timezone: str | None,
expected_timezone: str,
):
@@ -78,9 +82,16 @@ class TestTemporalMappers:
assert pm.serialize() == {
"timezone": expected_timezone,
"input_format": "%Y-%m-%dT%H:%M:%S",
- "output_format": expected_outut_format,
+ "output_format": expected_output_format,
}
+ @pytest.mark.parametrize(
+ ("timezone_payload", "expected_tz_name"),
+ [
+ ("UTC", "UTC"),
+ ("Asia/Taipei", "Asia/Taipei"),
+ ],
+ )
@pytest.mark.parametrize(
"mapper_cls",
[
@@ -92,10 +103,10 @@ class TestTemporalMappers:
StartOfYearMapper,
],
)
- def test_deserialize(self, mapper_cls):
+ def test_deserialize(self, mapper_cls, timezone_payload, expected_tz_name):
pm = mapper_cls.deserialize(
{
- "timezone": "UTC",
+ "timezone": timezone_payload,
"input_format": "%Y-%m-%dT%H:%M:%S",
"output_format": "customized-format",
}
@@ -103,6 +114,31 @@ class TestTemporalMappers:
assert isinstance(pm, mapper_cls)
assert pm.input_format == "%Y-%m-%dT%H:%M:%S"
assert pm.output_format == "customized-format"
+ assert pm._timezone.name == expected_tz_name
+
+ @pytest.mark.parametrize(
+ "mapper_cls",
+ [
+ StartOfHourMapper,
+ StartOfDayMapper,
+ StartOfWeekMapper,
+ StartOfMonthMapper,
+ StartOfQuarterMapper,
+ StartOfYearMapper,
+ ],
+ )
+ def test_deserialize_legacy_payload_without_timezone(self, mapper_cls):
+ """Payloads written by ``task-sdk`` 1.2.1 omit ``timezone`` from the
+ SDK-mapper wire format; the core decoder must default it to UTC so
+ those serialized Dags can still be loaded."""
+ pm = mapper_cls.deserialize(
+ {
+ "input_format": "%Y-%m-%dT%H:%M:%S",
+ "output_format": "customized-format",
+ }
+ )
+ assert isinstance(pm, mapper_cls)
+ assert pm._timezone.name == "UTC"
def test_to_downstream_timezone_aware(self):
"""Input is interpreted as local time in the given timezone."""
@@ -125,3 +161,93 @@ class TestTemporalMappers:
# 2026-02-11T06:00:00+00:00 UTC == 2026-02-11T01:00:00-05:00 New York
# → start-of-day in New York is 2026-02-11
assert pm.to_downstream("2026-02-11T06:00:00+0000") == "2026-02-11"
+
+
+class TestSdkTemporalMappersTimezoneSerialization:
+ """
+ SDK-side temporal mappers (``airflow.sdk.StartOf*Mapper``) must accept a
+ ``timezone`` kwarg in their constructor and round-trip it through the
+ encoder/decoder path. Previously the SDK class signature omitted timezone
+ entirely and the dispatch handler in ``encoders._Serializer`` dropped it,
+ so a Dag using ``StartOfDayMapper(timezone="Asia/Taipei")`` silently fell
+ back to UTC after serialization.
+ """
+
+ @pytest.mark.parametrize("timezone", ["UTC", "America/New_York",
"Asia/Taipei"])
+ @pytest.mark.parametrize(
+ "sdk_mapper_name",
+ [
+ "StartOfHourMapper",
+ "StartOfDayMapper",
+ "StartOfWeekMapper",
+ "StartOfMonthMapper",
+ "StartOfQuarterMapper",
+ "StartOfYearMapper",
+ ],
+ )
+ def test_sdk_constructor_accepts_timezone(self, sdk_mapper_name, timezone):
+ sdk_cls = getattr(sdk, sdk_mapper_name)
+ mapper = sdk_cls(timezone=timezone)
+ assert mapper._timezone.name == timezone
+
+ @pytest.mark.parametrize("timezone", ["UTC", "America/New_York",
"Asia/Taipei"])
+ @pytest.mark.parametrize(
+ ("sdk_mapper_name", "core_cls"),
+ [
+ ("StartOfHourMapper", StartOfHourMapper),
+ ("StartOfDayMapper", StartOfDayMapper),
+ ("StartOfWeekMapper", StartOfWeekMapper),
+ ("StartOfMonthMapper", StartOfMonthMapper),
+ ("StartOfQuarterMapper", StartOfQuarterMapper),
+ ("StartOfYearMapper", StartOfYearMapper),
+ ],
+ )
+ def test_encode_decode_round_trip_preserves_timezone(self,
sdk_mapper_name, core_cls, timezone):
+ sdk_cls = getattr(sdk, sdk_mapper_name)
+ original = sdk_cls(timezone=timezone)
+ restored = decode_partition_mapper(encode_partition_mapper(original))
+
+ # decode resolves to the Core class via BUILTIN_PARTITION_MAPPERS.
+ assert isinstance(restored, core_cls)
+ assert restored._timezone.name == timezone
+
+ @pytest.mark.parametrize(
+ "sdk_mapper_name",
+ [
+ "StartOfHourMapper",
+ "StartOfDayMapper",
+ "StartOfWeekMapper",
+ "StartOfMonthMapper",
+ "StartOfQuarterMapper",
+ "StartOfYearMapper",
+ ],
+ )
+ def test_encode_decode_round_trip_accepts_pendulum_tzinfo(self,
sdk_mapper_name):
+ """The SDK ``timezone`` kwarg is advertised as ``str | Timezone |
FixedTimezone``;
+ a pendulum tz object must survive the encoder pipeline
(encode_timezone handles
+ the object branch) and land on the core class with the matching IANA
name."""
+ sdk_cls = getattr(sdk, sdk_mapper_name)
+ original = sdk_cls(timezone=pendulum.timezone("Asia/Taipei"))
+ restored = decode_partition_mapper(encode_partition_mapper(original))
+
+ assert restored._timezone.name == "Asia/Taipei"
+
+ @pytest.mark.parametrize(
+ "sdk_mapper_name",
+ [
+ "StartOfHourMapper",
+ "StartOfDayMapper",
+ "StartOfWeekMapper",
+ "StartOfMonthMapper",
+ "StartOfQuarterMapper",
+ "StartOfYearMapper",
+ ],
+ )
+ def test_sdk_constructor_invalid_timezone_raises_eagerly(self,
sdk_mapper_name):
+ """Passing an unknown timezone string must raise at construction time
+ (via ``parse_timezone``), not silently fall back to UTC or fail
later."""
+ from pendulum.tz.exceptions import InvalidTimezone
+
+ sdk_cls = getattr(sdk, sdk_mapper_name)
+ with pytest.raises(InvalidTimezone):
+ sdk_cls(timezone="Not/A/Real/Zone")
diff --git a/airflow-core/tests/unit/serialization/test_serialized_objects.py
b/airflow-core/tests/unit/serialization/test_serialized_objects.py
index 05463439e1f..1767209ca73 100644
--- a/airflow-core/tests/unit/serialization/test_serialized_objects.py
+++ b/airflow-core/tests/unit/serialization/test_serialized_objects.py
@@ -808,51 +808,95 @@ def test_encode_timezone():
@pytest.mark.parametrize(
- ("cls", "args", "encode_type", "encode_var"),
+ ("cls", "kwargs", "encode_type", "encode_var"),
[
- (IdentityMapper, [],
"airflow.partition_mappers.identity.IdentityMapper", {}),
+ (IdentityMapper, {},
"airflow.partition_mappers.identity.IdentityMapper", {}),
(
StartOfHourMapper,
- [],
+ {},
"airflow.partition_mappers.temporal.StartOfHourMapper",
- {"input_format": "%Y-%m-%dT%H:%M:%S", "output_format":
"%Y-%m-%dT%H"},
+ {"timezone": "UTC", "input_format": "%Y-%m-%dT%H:%M:%S",
"output_format": "%Y-%m-%dT%H"},
+ ),
+ (
+ StartOfHourMapper,
+ {"timezone": "Asia/Taipei"},
+ "airflow.partition_mappers.temporal.StartOfHourMapper",
+ {"timezone": "Asia/Taipei", "input_format": "%Y-%m-%dT%H:%M:%S",
"output_format": "%Y-%m-%dT%H"},
+ ),
+ (
+ StartOfDayMapper,
+ {},
+ "airflow.partition_mappers.temporal.StartOfDayMapper",
+ {"timezone": "UTC", "input_format": "%Y-%m-%dT%H:%M:%S",
"output_format": "%Y-%m-%d"},
),
(
StartOfDayMapper,
- [],
+ {"timezone": "Asia/Taipei"},
"airflow.partition_mappers.temporal.StartOfDayMapper",
- {"input_format": "%Y-%m-%dT%H:%M:%S", "output_format": "%Y-%m-%d"},
+ {"timezone": "Asia/Taipei", "input_format": "%Y-%m-%dT%H:%M:%S",
"output_format": "%Y-%m-%d"},
),
(
StartOfWeekMapper,
- [],
+ {},
"airflow.partition_mappers.temporal.StartOfWeekMapper",
- {"input_format": "%Y-%m-%dT%H:%M:%S", "output_format": "%Y-%m-%d
(W%V)"},
+ {"timezone": "UTC", "input_format": "%Y-%m-%dT%H:%M:%S",
"output_format": "%Y-%m-%d (W%V)"},
+ ),
+ (
+ StartOfWeekMapper,
+ {"timezone": "Asia/Taipei"},
+ "airflow.partition_mappers.temporal.StartOfWeekMapper",
+ {
+ "timezone": "Asia/Taipei",
+ "input_format": "%Y-%m-%dT%H:%M:%S",
+ "output_format": "%Y-%m-%d (W%V)",
+ },
),
(
StartOfMonthMapper,
- [],
+ {},
"airflow.partition_mappers.temporal.StartOfMonthMapper",
- {"input_format": "%Y-%m-%dT%H:%M:%S", "output_format": "%Y-%m"},
+ {"timezone": "UTC", "input_format": "%Y-%m-%dT%H:%M:%S",
"output_format": "%Y-%m"},
+ ),
+ (
+ StartOfMonthMapper,
+ {"timezone": "Asia/Taipei"},
+ "airflow.partition_mappers.temporal.StartOfMonthMapper",
+ {"timezone": "Asia/Taipei", "input_format": "%Y-%m-%dT%H:%M:%S",
"output_format": "%Y-%m"},
+ ),
+ (
+ StartOfQuarterMapper,
+ {},
+ "airflow.partition_mappers.temporal.StartOfQuarterMapper",
+ {"timezone": "UTC", "input_format": "%Y-%m-%dT%H:%M:%S",
"output_format": "%Y-Q{quarter}"},
),
(
StartOfQuarterMapper,
- [],
+ {"timezone": "Asia/Taipei"},
"airflow.partition_mappers.temporal.StartOfQuarterMapper",
- {"input_format": "%Y-%m-%dT%H:%M:%S", "output_format":
"%Y-Q{quarter}"},
+ {
+ "timezone": "Asia/Taipei",
+ "input_format": "%Y-%m-%dT%H:%M:%S",
+ "output_format": "%Y-Q{quarter}",
+ },
+ ),
+ (
+ StartOfYearMapper,
+ {},
+ "airflow.partition_mappers.temporal.StartOfYearMapper",
+ {"timezone": "UTC", "input_format": "%Y-%m-%dT%H:%M:%S",
"output_format": "%Y"},
),
(
StartOfYearMapper,
- [],
+ {"timezone": "Asia/Taipei"},
"airflow.partition_mappers.temporal.StartOfYearMapper",
- {"input_format": "%Y-%m-%dT%H:%M:%S", "output_format": "%Y"},
+ {"timezone": "Asia/Taipei", "input_format": "%Y-%m-%dT%H:%M:%S",
"output_format": "%Y"},
),
],
)
-def test_encode_partition_mapper(cls, args, encode_type, encode_var):
+def test_encode_partition_mapper(cls, kwargs, encode_type, encode_var):
from airflow.serialization.encoders import encode_partition_mapper
- partition_mapper = cls(*args)
+ partition_mapper = cls(**kwargs)
assert encode_partition_mapper(partition_mapper) == {
Encoding.TYPE: encode_type,
Encoding.VAR: encode_var,
@@ -913,6 +957,7 @@ def test_encode_product_mapper():
{
Encoding.TYPE:
"airflow.partition_mappers.temporal.StartOfHourMapper",
Encoding.VAR: {
+ "timezone": "UTC",
"input_format": "%Y-%m-%dT%H:%M:%S",
"output_format": "%Y-%m-%dT%H",
},
@@ -951,6 +996,7 @@ def test_encode_chain_mapper():
{
Encoding.TYPE:
"airflow.partition_mappers.temporal.StartOfHourMapper",
Encoding.VAR: {
+ "timezone": "UTC",
"input_format": "%Y-%m-%dT%H:%M:%S",
"output_format": "%Y-%m-%dT%H",
},
@@ -958,6 +1004,7 @@ def test_encode_chain_mapper():
{
Encoding.TYPE:
"airflow.partition_mappers.temporal.StartOfDayMapper",
Encoding.VAR: {
+ "timezone": "UTC",
"input_format": "%Y-%m-%dT%H",
"output_format": "%Y-%m-%d",
},
diff --git a/task-sdk/src/airflow/sdk/definitions/partition_mappers/temporal.py
b/task-sdk/src/airflow/sdk/definitions/partition_mappers/temporal.py
index 60ca18f5044..afcbcac6beb 100644
--- a/task-sdk/src/airflow/sdk/definitions/partition_mappers/temporal.py
+++ b/task-sdk/src/airflow/sdk/definitions/partition_mappers/temporal.py
@@ -16,19 +16,32 @@
# under the License.
from __future__ import annotations
+from typing import TYPE_CHECKING
+
+from airflow.sdk._shared.timezones.timezone import parse_timezone
from airflow.sdk.definitions.partition_mappers.base import PartitionMapper
+if TYPE_CHECKING:
+ from pendulum import FixedTimezone, Timezone
+
class _BaseTemporalMapper(PartitionMapper):
+ """Base class for Temporal Partition Mappers."""
+
default_output_format: str
def __init__(
self,
+ *,
+ timezone: str | Timezone | FixedTimezone = "UTC",
input_format: str = "%Y-%m-%dT%H:%M:%S",
output_format: str | None = None,
) -> None:
self.input_format = input_format
self.output_format = output_format or self.default_output_format
+ if isinstance(timezone, str):
+ timezone = parse_timezone(timezone)
+ self._timezone = timezone
class StartOfHourMapper(_BaseTemporalMapper):