This is an automated email from the ASF dual-hosted git repository.

rahulvats pushed a commit to branch v3-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 10687f6a4cbae41eb06b305e0ec19914884a2987
Author: Wei Lee <[email protected]>
AuthorDate: Fri Mar 27 14:55:37 2026 +0800

    refactor(partition-mapper): rename ToXXXMapper to StartOfXXXMapper (#64160)
    
    (cherry picked from commit 3bb2dd5ec41fdd3ba81a5ff7890ee07dbbc8e0ae)
---
 .../docs/authoring-and-scheduling/assets.rst       | 24 +++----
 .../example_dags/example_asset_partition.py        | 22 +++---
 .../src/airflow/partition_mappers/temporal.py      | 12 ++--
 airflow-core/src/airflow/serialization/encoders.py | 50 ++++++-------
 airflow-core/tests/unit/jobs/test_scheduler_job.py |  4 +-
 .../tests/unit/partition_mappers/test_chain.py     | 12 ++--
 .../tests/unit/partition_mappers/test_product.py   | 34 ++++-----
 .../tests/unit/partition_mappers/test_temporal.py  | 45 +++++++-----
 .../unit/serialization/test_serialized_objects.py  | 82 +++++++++++-----------
 task-sdk/docs/api.rst                              | 12 ++--
 task-sdk/src/airflow/sdk/__init__.py               | 36 +++++-----
 task-sdk/src/airflow/sdk/__init__.pyi              | 24 +++----
 .../sdk/definitions/partition_mappers/temporal.py  | 12 ++--
 13 files changed, 188 insertions(+), 181 deletions(-)

diff --git a/airflow-core/docs/authoring-and-scheduling/assets.rst 
b/airflow-core/docs/authoring-and-scheduling/assets.rst
index 42bc78c0005..a764bc33449 100644
--- a/airflow-core/docs/authoring-and-scheduling/assets.rst
+++ b/airflow-core/docs/authoring-and-scheduling/assets.rst
@@ -434,13 +434,13 @@ For downstream partition-aware scheduling, use 
``PartitionedAssetTimetable``:
 
 .. code-block:: python
 
-    from airflow.sdk import DAG, HourlyMapper, PartitionedAssetTimetable
+    from airflow.sdk import DAG, StartOfHourMapper, PartitionedAssetTimetable
 
     with DAG(
         dag_id="clean_and_combine_player_stats",
         schedule=PartitionedAssetTimetable(
             assets=team_a_player_stats & team_b_player_stats & 
team_c_player_stats,
-            default_partition_mapper=HourlyMapper(),
+            default_partition_mapper=StartOfHourMapper(),
         ),
         catchup=False,
     ):
@@ -458,17 +458,17 @@ Partition mappers define how upstream partition keys are 
transformed to the
 downstream Dag partition key:
 
 * ``IdentityMapper`` keeps keys unchanged.
-* Temporal mappers such as ``HourlyMapper``, ``DailyMapper``, and
-  ``YearlyMapper`` normalize time keys to a chosen grain. For input key
+* Temporal mappers such as ``StartOfHourMapper``, ``StartOfDayMapper``, and
+  ``StartOfYearMapper`` normalize time keys to a chosen grain. For input key
   ``2026-03-10T09:37:51``, the default outputs are:
 
-  * ``HourlyMapper`` -> ``2026-03-10T09``
-  * ``DailyMapper`` -> ``2026-03-10``
-  * ``YearlyMapper`` -> ``2026``
+  * ``StartOfHourMapper`` -> ``2026-03-10T09``
+  * ``StartOfDayMapper`` -> ``2026-03-10``
+  * ``StartOfYearMapper`` -> ``2026``
 * ``ProductMapper`` maps composite keys segment-by-segment.
   It applies one mapper per segment and then rejoins the mapped segments.
   For example, with key ``us|2026-03-10T09:00:00``,
-  ``ProductMapper(IdentityMapper(), DailyMapper())`` produces
+  ``ProductMapper(IdentityMapper(), StartOfDayMapper())`` produces
   ``us|2026-03-10``.
 * ``AllowedKeyMapper`` validates that keys are in a fixed allow-list and
   passes the key through unchanged if valid.
@@ -481,10 +481,10 @@ Example of per-asset mapper configuration and 
composite-key mapping:
 
     from airflow.sdk import (
         Asset,
-        DailyMapper,
         IdentityMapper,
         PartitionedAssetTimetable,
         ProductMapper,
+        StartOfDayMapper,
     )
 
     regional_sales = Asset(uri="file://incoming/sales/regional.csv", 
name="regional_sales")
@@ -493,7 +493,7 @@ Example of per-asset mapper configuration and composite-key 
mapping:
         dag_id="aggregate_regional_sales",
         schedule=PartitionedAssetTimetable(
             assets=regional_sales,
-            default_partition_mapper=ProductMapper(IdentityMapper(), 
DailyMapper()),
+            default_partition_mapper=ProductMapper(IdentityMapper(), 
StartOfDayMapper()),
         ),
     ):
         ...
@@ -503,7 +503,7 @@ You can also override mappers for specific upstream assets 
with
 
 .. code-block:: python
 
