This is an automated email from the ASF dual-hosted git repository. rahulvats pushed a commit to branch v3-2-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 10687f6a4cbae41eb06b305e0ec19914884a2987 Author: Wei Lee <[email protected]> AuthorDate: Fri Mar 27 14:55:37 2026 +0800 refactor(partition-mapper): rename ToXXXMapper to StartOfXXXMapper (#64160) (cherry picked from commit 3bb2dd5ec41fdd3ba81a5ff7890ee07dbbc8e0ae) --- .../docs/authoring-and-scheduling/assets.rst | 24 +++---- .../example_dags/example_asset_partition.py | 22 +++--- .../src/airflow/partition_mappers/temporal.py | 12 ++-- airflow-core/src/airflow/serialization/encoders.py | 50 ++++++------- airflow-core/tests/unit/jobs/test_scheduler_job.py | 4 +- .../tests/unit/partition_mappers/test_chain.py | 12 ++-- .../tests/unit/partition_mappers/test_product.py | 34 ++++----- .../tests/unit/partition_mappers/test_temporal.py | 45 +++++++----- .../unit/serialization/test_serialized_objects.py | 82 +++++++++++----------- task-sdk/docs/api.rst | 12 ++-- task-sdk/src/airflow/sdk/__init__.py | 36 +++++----- task-sdk/src/airflow/sdk/__init__.pyi | 24 +++---- .../sdk/definitions/partition_mappers/temporal.py | 12 ++-- 13 files changed, 188 insertions(+), 181 deletions(-) diff --git a/airflow-core/docs/authoring-and-scheduling/assets.rst b/airflow-core/docs/authoring-and-scheduling/assets.rst index 42bc78c0005..a764bc33449 100644 --- a/airflow-core/docs/authoring-and-scheduling/assets.rst +++ b/airflow-core/docs/authoring-and-scheduling/assets.rst @@ -434,13 +434,13 @@ For downstream partition-aware scheduling, use ``PartitionedAssetTimetable``: .. code-block:: python - from airflow.sdk import DAG, HourlyMapper, PartitionedAssetTimetable + from airflow.sdk import DAG, StartOfHourMapper, PartitionedAssetTimetable with DAG( dag_id="clean_and_combine_player_stats", schedule=PartitionedAssetTimetable( assets=team_a_player_stats & team_b_player_stats & team_c_player_stats, - default_partition_mapper=HourlyMapper(), + default_partition_mapper=StartOfHourMapper(), ), catchup=False, ): @@ -458,17 +458,17 @@ Partition mappers define how upstream partition keys are transformed to the downstream Dag partition key: * ``IdentityMapper`` keeps keys unchanged. -* Temporal mappers such as ``HourlyMapper``, ``DailyMapper``, and - ``YearlyMapper`` normalize time keys to a chosen grain. For input key +* Temporal mappers such as ``StartOfHourMapper``, ``StartOfDayMapper``, and + ``StartOfYearMapper`` normalize time keys to a chosen grain. For input key ``2026-03-10T09:37:51``, the default outputs are: - * ``HourlyMapper`` -> ``2026-03-10T09`` - * ``DailyMapper`` -> ``2026-03-10`` - * ``YearlyMapper`` -> ``2026`` + * ``StartOfHourMapper`` -> ``2026-03-10T09`` + * ``StartOfDayMapper`` -> ``2026-03-10`` + * ``StartOfYearMapper`` -> ``2026`` * ``ProductMapper`` maps composite keys segment-by-segment. It applies one mapper per segment and then rejoins the mapped segments. For example, with key ``us|2026-03-10T09:00:00``, - ``ProductMapper(IdentityMapper(), DailyMapper())`` produces + ``ProductMapper(IdentityMapper(), StartOfDayMapper())`` produces ``us|2026-03-10``. * ``AllowedKeyMapper`` validates that keys are in a fixed allow-list and passes the key through unchanged if valid. @@ -481,10 +481,10 @@ Example of per-asset mapper configuration and composite-key mapping: from airflow.sdk import ( Asset, - DailyMapper, IdentityMapper, PartitionedAssetTimetable, ProductMapper, + StartOfDayMapper, ) regional_sales = Asset(uri="file://incoming/sales/regional.csv", name="regional_sales") @@ -493,7 +493,7 @@ Example of per-asset mapper configuration and composite-key mapping: dag_id="aggregate_regional_sales", schedule=PartitionedAssetTimetable( assets=regional_sales, - default_partition_mapper=ProductMapper(IdentityMapper(), DailyMapper()), + default_partition_mapper=ProductMapper(IdentityMapper(), StartOfDayMapper()), ), ): ... @@ -503,7 +503,7 @@ You can also override mappers for specific upstream assets with .. code-block:: python - from airflow.sdk import Asset, DAG, DailyMapper, IdentityMapper, PartitionedAssetTimetable + from airflow.sdk import Asset, DAG, StartOfDayMapper, IdentityMapper, PartitionedAssetTimetable hourly_sales = Asset(uri="file://incoming/sales/hourly.csv", name="hourly_sales") daily_targets = Asset(uri="file://incoming/sales/targets.csv", name="daily_targets") @@ -513,7 +513,7 @@ You can also override mappers for specific upstream assets with schedule=PartitionedAssetTimetable( assets=hourly_sales & daily_targets, # Default behavior: map timestamp-like keys to daily keys. - default_partition_mapper=DailyMapper(), + default_partition_mapper=StartOfDayMapper(), # Override for assets that already emit daily partition keys. partition_mapper_config={ daily_targets: IdentityMapper(), diff --git a/airflow-core/src/airflow/example_dags/example_asset_partition.py b/airflow-core/src/airflow/example_dags/example_asset_partition.py index 853d0453d26..75d582f6cad 100644 --- a/airflow-core/src/airflow/example_dags/example_asset_partition.py +++ b/airflow-core/src/airflow/example_dags/example_asset_partition.py @@ -27,9 +27,9 @@ from airflow.sdk import ( IdentityMapper, PartitionedAssetTimetable, ProductMapper, - ToDailyMapper, - ToHourlyMapper, - ToYearlyMapper, + StartOfDayMapper, + StartOfHourMapper, + StartOfYearMapper, asset, task, ) @@ -77,7 +77,7 @@ with DAG( dag_id="clean_and_combine_player_stats", schedule=PartitionedAssetTimetable( assets=team_a_player_stats & team_b_player_stats & team_c_player_stats, - default_partition_mapper=ToHourlyMapper(), + default_partition_mapper=StartOfHourMapper(), ), catchup=False, tags=["player-stats", "cleanup"], @@ -85,7 +85,7 @@ with DAG( """ Combine hourly partitions from Team A, B and C into a single curated dataset. - This Dag demonstrates multi-asset partition alignment using ToHourlyMapper. + This Dag demonstrates multi-asset partition alignment using StartOfHourMapper. """ @task(outlets=[combined_player_stats]) @@ -101,7 +101,7 @@ with DAG( @asset( uri="file://analytics/player-stats/computed-player-odds.csv", # Fallback to IdentityMapper if no partition_mapper is specified. - # If we want to other temporal mapper (e.g., ToHourlyMapper) here, + # If we want to other temporal mapper (e.g., StartOfHourMapper) here, # make sure the input_format is changed since the partition_key is now in "%Y-%m-%dT%H" format # instead of a valid timestamp schedule=PartitionedAssetTimetable(assets=combined_player_stats), @@ -121,9 +121,9 @@ with DAG( schedule=PartitionedAssetTimetable( assets=(combined_player_stats & team_a_player_stats & Asset.ref(name="team_b_player_stats")), partition_mapper_config={ - combined_player_stats: ToYearlyMapper(), # incompatible on purpose - team_a_player_stats: ToHourlyMapper(), - Asset.ref(name="team_b_player_stats"): ToHourlyMapper(), + combined_player_stats: StartOfYearMapper(), # incompatible on purpose + team_a_player_stats: StartOfHourMapper(), + Asset.ref(name="team_b_player_stats"): StartOfHourMapper(), }, ), catchup=False, @@ -164,7 +164,7 @@ with DAG( dag_id="aggregate_regional_sales", schedule=PartitionedAssetTimetable( assets=regional_sales, - default_partition_mapper=ProductMapper(IdentityMapper(), ToDailyMapper()), + default_partition_mapper=ProductMapper(IdentityMapper(), StartOfDayMapper()), ), catchup=False, tags=["sales", "aggregation"], @@ -173,7 +173,7 @@ with DAG( Aggregate regional sales using ProductMapper. The ProductMapper splits the composite key "region|timestamp" and applies - IdentityMapper to the region segment and ToDailyMapper to the timestamp segment, + IdentityMapper to the region segment and StartOfDayMapper to the timestamp segment, aligning hourly partitions to daily granularity per region. """ diff --git a/airflow-core/src/airflow/partition_mappers/temporal.py b/airflow-core/src/airflow/partition_mappers/temporal.py index b30e1879d53..9c86bace56b 100644 --- a/airflow-core/src/airflow/partition_mappers/temporal.py +++ b/airflow-core/src/airflow/partition_mappers/temporal.py @@ -63,7 +63,7 @@ class _BaseTemporalMapper(PartitionMapper, ABC): ) -class ToHourlyMapper(_BaseTemporalMapper): +class StartOfHourMapper(_BaseTemporalMapper): """Map a time-based partition key to hour.""" default_output_format = "%Y-%m-%dT%H" @@ -72,7 +72,7 @@ class ToHourlyMapper(_BaseTemporalMapper): return dt.replace(minute=0, second=0, microsecond=0) -class ToDailyMapper(_BaseTemporalMapper): +class StartOfDayMapper(_BaseTemporalMapper): """Map a time-based partition key to day.""" default_output_format = "%Y-%m-%d" @@ -81,7 +81,7 @@ class ToDailyMapper(_BaseTemporalMapper): return dt.replace(hour=0, minute=0, second=0, microsecond=0) -class ToWeeklyMapper(_BaseTemporalMapper): +class StartOfWeekMapper(_BaseTemporalMapper): """Map a time-based partition key to week.""" default_output_format = "%Y-%m-%d (W%V)" @@ -91,7 +91,7 @@ class ToWeeklyMapper(_BaseTemporalMapper): return start.replace(hour=0, minute=0, second=0, microsecond=0) -class ToMonthlyMapper(_BaseTemporalMapper): +class StartOfMonthMapper(_BaseTemporalMapper): """Map a time-based partition key to month.""" default_output_format = "%Y-%m" @@ -106,7 +106,7 @@ class ToMonthlyMapper(_BaseTemporalMapper): ) -class ToQuarterlyMapper(_BaseTemporalMapper): +class StartOfQuarterMapper(_BaseTemporalMapper): """Map a time-based partition key to quarter.""" default_output_format = "%Y-Q{quarter}" @@ -128,7 +128,7 @@ class ToQuarterlyMapper(_BaseTemporalMapper): return dt.strftime(self.output_format).format(quarter=quarter) -class ToYearlyMapper(_BaseTemporalMapper): +class StartOfYearMapper(_BaseTemporalMapper): """Map a time-based partition key to year.""" default_output_format = "%Y" diff --git a/airflow-core/src/airflow/serialization/encoders.py b/airflow-core/src/airflow/serialization/encoders.py index eca84065888..dcb064dcde0 100644 --- a/airflow-core/src/airflow/serialization/encoders.py +++ b/airflow-core/src/airflow/serialization/encoders.py @@ -44,15 +44,15 @@ from airflow.sdk import ( MultipleCronTriggerTimetable, PartitionMapper, ProductMapper, - ToDailyMapper, - ToMonthlyMapper, - ToQuarterlyMapper, - ToWeeklyMapper, - ToYearlyMapper, + StartOfDayMapper, + StartOfMonthMapper, + StartOfQuarterMapper, + StartOfWeekMapper, + StartOfYearMapper, ) from airflow.sdk.bases.timetable import BaseTimetable from airflow.sdk.definitions.asset import AssetRef -from airflow.sdk.definitions.partition_mappers.temporal import ToHourlyMapper +from airflow.sdk.definitions.partition_mappers.temporal import StartOfHourMapper from airflow.sdk.definitions.timetables.assets import ( AssetTriggeredTimetable, PartitionedAssetTimetable, @@ -393,16 +393,16 @@ class _Serializer: } BUILTIN_PARTITION_MAPPERS: dict[type, str] = { + AllowedKeyMapper: "airflow.partition_mappers.allowed_key.AllowedKeyMapper", ChainMapper: "airflow.partition_mappers.chain.ChainMapper", IdentityMapper: "airflow.partition_mappers.identity.IdentityMapper", - ToHourlyMapper: "airflow.partition_mappers.temporal.ToHourlyMapper", - ToDailyMapper: "airflow.partition_mappers.temporal.ToDailyMapper", - ToWeeklyMapper: "airflow.partition_mappers.temporal.ToWeeklyMapper", - ToMonthlyMapper: "airflow.partition_mappers.temporal.ToMonthlyMapper", - ToQuarterlyMapper: "airflow.partition_mappers.temporal.ToQuarterlyMapper", - ToYearlyMapper: "airflow.partition_mappers.temporal.ToYearlyMapper", ProductMapper: "airflow.partition_mappers.product.ProductMapper", - AllowedKeyMapper: "airflow.partition_mappers.allowed_key.AllowedKeyMapper", + StartOfDayMapper: "airflow.partition_mappers.temporal.StartOfDayMapper", + StartOfHourMapper: "airflow.partition_mappers.temporal.StartOfHourMapper", + StartOfMonthMapper: "airflow.partition_mappers.temporal.StartOfMonthMapper", + StartOfQuarterMapper: "airflow.partition_mappers.temporal.StartOfQuarterMapper", + StartOfWeekMapper: "airflow.partition_mappers.temporal.StartOfWeekMapper", + StartOfYearMapper: "airflow.partition_mappers.temporal.StartOfYearMapper", } @functools.singledispatchmethod @@ -421,20 +421,20 @@ class _Serializer: def _(self, partition_mapper: IdentityMapper) -> dict[str, Any]: return {} - @serialize_partition_mapper.register(ToHourlyMapper) - @serialize_partition_mapper.register(ToDailyMapper) - @serialize_partition_mapper.register(ToWeeklyMapper) - @serialize_partition_mapper.register(ToMonthlyMapper) - @serialize_partition_mapper.register(ToQuarterlyMapper) - @serialize_partition_mapper.register(ToYearlyMapper) + @serialize_partition_mapper.register(StartOfHourMapper) + @serialize_partition_mapper.register(StartOfDayMapper) + @serialize_partition_mapper.register(StartOfWeekMapper) + @serialize_partition_mapper.register(StartOfMonthMapper) + @serialize_partition_mapper.register(StartOfQuarterMapper) + @serialize_partition_mapper.register(StartOfYearMapper) def _( self, - partition_mapper: ToHourlyMapper - | ToDailyMapper - | ToWeeklyMapper - | ToMonthlyMapper - | ToQuarterlyMapper - | ToYearlyMapper, + partition_mapper: StartOfHourMapper + | StartOfDayMapper + | StartOfWeekMapper + | StartOfMonthMapper + | StartOfQuarterMapper + | StartOfYearMapper, ) -> dict[str, Any]: return { "input_format": partition_mapper.input_format, diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py b/airflow-core/tests/unit/jobs/test_scheduler_job.py index f9ae60a9c35..fd77644bc44 100644 --- a/airflow-core/tests/unit/jobs/test_scheduler_job.py +++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py @@ -92,7 +92,7 @@ from airflow.sdk import ( AssetWatcher, CronPartitionTimetable, IdentityMapper, - ToHourlyMapper, + StartOfHourMapper, task, ) from airflow.sdk.definitions.callback import AsyncCallback, SyncCallback @@ -8982,7 +8982,7 @@ def test_partitioned_dag_run_with_invalid_mapping(dag_maker: DagMaker, session: dag_id="asset-event-consumer", schedule=PartitionedAssetTimetable( assets=asset_1, - default_partition_mapper=ToHourlyMapper(), + default_partition_mapper=StartOfHourMapper(), ), session=session, ): diff --git a/airflow-core/tests/unit/partition_mappers/test_chain.py b/airflow-core/tests/unit/partition_mappers/test_chain.py index cdce301deb0..79f888af75f 100644 --- a/airflow-core/tests/unit/partition_mappers/test_chain.py +++ b/airflow-core/tests/unit/partition_mappers/test_chain.py @@ -22,7 +22,7 @@ import pytest from airflow.partition_mappers.base import PartitionMapper from airflow.partition_mappers.chain import ChainMapper from airflow.partition_mappers.identity import IdentityMapper -from airflow.partition_mappers.temporal import ToDailyMapper, ToHourlyMapper +from airflow.partition_mappers.temporal import StartOfDayMapper, StartOfHourMapper class _InvalidReturnMapper(PartitionMapper): @@ -37,7 +37,7 @@ class _InvalidIterableMapper(PartitionMapper): class TestChainMapper: def test_to_downstream(self): - sm = ChainMapper(ToHourlyMapper(), ToDailyMapper(input_format="%Y-%m-%dT%H")) + sm = ChainMapper(StartOfHourMapper(), StartOfDayMapper(input_format="%Y-%m-%dT%H")) assert sm.to_downstream("2024-01-15T10:30:00") == "2024-01-15" def test_to_downstream_invalid_non_iterable_return(self): @@ -53,17 +53,17 @@ class TestChainMapper: def test_serialize(self): from airflow.serialization.encoders import encode_partition_mapper - sm = ChainMapper(ToHourlyMapper(), ToDailyMapper(input_format="%Y-%m-%dT%H")) + sm = ChainMapper(StartOfHourMapper(), StartOfDayMapper(input_format="%Y-%m-%dT%H")) result = sm.serialize() assert result == { "mappers": [ - encode_partition_mapper(ToHourlyMapper()), - encode_partition_mapper(ToDailyMapper(input_format="%Y-%m-%dT%H")), + encode_partition_mapper(StartOfHourMapper()), + encode_partition_mapper(StartOfDayMapper(input_format="%Y-%m-%dT%H")), ], } def test_deserialize(self): - sm = ChainMapper(ToHourlyMapper(), ToDailyMapper(input_format="%Y-%m-%dT%H")) + sm = ChainMapper(StartOfHourMapper(), StartOfDayMapper(input_format="%Y-%m-%dT%H")) serialized = sm.serialize() restored = ChainMapper.deserialize(serialized) assert isinstance(restored, ChainMapper) diff --git a/airflow-core/tests/unit/partition_mappers/test_product.py b/airflow-core/tests/unit/partition_mappers/test_product.py index fba3ae868a8..17269f261ea 100644 --- a/airflow-core/tests/unit/partition_mappers/test_product.py +++ b/airflow-core/tests/unit/partition_mappers/test_product.py @@ -21,61 +21,61 @@ import pytest from airflow.partition_mappers.identity import IdentityMapper from airflow.partition_mappers.product import ProductMapper -from airflow.partition_mappers.temporal import ToDailyMapper, ToHourlyMapper +from airflow.partition_mappers.temporal import StartOfDayMapper, StartOfHourMapper class TestProductMapper: def test_to_downstream(self): - pm = ProductMapper(ToHourlyMapper(), ToDailyMapper()) + pm = ProductMapper(StartOfHourMapper(), StartOfDayMapper()) assert pm.to_downstream("2024-01-15T10:30:00|2024-01-15T10:30:00") == "2024-01-15T10|2024-01-15" def test_to_downstream_wrong_segment_count(self): - pm = ProductMapper(ToHourlyMapper(), ToDailyMapper()) + pm = ProductMapper(StartOfHourMapper(), StartOfDayMapper()) with pytest.raises(ValueError, match="Expected 2 segments"): pm.to_downstream("2024-01-15T10:30:00|2024-01-15T10:30:00|extra") def test_to_downstream_single_segment_for_two_mappers(self): - pm = ProductMapper(ToHourlyMapper(), ToDailyMapper()) + pm = ProductMapper(StartOfHourMapper(), StartOfDayMapper()) with pytest.raises(ValueError, match="Expected 2 segments"): pm.to_downstream("2024-01-15T10:30:00") def test_custom_delimiter(self): - pm = ProductMapper(ToHourlyMapper(), ToDailyMapper(), delimiter="::") + pm = ProductMapper(StartOfHourMapper(), StartOfDayMapper(), delimiter="::") assert pm.to_downstream("2024-01-15T10:30:00::2024-01-15T10:30:00") == "2024-01-15T10::2024-01-15" def test_custom_delimiter_wrong_segment_count(self): - pm = ProductMapper(ToHourlyMapper(), ToDailyMapper(), delimiter="::") + pm = ProductMapper(StartOfHourMapper(), StartOfDayMapper(), delimiter="::") with pytest.raises(ValueError, match="Expected 2 segments"): pm.to_downstream("2024-01-15T10:30:00::2024-01-15T10:30:00::extra") def test_serialize(self): from airflow.serialization.encoders import encode_partition_mapper - pm = ProductMapper(ToHourlyMapper(), ToDailyMapper()) + pm = ProductMapper(StartOfHourMapper(), StartOfDayMapper()) result = pm.serialize() assert result == { "delimiter": "|", "mappers": [ - encode_partition_mapper(ToHourlyMapper()), - encode_partition_mapper(ToDailyMapper()), + encode_partition_mapper(StartOfHourMapper()), + encode_partition_mapper(StartOfDayMapper()), ], } def test_serialize_custom_delimiter(self): from airflow.serialization.encoders import encode_partition_mapper - pm = ProductMapper(ToHourlyMapper(), ToDailyMapper(), delimiter="::") + pm = ProductMapper(StartOfHourMapper(), StartOfDayMapper(), delimiter="::") result = pm.serialize() assert result == { "delimiter": "::", "mappers": [ - encode_partition_mapper(ToHourlyMapper()), - encode_partition_mapper(ToDailyMapper()), + encode_partition_mapper(StartOfHourMapper()), + encode_partition_mapper(StartOfDayMapper()), ], } def test_deserialize(self): - pm = ProductMapper(ToHourlyMapper(), ToDailyMapper()) + pm = ProductMapper(StartOfHourMapper(), StartOfDayMapper()) serialized = pm.serialize() restored = ProductMapper.deserialize(serialized) assert isinstance(restored, ProductMapper) @@ -84,7 +84,7 @@ class TestProductMapper: assert restored.to_downstream("2024-01-15T10:30:00|2024-01-15T10:30:00") == "2024-01-15T10|2024-01-15" def test_deserialize_custom_delimiter(self): - pm = ProductMapper(ToHourlyMapper(), ToDailyMapper(), delimiter="::") + pm = ProductMapper(StartOfHourMapper(), StartOfDayMapper(), delimiter="::") serialized = pm.serialize() restored = ProductMapper.deserialize(serialized) assert isinstance(restored, ProductMapper) @@ -99,15 +99,15 @@ class TestProductMapper: data = { "mappers": [ - encode_partition_mapper(ToHourlyMapper()), - encode_partition_mapper(ToDailyMapper()), + encode_partition_mapper(StartOfHourMapper()), + encode_partition_mapper(StartOfDayMapper()), ], } restored = ProductMapper.deserialize(data) assert restored.delimiter == "|" def test_three_mappers(self): - pm = ProductMapper(ToHourlyMapper(), ToDailyMapper(), IdentityMapper()) + pm = ProductMapper(StartOfHourMapper(), StartOfDayMapper(), IdentityMapper()) assert ( pm.to_downstream("2024-01-15T10:30:00|2024-01-15T10:30:00|raw") == "2024-01-15T10|2024-01-15|raw" ) diff --git a/airflow-core/tests/unit/partition_mappers/test_temporal.py b/airflow-core/tests/unit/partition_mappers/test_temporal.py index 9bcb9679ea8..89ad98cebbc 100644 --- a/airflow-core/tests/unit/partition_mappers/test_temporal.py +++ b/airflow-core/tests/unit/partition_mappers/test_temporal.py @@ -19,12 +19,12 @@ from __future__ import annotations import pytest from airflow.partition_mappers.temporal import ( - ToDailyMapper, - ToHourlyMapper, - ToMonthlyMapper, - ToQuarterlyMapper, - ToWeeklyMapper, - ToYearlyMapper, + StartOfDayMapper, + StartOfHourMapper, + StartOfMonthMapper, + StartOfQuarterMapper, + StartOfWeekMapper, + StartOfYearMapper, _BaseTemporalMapper, ) @@ -33,12 +33,12 @@ class TestTemporalMappers: @pytest.mark.parametrize( ("mapper_cls", "expected_downstream_key"), [ - (ToHourlyMapper, "2026-02-10T14"), - (ToDailyMapper, "2026-02-10"), - (ToWeeklyMapper, "2026-02-09 (W07)"), - (ToMonthlyMapper, "2026-02"), - (ToQuarterlyMapper, "2026-Q1"), - (ToYearlyMapper, "2026"), + (StartOfHourMapper, "2026-02-10T14"), + (StartOfDayMapper, "2026-02-10"), + (StartOfWeekMapper, "2026-02-09 (W07)"), + (StartOfMonthMapper, "2026-02"), + (StartOfQuarterMapper, "2026-Q1"), + (StartOfYearMapper, "2026"), ], ) def test_to_downstream( @@ -52,12 +52,12 @@ class TestTemporalMappers: @pytest.mark.parametrize( ("mapper_cls", "expected_outut_format"), [ - (ToHourlyMapper, "%Y-%m-%dT%H"), - (ToDailyMapper, "%Y-%m-%d"), - (ToWeeklyMapper, "%Y-%m-%d (W%V)"), - (ToMonthlyMapper, "%Y-%m"), - (ToQuarterlyMapper, "%Y-Q{quarter}"), - (ToYearlyMapper, "%Y"), + (StartOfHourMapper, "%Y-%m-%dT%H"), + (StartOfDayMapper, "%Y-%m-%d"), + (StartOfWeekMapper, "%Y-%m-%d (W%V)"), + (StartOfMonthMapper, "%Y-%m"), + (StartOfQuarterMapper, "%Y-Q{quarter}"), + (StartOfYearMapper, "%Y"), ], ) def test_serialize(self, mapper_cls: type[_BaseTemporalMapper], expected_outut_format: str): @@ -69,7 +69,14 @@ class TestTemporalMappers: @pytest.mark.parametrize( "mapper_cls", - [ToHourlyMapper, ToDailyMapper, ToWeeklyMapper, ToMonthlyMapper, ToQuarterlyMapper, ToYearlyMapper], + [ + StartOfHourMapper, + StartOfDayMapper, + StartOfWeekMapper, + StartOfMonthMapper, + StartOfQuarterMapper, + StartOfYearMapper, + ], ) def test_deserialize(self, mapper_cls): pm = mapper_cls.deserialize( diff --git a/airflow-core/tests/unit/serialization/test_serialized_objects.py b/airflow-core/tests/unit/serialization/test_serialized_objects.py index 7be81c2fe3b..f59939be9b2 100644 --- a/airflow-core/tests/unit/serialization/test_serialized_objects.py +++ b/airflow-core/tests/unit/serialization/test_serialized_objects.py @@ -47,12 +47,12 @@ from airflow.models.taskinstance import TaskInstance from airflow.models.xcom_arg import XComArg from airflow.partition_mappers.identity import IdentityMapper as CoreIdentityMapper from airflow.partition_mappers.temporal import ( - ToDailyMapper as CoreToDailyMapper, - ToHourlyMapper as CoreToHourlyMapper, - ToMonthlyMapper as CoreToMonthlyMapper, - ToQuarterlyMapper as CoreToQuarterlyMapper, - ToWeeklyMapper as CoreToWeeklyMapper, - ToYearlyMapper as CoreToYearlyMapper, + StartOfDayMapper as CoreStartOfDayMapper, + StartOfHourMapper as CoreStartOfHourMapper, + StartOfMonthMapper as CoreStartOfMonthMapper, + StartOfQuarterMapper as CoreStartOfQuarterMapper, + StartOfWeekMapper as CoreStartOfWeekMapper, + StartOfYearMapper as CoreStartOfYearMapper, ) from airflow.providers.standard.operators.bash import BashOperator from airflow.providers.standard.operators.empty import EmptyOperator @@ -61,12 +61,12 @@ from airflow.providers.standard.triggers.file import FileDeleteTrigger from airflow.sdk import ( BaseOperator, IdentityMapper, - ToDailyMapper, - ToHourlyMapper, - ToMonthlyMapper, - ToQuarterlyMapper, - ToWeeklyMapper, - ToYearlyMapper, + StartOfDayMapper, + StartOfHourMapper, + StartOfMonthMapper, + StartOfQuarterMapper, + StartOfWeekMapper, + StartOfYearMapper, ) from airflow.sdk.definitions.asset import ( Asset, @@ -768,39 +768,39 @@ def test_encode_timezone(): [ (IdentityMapper, [], "airflow.partition_mappers.identity.IdentityMapper", {}), ( - ToHourlyMapper, + StartOfHourMapper, [], - "airflow.partition_mappers.temporal.ToHourlyMapper", + "airflow.partition_mappers.temporal.StartOfHourMapper", {"input_format": "%Y-%m-%dT%H:%M:%S", "output_format": "%Y-%m-%dT%H"}, ), ( - ToDailyMapper, + StartOfDayMapper, [], - "airflow.partition_mappers.temporal.ToDailyMapper", + "airflow.partition_mappers.temporal.StartOfDayMapper", {"input_format": "%Y-%m-%dT%H:%M:%S", "output_format": "%Y-%m-%d"}, ), ( - ToWeeklyMapper, + StartOfWeekMapper, [], - "airflow.partition_mappers.temporal.ToWeeklyMapper", + "airflow.partition_mappers.temporal.StartOfWeekMapper", {"input_format": "%Y-%m-%dT%H:%M:%S", "output_format": "%Y-%m-%d (W%V)"}, ), ( - ToMonthlyMapper, + StartOfMonthMapper, [], - "airflow.partition_mappers.temporal.ToMonthlyMapper", + "airflow.partition_mappers.temporal.StartOfMonthMapper", {"input_format": "%Y-%m-%dT%H:%M:%S", "output_format": "%Y-%m"}, ), ( - ToQuarterlyMapper, + StartOfQuarterMapper, [], - "airflow.partition_mappers.temporal.ToQuarterlyMapper", + "airflow.partition_mappers.temporal.StartOfQuarterMapper", {"input_format": "%Y-%m-%dT%H:%M:%S", "output_format": "%Y-Q{quarter}"}, ), ( - ToYearlyMapper, + StartOfYearMapper, [], - "airflow.partition_mappers.temporal.ToYearlyMapper", + "airflow.partition_mappers.temporal.StartOfYearMapper", {"input_format": "%Y-%m-%dT%H:%M:%S", "output_format": "%Y"}, ), ], @@ -819,12 +819,12 @@ def test_encode_partition_mapper(cls, args, encode_type, encode_var): ("sdk_cls", "core_cls"), [ (IdentityMapper, CoreIdentityMapper), - (ToHourlyMapper, CoreToHourlyMapper), - (ToDailyMapper, CoreToDailyMapper), - (ToWeeklyMapper, CoreToWeeklyMapper), - (ToMonthlyMapper, CoreToMonthlyMapper), - (ToQuarterlyMapper, CoreToQuarterlyMapper), - (ToYearlyMapper, CoreToYearlyMapper), + (StartOfHourMapper, CoreStartOfHourMapper), + (StartOfDayMapper, CoreStartOfDayMapper), + (StartOfWeekMapper, CoreStartOfWeekMapper), + (StartOfMonthMapper, CoreStartOfMonthMapper), + (StartOfQuarterMapper, CoreStartOfQuarterMapper), + (StartOfYearMapper, CoreStartOfYearMapper), ], ) def test_decode_partition_mapper(sdk_cls, core_cls): @@ -853,10 +853,10 @@ def test_decode_partition_mapper_not_exists(): def test_encode_product_mapper(): - from airflow.sdk import IdentityMapper, ProductMapper, ToHourlyMapper + from airflow.sdk import IdentityMapper, ProductMapper, StartOfHourMapper from airflow.serialization.encoders import encode_partition_mapper - partition_mapper = ProductMapper(IdentityMapper(), ToHourlyMapper()) + partition_mapper = ProductMapper(IdentityMapper(), StartOfHourMapper()) assert encode_partition_mapper(partition_mapper) == { Encoding.TYPE: "airflow.partition_mappers.product.ProductMapper", Encoding.VAR: { @@ -867,7 +867,7 @@ def test_encode_product_mapper(): Encoding.VAR: {}, }, { - Encoding.TYPE: "airflow.partition_mappers.temporal.ToHourlyMapper", + Encoding.TYPE: "airflow.partition_mappers.temporal.StartOfHourMapper", Encoding.VAR: { "input_format": "%Y-%m-%dT%H:%M:%S", "output_format": "%Y-%m-%dT%H", @@ -880,11 +880,11 @@ def test_encode_product_mapper(): def test_decode_product_mapper(): from airflow.partition_mappers.product import ProductMapper as CoreProductMapper - from airflow.sdk import ProductMapper, ToDailyMapper, ToHourlyMapper + from airflow.sdk import ProductMapper, StartOfDayMapper, StartOfHourMapper from airflow.serialization.decoders import decode_partition_mapper from airflow.serialization.encoders import encode_partition_mapper - partition_mapper = ProductMapper(ToHourlyMapper(), ToDailyMapper()) + partition_mapper = ProductMapper(StartOfHourMapper(), StartOfDayMapper()) encoded_pm = encode_partition_mapper(partition_mapper) core_pm = decode_partition_mapper(encoded_pm) @@ -896,23 +896,23 @@ def test_decode_product_mapper(): def test_encode_chain_mapper(): - from airflow.sdk import ChainMapper, ToDailyMapper, ToHourlyMapper + from airflow.sdk import ChainMapper, StartOfDayMapper, StartOfHourMapper from airflow.serialization.encoders import encode_partition_mapper - partition_mapper = ChainMapper(ToHourlyMapper(), ToDailyMapper(input_format="%Y-%m-%dT%H")) + partition_mapper = ChainMapper(StartOfHourMapper(), StartOfDayMapper(input_format="%Y-%m-%dT%H")) assert encode_partition_mapper(partition_mapper) == { Encoding.TYPE: "airflow.partition_mappers.chain.ChainMapper", Encoding.VAR: { "mappers": [ { - Encoding.TYPE: "airflow.partition_mappers.temporal.ToHourlyMapper", + Encoding.TYPE: "airflow.partition_mappers.temporal.StartOfHourMapper", Encoding.VAR: { "input_format": "%Y-%m-%dT%H:%M:%S", "output_format": "%Y-%m-%dT%H", }, }, { - Encoding.TYPE: "airflow.partition_mappers.temporal.ToDailyMapper", + Encoding.TYPE: "airflow.partition_mappers.temporal.StartOfDayMapper", Encoding.VAR: { "input_format": "%Y-%m-%dT%H", "output_format": "%Y-%m-%d", @@ -925,11 +925,11 @@ def test_encode_chain_mapper(): def test_decode_chain_mapper(): from airflow.partition_mappers.chain import ChainMapper as CoreChainMapper - from airflow.sdk import ChainMapper, ToDailyMapper, ToHourlyMapper + from airflow.sdk import ChainMapper, StartOfDayMapper, StartOfHourMapper from airflow.serialization.decoders import decode_partition_mapper from airflow.serialization.encoders import encode_partition_mapper - partition_mapper = ChainMapper(ToHourlyMapper(), ToDailyMapper(input_format="%Y-%m-%dT%H")) + partition_mapper = ChainMapper(StartOfHourMapper(), StartOfDayMapper(input_format="%Y-%m-%dT%H")) encoded_pm = encode_partition_mapper(partition_mapper) core_pm = decode_partition_mapper(encoded_pm) diff --git a/task-sdk/docs/api.rst b/task-sdk/docs/api.rst index 03b4d3999ba..9aa11c5f94d 100644 --- a/task-sdk/docs/api.rst +++ b/task-sdk/docs/api.rst @@ -205,17 +205,17 @@ Partition Mapper .. autoapiclass:: airflow.sdk.IdentityMapper -.. autoapiclass:: airflow.sdk.ToHourlyMapper +.. autoapiclass:: airflow.sdk.StartOfHourMapper -.. autoapiclass:: airflow.sdk.ToDailyMapper +.. autoapiclass:: airflow.sdk.StartOfDayMapper -.. autoapiclass:: airflow.sdk.ToWeeklyMapper +.. autoapiclass:: airflow.sdk.StartOfWeekMapper -.. autoapiclass:: airflow.sdk.ToMonthlyMapper +.. autoapiclass:: airflow.sdk.StartOfMonthMapper -.. autoapiclass:: airflow.sdk.ToQuarterlyMapper +.. autoapiclass:: airflow.sdk.StartOfQuarterMapper -.. autoapiclass:: airflow.sdk.ToYearlyMapper +.. autoapiclass:: airflow.sdk.StartOfYearMapper .. autoapiclass:: airflow.sdk.ProductMapper diff --git a/task-sdk/src/airflow/sdk/__init__.py b/task-sdk/src/airflow/sdk/__init__.py index 8aa55a0e623..d84d327a89c 100644 --- a/task-sdk/src/airflow/sdk/__init__.py +++ b/task-sdk/src/airflow/sdk/__init__.py @@ -64,15 +64,15 @@ __all__ = [ "ProductMapper", "SkipMixin", "SyncCallback", + "StartOfDayMapper", + "StartOfHourMapper", + "StartOfMonthMapper", + "StartOfQuarterMapper", + "StartOfWeekMapper", + "StartOfYearMapper", "TaskGroup", "TaskInstance", "TaskInstanceState", - "ToDailyMapper", - "ToHourlyMapper", - "ToMonthlyMapper", - "ToQuarterlyMapper", - "ToWeeklyMapper", - "ToYearlyMapper", "TriggerRule", "Variable", "WeightRule", @@ -130,12 +130,12 @@ if TYPE_CHECKING: from airflow.sdk.definitions.partition_mappers.identity import IdentityMapper from airflow.sdk.definitions.partition_mappers.product import ProductMapper from airflow.sdk.definitions.partition_mappers.temporal import ( - ToDailyMapper, - ToHourlyMapper, - ToMonthlyMapper, - ToQuarterlyMapper, - ToWeeklyMapper, - ToYearlyMapper, + StartOfDayMapper, + StartOfHourMapper, + StartOfMonthMapper, + StartOfQuarterMapper, + StartOfWeekMapper, + StartOfYearMapper, ) from airflow.sdk.definitions.taskgroup import TaskGroup from airflow.sdk.definitions.template import literal @@ -208,15 +208,15 @@ __lazy_imports: dict[str, str] = { "SecretCache": ".execution_time.cache", "SkipMixin": ".bases.skipmixin", "SyncCallback": ".definitions.callback", + "StartOfDayMapper": ".definitions.partition_mappers.temporal", + "StartOfHourMapper": ".definitions.partition_mappers.temporal", + "StartOfMonthMapper": ".definitions.partition_mappers.temporal", + "StartOfQuarterMapper": ".definitions.partition_mappers.temporal", + "StartOfWeekMapper": ".definitions.partition_mappers.temporal", + "StartOfYearMapper": ".definitions.partition_mappers.temporal", "TaskGroup": ".definitions.taskgroup", "TaskInstance": ".types", "TaskInstanceState": ".api.datamodels._generated", - "ToDailyMapper": ".definitions.partition_mappers.temporal", - "ToHourlyMapper": ".definitions.partition_mappers.temporal", - "ToMonthlyMapper": ".definitions.partition_mappers.temporal", - "ToQuarterlyMapper": ".definitions.partition_mappers.temporal", - "ToWeeklyMapper": ".definitions.partition_mappers.temporal", - "ToYearlyMapper": ".definitions.partition_mappers.temporal", "TriggerRule": ".api.datamodels._generated", "Variable": ".definitions.variable", "WeightRule": ".api.datamodels._generated", diff --git a/task-sdk/src/airflow/sdk/__init__.pyi b/task-sdk/src/airflow/sdk/__init__.pyi index a898ff27a70..4f5177d85cb 100644 --- a/task-sdk/src/airflow/sdk/__init__.pyi +++ b/task-sdk/src/airflow/sdk/__init__.pyi @@ -68,12 +68,12 @@ from airflow.sdk.definitions.partition_mappers.chain import ChainMapper from airflow.sdk.definitions.partition_mappers.identity import IdentityMapper from airflow.sdk.definitions.partition_mappers.product import ProductMapper from airflow.sdk.definitions.partition_mappers.temporal import ( - ToDailyMapper, - ToHourlyMapper, - ToMonthlyMapper, - ToQuarterlyMapper, - ToWeeklyMapper, - ToYearlyMapper, + StartOfDayMapper, + StartOfHourMapper, + StartOfMonthMapper, + StartOfQuarterMapper, + StartOfWeekMapper, + StartOfYearMapper, ) from airflow.sdk.definitions.taskgroup import TaskGroup as TaskGroup from airflow.sdk.definitions.template import literal as literal @@ -142,14 +142,14 @@ __all__ = [ "ProductMapper", "SecretCache", "SkipMixin", + "StartOfDayMapper", + "StartOfHourMapper", + "StartOfMonthMapper", + "StartOfQuarterMapper", + "StartOfWeekMapper", + "StartOfYearMapper", "TaskGroup", "TaskInstanceState", - "ToDailyMapper", - "ToHourlyMapper", - "ToMonthlyMapper", - "ToQuarterlyMapper", - "ToWeeklyMapper", - "ToYearlyMapper", "TriggerRule", "Variable", "WeightRule", diff --git a/task-sdk/src/airflow/sdk/definitions/partition_mappers/temporal.py b/task-sdk/src/airflow/sdk/definitions/partition_mappers/temporal.py index 4e6fc4647b1..60ca18f5044 100644 --- a/task-sdk/src/airflow/sdk/definitions/partition_mappers/temporal.py +++ b/task-sdk/src/airflow/sdk/definitions/partition_mappers/temporal.py @@ -31,37 +31,37 @@ class _BaseTemporalMapper(PartitionMapper): self.output_format = output_format or self.default_output_format -class ToHourlyMapper(_BaseTemporalMapper): +class StartOfHourMapper(_BaseTemporalMapper): """Map a time-based partition key to hour.""" default_output_format = "%Y-%m-%dT%H" -class ToDailyMapper(_BaseTemporalMapper): +class StartOfDayMapper(_BaseTemporalMapper): """Map a time-based partition key to day.""" default_output_format = "%Y-%m-%d" -class ToWeeklyMapper(_BaseTemporalMapper): +class StartOfWeekMapper(_BaseTemporalMapper): """Map a time-based partition key to week.""" default_output_format = "%Y-%m-%d (W%V)" -class ToMonthlyMapper(_BaseTemporalMapper): +class StartOfMonthMapper(_BaseTemporalMapper): """Map a time-based partition key to month.""" default_output_format = "%Y-%m" -class ToQuarterlyMapper(_BaseTemporalMapper): +class StartOfQuarterMapper(_BaseTemporalMapper): """Map a time-based partition key to quarter.""" default_output_format = "%Y-Q{quarter}" -class ToYearlyMapper(_BaseTemporalMapper): +class StartOfYearMapper(_BaseTemporalMapper): """Map a time-based partition key to year.""" default_output_format = "%Y"
