This is an automated email from the ASF dual-hosted git repository.
weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 8dd358add2e AIP-76: Add ProductMapper for multi-dimensional partition
keys (#61937)
8dd358add2e is described below
commit 8dd358add2e595f8610d44dcfc91a1143b8ad9c4
Author: Anish Giri <[email protected]>
AuthorDate: Thu Mar 5 04:42:43 2026 -0600
AIP-76: Add ProductMapper for multi-dimensional partition keys (#61937)
---
.../example_dags/example_asset_partition.py | 47 +++++++++
.../src/airflow/partition_mappers/product.py | 67 ++++++++++++
airflow-core/src/airflow/serialization/encoders.py | 9 ++
.../tests/unit/partition_mappers/test_product.py | 113 +++++++++++++++++++++
.../unit/serialization/test_serialized_objects.py | 43 ++++++++
task-sdk/docs/api.rst | 2 +
task-sdk/src/airflow/sdk/__init__.py | 3 +
task-sdk/src/airflow/sdk/__init__.pyi | 2 +
.../sdk/definitions/partition_mappers/product.py | 34 +++++++
9 files changed, 320 insertions(+)
diff --git a/airflow-core/src/airflow/example_dags/example_asset_partition.py
b/airflow-core/src/airflow/example_dags/example_asset_partition.py
index e744a2dc1ce..69c9d883e63 100644
--- a/airflow-core/src/airflow/example_dags/example_asset_partition.py
+++ b/airflow-core/src/airflow/example_dags/example_asset_partition.py
@@ -23,8 +23,11 @@ from airflow.sdk import (
DAG,
Asset,
CronPartitionTimetable,
+ DailyMapper,
HourlyMapper,
+ IdentityMapper,
PartitionedAssetTimetable,
+ ProductMapper,
YearlyMapper,
asset,
task,
@@ -137,3 +140,47 @@ with DAG(
pass
check_partition_alignment()
+
+
+regional_sales = Asset(uri="file://incoming/sales/regional.csv",
name="regional_sales")
+
+with DAG(
+ dag_id="ingest_regional_sales",
+ schedule=CronPartitionTimetable("0 * * * *", timezone="UTC"),
+ tags=["sales", "ingestion"],
+):
+ """Produce hourly regional sales data with composite partition keys."""
+
+ @task(outlets=[regional_sales])
+ def ingest_sales():
+ """Ingest regional sales data partitioned by region and time."""
+ pass
+
+ ingest_sales()
+
+
+with DAG(
+ dag_id="aggregate_regional_sales",
+ schedule=PartitionedAssetTimetable(
+ assets=regional_sales,
+ default_partition_mapper=ProductMapper(IdentityMapper(),
DailyMapper()),
+ ),
+ catchup=False,
+ tags=["sales", "aggregation"],
+):
+ """
+ Aggregate regional sales using ProductMapper.
+
+ The ProductMapper splits the composite key "region|timestamp" and applies
+ IdentityMapper to the region segment and DailyMapper to the timestamp
segment,
+ aligning hourly partitions to daily granularity per region.
+ """
+
+ @task
+ def aggregate_sales(dag_run=None):
+ """Aggregate sales data for the matched region-day partition."""
+ if TYPE_CHECKING:
+ assert dag_run
+ print(dag_run.partition_key)
+
+ aggregate_sales()
diff --git a/airflow-core/src/airflow/partition_mappers/product.py
b/airflow-core/src/airflow/partition_mappers/product.py
new file mode 100644
index 00000000000..5d8882bbbcc
--- /dev/null
+++ b/airflow-core/src/airflow/partition_mappers/product.py
@@ -0,0 +1,67 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from typing import Any
+
+from airflow.partition_mappers.base import PartitionMapper
+
+
+class ProductMapper(PartitionMapper):
+ """Partition mapper that combines multiple mappers into a
multi-dimensional key."""
+
+ def __init__(
+ self,
+ mapper0: PartitionMapper,
+ mapper1: PartitionMapper,
+ /,
+ *mappers: PartitionMapper,
+ delimiter: str = "|",
+ ) -> None:
+ self.mappers = [mapper0, mapper1, *mappers]
+ self.delimiter = delimiter
+
+ def to_downstream(self, key: str) -> str:
+ segments = key.split(self.delimiter)
+ if len(segments) != len(self.mappers):
+ raise ValueError(f"Expected {len(self.mappers)} segments in key,
got {len(segments)}")
+ results: list[str] = []
+ for mapper, segment in zip(self.mappers, segments):
+ result = mapper.to_downstream(segment)
+ if not isinstance(result, str):
+ raise TypeError(
+ f"ProductMapper child mappers must return a single key, "
+ f"but {type(mapper).__name__} returned multiple keys"
+ )
+ results.append(result)
+ return self.delimiter.join(results)
+
+ def serialize(self) -> dict[str, Any]:
+ from airflow.serialization.encoders import encode_partition_mapper
+
+ return {
+ "delimiter": self.delimiter,
+ "mappers": [encode_partition_mapper(m) for m in self.mappers],
+ }
+
+ @classmethod
+ def deserialize(cls, data: dict[str, Any]) -> PartitionMapper:
+ from airflow.serialization.decoders import decode_partition_mapper
+
+ mappers = [decode_partition_mapper(m) for m in data["mappers"]]
+ return cls(*mappers, delimiter=data.get("delimiter", "|"))
diff --git a/airflow-core/src/airflow/serialization/encoders.py
b/airflow-core/src/airflow/serialization/encoders.py
index f82af80c70c..0242b6ee61d 100644
--- a/airflow-core/src/airflow/serialization/encoders.py
+++ b/airflow-core/src/airflow/serialization/encoders.py
@@ -43,6 +43,7 @@ from airflow.sdk import (
MonthlyMapper,
MultipleCronTriggerTimetable,
PartitionMapper,
+ ProductMapper,
QuarterlyMapper,
WeeklyMapper,
YearlyMapper,
@@ -373,6 +374,7 @@ class _Serializer:
MonthlyMapper: "airflow.partition_mappers.temporal.MonthlyMapper",
QuarterlyMapper: "airflow.partition_mappers.temporal.QuarterlyMapper",
YearlyMapper: "airflow.partition_mappers.temporal.YearlyMapper",
+ ProductMapper: "airflow.partition_mappers.product.ProductMapper",
}
@functools.singledispatchmethod
@@ -407,6 +409,13 @@ class _Serializer:
"output_format": partition_mapper.output_format,
}
+ @serialize_partition_mapper.register
+ def _(self, partition_mapper: ProductMapper) -> dict[str, Any]:
+ return {
+ "delimiter": partition_mapper.delimiter,
+ "mappers": [encode_partition_mapper(m) for m in
partition_mapper.mappers],
+ }
+
_serializer = _Serializer()
diff --git a/airflow-core/tests/unit/partition_mappers/test_product.py
b/airflow-core/tests/unit/partition_mappers/test_product.py
new file mode 100644
index 00000000000..a1c46abc629
--- /dev/null
+++ b/airflow-core/tests/unit/partition_mappers/test_product.py
@@ -0,0 +1,113 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import pytest
+
+from airflow.partition_mappers.identity import IdentityMapper
+from airflow.partition_mappers.product import ProductMapper
+from airflow.partition_mappers.temporal import DailyMapper, HourlyMapper
+
+
+class TestProductMapper:
+ def test_to_downstream(self):
+ pm = ProductMapper(HourlyMapper(), DailyMapper())
+ assert pm.to_downstream("2024-01-15T10:30:00|2024-01-15T10:30:00") ==
"2024-01-15T10|2024-01-15"
+
+ def test_to_downstream_wrong_segment_count(self):
+ pm = ProductMapper(HourlyMapper(), DailyMapper())
+ with pytest.raises(ValueError, match="Expected 2 segments"):
+ pm.to_downstream("2024-01-15T10:30:00|2024-01-15T10:30:00|extra")
+
+ def test_to_downstream_single_segment_for_two_mappers(self):
+ pm = ProductMapper(HourlyMapper(), DailyMapper())
+ with pytest.raises(ValueError, match="Expected 2 segments"):
+ pm.to_downstream("2024-01-15T10:30:00")
+
+ def test_custom_delimiter(self):
+ pm = ProductMapper(HourlyMapper(), DailyMapper(), delimiter="::")
+ assert pm.to_downstream("2024-01-15T10:30:00::2024-01-15T10:30:00") ==
"2024-01-15T10::2024-01-15"
+
+ def test_custom_delimiter_wrong_segment_count(self):
+ pm = ProductMapper(HourlyMapper(), DailyMapper(), delimiter="::")
+ with pytest.raises(ValueError, match="Expected 2 segments"):
+ pm.to_downstream("2024-01-15T10:30:00::2024-01-15T10:30:00::extra")
+
+ def test_serialize(self):
+ from airflow.serialization.encoders import encode_partition_mapper
+
+ pm = ProductMapper(HourlyMapper(), DailyMapper())
+ result = pm.serialize()
+ assert result == {
+ "delimiter": "|",
+ "mappers": [
+ encode_partition_mapper(HourlyMapper()),
+ encode_partition_mapper(DailyMapper()),
+ ],
+ }
+
+ def test_serialize_custom_delimiter(self):
+ from airflow.serialization.encoders import encode_partition_mapper
+
+ pm = ProductMapper(HourlyMapper(), DailyMapper(), delimiter="::")
+ result = pm.serialize()
+ assert result == {
+ "delimiter": "::",
+ "mappers": [
+ encode_partition_mapper(HourlyMapper()),
+ encode_partition_mapper(DailyMapper()),
+ ],
+ }
+
+ def test_deserialize(self):
+ pm = ProductMapper(HourlyMapper(), DailyMapper())
+ serialized = pm.serialize()
+ restored = ProductMapper.deserialize(serialized)
+ assert isinstance(restored, ProductMapper)
+ assert len(restored.mappers) == 2
+ assert restored.delimiter == "|"
+ assert
restored.to_downstream("2024-01-15T10:30:00|2024-01-15T10:30:00") ==
"2024-01-15T10|2024-01-15"
+
+ def test_deserialize_custom_delimiter(self):
+ pm = ProductMapper(HourlyMapper(), DailyMapper(), delimiter="::")
+ serialized = pm.serialize()
+ restored = ProductMapper.deserialize(serialized)
+ assert isinstance(restored, ProductMapper)
+ assert restored.delimiter == "::"
+ assert (
+ restored.to_downstream("2024-01-15T10:30:00::2024-01-15T10:30:00")
== "2024-01-15T10::2024-01-15"
+ )
+
+ def test_deserialize_backward_compat(self):
+ """Deserializing data without delimiter field defaults to '|'."""
+ from airflow.serialization.encoders import encode_partition_mapper
+
+ data = {
+ "mappers": [
+ encode_partition_mapper(HourlyMapper()),
+ encode_partition_mapper(DailyMapper()),
+ ],
+ }
+ restored = ProductMapper.deserialize(data)
+ assert restored.delimiter == "|"
+
+ def test_three_mappers(self):
+ pm = ProductMapper(HourlyMapper(), DailyMapper(), IdentityMapper())
+ assert (
+ pm.to_downstream("2024-01-15T10:30:00|2024-01-15T10:30:00|raw") ==
"2024-01-15T10|2024-01-15|raw"
+ )
diff --git a/airflow-core/tests/unit/serialization/test_serialized_objects.py
b/airflow-core/tests/unit/serialization/test_serialized_objects.py
index d4f549f6df1..e001def29a9 100644
--- a/airflow-core/tests/unit/serialization/test_serialized_objects.py
+++ b/airflow-core/tests/unit/serialization/test_serialized_objects.py
@@ -834,6 +834,49 @@ def test_decode_partition_mapper_not_exists():
decode_partition_mapper({Encoding.TYPE: "not_exists", Encoding.VAR:
{}})
+def test_encode_product_mapper():
+ from airflow.sdk import HourlyMapper, IdentityMapper, ProductMapper
+ from airflow.serialization.encoders import encode_partition_mapper
+
+ partition_mapper = ProductMapper(IdentityMapper(), HourlyMapper())
+ assert encode_partition_mapper(partition_mapper) == {
+ Encoding.TYPE: "airflow.partition_mappers.product.ProductMapper",
+ Encoding.VAR: {
+ "delimiter": "|",
+ "mappers": [
+ {
+ Encoding.TYPE:
"airflow.partition_mappers.identity.IdentityMapper",
+ Encoding.VAR: {},
+ },
+ {
+ Encoding.TYPE:
"airflow.partition_mappers.temporal.HourlyMapper",
+ Encoding.VAR: {
+ "input_format": "%Y-%m-%dT%H:%M:%S",
+ "output_format": "%Y-%m-%dT%H",
+ },
+ },
+ ],
+ },
+ }
+
+
+def test_decode_product_mapper():
+ from airflow.partition_mappers.product import ProductMapper as
CoreProductMapper
+ from airflow.sdk import DailyMapper, HourlyMapper, ProductMapper
+ from airflow.serialization.decoders import decode_partition_mapper
+ from airflow.serialization.encoders import encode_partition_mapper
+
+ partition_mapper = ProductMapper(HourlyMapper(), DailyMapper())
+ encoded_pm = encode_partition_mapper(partition_mapper)
+
+ core_pm = decode_partition_mapper(encoded_pm)
+
+ assert isinstance(core_pm, CoreProductMapper)
+ assert len(core_pm.mappers) == 2
+ assert core_pm.delimiter == "|"
+ assert core_pm.to_downstream("2024-06-15T10:30:00|2024-06-15T10:30:00") ==
"2024-06-15T10|2024-06-15"
+
+
class TestSerializedBaseOperator:
# ensure the default logging config is used for this test, no matter what
ran before
@pytest.mark.usefixtures("reset_logging_config")
diff --git a/task-sdk/docs/api.rst b/task-sdk/docs/api.rst
index 6b400e50448..88cc4205374 100644
--- a/task-sdk/docs/api.rst
+++ b/task-sdk/docs/api.rst
@@ -215,6 +215,8 @@ Partition Mapper
.. autoapiclass:: airflow.sdk.YearlyMapper
+.. autoapiclass:: airflow.sdk.ProductMapper
+
I/O Helpers
-----------
.. autoapiclass:: airflow.sdk.ObjectStoragePath
diff --git a/task-sdk/src/airflow/sdk/__init__.py
b/task-sdk/src/airflow/sdk/__init__.py
index 1259870bdbf..669c87e019a 100644
--- a/task-sdk/src/airflow/sdk/__init__.py
+++ b/task-sdk/src/airflow/sdk/__init__.py
@@ -61,6 +61,7 @@ __all__ = [
"PartitionedAssetTimetable",
"PartitionMapper",
"PokeReturnValue",
+ "ProductMapper",
"QuarterlyMapper",
"SkipMixin",
"SyncCallback",
@@ -122,6 +123,7 @@ if TYPE_CHECKING:
from airflow.sdk.definitions.param import Param, ParamsDict
from airflow.sdk.definitions.partition_mappers.base import PartitionMapper
from airflow.sdk.definitions.partition_mappers.identity import
IdentityMapper
+ from airflow.sdk.definitions.partition_mappers.product import ProductMapper
from airflow.sdk.definitions.partition_mappers.temporal import (
DailyMapper,
HourlyMapper,
@@ -198,6 +200,7 @@ __lazy_imports: dict[str, str] = {
"PartitionedAssetTimetable": ".definitions.timetables.assets",
"PartitionMapper": ".definitions.partition_mappers.base",
"PokeReturnValue": ".bases.sensor",
+ "ProductMapper": ".definitions.partition_mappers.product",
"QuarterlyMapper": ".definitions.partition_mappers.temporal",
"SecretCache": ".execution_time.cache",
"SkipMixin": ".bases.skipmixin",
diff --git a/task-sdk/src/airflow/sdk/__init__.pyi
b/task-sdk/src/airflow/sdk/__init__.pyi
index 60b87aeec9a..ed9943700b5 100644
--- a/task-sdk/src/airflow/sdk/__init__.pyi
+++ b/task-sdk/src/airflow/sdk/__init__.pyi
@@ -63,6 +63,7 @@ from airflow.sdk.definitions.edges import EdgeModifier as
EdgeModifier, Label as
from airflow.sdk.definitions.param import Param as Param
from airflow.sdk.definitions.partition_mappers.base import PartitionMapper
from airflow.sdk.definitions.partition_mappers.identity import IdentityMapper
+from airflow.sdk.definitions.partition_mappers.product import ProductMapper
from airflow.sdk.definitions.partition_mappers.temporal import (
DailyMapper,
HourlyMapper,
@@ -135,6 +136,7 @@ __all__ = [
"PokeReturnValue",
"PartitionedAssetTimetable",
"PartitionMapper",
+ "ProductMapper",
"QuarterlyMapper",
"SecretCache",
"SkipMixin",
diff --git a/task-sdk/src/airflow/sdk/definitions/partition_mappers/product.py
b/task-sdk/src/airflow/sdk/definitions/partition_mappers/product.py
new file mode 100644
index 00000000000..ffc744ff71e
--- /dev/null
+++ b/task-sdk/src/airflow/sdk/definitions/partition_mappers/product.py
@@ -0,0 +1,34 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from airflow.sdk.definitions.partition_mappers.base import PartitionMapper
+
+
+class ProductMapper(PartitionMapper):
+ """Partition mapper that combines multiple mappers into a
multi-dimensional key."""
+
+ def __init__(
+ self,
+ mapper0: PartitionMapper,
+ mapper1: PartitionMapper,
+ /,
+ *mappers: PartitionMapper,
+ delimiter: str = "|",
+ ) -> None:
+ self.mappers = [mapper0, mapper1, *mappers]
+ self.delimiter = delimiter