-    from airflow.sdk import Asset, DAG, DailyMapper, IdentityMapper, 
PartitionedAssetTimetable
+    from airflow.sdk import Asset, DAG, StartOfDayMapper, IdentityMapper, 
PartitionedAssetTimetable
 
     hourly_sales = Asset(uri="file://incoming/sales/hourly.csv", 
name="hourly_sales")
     daily_targets = Asset(uri="file://incoming/sales/targets.csv", 
name="daily_targets")
@@ -513,7 +513,7 @@ You can also override mappers for specific upstream assets 
with
         schedule=PartitionedAssetTimetable(
             assets=hourly_sales & daily_targets,
             # Default behavior: map timestamp-like keys to daily keys.
-            default_partition_mapper=DailyMapper(),
+            default_partition_mapper=StartOfDayMapper(),
             # Override for assets that already emit daily partition keys.
             partition_mapper_config={
                 daily_targets: IdentityMapper(),
diff --git a/airflow-core/src/airflow/example_dags/example_asset_partition.py 
b/airflow-core/src/airflow/example_dags/example_asset_partition.py
index 853d0453d26..75d582f6cad 100644
--- a/airflow-core/src/airflow/example_dags/example_asset_partition.py
+++ b/airflow-core/src/airflow/example_dags/example_asset_partition.py
@@ -27,9 +27,9 @@ from airflow.sdk import (
     IdentityMapper,
     PartitionedAssetTimetable,
     ProductMapper,
-    ToDailyMapper,
-    ToHourlyMapper,
-    ToYearlyMapper,
+    StartOfDayMapper,
+    StartOfHourMapper,
+    StartOfYearMapper,
     asset,
     task,
 )
@@ -77,7 +77,7 @@ with DAG(
     dag_id="clean_and_combine_player_stats",
     schedule=PartitionedAssetTimetable(
         assets=team_a_player_stats & team_b_player_stats & team_c_player_stats,
-        default_partition_mapper=ToHourlyMapper(),
+        default_partition_mapper=StartOfHourMapper(),
     ),
     catchup=False,
     tags=["player-stats", "cleanup"],
@@ -85,7 +85,7 @@ with DAG(
     """
     Combine hourly partitions from Team A, B and C into a single curated 
dataset.
 
-    This Dag demonstrates multi-asset partition alignment using ToHourlyMapper.
+    This Dag demonstrates multi-asset partition alignment using 
StartOfHourMapper.
     """
 
     @task(outlets=[combined_player_stats])
@@ -101,7 +101,7 @@ with DAG(
 @asset(
     uri="file://analytics/player-stats/computed-player-odds.csv",
     # Fallback to IdentityMapper if no partition_mapper is specified.
-    # If we want to other temporal mapper (e.g., ToHourlyMapper) here,
+    # If we want to other temporal mapper (e.g., StartOfHourMapper) here,
     # make sure the input_format is changed since the partition_key is now in 
"%Y-%m-%dT%H" format
     # instead of a valid timestamp
     schedule=PartitionedAssetTimetable(assets=combined_player_stats),
@@ -121,9 +121,9 @@ with DAG(
     schedule=PartitionedAssetTimetable(
         assets=(combined_player_stats & team_a_player_stats & 
Asset.ref(name="team_b_player_stats")),
         partition_mapper_config={
-            combined_player_stats: ToYearlyMapper(),  # incompatible on purpose
-            team_a_player_stats: ToHourlyMapper(),
-            Asset.ref(name="team_b_player_stats"): ToHourlyMapper(),
+            combined_player_stats: StartOfYearMapper(),  # incompatible on 
purpose
+            team_a_player_stats: StartOfHourMapper(),
+            Asset.ref(name="team_b_player_stats"): StartOfHourMapper(),
         },
     ),
     catchup=False,
@@ -164,7 +164,7 @@ with DAG(
     dag_id="aggregate_regional_sales",
     schedule=PartitionedAssetTimetable(
         assets=regional_sales,
-        default_partition_mapper=ProductMapper(IdentityMapper(), 
ToDailyMapper()),
+        default_partition_mapper=ProductMapper(IdentityMapper(), 
StartOfDayMapper()),
     ),
     catchup=False,
     tags=["sales", "aggregation"],
@@ -173,7 +173,7 @@ with DAG(
     Aggregate regional sales using ProductMapper.
 
     The ProductMapper splits the composite key "region|timestamp" and applies
-    IdentityMapper to the region segment and ToDailyMapper to the timestamp 
segment,
+    IdentityMapper to the region segment and StartOfDayMapper to the timestamp 
segment,
     aligning hourly partitions to daily granularity per region.
     """
 
diff --git a/airflow-core/src/airflow/partition_mappers/temporal.py 
b/airflow-core/src/airflow/partition_mappers/temporal.py
index b30e1879d53..9c86bace56b 100644
--- a/airflow-core/src/airflow/partition_mappers/temporal.py
+++ b/airflow-core/src/airflow/partition_mappers/temporal.py
@@ -63,7 +63,7 @@ class _BaseTemporalMapper(PartitionMapper, ABC):
         )
 
 
-class ToHourlyMapper(_BaseTemporalMapper):
+class StartOfHourMapper(_BaseTemporalMapper):
     """Map a time-based partition key to hour."""
 
     default_output_format = "%Y-%m-%dT%H"
@@ -72,7 +72,7 @@ class ToHourlyMapper(_BaseTemporalMapper):
         return dt.replace(minute=0, second=0, microsecond=0)
 
 
-class ToDailyMapper(_BaseTemporalMapper):
+class StartOfDayMapper(_BaseTemporalMapper):
     """Map a time-based partition key to day."""
 
     default_output_format = "%Y-%m-%d"
@@ -81,7 +81,7 @@ class ToDailyMapper(_BaseTemporalMapper):
         return dt.replace(hour=0, minute=0, second=0, microsecond=0)
 
 
-class ToWeeklyMapper(_BaseTemporalMapper):
+class StartOfWeekMapper(_BaseTemporalMapper):
     """Map a time-based partition key to week."""
 
     default_output_format = "%Y-%m-%d (W%V)"
@@ -91,7 +91,7 @@ class ToWeeklyMapper(_BaseTemporalMapper):
         return start.replace(hour=0, minute=0, second=0, microsecond=0)
 
 
-class ToMonthlyMapper(_BaseTemporalMapper):
+class StartOfMonthMapper(_BaseTemporalMapper):
     """Map a time-based partition key to month."""
 
     default_output_format = "%Y-%m"
@@ -106,7 +106,7 @@ class ToMonthlyMapper(_BaseTemporalMapper):
         )
 
 
-class ToQuarterlyMapper(_BaseTemporalMapper):
+class StartOfQuarterMapper(_BaseTemporalMapper):
     """Map a time-based partition key to quarter."""
 
     default_output_format = "%Y-Q{quarter}"
@@ -128,7 +128,7 @@ class ToQuarterlyMapper(_BaseTemporalMapper):
         return dt.strftime(self.output_format).format(quarter=quarter)
 
 
-class ToYearlyMapper(_BaseTemporalMapper):
+class StartOfYearMapper(_BaseTemporalMapper):
     """Map a time-based partition key to year."""
 
     default_output_format = "%Y"
diff --git a/airflow-core/src/airflow/serialization/encoders.py 
b/airflow-core/src/airflow/serialization/encoders.py
index eca84065888..dcb064dcde0 100644
--- a/airflow-core/src/airflow/serialization/encoders.py
+++ b/airflow-core/src/airflow/serialization/encoders.py
@@ -44,15 +44,15 @@ from airflow.sdk import (
     MultipleCronTriggerTimetable,
     PartitionMapper,
     ProductMapper,
-    ToDailyMapper,
-    ToMonthlyMapper,
-    ToQuarterlyMapper,
-    ToWeeklyMapper,
-    ToYearlyMapper,
+    StartOfDayMapper,
+    StartOfMonthMapper,
+    StartOfQuarterMapper,
+    StartOfWeekMapper,
+    StartOfYearMapper,
 )
 from airflow.sdk.bases.timetable import BaseTimetable
 from airflow.sdk.definitions.asset import AssetRef
-from airflow.sdk.definitions.partition_mappers.temporal import ToHourlyMapper
+from airflow.sdk.definitions.partition_mappers.temporal import 
StartOfHourMapper
 from airflow.sdk.definitions.timetables.assets import (
     AssetTriggeredTimetable,
     PartitionedAssetTimetable,
@@ -393,16 +393,16 @@ class _Serializer:
         }
 
     BUILTIN_PARTITION_MAPPERS: dict[type, str] = {
+        AllowedKeyMapper: 
"airflow.partition_mappers.allowed_key.AllowedKeyMapper",
         ChainMapper: "airflow.partition_mappers.chain.ChainMapper",
         IdentityMapper: "airflow.partition_mappers.identity.IdentityMapper",
-        ToHourlyMapper: "airflow.partition_mappers.temporal.ToHourlyMapper",
-        ToDailyMapper: "airflow.partition_mappers.temporal.ToDailyMapper",
-        ToWeeklyMapper: "airflow.partition_mappers.temporal.ToWeeklyMapper",
-        ToMonthlyMapper: "airflow.partition_mappers.temporal.ToMonthlyMapper",
-        ToQuarterlyMapper: 
"airflow.partition_mappers.temporal.ToQuarterlyMapper",
-        ToYearlyMapper: "airflow.partition_mappers.temporal.ToYearlyMapper",
         ProductMapper: "airflow.partition_mappers.product.ProductMapper",
-        AllowedKeyMapper: 
"airflow.partition_mappers.allowed_key.AllowedKeyMapper",
+        StartOfDayMapper: 
"airflow.partition_mappers.temporal.StartOfDayMapper",
+        StartOfHourMapper: 
"airflow.partition_mappers.temporal.StartOfHourMapper",
+        StartOfMonthMapper: 
"airflow.partition_mappers.temporal.StartOfMonthMapper",
+        StartOfQuarterMapper: 
"airflow.partition_mappers.temporal.StartOfQuarterMapper",
+        StartOfWeekMapper: 
"airflow.partition_mappers.temporal.StartOfWeekMapper",
+        StartOfYearMapper: 
"airflow.partition_mappers.temporal.StartOfYearMapper",
     }
 
     @functools.singledispatchmethod
@@ -421,20 +421,20 @@ class _Serializer:
     def _(self, partition_mapper: IdentityMapper) -> dict[str, Any]:
         return {}
 
-    @serialize_partition_mapper.register(ToHourlyMapper)
-    @serialize_partition_mapper.register(ToDailyMapper)
-    @serialize_partition_mapper.register(ToWeeklyMapper)
-    @serialize_partition_mapper.register(ToMonthlyMapper)
-    @serialize_partition_mapper.register(ToQuarterlyMapper)
-    @serialize_partition_mapper.register(ToYearlyMapper)
+    @serialize_partition_mapper.register(StartOfHourMapper)
+    @serialize_partition_mapper.register(StartOfDayMapper)
+    @serialize_partition_mapper.register(StartOfWeekMapper)
+    @serialize_partition_mapper.register(StartOfMonthMapper)
+    @serialize_partition_mapper.register(StartOfQuarterMapper)
+    @serialize_partition_mapper.register(StartOfYearMapper)
     def _(
         self,
-        partition_mapper: ToHourlyMapper
-        | ToDailyMapper
-        | ToWeeklyMapper
-        | ToMonthlyMapper
-        | ToQuarterlyMapper
-        | ToYearlyMapper,
+        partition_mapper: StartOfHourMapper
+        | StartOfDayMapper
+        | StartOfWeekMapper
+        | StartOfMonthMapper
+        | StartOfQuarterMapper
+        | StartOfYearMapper,
     ) -> dict[str, Any]:
         return {
             "input_format": partition_mapper.input_format,
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py 
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index f9ae60a9c35..fd77644bc44 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -92,7 +92,7 @@ from airflow.sdk import (
     AssetWatcher,
     CronPartitionTimetable,
     IdentityMapper,
-    ToHourlyMapper,
+    StartOfHourMapper,
     task,
 )
 from airflow.sdk.definitions.callback import AsyncCallback, SyncCallback
@@ -8982,7 +8982,7 @@ def 
test_partitioned_dag_run_with_invalid_mapping(dag_maker: DagMaker, session:
         dag_id="asset-event-consumer",
         schedule=PartitionedAssetTimetable(
             assets=asset_1,
-            default_partition_mapper=ToHourlyMapper(),
+            default_partition_mapper=StartOfHourMapper(),
         ),
         session=session,
     ):
diff --git a/airflow-core/tests/unit/partition_mappers/test_chain.py 
b/airflow-core/tests/unit/partition_mappers/test_chain.py
index cdce301deb0..79f888af75f 100644
--- a/airflow-core/tests/unit/partition_mappers/test_chain.py
+++ b/airflow-core/tests/unit/partition_mappers/test_chain.py
@@ -22,7 +22,7 @@ import pytest
 from airflow.partition_mappers.base import PartitionMapper
 from airflow.partition_mappers.chain import ChainMapper
 from airflow.partition_mappers.identity import IdentityMapper
-from airflow.partition_mappers.temporal import ToDailyMapper, ToHourlyMapper
+from airflow.partition_mappers.temporal import StartOfDayMapper, 
StartOfHourMapper
 
 
 class _InvalidReturnMapper(PartitionMapper):
@@ -37,7 +37,7 @@ class _InvalidIterableMapper(PartitionMapper):
 
 class TestChainMapper:
     def test_to_downstream(self):
-        sm = ChainMapper(ToHourlyMapper(), 
ToDailyMapper(input_format="%Y-%m-%dT%H"))
+        sm = ChainMapper(StartOfHourMapper(), 
StartOfDayMapper(input_format="%Y-%m-%dT%H"))
         assert sm.to_downstream("2024-01-15T10:30:00") == "2024-01-15"
 
     def test_to_downstream_invalid_non_iterable_return(self):
@@ -53,17 +53,17 @@ class TestChainMapper:
     def test_serialize(self):
         from airflow.serialization.encoders import encode_partition_mapper
 
-        sm = ChainMapper(ToHourlyMapper(), 
ToDailyMapper(input_format="%Y-%m-%dT%H"))
+        sm = ChainMapper(StartOfHourMapper(), 
StartOfDayMapper(input_format="%Y-%m-%dT%H"))
         result = sm.serialize()
         assert result == {
             "mappers": [
-                encode_partition_mapper(ToHourlyMapper()),
-                
encode_partition_mapper(ToDailyMapper(input_format="%Y-%m-%dT%H")),
+                encode_partition_mapper(StartOfHourMapper()),
+                
encode_partition_mapper(StartOfDayMapper(input_format="%Y-%m-%dT%H")),
             ],
         }
 
     def test_deserialize(self):
-        sm = ChainMapper(ToHourlyMapper(), 
ToDailyMapper(input_format="%Y-%m-%dT%H"))
+        sm = ChainMapper(StartOfHourMapper(), 
StartOfDayMapper(input_format="%Y-%m-%dT%H"))
         serialized = sm.serialize()
         restored = ChainMapper.deserialize(serialized)
         assert isinstance(restored, ChainMapper)
diff --git a/airflow-core/tests/unit/partition_mappers/test_product.py 
b/airflow-core/tests/unit/partition_mappers/test_product.py
index fba3ae868a8..17269f261ea 100644
--- a/airflow-core/tests/unit/partition_mappers/test_product.py
+++ b/airflow-core/tests/unit/partition_mappers/test_product.py
@@ -21,61 +21,61 @@ import pytest
 
 from airflow.partition_mappers.identity import IdentityMapper
 from airflow.partition_mappers.product import ProductMapper
-from airflow.partition_mappers.temporal import ToDailyMapper, ToHourlyMapper
+from airflow.partition_mappers.temporal import StartOfDayMapper, 
StartOfHourMapper
 
 
 class TestProductMapper:
     def test_to_downstream(self):
-        pm = ProductMapper(ToHourlyMapper(), ToDailyMapper())
+        pm = ProductMapper(StartOfHourMapper(), StartOfDayMapper())
         assert pm.to_downstream("2024-01-15T10:30:00|2024-01-15T10:30:00") == 
"2024-01-15T10|2024-01-15"
 
     def test_to_downstream_wrong_segment_count(self):
-        pm = ProductMapper(ToHourlyMapper(), ToDailyMapper())
+        pm = ProductMapper(StartOfHourMapper(), StartOfDayMapper())
         with pytest.raises(ValueError, match="Expected 2 segments"):
             pm.to_downstream("2024-01-15T10:30:00|2024-01-15T10:30:00|extra")
 
     def test_to_downstream_single_segment_for_two_mappers(self):
-        pm = ProductMapper(ToHourlyMapper(), ToDailyMapper())
+        pm = ProductMapper(StartOfHourMapper(), StartOfDayMapper())
         with pytest.raises(ValueError, match="Expected 2 segments"):
             pm.to_downstream("2024-01-15T10:30:00")
 
     def test_custom_delimiter(self):
-        pm = ProductMapper(ToHourlyMapper(), ToDailyMapper(), delimiter="::")
+        pm = ProductMapper(StartOfHourMapper(), StartOfDayMapper(), 
delimiter="::")
         assert pm.to_downstream("2024-01-15T10:30:00::2024-01-15T10:30:00") == 
"2024-01-15T10::2024-01-15"
 
     def test_custom_delimiter_wrong_segment_count(self):
-        pm = ProductMapper(ToHourlyMapper(), ToDailyMapper(), delimiter="::")
+        pm = ProductMapper(StartOfHourMapper(), StartOfDayMapper(), 
delimiter="::")
         with pytest.raises(ValueError, match="Expected 2 segments"):
             pm.to_downstream("2024-01-15T10:30:00::2024-01-15T10:30:00::extra")
 
     def test_serialize(self):
         from airflow.serialization.encoders import encode_partition_mapper
 
-        pm = ProductMapper(ToHourlyMapper(), ToDailyMapper())
+        pm = ProductMapper(StartOfHourMapper(), StartOfDayMapper())
         result = pm.serialize()
         assert result == {
             "delimiter": "|",
             "mappers": [
-                encode_partition_mapper(ToHourlyMapper()),
-                encode_partition_mapper(ToDailyMapper()),
+                encode_partition_mapper(StartOfHourMapper()),
+                encode_partition_mapper(StartOfDayMapper()),
             ],
         }
 
     def test_serialize_custom_delimiter(self):
         from airflow.serialization.encoders import encode_partition_mapper
 
-        pm = ProductMapper(ToHourlyMapper(), ToDailyMapper(), delimiter="::")
+        pm = ProductMapper(StartOfHourMapper(), StartOfDayMapper(), 
delimiter="::")
         result = pm.serialize()
         assert result == {
             "delimiter": "::",
             "mappers": [
-                encode_partition_mapper(ToHourlyMapper()),
-                encode_partition_mapper(ToDailyMapper()),
+                encode_partition_mapper(StartOfHourMapper()),
+                encode_partition_mapper(StartOfDayMapper()),
             ],
         }
 
     def test_deserialize(self):
-        pm = ProductMapper(ToHourlyMapper(), ToDailyMapper())
+        pm = ProductMapper(StartOfHourMapper(), StartOfDayMapper())
         serialized = pm.serialize()
         restored = ProductMapper.deserialize(serialized)
         assert isinstance(restored, ProductMapper)
@@ -84,7 +84,7 @@ class TestProductMapper:
         assert 
restored.to_downstream("2024-01-15T10:30:00|2024-01-15T10:30:00") == 
"2024-01-15T10|2024-01-15"
 
     def test_deserialize_custom_delimiter(self):
-        pm = ProductMapper(ToHourlyMapper(), ToDailyMapper(), delimiter="::")
+        pm = ProductMapper(StartOfHourMapper(), StartOfDayMapper(), 
delimiter="::")
         serialized = pm.serialize()
         restored = ProductMapper.deserialize(serialized)
         assert isinstance(restored, ProductMapper)
@@ -99,15 +99,15 @@ class TestProductMapper:
 
         data = {
             "mappers": [
-                encode_partition_mapper(ToHourlyMapper()),
-                encode_partition_mapper(ToDailyMapper()),
+                encode_partition_mapper(StartOfHourMapper()),
+                encode_partition_mapper(StartOfDayMapper()),
             ],
         }
         restored = ProductMapper.deserialize(data)
         assert restored.delimiter == "|"
 
     def test_three_mappers(self):
-        pm = ProductMapper(ToHourlyMapper(), ToDailyMapper(), IdentityMapper())
+        pm = ProductMapper(StartOfHourMapper(), StartOfDayMapper(), 
IdentityMapper())
         assert (
             pm.to_downstream("2024-01-15T10:30:00|2024-01-15T10:30:00|raw") == 
"2024-01-15T10|2024-01-15|raw"
         )
diff --git a/airflow-core/tests/unit/partition_mappers/test_temporal.py 
b/airflow-core/tests/unit/partition_mappers/test_temporal.py
index 9bcb9679ea8..89ad98cebbc 100644
--- a/airflow-core/tests/unit/partition_mappers/test_temporal.py
+++ b/airflow-core/tests/unit/partition_mappers/test_temporal.py
@@ -19,12 +19,12 @@ from __future__ import annotations
 import pytest
 
 from airflow.partition_mappers.temporal import (
-    ToDailyMapper,
-    ToHourlyMapper,
-    ToMonthlyMapper,
-    ToQuarterlyMapper,
-    ToWeeklyMapper,
-    ToYearlyMapper,
+    StartOfDayMapper,
+    StartOfHourMapper,
+    StartOfMonthMapper,
+    StartOfQuarterMapper,
+    StartOfWeekMapper,
+    StartOfYearMapper,
     _BaseTemporalMapper,
 )
 
@@ -33,12 +33,12 @@ class TestTemporalMappers:
     @pytest.mark.parametrize(
         ("mapper_cls", "expected_downstream_key"),
         [
-            (ToHourlyMapper, "2026-02-10T14"),
-            (ToDailyMapper, "2026-02-10"),
-            (ToWeeklyMapper, "2026-02-09 (W07)"),
-            (ToMonthlyMapper, "2026-02"),
-            (ToQuarterlyMapper, "2026-Q1"),
-            (ToYearlyMapper, "2026"),
+            (StartOfHourMapper, "2026-02-10T14"),
+            (StartOfDayMapper, "2026-02-10"),
+            (StartOfWeekMapper, "2026-02-09 (W07)"),
+            (StartOfMonthMapper, "2026-02"),
+            (StartOfQuarterMapper, "2026-Q1"),
+            (StartOfYearMapper, "2026"),
         ],
     )
     def test_to_downstream(
@@ -52,12 +52,12 @@ class TestTemporalMappers:
     @pytest.mark.parametrize(
         ("mapper_cls", "expected_outut_format"),
         [
-            (ToHourlyMapper, "%Y-%m-%dT%H"),
-            (ToDailyMapper, "%Y-%m-%d"),
-            (ToWeeklyMapper, "%Y-%m-%d (W%V)"),
-            (ToMonthlyMapper, "%Y-%m"),
-            (ToQuarterlyMapper, "%Y-Q{quarter}"),
-            (ToYearlyMapper, "%Y"),
+            (StartOfHourMapper, "%Y-%m-%dT%H"),
+            (StartOfDayMapper, "%Y-%m-%d"),
+            (StartOfWeekMapper, "%Y-%m-%d (W%V)"),
+            (StartOfMonthMapper, "%Y-%m"),
+            (StartOfQuarterMapper, "%Y-Q{quarter}"),
+            (StartOfYearMapper, "%Y"),
         ],
     )
     def test_serialize(self, mapper_cls: type[_BaseTemporalMapper], 
expected_outut_format: str):
@@ -69,7 +69,14 @@ class TestTemporalMappers:
 
     @pytest.mark.parametrize(
         "mapper_cls",
-        [ToHourlyMapper, ToDailyMapper, ToWeeklyMapper, ToMonthlyMapper, 
ToQuarterlyMapper, ToYearlyMapper],
+        [
+            StartOfHourMapper,
+            StartOfDayMapper,
+            StartOfWeekMapper,
+            StartOfMonthMapper,
+            StartOfQuarterMapper,
+            StartOfYearMapper,
+        ],
     )
     def test_deserialize(self, mapper_cls):
         pm = mapper_cls.deserialize(
diff --git a/airflow-core/tests/unit/serialization/test_serialized_objects.py 
b/airflow-core/tests/unit/serialization/test_serialized_objects.py
index 7be81c2fe3b..f59939be9b2 100644
--- a/airflow-core/tests/unit/serialization/test_serialized_objects.py
+++ b/airflow-core/tests/unit/serialization/test_serialized_objects.py
@@ -47,12 +47,12 @@ from airflow.models.taskinstance import TaskInstance
 from airflow.models.xcom_arg import XComArg
 from airflow.partition_mappers.identity import IdentityMapper as 
CoreIdentityMapper
 from airflow.partition_mappers.temporal import (
-    ToDailyMapper as CoreToDailyMapper,
-    ToHourlyMapper as CoreToHourlyMapper,
-    ToMonthlyMapper as CoreToMonthlyMapper,
-    ToQuarterlyMapper as CoreToQuarterlyMapper,
-    ToWeeklyMapper as CoreToWeeklyMapper,
-    ToYearlyMapper as CoreToYearlyMapper,
+    StartOfDayMapper as CoreStartOfDayMapper,
+    StartOfHourMapper as CoreStartOfHourMapper,
+    StartOfMonthMapper as CoreStartOfMonthMapper,
+    StartOfQuarterMapper as CoreStartOfQuarterMapper,
+    StartOfWeekMapper as CoreStartOfWeekMapper,
+    StartOfYearMapper as CoreStartOfYearMapper,
 )
 from airflow.providers.standard.operators.bash import BashOperator
 from airflow.providers.standard.operators.empty import EmptyOperator
@@ -61,12 +61,12 @@ from airflow.providers.standard.triggers.file import 
FileDeleteTrigger
 from airflow.sdk import (
     BaseOperator,
     IdentityMapper,
-    ToDailyMapper,
-    ToHourlyMapper,
-    ToMonthlyMapper,
-    ToQuarterlyMapper,
-    ToWeeklyMapper,
-    ToYearlyMapper,
+    StartOfDayMapper,
+    StartOfHourMapper,
+    StartOfMonthMapper,
+    StartOfQuarterMapper,
+    StartOfWeekMapper,
+    StartOfYearMapper,
 )
 from airflow.sdk.definitions.asset import (
     Asset,
@@ -768,39 +768,39 @@ def test_encode_timezone():
     [
         (IdentityMapper, [], 
"airflow.partition_mappers.identity.IdentityMapper", {}),
         (
-            ToHourlyMapper,
+            StartOfHourMapper,
             [],
-            "airflow.partition_mappers.temporal.ToHourlyMapper",
+            "airflow.partition_mappers.temporal.StartOfHourMapper",
             {"input_format": "%Y-%m-%dT%H:%M:%S", "output_format": 
"%Y-%m-%dT%H"},
         ),
         (
-            ToDailyMapper,
+            StartOfDayMapper,
             [],
-            "airflow.partition_mappers.temporal.ToDailyMapper",
+            "airflow.partition_mappers.temporal.StartOfDayMapper",
             {"input_format": "%Y-%m-%dT%H:%M:%S", "output_format": "%Y-%m-%d"},
         ),
         (
-            ToWeeklyMapper,
+            StartOfWeekMapper,
             [],
-            "airflow.partition_mappers.temporal.ToWeeklyMapper",
+            "airflow.partition_mappers.temporal.StartOfWeekMapper",
             {"input_format": "%Y-%m-%dT%H:%M:%S", "output_format": "%Y-%m-%d 
(W%V)"},
         ),
         (
-            ToMonthlyMapper,
+            StartOfMonthMapper,
             [],
-            "airflow.partition_mappers.temporal.ToMonthlyMapper",
+            "airflow.partition_mappers.temporal.StartOfMonthMapper",
             {"input_format": "%Y-%m-%dT%H:%M:%S", "output_format": "%Y-%m"},
         ),
         (
-            ToQuarterlyMapper,
+            StartOfQuarterMapper,
             [],
-            "airflow.partition_mappers.temporal.ToQuarterlyMapper",
+            "airflow.partition_mappers.temporal.StartOfQuarterMapper",
             {"input_format": "%Y-%m-%dT%H:%M:%S", "output_format": 
"%Y-Q{quarter}"},
         ),
         (
-            ToYearlyMapper,
+            StartOfYearMapper,
             [],
-            "airflow.partition_mappers.temporal.ToYearlyMapper",
+            "airflow.partition_mappers.temporal.StartOfYearMapper",
             {"input_format": "%Y-%m-%dT%H:%M:%S", "output_format": "%Y"},
         ),
     ],
@@ -819,12 +819,12 @@ def test_encode_partition_mapper(cls, args, encode_type, 
encode_var):
     ("sdk_cls", "core_cls"),
     [
         (IdentityMapper, CoreIdentityMapper),
-        (ToHourlyMapper, CoreToHourlyMapper),
-        (ToDailyMapper, CoreToDailyMapper),
-        (ToWeeklyMapper, CoreToWeeklyMapper),
-        (ToMonthlyMapper, CoreToMonthlyMapper),
-        (ToQuarterlyMapper, CoreToQuarterlyMapper),
-        (ToYearlyMapper, CoreToYearlyMapper),
+        (StartOfHourMapper, CoreStartOfHourMapper),
+        (StartOfDayMapper, CoreStartOfDayMapper),
+        (StartOfWeekMapper, CoreStartOfWeekMapper),
+        (StartOfMonthMapper, CoreStartOfMonthMapper),
+        (StartOfQuarterMapper, CoreStartOfQuarterMapper),
+        (StartOfYearMapper, CoreStartOfYearMapper),
     ],
 )
 def test_decode_partition_mapper(sdk_cls, core_cls):
@@ -853,10 +853,10 @@ def test_decode_partition_mapper_not_exists():
 
 
 def test_encode_product_mapper():
-    from airflow.sdk import IdentityMapper, ProductMapper, ToHourlyMapper
+    from airflow.sdk import IdentityMapper, ProductMapper, StartOfHourMapper
     from airflow.serialization.encoders import encode_partition_mapper
 
-    partition_mapper = ProductMapper(IdentityMapper(), ToHourlyMapper())
+    partition_mapper = ProductMapper(IdentityMapper(), StartOfHourMapper())
     assert encode_partition_mapper(partition_mapper) == {
         Encoding.TYPE: "airflow.partition_mappers.product.ProductMapper",
         Encoding.VAR: {
@@ -867,7 +867,7 @@ def test_encode_product_mapper():
                     Encoding.VAR: {},
                 },
                 {
-                    Encoding.TYPE: 
"airflow.partition_mappers.temporal.ToHourlyMapper",
+                    Encoding.TYPE: 
"airflow.partition_mappers.temporal.StartOfHourMapper",
                     Encoding.VAR: {
                         "input_format": "%Y-%m-%dT%H:%M:%S",
                         "output_format": "%Y-%m-%dT%H",
@@ -880,11 +880,11 @@ def test_encode_product_mapper():
 
 def test_decode_product_mapper():
     from airflow.partition_mappers.product import ProductMapper as 
CoreProductMapper
-    from airflow.sdk import ProductMapper, ToDailyMapper, ToHourlyMapper
+    from airflow.sdk import ProductMapper, StartOfDayMapper, StartOfHourMapper
     from airflow.serialization.decoders import decode_partition_mapper
     from airflow.serialization.encoders import encode_partition_mapper
 
-    partition_mapper = ProductMapper(ToHourlyMapper(), ToDailyMapper())
+    partition_mapper = ProductMapper(StartOfHourMapper(), StartOfDayMapper())
     encoded_pm = encode_partition_mapper(partition_mapper)
 
     core_pm = decode_partition_mapper(encoded_pm)
@@ -896,23 +896,23 @@ def test_decode_product_mapper():
 
 
 def test_encode_chain_mapper():
-    from airflow.sdk import ChainMapper, ToDailyMapper, ToHourlyMapper
+    from airflow.sdk import ChainMapper, StartOfDayMapper, StartOfHourMapper
     from airflow.serialization.encoders import encode_partition_mapper
 
-    partition_mapper = ChainMapper(ToHourlyMapper(), 
ToDailyMapper(input_format="%Y-%m-%dT%H"))
+    partition_mapper = ChainMapper(StartOfHourMapper(), 
StartOfDayMapper(input_format="%Y-%m-%dT%H"))
     assert encode_partition_mapper(partition_mapper) == {
         Encoding.TYPE: "airflow.partition_mappers.chain.ChainMapper",
         Encoding.VAR: {
             "mappers": [
                 {
-                    Encoding.TYPE: 
"airflow.partition_mappers.temporal.ToHourlyMapper",
+                    Encoding.TYPE: 
"airflow.partition_mappers.temporal.StartOfHourMapper",
                     Encoding.VAR: {
                         "input_format": "%Y-%m-%dT%H:%M:%S",
                         "output_format": "%Y-%m-%dT%H",
                     },
                 },
                 {
-                    Encoding.TYPE: 
"airflow.partition_mappers.temporal.ToDailyMapper",
+                    Encoding.TYPE: 
"airflow.partition_mappers.temporal.StartOfDayMapper",
                     Encoding.VAR: {
                         "input_format": "%Y-%m-%dT%H",
                         "output_format": "%Y-%m-%d",
@@ -925,11 +925,11 @@ def test_encode_chain_mapper():
 
 def test_decode_chain_mapper():
     from airflow.partition_mappers.chain import ChainMapper as CoreChainMapper
-    from airflow.sdk import ChainMapper, ToDailyMapper, ToHourlyMapper
+    from airflow.sdk import ChainMapper, StartOfDayMapper, StartOfHourMapper
     from airflow.serialization.decoders import decode_partition_mapper
     from airflow.serialization.encoders import encode_partition_mapper
 
-    partition_mapper = ChainMapper(ToHourlyMapper(), 
ToDailyMapper(input_format="%Y-%m-%dT%H"))
+    partition_mapper = ChainMapper(StartOfHourMapper(), 
StartOfDayMapper(input_format="%Y-%m-%dT%H"))
     encoded_pm = encode_partition_mapper(partition_mapper)
 
     core_pm = decode_partition_mapper(encoded_pm)
diff --git a/task-sdk/docs/api.rst b/task-sdk/docs/api.rst
index 03b4d3999ba..9aa11c5f94d 100644
--- a/task-sdk/docs/api.rst
+++ b/task-sdk/docs/api.rst
@@ -205,17 +205,17 @@ Partition Mapper
 
 .. autoapiclass:: airflow.sdk.IdentityMapper
 
-.. autoapiclass:: airflow.sdk.ToHourlyMapper
+.. autoapiclass:: airflow.sdk.StartOfHourMapper
 
-.. autoapiclass:: airflow.sdk.ToDailyMapper
+.. autoapiclass:: airflow.sdk.StartOfDayMapper
 
-.. autoapiclass:: airflow.sdk.ToWeeklyMapper
+.. autoapiclass:: airflow.sdk.StartOfWeekMapper
 
-.. autoapiclass:: airflow.sdk.ToMonthlyMapper
+.. autoapiclass:: airflow.sdk.StartOfMonthMapper
 
-.. autoapiclass:: airflow.sdk.ToQuarterlyMapper
+.. autoapiclass:: airflow.sdk.StartOfQuarterMapper
 
-.. autoapiclass:: airflow.sdk.ToYearlyMapper
+.. autoapiclass:: airflow.sdk.StartOfYearMapper
 
 .. autoapiclass:: airflow.sdk.ProductMapper
 
diff --git a/task-sdk/src/airflow/sdk/__init__.py 
b/task-sdk/src/airflow/sdk/__init__.py
index 8aa55a0e623..d84d327a89c 100644
--- a/task-sdk/src/airflow/sdk/__init__.py
+++ b/task-sdk/src/airflow/sdk/__init__.py
@@ -64,15 +64,15 @@ __all__ = [
     "ProductMapper",
     "SkipMixin",
     "SyncCallback",
+    "StartOfDayMapper",
+    "StartOfHourMapper",
+    "StartOfMonthMapper",
+    "StartOfQuarterMapper",
+    "StartOfWeekMapper",
+    "StartOfYearMapper",
     "TaskGroup",
     "TaskInstance",
     "TaskInstanceState",
-    "ToDailyMapper",
-    "ToHourlyMapper",
-    "ToMonthlyMapper",
-    "ToQuarterlyMapper",
-    "ToWeeklyMapper",
-    "ToYearlyMapper",
     "TriggerRule",
     "Variable",
     "WeightRule",
@@ -130,12 +130,12 @@ if TYPE_CHECKING:
     from airflow.sdk.definitions.partition_mappers.identity import 
IdentityMapper
     from airflow.sdk.definitions.partition_mappers.product import ProductMapper
     from airflow.sdk.definitions.partition_mappers.temporal import (
-        ToDailyMapper,
-        ToHourlyMapper,
-        ToMonthlyMapper,
-        ToQuarterlyMapper,
-        ToWeeklyMapper,
-        ToYearlyMapper,
+        StartOfDayMapper,
+        StartOfHourMapper,
+        StartOfMonthMapper,
+        StartOfQuarterMapper,
+        StartOfWeekMapper,
+        StartOfYearMapper,
     )
     from airflow.sdk.definitions.taskgroup import TaskGroup
     from airflow.sdk.definitions.template import literal
@@ -208,15 +208,15 @@ __lazy_imports: dict[str, str] = {
     "SecretCache": ".execution_time.cache",
     "SkipMixin": ".bases.skipmixin",
     "SyncCallback": ".definitions.callback",
+    "StartOfDayMapper": ".definitions.partition_mappers.temporal",
+    "StartOfHourMapper": ".definitions.partition_mappers.temporal",
+    "StartOfMonthMapper": ".definitions.partition_mappers.temporal",
+    "StartOfQuarterMapper": ".definitions.partition_mappers.temporal",
+    "StartOfWeekMapper": ".definitions.partition_mappers.temporal",
+    "StartOfYearMapper": ".definitions.partition_mappers.temporal",
     "TaskGroup": ".definitions.taskgroup",
     "TaskInstance": ".types",
     "TaskInstanceState": ".api.datamodels._generated",
-    "ToDailyMapper": ".definitions.partition_mappers.temporal",
-    "ToHourlyMapper": ".definitions.partition_mappers.temporal",
-    "ToMonthlyMapper": ".definitions.partition_mappers.temporal",
-    "ToQuarterlyMapper": ".definitions.partition_mappers.temporal",
-    "ToWeeklyMapper": ".definitions.partition_mappers.temporal",
-    "ToYearlyMapper": ".definitions.partition_mappers.temporal",
     "TriggerRule": ".api.datamodels._generated",
     "Variable": ".definitions.variable",
     "WeightRule": ".api.datamodels._generated",
diff --git a/task-sdk/src/airflow/sdk/__init__.pyi 
b/task-sdk/src/airflow/sdk/__init__.pyi
index a898ff27a70..4f5177d85cb 100644
--- a/task-sdk/src/airflow/sdk/__init__.pyi
+++ b/task-sdk/src/airflow/sdk/__init__.pyi
@@ -68,12 +68,12 @@ from airflow.sdk.definitions.partition_mappers.chain import 
ChainMapper
 from airflow.sdk.definitions.partition_mappers.identity import IdentityMapper
 from airflow.sdk.definitions.partition_mappers.product import ProductMapper
 from airflow.sdk.definitions.partition_mappers.temporal import (
-    ToDailyMapper,
-    ToHourlyMapper,
-    ToMonthlyMapper,
-    ToQuarterlyMapper,
-    ToWeeklyMapper,
-    ToYearlyMapper,
+    StartOfDayMapper,
+    StartOfHourMapper,
+    StartOfMonthMapper,
+    StartOfQuarterMapper,
+    StartOfWeekMapper,
+    StartOfYearMapper,
 )
 from airflow.sdk.definitions.taskgroup import TaskGroup as TaskGroup
 from airflow.sdk.definitions.template import literal as literal
@@ -142,14 +142,14 @@ __all__ = [
     "ProductMapper",
     "SecretCache",
     "SkipMixin",
+    "StartOfDayMapper",
+    "StartOfHourMapper",
+    "StartOfMonthMapper",
+    "StartOfQuarterMapper",
+    "StartOfWeekMapper",
+    "StartOfYearMapper",
     "TaskGroup",
     "TaskInstanceState",
-    "ToDailyMapper",
-    "ToHourlyMapper",
-    "ToMonthlyMapper",
-    "ToQuarterlyMapper",
-    "ToWeeklyMapper",
-    "ToYearlyMapper",
     "TriggerRule",
     "Variable",
     "WeightRule",
diff --git a/task-sdk/src/airflow/sdk/definitions/partition_mappers/temporal.py 
b/task-sdk/src/airflow/sdk/definitions/partition_mappers/temporal.py
index 4e6fc4647b1..60ca18f5044 100644
--- a/task-sdk/src/airflow/sdk/definitions/partition_mappers/temporal.py
+++ b/task-sdk/src/airflow/sdk/definitions/partition_mappers/temporal.py
@@ -31,37 +31,37 @@ class _BaseTemporalMapper(PartitionMapper):
         self.output_format = output_format or self.default_output_format
 
 
-class ToHourlyMapper(_BaseTemporalMapper):
+class StartOfHourMapper(_BaseTemporalMapper):
     """Map a time-based partition key to hour."""
 
     default_output_format = "%Y-%m-%dT%H"
 
 
-class ToDailyMapper(_BaseTemporalMapper):
+class StartOfDayMapper(_BaseTemporalMapper):
     """Map a time-based partition key to day."""
 
     default_output_format = "%Y-%m-%d"
 
 
-class ToWeeklyMapper(_BaseTemporalMapper):
+class StartOfWeekMapper(_BaseTemporalMapper):
     """Map a time-based partition key to week."""
 
     default_output_format = "%Y-%m-%d (W%V)"
 
 
-class ToMonthlyMapper(_BaseTemporalMapper):
+class StartOfMonthMapper(_BaseTemporalMapper):
     """Map a time-based partition key to month."""
 
     default_output_format = "%Y-%m"
 
 
-class ToQuarterlyMapper(_BaseTemporalMapper):
+class StartOfQuarterMapper(_BaseTemporalMapper):
     """Map a time-based partition key to quarter."""
 
     default_output_format = "%Y-Q{quarter}"
 
 
-class ToYearlyMapper(_BaseTemporalMapper):
+class StartOfYearMapper(_BaseTemporalMapper):
     """Map a time-based partition key to year."""
 
     default_output_format = "%Y"


Reply via email to