This is an automated email from the ASF dual-hosted git repository.

uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new e3c10eae08c refactor: rename temporal partition mappers (#63783)
e3c10eae08c is described below

commit e3c10eae08cd8c0d28f1edaab24e11bd25606d6c
Author: Wei Lee <[email protected]>
AuthorDate: Thu Mar 19 16:02:29 2026 +0800

    refactor: rename temporal partition mappers (#63783)
    
    Co-authored-by: Tzu-ping Chung <[email protected]>
---
 .../example_dags/example_asset_partition.py        | 22 +++----
 .../src/airflow/partition_mappers/temporal.py      | 12 ++--
 airflow-core/src/airflow/serialization/encoders.py | 48 +++++++--------
 airflow-core/tests/unit/jobs/test_scheduler_job.py |  4 +-
 .../tests/unit/partition_mappers/test_product.py   | 34 +++++------
 .../tests/unit/partition_mappers/test_temporal.py  | 38 ++++++------
 .../unit/serialization/test_serialized_objects.py  | 70 +++++++++++-----------
 task-sdk/docs/api.rst                              | 12 ++--
 task-sdk/src/airflow/sdk/__init__.py               | 36 +++++------
 task-sdk/src/airflow/sdk/__init__.pyi              | 24 ++++----
 .../sdk/definitions/partition_mappers/temporal.py  | 12 ++--
 11 files changed, 156 insertions(+), 156 deletions(-)

diff --git a/airflow-core/src/airflow/example_dags/example_asset_partition.py 
b/airflow-core/src/airflow/example_dags/example_asset_partition.py
index ecc9d7427af..853d0453d26 100644
--- a/airflow-core/src/airflow/example_dags/example_asset_partition.py
+++ b/airflow-core/src/airflow/example_dags/example_asset_partition.py
@@ -24,12 +24,12 @@ from airflow.sdk import (
     AllowedKeyMapper,
     Asset,
     CronPartitionTimetable,
-    DailyMapper,
-    HourlyMapper,
     IdentityMapper,
     PartitionedAssetTimetable,
     ProductMapper,
-    YearlyMapper,
+    ToDailyMapper,
+    ToHourlyMapper,
+    ToYearlyMapper,
     asset,
     task,
 )
@@ -77,7 +77,7 @@ with DAG(
     dag_id="clean_and_combine_player_stats",
     schedule=PartitionedAssetTimetable(
         assets=team_a_player_stats & team_b_player_stats & team_c_player_stats,
-        default_partition_mapper=HourlyMapper(),
+        default_partition_mapper=ToHourlyMapper(),
     ),
     catchup=False,
     tags=["player-stats", "cleanup"],
@@ -85,7 +85,7 @@ with DAG(
     """
     Combine hourly partitions from Team A, B and C into a single curated 
dataset.
 
-    This Dag demonstrates multi-asset partition alignment using HourlyMapper.
+    This Dag demonstrates multi-asset partition alignment using ToHourlyMapper.
     """
 
     @task(outlets=[combined_player_stats])
@@ -101,7 +101,7 @@ with DAG(
 @asset(
     uri="file://analytics/player-stats/computed-player-odds.csv",
     # Fallback to IdentityMapper if no partition_mapper is specified.
-    # If we want to other temporal mapper (e.g., HourlyMapper) here,
+    # If we want to other temporal mapper (e.g., ToHourlyMapper) here,
     # make sure the input_format is changed since the partition_key is now in 
"%Y-%m-%dT%H" format
     # instead of a valid timestamp
     schedule=PartitionedAssetTimetable(assets=combined_player_stats),
@@ -121,9 +121,9 @@ with DAG(
     schedule=PartitionedAssetTimetable(
         assets=(combined_player_stats & team_a_player_stats & 
Asset.ref(name="team_b_player_stats")),
         partition_mapper_config={
-            combined_player_stats: YearlyMapper(),  # incompatible on purpose
-            team_a_player_stats: HourlyMapper(),
-            Asset.ref(name="team_b_player_stats"): HourlyMapper(),
+            combined_player_stats: ToYearlyMapper(),  # incompatible on purpose
+            team_a_player_stats: ToHourlyMapper(),
+            Asset.ref(name="team_b_player_stats"): ToHourlyMapper(),
         },
     ),
     catchup=False,
@@ -164,7 +164,7 @@ with DAG(
     dag_id="aggregate_regional_sales",
     schedule=PartitionedAssetTimetable(
         assets=regional_sales,
-        default_partition_mapper=ProductMapper(IdentityMapper(), 
DailyMapper()),
+        default_partition_mapper=ProductMapper(IdentityMapper(), 
ToDailyMapper()),
     ),
     catchup=False,
     tags=["sales", "aggregation"],
@@ -173,7 +173,7 @@ with DAG(
     Aggregate regional sales using ProductMapper.
 
     The ProductMapper splits the composite key "region|timestamp" and applies
-    IdentityMapper to the region segment and DailyMapper to the timestamp 
segment,
+    IdentityMapper to the region segment and ToDailyMapper to the timestamp 
segment,
     aligning hourly partitions to daily granularity per region.
     """
 
diff --git a/airflow-core/src/airflow/partition_mappers/temporal.py 
b/airflow-core/src/airflow/partition_mappers/temporal.py
index f40c037247f..b30e1879d53 100644
--- a/airflow-core/src/airflow/partition_mappers/temporal.py
+++ b/airflow-core/src/airflow/partition_mappers/temporal.py
@@ -63,7 +63,7 @@ class _BaseTemporalMapper(PartitionMapper, ABC):
         )
 
 
-class HourlyMapper(_BaseTemporalMapper):
+class ToHourlyMapper(_BaseTemporalMapper):
     """Map a time-based partition key to hour."""
 
     default_output_format = "%Y-%m-%dT%H"
@@ -72,7 +72,7 @@ class HourlyMapper(_BaseTemporalMapper):
         return dt.replace(minute=0, second=0, microsecond=0)
 
 
-class DailyMapper(_BaseTemporalMapper):
+class ToDailyMapper(_BaseTemporalMapper):
     """Map a time-based partition key to day."""
 
     default_output_format = "%Y-%m-%d"
@@ -81,7 +81,7 @@ class DailyMapper(_BaseTemporalMapper):
         return dt.replace(hour=0, minute=0, second=0, microsecond=0)
 
 
-class WeeklyMapper(_BaseTemporalMapper):
+class ToWeeklyMapper(_BaseTemporalMapper):
     """Map a time-based partition key to week."""
 
     default_output_format = "%Y-%m-%d (W%V)"
@@ -91,7 +91,7 @@ class WeeklyMapper(_BaseTemporalMapper):
         return start.replace(hour=0, minute=0, second=0, microsecond=0)
 
 
-class MonthlyMapper(_BaseTemporalMapper):
+class ToMonthlyMapper(_BaseTemporalMapper):
     """Map a time-based partition key to month."""
 
     default_output_format = "%Y-%m"
@@ -106,7 +106,7 @@ class MonthlyMapper(_BaseTemporalMapper):
         )
 
 
-class QuarterlyMapper(_BaseTemporalMapper):
+class ToQuarterlyMapper(_BaseTemporalMapper):
     """Map a time-based partition key to quarter."""
 
     default_output_format = "%Y-Q{quarter}"
@@ -128,7 +128,7 @@ class QuarterlyMapper(_BaseTemporalMapper):
         return dt.strftime(self.output_format).format(quarter=quarter)
 
 
-class YearlyMapper(_BaseTemporalMapper):
+class ToYearlyMapper(_BaseTemporalMapper):
     """Map a time-based partition key to year."""
 
     default_output_format = "%Y"
diff --git a/airflow-core/src/airflow/serialization/encoders.py 
b/airflow-core/src/airflow/serialization/encoders.py
index 239b2b1b97b..506cdb07cdb 100644
--- a/airflow-core/src/airflow/serialization/encoders.py
+++ b/airflow-core/src/airflow/serialization/encoders.py
@@ -36,22 +36,22 @@ from airflow.sdk import (
     AssetOrTimeSchedule,
     CronDataIntervalTimetable,
     CronTriggerTimetable,
-    DailyMapper,
     DeltaDataIntervalTimetable,
     DeltaTriggerTimetable,
     EventsTimetable,
     IdentityMapper,
-    MonthlyMapper,
     MultipleCronTriggerTimetable,
     PartitionMapper,
     ProductMapper,
-    QuarterlyMapper,
-    WeeklyMapper,
-    YearlyMapper,
+    ToDailyMapper,
+    ToMonthlyMapper,
+    ToQuarterlyMapper,
+    ToWeeklyMapper,
+    ToYearlyMapper,
 )
 from airflow.sdk.bases.timetable import BaseTimetable
 from airflow.sdk.definitions.asset import AssetRef
-from airflow.sdk.definitions.partition_mappers.temporal import HourlyMapper
+from airflow.sdk.definitions.partition_mappers.temporal import ToHourlyMapper
 from airflow.sdk.definitions.timetables.assets import (
     AssetTriggeredTimetable,
     PartitionedAssetTimetable,
@@ -393,12 +393,12 @@ class _Serializer:
 
     BUILTIN_PARTITION_MAPPERS: dict[type, str] = {
         IdentityMapper: "airflow.partition_mappers.identity.IdentityMapper",
-        HourlyMapper: "airflow.partition_mappers.temporal.HourlyMapper",
-        DailyMapper: "airflow.partition_mappers.temporal.DailyMapper",
-        WeeklyMapper: "airflow.partition_mappers.temporal.WeeklyMapper",
-        MonthlyMapper: "airflow.partition_mappers.temporal.MonthlyMapper",
-        QuarterlyMapper: "airflow.partition_mappers.temporal.QuarterlyMapper",
-        YearlyMapper: "airflow.partition_mappers.temporal.YearlyMapper",
+        ToHourlyMapper: "airflow.partition_mappers.temporal.ToHourlyMapper",
+        ToDailyMapper: "airflow.partition_mappers.temporal.ToDailyMapper",
+        ToWeeklyMapper: "airflow.partition_mappers.temporal.ToWeeklyMapper",
+        ToMonthlyMapper: "airflow.partition_mappers.temporal.ToMonthlyMapper",
+        ToQuarterlyMapper: 
"airflow.partition_mappers.temporal.ToQuarterlyMapper",
+        ToYearlyMapper: "airflow.partition_mappers.temporal.ToYearlyMapper",
         ProductMapper: "airflow.partition_mappers.product.ProductMapper",
         AllowedKeyMapper: 
"airflow.partition_mappers.allowed_key.AllowedKeyMapper",
     }
@@ -415,20 +415,20 @@ class _Serializer:
     def _(self, partition_mapper: IdentityMapper) -> dict[str, Any]:
         return {}
 
-    @serialize_partition_mapper.register(HourlyMapper)
-    @serialize_partition_mapper.register(DailyMapper)
-    @serialize_partition_mapper.register(WeeklyMapper)
-    @serialize_partition_mapper.register(MonthlyMapper)
-    @serialize_partition_mapper.register(QuarterlyMapper)
-    @serialize_partition_mapper.register(YearlyMapper)
+    @serialize_partition_mapper.register(ToHourlyMapper)
+    @serialize_partition_mapper.register(ToDailyMapper)
+    @serialize_partition_mapper.register(ToWeeklyMapper)
+    @serialize_partition_mapper.register(ToMonthlyMapper)
+    @serialize_partition_mapper.register(ToQuarterlyMapper)
+    @serialize_partition_mapper.register(ToYearlyMapper)
     def _(
         self,
-        partition_mapper: HourlyMapper
-        | DailyMapper
-        | WeeklyMapper
-        | MonthlyMapper
-        | QuarterlyMapper
-        | YearlyMapper,
+        partition_mapper: ToHourlyMapper
+        | ToDailyMapper
+        | ToWeeklyMapper
+        | ToMonthlyMapper
+        | ToQuarterlyMapper
+        | ToYearlyMapper,
     ) -> dict[str, Any]:
         return {
             "input_format": partition_mapper.input_format,
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py 
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index 386eeedb608..a6719409900 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -91,8 +91,8 @@ from airflow.sdk import (
     AssetAlias,
     AssetWatcher,
     CronPartitionTimetable,
-    HourlyMapper,
     IdentityMapper,
+    ToHourlyMapper,
     task,
 )
 from airflow.sdk.definitions.callback import AsyncCallback, SyncCallback
@@ -8982,7 +8982,7 @@ def 
test_partitioned_dag_run_with_invalid_mapping(dag_maker: DagMaker, session:
         dag_id="asset-event-consumer",
         schedule=PartitionedAssetTimetable(
             assets=asset_1,
-            default_partition_mapper=HourlyMapper(),
+            default_partition_mapper=ToHourlyMapper(),
         ),
         session=session,
     ):
diff --git a/airflow-core/tests/unit/partition_mappers/test_product.py 
b/airflow-core/tests/unit/partition_mappers/test_product.py
index a1c46abc629..fba3ae868a8 100644
--- a/airflow-core/tests/unit/partition_mappers/test_product.py
+++ b/airflow-core/tests/unit/partition_mappers/test_product.py
@@ -21,61 +21,61 @@ import pytest
 
 from airflow.partition_mappers.identity import IdentityMapper
 from airflow.partition_mappers.product import ProductMapper
-from airflow.partition_mappers.temporal import DailyMapper, HourlyMapper
+from airflow.partition_mappers.temporal import ToDailyMapper, ToHourlyMapper
 
 
 class TestProductMapper:
     def test_to_downstream(self):
-        pm = ProductMapper(HourlyMapper(), DailyMapper())
+        pm = ProductMapper(ToHourlyMapper(), ToDailyMapper())
         assert pm.to_downstream("2024-01-15T10:30:00|2024-01-15T10:30:00") == 
"2024-01-15T10|2024-01-15"
 
     def test_to_downstream_wrong_segment_count(self):
-        pm = ProductMapper(HourlyMapper(), DailyMapper())
+        pm = ProductMapper(ToHourlyMapper(), ToDailyMapper())
         with pytest.raises(ValueError, match="Expected 2 segments"):
             pm.to_downstream("2024-01-15T10:30:00|2024-01-15T10:30:00|extra")
 
     def test_to_downstream_single_segment_for_two_mappers(self):
-        pm = ProductMapper(HourlyMapper(), DailyMapper())
+        pm = ProductMapper(ToHourlyMapper(), ToDailyMapper())
         with pytest.raises(ValueError, match="Expected 2 segments"):
             pm.to_downstream("2024-01-15T10:30:00")
 
     def test_custom_delimiter(self):
-        pm = ProductMapper(HourlyMapper(), DailyMapper(), delimiter="::")
+        pm = ProductMapper(ToHourlyMapper(), ToDailyMapper(), delimiter="::")
         assert pm.to_downstream("2024-01-15T10:30:00::2024-01-15T10:30:00") == 
"2024-01-15T10::2024-01-15"
 
     def test_custom_delimiter_wrong_segment_count(self):
-        pm = ProductMapper(HourlyMapper(), DailyMapper(), delimiter="::")
+        pm = ProductMapper(ToHourlyMapper(), ToDailyMapper(), delimiter="::")
         with pytest.raises(ValueError, match="Expected 2 segments"):
             pm.to_downstream("2024-01-15T10:30:00::2024-01-15T10:30:00::extra")
 
     def test_serialize(self):
         from airflow.serialization.encoders import encode_partition_mapper
 
-        pm = ProductMapper(HourlyMapper(), DailyMapper())
+        pm = ProductMapper(ToHourlyMapper(), ToDailyMapper())
         result = pm.serialize()
         assert result == {
             "delimiter": "|",
             "mappers": [
-                encode_partition_mapper(HourlyMapper()),
-                encode_partition_mapper(DailyMapper()),
+                encode_partition_mapper(ToHourlyMapper()),
+                encode_partition_mapper(ToDailyMapper()),
             ],
         }
 
     def test_serialize_custom_delimiter(self):
         from airflow.serialization.encoders import encode_partition_mapper
 
-        pm = ProductMapper(HourlyMapper(), DailyMapper(), delimiter="::")
+        pm = ProductMapper(ToHourlyMapper(), ToDailyMapper(), delimiter="::")
         result = pm.serialize()
         assert result == {
             "delimiter": "::",
             "mappers": [
-                encode_partition_mapper(HourlyMapper()),
-                encode_partition_mapper(DailyMapper()),
+                encode_partition_mapper(ToHourlyMapper()),
+                encode_partition_mapper(ToDailyMapper()),
             ],
         }
 
     def test_deserialize(self):
-        pm = ProductMapper(HourlyMapper(), DailyMapper())
+        pm = ProductMapper(ToHourlyMapper(), ToDailyMapper())
         serialized = pm.serialize()
         restored = ProductMapper.deserialize(serialized)
         assert isinstance(restored, ProductMapper)
@@ -84,7 +84,7 @@ class TestProductMapper:
         assert 
restored.to_downstream("2024-01-15T10:30:00|2024-01-15T10:30:00") == 
"2024-01-15T10|2024-01-15"
 
     def test_deserialize_custom_delimiter(self):
-        pm = ProductMapper(HourlyMapper(), DailyMapper(), delimiter="::")
+        pm = ProductMapper(ToHourlyMapper(), ToDailyMapper(), delimiter="::")
         serialized = pm.serialize()
         restored = ProductMapper.deserialize(serialized)
         assert isinstance(restored, ProductMapper)
@@ -99,15 +99,15 @@ class TestProductMapper:
 
         data = {
             "mappers": [
-                encode_partition_mapper(HourlyMapper()),
-                encode_partition_mapper(DailyMapper()),
+                encode_partition_mapper(ToHourlyMapper()),
+                encode_partition_mapper(ToDailyMapper()),
             ],
         }
         restored = ProductMapper.deserialize(data)
         assert restored.delimiter == "|"
 
     def test_three_mappers(self):
-        pm = ProductMapper(HourlyMapper(), DailyMapper(), IdentityMapper())
+        pm = ProductMapper(ToHourlyMapper(), ToDailyMapper(), IdentityMapper())
         assert (
             pm.to_downstream("2024-01-15T10:30:00|2024-01-15T10:30:00|raw") == 
"2024-01-15T10|2024-01-15|raw"
         )
diff --git a/airflow-core/tests/unit/partition_mappers/test_temporal.py 
b/airflow-core/tests/unit/partition_mappers/test_temporal.py
index 94c04f08d91..9bcb9679ea8 100644
--- a/airflow-core/tests/unit/partition_mappers/test_temporal.py
+++ b/airflow-core/tests/unit/partition_mappers/test_temporal.py
@@ -19,12 +19,12 @@ from __future__ import annotations
 import pytest
 
 from airflow.partition_mappers.temporal import (
-    DailyMapper,
-    HourlyMapper,
-    MonthlyMapper,
-    QuarterlyMapper,
-    WeeklyMapper,
-    YearlyMapper,
+    ToDailyMapper,
+    ToHourlyMapper,
+    ToMonthlyMapper,
+    ToQuarterlyMapper,
+    ToWeeklyMapper,
+    ToYearlyMapper,
     _BaseTemporalMapper,
 )
 
@@ -33,12 +33,12 @@ class TestTemporalMappers:
     @pytest.mark.parametrize(
         ("mapper_cls", "expected_downstream_key"),
         [
-            (HourlyMapper, "2026-02-10T14"),
-            (DailyMapper, "2026-02-10"),
-            (WeeklyMapper, "2026-02-09 (W07)"),
-            (MonthlyMapper, "2026-02"),
-            (QuarterlyMapper, "2026-Q1"),
-            (YearlyMapper, "2026"),
+            (ToHourlyMapper, "2026-02-10T14"),
+            (ToDailyMapper, "2026-02-10"),
+            (ToWeeklyMapper, "2026-02-09 (W07)"),
+            (ToMonthlyMapper, "2026-02"),
+            (ToQuarterlyMapper, "2026-Q1"),
+            (ToYearlyMapper, "2026"),
         ],
     )
     def test_to_downstream(
@@ -52,12 +52,12 @@ class TestTemporalMappers:
     @pytest.mark.parametrize(
         ("mapper_cls", "expected_outut_format"),
         [
-            (HourlyMapper, "%Y-%m-%dT%H"),
-            (DailyMapper, "%Y-%m-%d"),
-            (WeeklyMapper, "%Y-%m-%d (W%V)"),
-            (MonthlyMapper, "%Y-%m"),
-            (QuarterlyMapper, "%Y-Q{quarter}"),
-            (YearlyMapper, "%Y"),
+            (ToHourlyMapper, "%Y-%m-%dT%H"),
+            (ToDailyMapper, "%Y-%m-%d"),
+            (ToWeeklyMapper, "%Y-%m-%d (W%V)"),
+            (ToMonthlyMapper, "%Y-%m"),
+            (ToQuarterlyMapper, "%Y-Q{quarter}"),
+            (ToYearlyMapper, "%Y"),
         ],
     )
     def test_serialize(self, mapper_cls: type[_BaseTemporalMapper], 
expected_outut_format: str):
@@ -69,7 +69,7 @@ class TestTemporalMappers:
 
     @pytest.mark.parametrize(
         "mapper_cls",
-        [HourlyMapper, DailyMapper, WeeklyMapper, MonthlyMapper, 
QuarterlyMapper, YearlyMapper],
+        [ToHourlyMapper, ToDailyMapper, ToWeeklyMapper, ToMonthlyMapper, 
ToQuarterlyMapper, ToYearlyMapper],
     )
     def test_deserialize(self, mapper_cls):
         pm = mapper_cls.deserialize(
diff --git a/airflow-core/tests/unit/serialization/test_serialized_objects.py 
b/airflow-core/tests/unit/serialization/test_serialized_objects.py
index 50e1621c4af..514abb618aa 100644
--- a/airflow-core/tests/unit/serialization/test_serialized_objects.py
+++ b/airflow-core/tests/unit/serialization/test_serialized_objects.py
@@ -47,12 +47,12 @@ from airflow.models.taskinstance import TaskInstance
 from airflow.models.xcom_arg import XComArg
 from airflow.partition_mappers.identity import IdentityMapper as 
CoreIdentityMapper
 from airflow.partition_mappers.temporal import (
-    DailyMapper as CoreDailyMapper,
-    HourlyMapper as CoureHourlyMapper,
-    MonthlyMapper as CoreMonthlyMapper,
-    QuarterlyMapper as CoreQuarterlyMapper,
-    WeeklyMapper as CoreWeeklyMapper,
-    YearlyMapper as CoreYearlyMapper,
+    ToDailyMapper as CoreToDailyMapper,
+    ToHourlyMapper as CoreToHourlyMapper,
+    ToMonthlyMapper as CoreToMonthlyMapper,
+    ToQuarterlyMapper as CoreToQuarterlyMapper,
+    ToWeeklyMapper as CoreToWeeklyMapper,
+    ToYearlyMapper as CoreToYearlyMapper,
 )
 from airflow.providers.standard.operators.bash import BashOperator
 from airflow.providers.standard.operators.empty import EmptyOperator
@@ -60,13 +60,13 @@ from airflow.providers.standard.operators.python import 
PythonOperator
 from airflow.providers.standard.triggers.file import FileDeleteTrigger
 from airflow.sdk import (
     BaseOperator,
-    DailyMapper,
-    HourlyMapper,
     IdentityMapper,
-    MonthlyMapper,
-    QuarterlyMapper,
-    WeeklyMapper,
-    YearlyMapper,
+    ToDailyMapper,
+    ToHourlyMapper,
+    ToMonthlyMapper,
+    ToQuarterlyMapper,
+    ToWeeklyMapper,
+    ToYearlyMapper,
 )
 from airflow.sdk.definitions.asset import (
     Asset,
@@ -768,39 +768,39 @@ def test_encode_timezone():
     [
         (IdentityMapper, [], 
"airflow.partition_mappers.identity.IdentityMapper", {}),
         (
-            HourlyMapper,
+            ToHourlyMapper,
             [],
-            "airflow.partition_mappers.temporal.HourlyMapper",
+            "airflow.partition_mappers.temporal.ToHourlyMapper",
             {"input_format": "%Y-%m-%dT%H:%M:%S", "output_format": 
"%Y-%m-%dT%H"},
         ),
         (
-            DailyMapper,
+            ToDailyMapper,
             [],
-            "airflow.partition_mappers.temporal.DailyMapper",
+            "airflow.partition_mappers.temporal.ToDailyMapper",
             {"input_format": "%Y-%m-%dT%H:%M:%S", "output_format": "%Y-%m-%d"},
         ),
         (
-            WeeklyMapper,
+            ToWeeklyMapper,
             [],
-            "airflow.partition_mappers.temporal.WeeklyMapper",
+            "airflow.partition_mappers.temporal.ToWeeklyMapper",
             {"input_format": "%Y-%m-%dT%H:%M:%S", "output_format": "%Y-%m-%d 
(W%V)"},
         ),
         (
-            MonthlyMapper,
+            ToMonthlyMapper,
             [],
-            "airflow.partition_mappers.temporal.MonthlyMapper",
+            "airflow.partition_mappers.temporal.ToMonthlyMapper",
             {"input_format": "%Y-%m-%dT%H:%M:%S", "output_format": "%Y-%m"},
         ),
         (
-            QuarterlyMapper,
+            ToQuarterlyMapper,
             [],
-            "airflow.partition_mappers.temporal.QuarterlyMapper",
+            "airflow.partition_mappers.temporal.ToQuarterlyMapper",
             {"input_format": "%Y-%m-%dT%H:%M:%S", "output_format": 
"%Y-Q{quarter}"},
         ),
         (
-            YearlyMapper,
+            ToYearlyMapper,
             [],
-            "airflow.partition_mappers.temporal.YearlyMapper",
+            "airflow.partition_mappers.temporal.ToYearlyMapper",
             {"input_format": "%Y-%m-%dT%H:%M:%S", "output_format": "%Y"},
         ),
     ],
@@ -819,12 +819,12 @@ def test_encode_partition_mapper(cls, args, encode_type, 
encode_var):
     ("sdk_cls", "core_cls"),
     [
         (IdentityMapper, CoreIdentityMapper),
-        (HourlyMapper, CoureHourlyMapper),
-        (DailyMapper, CoreDailyMapper),
-        (WeeklyMapper, CoreWeeklyMapper),
-        (MonthlyMapper, CoreMonthlyMapper),
-        (QuarterlyMapper, CoreQuarterlyMapper),
-        (YearlyMapper, CoreYearlyMapper),
+        (ToHourlyMapper, CoreToHourlyMapper),
+        (ToDailyMapper, CoreToDailyMapper),
+        (ToWeeklyMapper, CoreToWeeklyMapper),
+        (ToMonthlyMapper, CoreToMonthlyMapper),
+        (ToQuarterlyMapper, CoreToQuarterlyMapper),
+        (ToYearlyMapper, CoreToYearlyMapper),
     ],
 )
 def test_decode_partition_mapper(sdk_cls, core_cls):
@@ -853,10 +853,10 @@ def test_decode_partition_mapper_not_exists():
 
 
 def test_encode_product_mapper():
-    from airflow.sdk import HourlyMapper, IdentityMapper, ProductMapper
+    from airflow.sdk import IdentityMapper, ProductMapper, ToHourlyMapper
     from airflow.serialization.encoders import encode_partition_mapper
 
-    partition_mapper = ProductMapper(IdentityMapper(), HourlyMapper())
+    partition_mapper = ProductMapper(IdentityMapper(), ToHourlyMapper())
     assert encode_partition_mapper(partition_mapper) == {
         Encoding.TYPE: "airflow.partition_mappers.product.ProductMapper",
         Encoding.VAR: {
@@ -867,7 +867,7 @@ def test_encode_product_mapper():
                     Encoding.VAR: {},
                 },
                 {
-                    Encoding.TYPE: 
"airflow.partition_mappers.temporal.HourlyMapper",
+                    Encoding.TYPE: 
"airflow.partition_mappers.temporal.ToHourlyMapper",
                     Encoding.VAR: {
                         "input_format": "%Y-%m-%dT%H:%M:%S",
                         "output_format": "%Y-%m-%dT%H",
@@ -880,11 +880,11 @@ def test_encode_product_mapper():
 
 def test_decode_product_mapper():
     from airflow.partition_mappers.product import ProductMapper as 
CoreProductMapper
-    from airflow.sdk import DailyMapper, HourlyMapper, ProductMapper
+    from airflow.sdk import ProductMapper, ToDailyMapper, ToHourlyMapper
     from airflow.serialization.decoders import decode_partition_mapper
     from airflow.serialization.encoders import encode_partition_mapper
 
-    partition_mapper = ProductMapper(HourlyMapper(), DailyMapper())
+    partition_mapper = ProductMapper(ToHourlyMapper(), ToDailyMapper())
     encoded_pm = encode_partition_mapper(partition_mapper)
 
     core_pm = decode_partition_mapper(encoded_pm)
diff --git a/task-sdk/docs/api.rst b/task-sdk/docs/api.rst
index 0565eeca0fb..5c0e6b07632 100644
--- a/task-sdk/docs/api.rst
+++ b/task-sdk/docs/api.rst
@@ -203,17 +203,17 @@ Partition Mapper
 
 .. autoapiclass:: airflow.sdk.IdentityMapper
 
-.. autoapiclass:: airflow.sdk.HourlyMapper
+.. autoapiclass:: airflow.sdk.ToHourlyMapper
 
-.. autoapiclass:: airflow.sdk.DailyMapper
+.. autoapiclass:: airflow.sdk.ToDailyMapper
 
-.. autoapiclass:: airflow.sdk.WeeklyMapper
+.. autoapiclass:: airflow.sdk.ToWeeklyMapper
 
-.. autoapiclass:: airflow.sdk.MonthlyMapper
+.. autoapiclass:: airflow.sdk.ToMonthlyMapper
 
-.. autoapiclass:: airflow.sdk.QuarterlyMapper
+.. autoapiclass:: airflow.sdk.ToQuarterlyMapper
 
-.. autoapiclass:: airflow.sdk.YearlyMapper
+.. autoapiclass:: airflow.sdk.ToYearlyMapper
 
 .. autoapiclass:: airflow.sdk.ProductMapper
 
diff --git a/task-sdk/src/airflow/sdk/__init__.py 
b/task-sdk/src/airflow/sdk/__init__.py
index 1044dfa2a44..70521464908 100644
--- a/task-sdk/src/airflow/sdk/__init__.py
+++ b/task-sdk/src/airflow/sdk/__init__.py
@@ -44,18 +44,15 @@ __all__ = [
     "CronPartitionTimetable",
     "DAG",
     "DagRunState",
-    "DailyMapper",
     "DeadlineAlert",
     "DeadlineReference",
     "DeltaDataIntervalTimetable",
     "DeltaTriggerTimetable",
     "EdgeModifier",
     "EventsTimetable",
-    "HourlyMapper",
     "IdentityMapper",
     "Label",
     "Metadata",
-    "MonthlyMapper",
     "MultipleCronTriggerTimetable",
     "ObjectStoragePath",
     "Param",
@@ -64,18 +61,21 @@ __all__ = [
     "PartitionMapper",
     "PokeReturnValue",
     "ProductMapper",
-    "QuarterlyMapper",
     "SkipMixin",
     "SyncCallback",
     "TaskGroup",
     "TaskInstance",
     "TaskInstanceState",
+    "ToDailyMapper",
+    "ToHourlyMapper",
+    "ToMonthlyMapper",
+    "ToQuarterlyMapper",
+    "ToWeeklyMapper",
+    "ToYearlyMapper",
     "TriggerRule",
     "Variable",
-    "WeeklyMapper",
     "WeightRule",
     "XComArg",
-    "YearlyMapper",
     "asset",
     "chain",
     "chain_linear",
@@ -128,12 +128,12 @@ if TYPE_CHECKING:
     from airflow.sdk.definitions.partition_mappers.identity import 
IdentityMapper
     from airflow.sdk.definitions.partition_mappers.product import ProductMapper
     from airflow.sdk.definitions.partition_mappers.temporal import (
-        DailyMapper,
-        HourlyMapper,
-        MonthlyMapper,
-        QuarterlyMapper,
-        WeeklyMapper,
-        YearlyMapper,
+        ToDailyMapper,
+        ToHourlyMapper,
+        ToMonthlyMapper,
+        ToQuarterlyMapper,
+        ToWeeklyMapper,
+        ToYearlyMapper,
     )
     from airflow.sdk.definitions.taskgroup import TaskGroup
     from airflow.sdk.definitions.template import literal
@@ -185,18 +185,15 @@ __lazy_imports: dict[str, str] = {
     "CronPartitionTimetable": ".definitions.timetables.trigger",
     "DAG": ".definitions.dag",
     "DagRunState": ".api.datamodels._generated",
-    "DailyMapper": ".definitions.partition_mappers.temporal",
     "DeadlineAlert": ".definitions.deadline",
     "DeadlineReference": ".definitions.deadline",
     "DeltaDataIntervalTimetable": ".definitions.timetables.interval",
     "DeltaTriggerTimetable": ".definitions.timetables.trigger",
     "EdgeModifier": ".definitions.edges",
     "EventsTimetable": ".definitions.timetables.events",
-    "HourlyMapper": ".definitions.partition_mappers.temporal",
     "IdentityMapper": ".definitions.partition_mappers.identity",
     "Label": ".definitions.edges",
     "Metadata": ".definitions.asset.metadata",
-    "MonthlyMapper": ".definitions.partition_mappers.temporal",
     "MultipleCronTriggerTimetable": ".definitions.timetables.trigger",
     "ObjectStoragePath": ".io.path",
     "Param": ".definitions.param",
@@ -205,19 +202,22 @@ __lazy_imports: dict[str, str] = {
     "PartitionMapper": ".definitions.partition_mappers.base",
     "PokeReturnValue": ".bases.sensor",
     "ProductMapper": ".definitions.partition_mappers.product",
-    "QuarterlyMapper": ".definitions.partition_mappers.temporal",
     "SecretCache": ".execution_time.cache",
     "SkipMixin": ".bases.skipmixin",
     "SyncCallback": ".definitions.callback",
     "TaskGroup": ".definitions.taskgroup",
     "TaskInstance": ".types",
     "TaskInstanceState": ".api.datamodels._generated",
+    "ToDailyMapper": ".definitions.partition_mappers.temporal",
+    "ToHourlyMapper": ".definitions.partition_mappers.temporal",
+    "ToMonthlyMapper": ".definitions.partition_mappers.temporal",
+    "ToQuarterlyMapper": ".definitions.partition_mappers.temporal",
+    "ToWeeklyMapper": ".definitions.partition_mappers.temporal",
+    "ToYearlyMapper": ".definitions.partition_mappers.temporal",
     "TriggerRule": ".api.datamodels._generated",
     "Variable": ".definitions.variable",
-    "WeeklyMapper": ".definitions.partition_mappers.temporal",
     "WeightRule": ".api.datamodels._generated",
     "XComArg": ".definitions.xcom_arg",
-    "YearlyMapper": ".definitions.partition_mappers.temporal",
     "asset": ".definitions.asset.decorators",
     "chain": ".bases.operator",
     "chain_linear": ".bases.operator",
diff --git a/task-sdk/src/airflow/sdk/__init__.pyi 
b/task-sdk/src/airflow/sdk/__init__.pyi
index 2670163899b..905500c6164 100644
--- a/task-sdk/src/airflow/sdk/__init__.pyi
+++ b/task-sdk/src/airflow/sdk/__init__.pyi
@@ -67,12 +67,12 @@ from airflow.sdk.definitions.partition_mappers.base import 
PartitionMapper
 from airflow.sdk.definitions.partition_mappers.identity import IdentityMapper
 from airflow.sdk.definitions.partition_mappers.product import ProductMapper
 from airflow.sdk.definitions.partition_mappers.temporal import (
-    DailyMapper,
-    HourlyMapper,
-    MonthlyMapper,
-    QuarterlyMapper,
-    WeeklyMapper,
-    YearlyMapper,
+    ToDailyMapper,
+    ToHourlyMapper,
+    ToMonthlyMapper,
+    ToQuarterlyMapper,
+    ToWeeklyMapper,
+    ToYearlyMapper,
 )
 from airflow.sdk.definitions.taskgroup import TaskGroup as TaskGroup
 from airflow.sdk.definitions.template import literal as literal
@@ -124,16 +124,13 @@ __all__ = [
     "CronPartitionTimetable",
     "DAG",
     "DagRunState",
-    "DailyMapper",
     "DeltaDataIntervalTimetable",
     "DeltaTriggerTimetable",
     "EdgeModifier",
     "EventsTimetable",
-    "HourlyMapper",
     "IdentityMapper",
     "Label",
     "Metadata",
-    "MonthlyMapper",
     "MultipleCronTriggerTimetable",
     "ObjectStoragePath",
     "Param",
@@ -141,17 +138,20 @@ __all__ = [
     "PartitionedAssetTimetable",
     "PartitionMapper",
     "ProductMapper",
-    "QuarterlyMapper",
     "SecretCache",
     "SkipMixin",
     "TaskGroup",
     "TaskInstanceState",
+    "ToDailyMapper",
+    "ToHourlyMapper",
+    "ToMonthlyMapper",
+    "ToQuarterlyMapper",
+    "ToWeeklyMapper",
+    "ToYearlyMapper",
     "TriggerRule",
     "Variable",
-    "WeeklyMapper",
     "WeightRule",
     "XComArg",
-    "YearlyMapper",
     "asset",
     "chain",
     "chain_linear",
diff --git a/task-sdk/src/airflow/sdk/definitions/partition_mappers/temporal.py 
b/task-sdk/src/airflow/sdk/definitions/partition_mappers/temporal.py
index 0d225155c7b..4e6fc4647b1 100644
--- a/task-sdk/src/airflow/sdk/definitions/partition_mappers/temporal.py
+++ b/task-sdk/src/airflow/sdk/definitions/partition_mappers/temporal.py
@@ -31,37 +31,37 @@ class _BaseTemporalMapper(PartitionMapper):
         self.output_format = output_format or self.default_output_format
 
 
-class HourlyMapper(_BaseTemporalMapper):
+class ToHourlyMapper(_BaseTemporalMapper):
     """Map a time-based partition key to hour."""
 
     default_output_format = "%Y-%m-%dT%H"
 
 
-class DailyMapper(_BaseTemporalMapper):
+class ToDailyMapper(_BaseTemporalMapper):
     """Map a time-based partition key to day."""
 
     default_output_format = "%Y-%m-%d"
 
 
-class WeeklyMapper(_BaseTemporalMapper):
+class ToWeeklyMapper(_BaseTemporalMapper):
     """Map a time-based partition key to week."""
 
     default_output_format = "%Y-%m-%d (W%V)"
 
 
-class MonthlyMapper(_BaseTemporalMapper):
+class ToMonthlyMapper(_BaseTemporalMapper):
     """Map a time-based partition key to month."""
 
     default_output_format = "%Y-%m"
 
 
-class QuarterlyMapper(_BaseTemporalMapper):
+class ToQuarterlyMapper(_BaseTemporalMapper):
     """Map a time-based partition key to quarter."""
 
     default_output_format = "%Y-Q{quarter}"
 
 
-class YearlyMapper(_BaseTemporalMapper):
+class ToYearlyMapper(_BaseTemporalMapper):
     """Map a time-based partition key to year."""
 
     default_output_format = "%Y"

Reply via email to