This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new e3c10eae08c refactor: rename temporal partition mappers (#63783)
e3c10eae08c is described below
commit e3c10eae08cd8c0d28f1edaab24e11bd25606d6c
Author: Wei Lee <[email protected]>
AuthorDate: Thu Mar 19 16:02:29 2026 +0800
refactor: rename temporal partition mappers (#63783)
Co-authored-by: Tzu-ping Chung <[email protected]>
---
.../example_dags/example_asset_partition.py | 22 +++----
.../src/airflow/partition_mappers/temporal.py | 12 ++--
airflow-core/src/airflow/serialization/encoders.py | 48 +++++++--------
airflow-core/tests/unit/jobs/test_scheduler_job.py | 4 +-
.../tests/unit/partition_mappers/test_product.py | 34 +++++------
.../tests/unit/partition_mappers/test_temporal.py | 38 ++++++------
.../unit/serialization/test_serialized_objects.py | 70 +++++++++++-----------
task-sdk/docs/api.rst | 12 ++--
task-sdk/src/airflow/sdk/__init__.py | 36 +++++------
task-sdk/src/airflow/sdk/__init__.pyi | 24 ++++----
.../sdk/definitions/partition_mappers/temporal.py | 12 ++--
11 files changed, 156 insertions(+), 156 deletions(-)
diff --git a/airflow-core/src/airflow/example_dags/example_asset_partition.py
b/airflow-core/src/airflow/example_dags/example_asset_partition.py
index ecc9d7427af..853d0453d26 100644
--- a/airflow-core/src/airflow/example_dags/example_asset_partition.py
+++ b/airflow-core/src/airflow/example_dags/example_asset_partition.py
@@ -24,12 +24,12 @@ from airflow.sdk import (
AllowedKeyMapper,
Asset,
CronPartitionTimetable,
- DailyMapper,
- HourlyMapper,
IdentityMapper,
PartitionedAssetTimetable,
ProductMapper,
- YearlyMapper,
+ ToDailyMapper,
+ ToHourlyMapper,
+ ToYearlyMapper,
asset,
task,
)
@@ -77,7 +77,7 @@ with DAG(
dag_id="clean_and_combine_player_stats",
schedule=PartitionedAssetTimetable(
assets=team_a_player_stats & team_b_player_stats & team_c_player_stats,
- default_partition_mapper=HourlyMapper(),
+ default_partition_mapper=ToHourlyMapper(),
),
catchup=False,
tags=["player-stats", "cleanup"],
@@ -85,7 +85,7 @@ with DAG(
"""
Combine hourly partitions from Team A, B and C into a single curated
dataset.
- This Dag demonstrates multi-asset partition alignment using HourlyMapper.
+ This Dag demonstrates multi-asset partition alignment using ToHourlyMapper.
"""
@task(outlets=[combined_player_stats])
@@ -101,7 +101,7 @@ with DAG(
@asset(
uri="file://analytics/player-stats/computed-player-odds.csv",
# Fallback to IdentityMapper if no partition_mapper is specified.
- # If we want to other temporal mapper (e.g., HourlyMapper) here,
+ # If we want to other temporal mapper (e.g., ToHourlyMapper) here,
# make sure the input_format is changed since the partition_key is now in
"%Y-%m-%dT%H" format
# instead of a valid timestamp
schedule=PartitionedAssetTimetable(assets=combined_player_stats),
@@ -121,9 +121,9 @@ with DAG(
schedule=PartitionedAssetTimetable(
assets=(combined_player_stats & team_a_player_stats &
Asset.ref(name="team_b_player_stats")),
partition_mapper_config={
- combined_player_stats: YearlyMapper(), # incompatible on purpose
- team_a_player_stats: HourlyMapper(),
- Asset.ref(name="team_b_player_stats"): HourlyMapper(),
+ combined_player_stats: ToYearlyMapper(), # incompatible on purpose
+ team_a_player_stats: ToHourlyMapper(),
+ Asset.ref(name="team_b_player_stats"): ToHourlyMapper(),
},
),
catchup=False,
@@ -164,7 +164,7 @@ with DAG(
dag_id="aggregate_regional_sales",
schedule=PartitionedAssetTimetable(
assets=regional_sales,
- default_partition_mapper=ProductMapper(IdentityMapper(),
DailyMapper()),
+ default_partition_mapper=ProductMapper(IdentityMapper(),
ToDailyMapper()),
),
catchup=False,
tags=["sales", "aggregation"],
@@ -173,7 +173,7 @@ with DAG(
Aggregate regional sales using ProductMapper.
The ProductMapper splits the composite key "region|timestamp" and applies
- IdentityMapper to the region segment and DailyMapper to the timestamp
segment,
+ IdentityMapper to the region segment and ToDailyMapper to the timestamp
segment,
aligning hourly partitions to daily granularity per region.
"""
diff --git a/airflow-core/src/airflow/partition_mappers/temporal.py
b/airflow-core/src/airflow/partition_mappers/temporal.py
index f40c037247f..b30e1879d53 100644
--- a/airflow-core/src/airflow/partition_mappers/temporal.py
+++ b/airflow-core/src/airflow/partition_mappers/temporal.py
@@ -63,7 +63,7 @@ class _BaseTemporalMapper(PartitionMapper, ABC):
)
-class HourlyMapper(_BaseTemporalMapper):
+class ToHourlyMapper(_BaseTemporalMapper):
"""Map a time-based partition key to hour."""
default_output_format = "%Y-%m-%dT%H"
@@ -72,7 +72,7 @@ class HourlyMapper(_BaseTemporalMapper):
return dt.replace(minute=0, second=0, microsecond=0)
-class DailyMapper(_BaseTemporalMapper):
+class ToDailyMapper(_BaseTemporalMapper):
"""Map a time-based partition key to day."""
default_output_format = "%Y-%m-%d"
@@ -81,7 +81,7 @@ class DailyMapper(_BaseTemporalMapper):
return dt.replace(hour=0, minute=0, second=0, microsecond=0)
-class WeeklyMapper(_BaseTemporalMapper):
+class ToWeeklyMapper(_BaseTemporalMapper):
"""Map a time-based partition key to week."""
default_output_format = "%Y-%m-%d (W%V)"
@@ -91,7 +91,7 @@ class WeeklyMapper(_BaseTemporalMapper):
return start.replace(hour=0, minute=0, second=0, microsecond=0)
-class MonthlyMapper(_BaseTemporalMapper):
+class ToMonthlyMapper(_BaseTemporalMapper):
"""Map a time-based partition key to month."""
default_output_format = "%Y-%m"
@@ -106,7 +106,7 @@ class MonthlyMapper(_BaseTemporalMapper):
)
-class QuarterlyMapper(_BaseTemporalMapper):
+class ToQuarterlyMapper(_BaseTemporalMapper):
"""Map a time-based partition key to quarter."""
default_output_format = "%Y-Q{quarter}"
@@ -128,7 +128,7 @@ class QuarterlyMapper(_BaseTemporalMapper):
return dt.strftime(self.output_format).format(quarter=quarter)
-class YearlyMapper(_BaseTemporalMapper):
+class ToYearlyMapper(_BaseTemporalMapper):
"""Map a time-based partition key to year."""
default_output_format = "%Y"
diff --git a/airflow-core/src/airflow/serialization/encoders.py
b/airflow-core/src/airflow/serialization/encoders.py
index 239b2b1b97b..506cdb07cdb 100644
--- a/airflow-core/src/airflow/serialization/encoders.py
+++ b/airflow-core/src/airflow/serialization/encoders.py
@@ -36,22 +36,22 @@ from airflow.sdk import (
AssetOrTimeSchedule,
CronDataIntervalTimetable,
CronTriggerTimetable,
- DailyMapper,
DeltaDataIntervalTimetable,
DeltaTriggerTimetable,
EventsTimetable,
IdentityMapper,
- MonthlyMapper,
MultipleCronTriggerTimetable,
PartitionMapper,
ProductMapper,
- QuarterlyMapper,
- WeeklyMapper,
- YearlyMapper,
+ ToDailyMapper,
+ ToMonthlyMapper,
+ ToQuarterlyMapper,
+ ToWeeklyMapper,
+ ToYearlyMapper,
)
from airflow.sdk.bases.timetable import BaseTimetable
from airflow.sdk.definitions.asset import AssetRef
-from airflow.sdk.definitions.partition_mappers.temporal import HourlyMapper
+from airflow.sdk.definitions.partition_mappers.temporal import ToHourlyMapper
from airflow.sdk.definitions.timetables.assets import (
AssetTriggeredTimetable,
PartitionedAssetTimetable,
@@ -393,12 +393,12 @@ class _Serializer:
BUILTIN_PARTITION_MAPPERS: dict[type, str] = {
IdentityMapper: "airflow.partition_mappers.identity.IdentityMapper",
- HourlyMapper: "airflow.partition_mappers.temporal.HourlyMapper",
- DailyMapper: "airflow.partition_mappers.temporal.DailyMapper",
- WeeklyMapper: "airflow.partition_mappers.temporal.WeeklyMapper",
- MonthlyMapper: "airflow.partition_mappers.temporal.MonthlyMapper",
- QuarterlyMapper: "airflow.partition_mappers.temporal.QuarterlyMapper",
- YearlyMapper: "airflow.partition_mappers.temporal.YearlyMapper",
+ ToHourlyMapper: "airflow.partition_mappers.temporal.ToHourlyMapper",
+ ToDailyMapper: "airflow.partition_mappers.temporal.ToDailyMapper",
+ ToWeeklyMapper: "airflow.partition_mappers.temporal.ToWeeklyMapper",
+ ToMonthlyMapper: "airflow.partition_mappers.temporal.ToMonthlyMapper",
+ ToQuarterlyMapper:
"airflow.partition_mappers.temporal.ToQuarterlyMapper",
+ ToYearlyMapper: "airflow.partition_mappers.temporal.ToYearlyMapper",
ProductMapper: "airflow.partition_mappers.product.ProductMapper",
AllowedKeyMapper:
"airflow.partition_mappers.allowed_key.AllowedKeyMapper",
}
@@ -415,20 +415,20 @@ class _Serializer:
def _(self, partition_mapper: IdentityMapper) -> dict[str, Any]:
return {}
- @serialize_partition_mapper.register(HourlyMapper)
- @serialize_partition_mapper.register(DailyMapper)
- @serialize_partition_mapper.register(WeeklyMapper)
- @serialize_partition_mapper.register(MonthlyMapper)
- @serialize_partition_mapper.register(QuarterlyMapper)
- @serialize_partition_mapper.register(YearlyMapper)
+ @serialize_partition_mapper.register(ToHourlyMapper)
+ @serialize_partition_mapper.register(ToDailyMapper)
+ @serialize_partition_mapper.register(ToWeeklyMapper)
+ @serialize_partition_mapper.register(ToMonthlyMapper)
+ @serialize_partition_mapper.register(ToQuarterlyMapper)
+ @serialize_partition_mapper.register(ToYearlyMapper)
def _(
self,
- partition_mapper: HourlyMapper
- | DailyMapper
- | WeeklyMapper
- | MonthlyMapper
- | QuarterlyMapper
- | YearlyMapper,
+ partition_mapper: ToHourlyMapper
+ | ToDailyMapper
+ | ToWeeklyMapper
+ | ToMonthlyMapper
+ | ToQuarterlyMapper
+ | ToYearlyMapper,
) -> dict[str, Any]:
return {
"input_format": partition_mapper.input_format,
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index 386eeedb608..a6719409900 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -91,8 +91,8 @@ from airflow.sdk import (
AssetAlias,
AssetWatcher,
CronPartitionTimetable,
- HourlyMapper,
IdentityMapper,
+ ToHourlyMapper,
task,
)
from airflow.sdk.definitions.callback import AsyncCallback, SyncCallback
@@ -8982,7 +8982,7 @@ def
test_partitioned_dag_run_with_invalid_mapping(dag_maker: DagMaker, session:
dag_id="asset-event-consumer",
schedule=PartitionedAssetTimetable(
assets=asset_1,
- default_partition_mapper=HourlyMapper(),
+ default_partition_mapper=ToHourlyMapper(),
),
session=session,
):
diff --git a/airflow-core/tests/unit/partition_mappers/test_product.py
b/airflow-core/tests/unit/partition_mappers/test_product.py
index a1c46abc629..fba3ae868a8 100644
--- a/airflow-core/tests/unit/partition_mappers/test_product.py
+++ b/airflow-core/tests/unit/partition_mappers/test_product.py
@@ -21,61 +21,61 @@ import pytest
from airflow.partition_mappers.identity import IdentityMapper
from airflow.partition_mappers.product import ProductMapper
-from airflow.partition_mappers.temporal import DailyMapper, HourlyMapper
+from airflow.partition_mappers.temporal import ToDailyMapper, ToHourlyMapper
class TestProductMapper:
def test_to_downstream(self):
- pm = ProductMapper(HourlyMapper(), DailyMapper())
+ pm = ProductMapper(ToHourlyMapper(), ToDailyMapper())
assert pm.to_downstream("2024-01-15T10:30:00|2024-01-15T10:30:00") ==
"2024-01-15T10|2024-01-15"
def test_to_downstream_wrong_segment_count(self):
- pm = ProductMapper(HourlyMapper(), DailyMapper())
+ pm = ProductMapper(ToHourlyMapper(), ToDailyMapper())
with pytest.raises(ValueError, match="Expected 2 segments"):
pm.to_downstream("2024-01-15T10:30:00|2024-01-15T10:30:00|extra")
def test_to_downstream_single_segment_for_two_mappers(self):
- pm = ProductMapper(HourlyMapper(), DailyMapper())
+ pm = ProductMapper(ToHourlyMapper(), ToDailyMapper())
with pytest.raises(ValueError, match="Expected 2 segments"):
pm.to_downstream("2024-01-15T10:30:00")
def test_custom_delimiter(self):
- pm = ProductMapper(HourlyMapper(), DailyMapper(), delimiter="::")
+ pm = ProductMapper(ToHourlyMapper(), ToDailyMapper(), delimiter="::")
assert pm.to_downstream("2024-01-15T10:30:00::2024-01-15T10:30:00") ==
"2024-01-15T10::2024-01-15"
def test_custom_delimiter_wrong_segment_count(self):
- pm = ProductMapper(HourlyMapper(), DailyMapper(), delimiter="::")
+ pm = ProductMapper(ToHourlyMapper(), ToDailyMapper(), delimiter="::")
with pytest.raises(ValueError, match="Expected 2 segments"):
pm.to_downstream("2024-01-15T10:30:00::2024-01-15T10:30:00::extra")
def test_serialize(self):
from airflow.serialization.encoders import encode_partition_mapper
- pm = ProductMapper(HourlyMapper(), DailyMapper())
+ pm = ProductMapper(ToHourlyMapper(), ToDailyMapper())
result = pm.serialize()
assert result == {
"delimiter": "|",
"mappers": [
- encode_partition_mapper(HourlyMapper()),
- encode_partition_mapper(DailyMapper()),
+ encode_partition_mapper(ToHourlyMapper()),
+ encode_partition_mapper(ToDailyMapper()),
],
}
def test_serialize_custom_delimiter(self):
from airflow.serialization.encoders import encode_partition_mapper
- pm = ProductMapper(HourlyMapper(), DailyMapper(), delimiter="::")
+ pm = ProductMapper(ToHourlyMapper(), ToDailyMapper(), delimiter="::")
result = pm.serialize()
assert result == {
"delimiter": "::",
"mappers": [
- encode_partition_mapper(HourlyMapper()),
- encode_partition_mapper(DailyMapper()),
+ encode_partition_mapper(ToHourlyMapper()),
+ encode_partition_mapper(ToDailyMapper()),
],
}
def test_deserialize(self):
- pm = ProductMapper(HourlyMapper(), DailyMapper())
+ pm = ProductMapper(ToHourlyMapper(), ToDailyMapper())
serialized = pm.serialize()
restored = ProductMapper.deserialize(serialized)
assert isinstance(restored, ProductMapper)
@@ -84,7 +84,7 @@ class TestProductMapper:
assert
restored.to_downstream("2024-01-15T10:30:00|2024-01-15T10:30:00") ==
"2024-01-15T10|2024-01-15"
def test_deserialize_custom_delimiter(self):
- pm = ProductMapper(HourlyMapper(), DailyMapper(), delimiter="::")
+ pm = ProductMapper(ToHourlyMapper(), ToDailyMapper(), delimiter="::")
serialized = pm.serialize()
restored = ProductMapper.deserialize(serialized)
assert isinstance(restored, ProductMapper)
@@ -99,15 +99,15 @@ class TestProductMapper:
data = {
"mappers": [
- encode_partition_mapper(HourlyMapper()),
- encode_partition_mapper(DailyMapper()),
+ encode_partition_mapper(ToHourlyMapper()),
+ encode_partition_mapper(ToDailyMapper()),
],
}
restored = ProductMapper.deserialize(data)
assert restored.delimiter == "|"
def test_three_mappers(self):
- pm = ProductMapper(HourlyMapper(), DailyMapper(), IdentityMapper())
+ pm = ProductMapper(ToHourlyMapper(), ToDailyMapper(), IdentityMapper())
assert (
pm.to_downstream("2024-01-15T10:30:00|2024-01-15T10:30:00|raw") ==
"2024-01-15T10|2024-01-15|raw"
)
diff --git a/airflow-core/tests/unit/partition_mappers/test_temporal.py
b/airflow-core/tests/unit/partition_mappers/test_temporal.py
index 94c04f08d91..9bcb9679ea8 100644
--- a/airflow-core/tests/unit/partition_mappers/test_temporal.py
+++ b/airflow-core/tests/unit/partition_mappers/test_temporal.py
@@ -19,12 +19,12 @@ from __future__ import annotations
import pytest
from airflow.partition_mappers.temporal import (
- DailyMapper,
- HourlyMapper,
- MonthlyMapper,
- QuarterlyMapper,
- WeeklyMapper,
- YearlyMapper,
+ ToDailyMapper,
+ ToHourlyMapper,
+ ToMonthlyMapper,
+ ToQuarterlyMapper,
+ ToWeeklyMapper,
+ ToYearlyMapper,
_BaseTemporalMapper,
)
@@ -33,12 +33,12 @@ class TestTemporalMappers:
@pytest.mark.parametrize(
("mapper_cls", "expected_downstream_key"),
[
- (HourlyMapper, "2026-02-10T14"),
- (DailyMapper, "2026-02-10"),
- (WeeklyMapper, "2026-02-09 (W07)"),
- (MonthlyMapper, "2026-02"),
- (QuarterlyMapper, "2026-Q1"),
- (YearlyMapper, "2026"),
+ (ToHourlyMapper, "2026-02-10T14"),
+ (ToDailyMapper, "2026-02-10"),
+ (ToWeeklyMapper, "2026-02-09 (W07)"),
+ (ToMonthlyMapper, "2026-02"),
+ (ToQuarterlyMapper, "2026-Q1"),
+ (ToYearlyMapper, "2026"),
],
)
def test_to_downstream(
@@ -52,12 +52,12 @@ class TestTemporalMappers:
@pytest.mark.parametrize(
("mapper_cls", "expected_outut_format"),
[
- (HourlyMapper, "%Y-%m-%dT%H"),
- (DailyMapper, "%Y-%m-%d"),
- (WeeklyMapper, "%Y-%m-%d (W%V)"),
- (MonthlyMapper, "%Y-%m"),
- (QuarterlyMapper, "%Y-Q{quarter}"),
- (YearlyMapper, "%Y"),
+ (ToHourlyMapper, "%Y-%m-%dT%H"),
+ (ToDailyMapper, "%Y-%m-%d"),
+ (ToWeeklyMapper, "%Y-%m-%d (W%V)"),
+ (ToMonthlyMapper, "%Y-%m"),
+ (ToQuarterlyMapper, "%Y-Q{quarter}"),
+ (ToYearlyMapper, "%Y"),
],
)
def test_serialize(self, mapper_cls: type[_BaseTemporalMapper],
expected_outut_format: str):
@@ -69,7 +69,7 @@ class TestTemporalMappers:
@pytest.mark.parametrize(
"mapper_cls",
- [HourlyMapper, DailyMapper, WeeklyMapper, MonthlyMapper,
QuarterlyMapper, YearlyMapper],
+ [ToHourlyMapper, ToDailyMapper, ToWeeklyMapper, ToMonthlyMapper,
ToQuarterlyMapper, ToYearlyMapper],
)
def test_deserialize(self, mapper_cls):
pm = mapper_cls.deserialize(
diff --git a/airflow-core/tests/unit/serialization/test_serialized_objects.py
b/airflow-core/tests/unit/serialization/test_serialized_objects.py
index 50e1621c4af..514abb618aa 100644
--- a/airflow-core/tests/unit/serialization/test_serialized_objects.py
+++ b/airflow-core/tests/unit/serialization/test_serialized_objects.py
@@ -47,12 +47,12 @@ from airflow.models.taskinstance import TaskInstance
from airflow.models.xcom_arg import XComArg
from airflow.partition_mappers.identity import IdentityMapper as
CoreIdentityMapper
from airflow.partition_mappers.temporal import (
- DailyMapper as CoreDailyMapper,
- HourlyMapper as CoureHourlyMapper,
- MonthlyMapper as CoreMonthlyMapper,
- QuarterlyMapper as CoreQuarterlyMapper,
- WeeklyMapper as CoreWeeklyMapper,
- YearlyMapper as CoreYearlyMapper,
+ ToDailyMapper as CoreToDailyMapper,
+ ToHourlyMapper as CoreToHourlyMapper,
+ ToMonthlyMapper as CoreToMonthlyMapper,
+ ToQuarterlyMapper as CoreToQuarterlyMapper,
+ ToWeeklyMapper as CoreToWeeklyMapper,
+ ToYearlyMapper as CoreToYearlyMapper,
)
from airflow.providers.standard.operators.bash import BashOperator
from airflow.providers.standard.operators.empty import EmptyOperator
@@ -60,13 +60,13 @@ from airflow.providers.standard.operators.python import
PythonOperator
from airflow.providers.standard.triggers.file import FileDeleteTrigger
from airflow.sdk import (
BaseOperator,
- DailyMapper,
- HourlyMapper,
IdentityMapper,
- MonthlyMapper,
- QuarterlyMapper,
- WeeklyMapper,
- YearlyMapper,
+ ToDailyMapper,
+ ToHourlyMapper,
+ ToMonthlyMapper,
+ ToQuarterlyMapper,
+ ToWeeklyMapper,
+ ToYearlyMapper,
)
from airflow.sdk.definitions.asset import (
Asset,
@@ -768,39 +768,39 @@ def test_encode_timezone():
[
(IdentityMapper, [],
"airflow.partition_mappers.identity.IdentityMapper", {}),
(
- HourlyMapper,
+ ToHourlyMapper,
[],
- "airflow.partition_mappers.temporal.HourlyMapper",
+ "airflow.partition_mappers.temporal.ToHourlyMapper",
{"input_format": "%Y-%m-%dT%H:%M:%S", "output_format":
"%Y-%m-%dT%H"},
),
(
- DailyMapper,
+ ToDailyMapper,
[],
- "airflow.partition_mappers.temporal.DailyMapper",
+ "airflow.partition_mappers.temporal.ToDailyMapper",
{"input_format": "%Y-%m-%dT%H:%M:%S", "output_format": "%Y-%m-%d"},
),
(
- WeeklyMapper,
+ ToWeeklyMapper,
[],
- "airflow.partition_mappers.temporal.WeeklyMapper",
+ "airflow.partition_mappers.temporal.ToWeeklyMapper",
{"input_format": "%Y-%m-%dT%H:%M:%S", "output_format": "%Y-%m-%d
(W%V)"},
),
(
- MonthlyMapper,
+ ToMonthlyMapper,
[],
- "airflow.partition_mappers.temporal.MonthlyMapper",
+ "airflow.partition_mappers.temporal.ToMonthlyMapper",
{"input_format": "%Y-%m-%dT%H:%M:%S", "output_format": "%Y-%m"},
),
(
- QuarterlyMapper,
+ ToQuarterlyMapper,
[],
- "airflow.partition_mappers.temporal.QuarterlyMapper",
+ "airflow.partition_mappers.temporal.ToQuarterlyMapper",
{"input_format": "%Y-%m-%dT%H:%M:%S", "output_format":
"%Y-Q{quarter}"},
),
(
- YearlyMapper,
+ ToYearlyMapper,
[],
- "airflow.partition_mappers.temporal.YearlyMapper",
+ "airflow.partition_mappers.temporal.ToYearlyMapper",
{"input_format": "%Y-%m-%dT%H:%M:%S", "output_format": "%Y"},
),
],
@@ -819,12 +819,12 @@ def test_encode_partition_mapper(cls, args, encode_type,
encode_var):
("sdk_cls", "core_cls"),
[
(IdentityMapper, CoreIdentityMapper),
- (HourlyMapper, CoureHourlyMapper),
- (DailyMapper, CoreDailyMapper),
- (WeeklyMapper, CoreWeeklyMapper),
- (MonthlyMapper, CoreMonthlyMapper),
- (QuarterlyMapper, CoreQuarterlyMapper),
- (YearlyMapper, CoreYearlyMapper),
+ (ToHourlyMapper, CoreToHourlyMapper),
+ (ToDailyMapper, CoreToDailyMapper),
+ (ToWeeklyMapper, CoreToWeeklyMapper),
+ (ToMonthlyMapper, CoreToMonthlyMapper),
+ (ToQuarterlyMapper, CoreToQuarterlyMapper),
+ (ToYearlyMapper, CoreToYearlyMapper),
],
)
def test_decode_partition_mapper(sdk_cls, core_cls):
@@ -853,10 +853,10 @@ def test_decode_partition_mapper_not_exists():
def test_encode_product_mapper():
- from airflow.sdk import HourlyMapper, IdentityMapper, ProductMapper
+ from airflow.sdk import IdentityMapper, ProductMapper, ToHourlyMapper
from airflow.serialization.encoders import encode_partition_mapper
- partition_mapper = ProductMapper(IdentityMapper(), HourlyMapper())
+ partition_mapper = ProductMapper(IdentityMapper(), ToHourlyMapper())
assert encode_partition_mapper(partition_mapper) == {
Encoding.TYPE: "airflow.partition_mappers.product.ProductMapper",
Encoding.VAR: {
@@ -867,7 +867,7 @@ def test_encode_product_mapper():
Encoding.VAR: {},
},
{
- Encoding.TYPE:
"airflow.partition_mappers.temporal.HourlyMapper",
+ Encoding.TYPE:
"airflow.partition_mappers.temporal.ToHourlyMapper",
Encoding.VAR: {
"input_format": "%Y-%m-%dT%H:%M:%S",
"output_format": "%Y-%m-%dT%H",
@@ -880,11 +880,11 @@ def test_encode_product_mapper():
def test_decode_product_mapper():
from airflow.partition_mappers.product import ProductMapper as
CoreProductMapper
- from airflow.sdk import DailyMapper, HourlyMapper, ProductMapper
+ from airflow.sdk import ProductMapper, ToDailyMapper, ToHourlyMapper
from airflow.serialization.decoders import decode_partition_mapper
from airflow.serialization.encoders import encode_partition_mapper
- partition_mapper = ProductMapper(HourlyMapper(), DailyMapper())
+ partition_mapper = ProductMapper(ToHourlyMapper(), ToDailyMapper())
encoded_pm = encode_partition_mapper(partition_mapper)
core_pm = decode_partition_mapper(encoded_pm)
diff --git a/task-sdk/docs/api.rst b/task-sdk/docs/api.rst
index 0565eeca0fb..5c0e6b07632 100644
--- a/task-sdk/docs/api.rst
+++ b/task-sdk/docs/api.rst
@@ -203,17 +203,17 @@ Partition Mapper
.. autoapiclass:: airflow.sdk.IdentityMapper
-.. autoapiclass:: airflow.sdk.HourlyMapper
+.. autoapiclass:: airflow.sdk.ToHourlyMapper
-.. autoapiclass:: airflow.sdk.DailyMapper
+.. autoapiclass:: airflow.sdk.ToDailyMapper
-.. autoapiclass:: airflow.sdk.WeeklyMapper
+.. autoapiclass:: airflow.sdk.ToWeeklyMapper
-.. autoapiclass:: airflow.sdk.MonthlyMapper
+.. autoapiclass:: airflow.sdk.ToMonthlyMapper
-.. autoapiclass:: airflow.sdk.QuarterlyMapper
+.. autoapiclass:: airflow.sdk.ToQuarterlyMapper
-.. autoapiclass:: airflow.sdk.YearlyMapper
+.. autoapiclass:: airflow.sdk.ToYearlyMapper
.. autoapiclass:: airflow.sdk.ProductMapper
diff --git a/task-sdk/src/airflow/sdk/__init__.py
b/task-sdk/src/airflow/sdk/__init__.py
index 1044dfa2a44..70521464908 100644
--- a/task-sdk/src/airflow/sdk/__init__.py
+++ b/task-sdk/src/airflow/sdk/__init__.py
@@ -44,18 +44,15 @@ __all__ = [
"CronPartitionTimetable",
"DAG",
"DagRunState",
- "DailyMapper",
"DeadlineAlert",
"DeadlineReference",
"DeltaDataIntervalTimetable",
"DeltaTriggerTimetable",
"EdgeModifier",
"EventsTimetable",
- "HourlyMapper",
"IdentityMapper",
"Label",
"Metadata",
- "MonthlyMapper",
"MultipleCronTriggerTimetable",
"ObjectStoragePath",
"Param",
@@ -64,18 +61,21 @@ __all__ = [
"PartitionMapper",
"PokeReturnValue",
"ProductMapper",
- "QuarterlyMapper",
"SkipMixin",
"SyncCallback",
"TaskGroup",
"TaskInstance",
"TaskInstanceState",
+ "ToDailyMapper",
+ "ToHourlyMapper",
+ "ToMonthlyMapper",
+ "ToQuarterlyMapper",
+ "ToWeeklyMapper",
+ "ToYearlyMapper",
"TriggerRule",
"Variable",
- "WeeklyMapper",
"WeightRule",
"XComArg",
- "YearlyMapper",
"asset",
"chain",
"chain_linear",
@@ -128,12 +128,12 @@ if TYPE_CHECKING:
from airflow.sdk.definitions.partition_mappers.identity import
IdentityMapper
from airflow.sdk.definitions.partition_mappers.product import ProductMapper
from airflow.sdk.definitions.partition_mappers.temporal import (
- DailyMapper,
- HourlyMapper,
- MonthlyMapper,
- QuarterlyMapper,
- WeeklyMapper,
- YearlyMapper,
+ ToDailyMapper,
+ ToHourlyMapper,
+ ToMonthlyMapper,
+ ToQuarterlyMapper,
+ ToWeeklyMapper,
+ ToYearlyMapper,
)
from airflow.sdk.definitions.taskgroup import TaskGroup
from airflow.sdk.definitions.template import literal
@@ -185,18 +185,15 @@ __lazy_imports: dict[str, str] = {
"CronPartitionTimetable": ".definitions.timetables.trigger",
"DAG": ".definitions.dag",
"DagRunState": ".api.datamodels._generated",
- "DailyMapper": ".definitions.partition_mappers.temporal",
"DeadlineAlert": ".definitions.deadline",
"DeadlineReference": ".definitions.deadline",
"DeltaDataIntervalTimetable": ".definitions.timetables.interval",
"DeltaTriggerTimetable": ".definitions.timetables.trigger",
"EdgeModifier": ".definitions.edges",
"EventsTimetable": ".definitions.timetables.events",
- "HourlyMapper": ".definitions.partition_mappers.temporal",
"IdentityMapper": ".definitions.partition_mappers.identity",
"Label": ".definitions.edges",
"Metadata": ".definitions.asset.metadata",
- "MonthlyMapper": ".definitions.partition_mappers.temporal",
"MultipleCronTriggerTimetable": ".definitions.timetables.trigger",
"ObjectStoragePath": ".io.path",
"Param": ".definitions.param",
@@ -205,19 +202,22 @@ __lazy_imports: dict[str, str] = {
"PartitionMapper": ".definitions.partition_mappers.base",
"PokeReturnValue": ".bases.sensor",
"ProductMapper": ".definitions.partition_mappers.product",
- "QuarterlyMapper": ".definitions.partition_mappers.temporal",
"SecretCache": ".execution_time.cache",
"SkipMixin": ".bases.skipmixin",
"SyncCallback": ".definitions.callback",
"TaskGroup": ".definitions.taskgroup",
"TaskInstance": ".types",
"TaskInstanceState": ".api.datamodels._generated",
+ "ToDailyMapper": ".definitions.partition_mappers.temporal",
+ "ToHourlyMapper": ".definitions.partition_mappers.temporal",
+ "ToMonthlyMapper": ".definitions.partition_mappers.temporal",
+ "ToQuarterlyMapper": ".definitions.partition_mappers.temporal",
+ "ToWeeklyMapper": ".definitions.partition_mappers.temporal",
+ "ToYearlyMapper": ".definitions.partition_mappers.temporal",
"TriggerRule": ".api.datamodels._generated",
"Variable": ".definitions.variable",
- "WeeklyMapper": ".definitions.partition_mappers.temporal",
"WeightRule": ".api.datamodels._generated",
"XComArg": ".definitions.xcom_arg",
- "YearlyMapper": ".definitions.partition_mappers.temporal",
"asset": ".definitions.asset.decorators",
"chain": ".bases.operator",
"chain_linear": ".bases.operator",
diff --git a/task-sdk/src/airflow/sdk/__init__.pyi
b/task-sdk/src/airflow/sdk/__init__.pyi
index 2670163899b..905500c6164 100644
--- a/task-sdk/src/airflow/sdk/__init__.pyi
+++ b/task-sdk/src/airflow/sdk/__init__.pyi
@@ -67,12 +67,12 @@ from airflow.sdk.definitions.partition_mappers.base import
PartitionMapper
from airflow.sdk.definitions.partition_mappers.identity import IdentityMapper
from airflow.sdk.definitions.partition_mappers.product import ProductMapper
from airflow.sdk.definitions.partition_mappers.temporal import (
- DailyMapper,
- HourlyMapper,
- MonthlyMapper,
- QuarterlyMapper,
- WeeklyMapper,
- YearlyMapper,
+ ToDailyMapper,
+ ToHourlyMapper,
+ ToMonthlyMapper,
+ ToQuarterlyMapper,
+ ToWeeklyMapper,
+ ToYearlyMapper,
)
from airflow.sdk.definitions.taskgroup import TaskGroup as TaskGroup
from airflow.sdk.definitions.template import literal as literal
@@ -124,16 +124,13 @@ __all__ = [
"CronPartitionTimetable",
"DAG",
"DagRunState",
- "DailyMapper",
"DeltaDataIntervalTimetable",
"DeltaTriggerTimetable",
"EdgeModifier",
"EventsTimetable",
- "HourlyMapper",
"IdentityMapper",
"Label",
"Metadata",
- "MonthlyMapper",
"MultipleCronTriggerTimetable",
"ObjectStoragePath",
"Param",
@@ -141,17 +138,20 @@ __all__ = [
"PartitionedAssetTimetable",
"PartitionMapper",
"ProductMapper",
- "QuarterlyMapper",
"SecretCache",
"SkipMixin",
"TaskGroup",
"TaskInstanceState",
+ "ToDailyMapper",
+ "ToHourlyMapper",
+ "ToMonthlyMapper",
+ "ToQuarterlyMapper",
+ "ToWeeklyMapper",
+ "ToYearlyMapper",
"TriggerRule",
"Variable",
- "WeeklyMapper",
"WeightRule",
"XComArg",
- "YearlyMapper",
"asset",
"chain",
"chain_linear",
diff --git a/task-sdk/src/airflow/sdk/definitions/partition_mappers/temporal.py
b/task-sdk/src/airflow/sdk/definitions/partition_mappers/temporal.py
index 0d225155c7b..4e6fc4647b1 100644
--- a/task-sdk/src/airflow/sdk/definitions/partition_mappers/temporal.py
+++ b/task-sdk/src/airflow/sdk/definitions/partition_mappers/temporal.py
@@ -31,37 +31,37 @@ class _BaseTemporalMapper(PartitionMapper):
self.output_format = output_format or self.default_output_format
-class HourlyMapper(_BaseTemporalMapper):
+class ToHourlyMapper(_BaseTemporalMapper):
"""Map a time-based partition key to hour."""
default_output_format = "%Y-%m-%dT%H"
-class DailyMapper(_BaseTemporalMapper):
+class ToDailyMapper(_BaseTemporalMapper):
"""Map a time-based partition key to day."""
default_output_format = "%Y-%m-%d"
-class WeeklyMapper(_BaseTemporalMapper):
+class ToWeeklyMapper(_BaseTemporalMapper):
"""Map a time-based partition key to week."""
default_output_format = "%Y-%m-%d (W%V)"
-class MonthlyMapper(_BaseTemporalMapper):
+class ToMonthlyMapper(_BaseTemporalMapper):
"""Map a time-based partition key to month."""
default_output_format = "%Y-%m"
-class QuarterlyMapper(_BaseTemporalMapper):
+class ToQuarterlyMapper(_BaseTemporalMapper):
"""Map a time-based partition key to quarter."""
default_output_format = "%Y-Q{quarter}"
-class YearlyMapper(_BaseTemporalMapper):
+class ToYearlyMapper(_BaseTemporalMapper):
"""Map a time-based partition key to year."""
default_output_format = "%Y"