This is an automated email from the ASF dual-hosted git repository.
Lee-W pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 2963064c06f Derive partition_date for composite keys with one temporal
dimension (#68442)
2963064c06f is described below
commit 2963064c06f948d67c22056360b2f5045aa932d1
Author: Wei Lee <[email protected]>
AuthorDate: Wed Jun 17 14:52:57 2026 +0800
Derive partition_date for composite keys with one temporal dimension
(#68442)
---
airflow-core/src/airflow/models/backfill.py | 1 +
.../src/airflow/partition_mappers/product.py | 31 +++++++++++-
airflow-core/tests/unit/models/test_backfill.py | 58 +++++++++++++++++++++-
.../tests/unit/partition_mappers/test_product.py | 40 +++++++++++++++
4 files changed, 128 insertions(+), 2 deletions(-)
diff --git a/airflow-core/src/airflow/models/backfill.py
b/airflow-core/src/airflow/models/backfill.py
index f27f236e171..52019fa5d98 100644
--- a/airflow-core/src/airflow/models/backfill.py
+++ b/airflow-core/src/airflow/models/backfill.py
@@ -602,6 +602,7 @@ def _handle_clear_run(
backfill_id=backfill_id,
dag_run_id=dr.id,
logical_date=info.logical_date,
+ partition_key=info.partition_key,
sort_ordinal=sort_ordinal,
)
)
diff --git a/airflow-core/src/airflow/partition_mappers/product.py
b/airflow-core/src/airflow/partition_mappers/product.py
index e7c91aad082..367ac4fda84 100644
--- a/airflow-core/src/airflow/partition_mappers/product.py
+++ b/airflow-core/src/airflow/partition_mappers/product.py
@@ -17,10 +17,13 @@
from __future__ import annotations
-from typing import Any
+from typing import TYPE_CHECKING, Any
from airflow.partition_mappers.base import PartitionMapper
+if TYPE_CHECKING:
+ from datetime import datetime
+
class ProductMapper(PartitionMapper):
"""Partition mapper that combines multiple mappers into a
multi-dimensional key."""
@@ -53,6 +56,32 @@ class ProductMapper(PartitionMapper):
results.append(result)
return self.delimiter.join(results)
+ def to_partition_date(self, downstream_key: str) -> datetime | None:
+ """
+ Return the temporal anchor for *downstream_key*, or ``None`` when
ambiguous.
+
+ Splits *downstream_key* by :attr:`delimiter` and calls each child
mapper's
+ ``to_partition_date`` on the corresponding segment. Exactly one
non-``None``
+ result is returned as the anchor. Zero temporal children (all
categorical)
+ and two or more temporal children (ambiguous) both return ``None`` —
+ the convention is at most one time-based dimension per product key.
+
+ An unexpected segment count (key does not match the number of child
mappers)
+ returns ``None`` rather than raising, matching the scheduler's rule of
leaving
+ ``partition_date`` unset when the input is ambiguous.
+ """
+ segments = downstream_key.split(self.delimiter)
+ if len(segments) != len(self.mappers):
+ return None
+ anchors = [
+ anchor
+ for mapper, segment in zip(self.mappers, segments)
+ if (anchor := mapper.to_partition_date(segment)) is not None
+ ]
+ if len(anchors) == 1:
+ return anchors[0]
+ return None
+
def serialize(self) -> dict[str, Any]:
from airflow.serialization.encoders import encode_partition_mapper
diff --git a/airflow-core/tests/unit/models/test_backfill.py
b/airflow-core/tests/unit/models/test_backfill.py
index 531d0ef01ab..538c33a2461 100644
--- a/airflow-core/tests/unit/models/test_backfill.py
+++ b/airflow-core/tests/unit/models/test_backfill.py
@@ -41,11 +41,12 @@ from airflow.models.backfill import (
_create_backfill,
_do_dry_run,
_get_latest_dag_run_row_query,
+ _handle_clear_run,
)
from airflow.providers.standard.operators.python import PythonOperator
from airflow.sdk import Asset, CronPartitionTimetable,
PartitionedAssetTimetable
from airflow.ti_deps.dep_context import DepContext
-from airflow.timetables.base import DagRunInfo
+from airflow.timetables.base import DagRunInfo, DataInterval
from airflow.utils.state import DagRunState, TaskInstanceState
from airflow.utils.strings import get_random_string
from airflow.utils.types import DagRunTriggeredByType, DagRunType
@@ -1319,3 +1320,58 @@ def
test_partitioned_backfill_reprocess_failed(dag_maker, session):
assert bdr is not None
assert bdr.dag_run_id == backfill_run.id
assert bdr.partition_key == info.partition_key
+
+
+def test_handle_clear_run_preserves_partition_key(dag_maker, session):
+ """BackfillDagRun created via the clear/reprocess path carries
partition_key from info."""
+
+ with dag_maker(schedule="@daily") as dag:
+ PythonOperator(task_id="hi", python_callable=print)
+
+ logical_date = timezone.parse("2026-01-10")
+ dr = dag_maker.create_dagrun(
+ run_id="scheduled_2026-01-10",
+ logical_date=logical_date,
+ session=session,
+ state="failed",
+ )
+ session.commit()
+
+ # Create a Backfill row so the foreign-key constraint is satisfied.
+ backfill = Backfill(
+ dag_id=dag.dag_id,
+ from_date=logical_date,
+ to_date=logical_date,
+ dag_run_conf=None,
+ max_active_runs=1,
+ reprocess_behavior=ReprocessBehavior.FAILED,
+ )
+ session.add(backfill)
+ session.flush()
+
+ partition_key = "2026-01-10T00:00:00"
+ info = DagRunInfo(
+ run_after=logical_date,
+ data_interval=DataInterval(logical_date, logical_date),
+ partition_key=partition_key,
+ partition_date=None,
+ )
+
+ _handle_clear_run(
+ session=session,
+ dag=dag,
+ dr=dr,
+ info=info,
+ backfill_id=backfill.id,
+ sort_ordinal=1,
+ )
+ session.flush()
+
+ bdr = session.scalar(
+ select(BackfillDagRun).where(
+ BackfillDagRun.backfill_id == backfill.id,
+ BackfillDagRun.dag_run_id == dr.id,
+ )
+ )
+ assert bdr is not None
+ assert bdr.partition_key == partition_key
diff --git a/airflow-core/tests/unit/partition_mappers/test_product.py
b/airflow-core/tests/unit/partition_mappers/test_product.py
index 9d6eba1eb21..ce77441ef40 100644
--- a/airflow-core/tests/unit/partition_mappers/test_product.py
+++ b/airflow-core/tests/unit/partition_mappers/test_product.py
@@ -17,6 +17,8 @@
from __future__ import annotations
+from datetime import datetime, timezone
+
import pytest
from airflow.partition_mappers.identity import IdentityMapper
@@ -120,3 +122,41 @@ class TestProductMapper:
mapper = ProductMapper(StartOfHourMapper(), StartOfDayMapper())
encoded_var = encode_partition_mapper(mapper)[Encoding.VAR]
assert "max_downstream_keys" not in encoded_var
+
+ @pytest.mark.parametrize(
+ ("mapper", "downstream_key", "expected"),
+ [
+ pytest.param(
+ ProductMapper(StartOfDayMapper(), IdentityMapper()),
+ "2024-01-15|us-east-1",
+ datetime(2024, 1, 15, 0, 0, tzinfo=timezone.utc),
+ id="one-temporal-one-categorical-returns-temporal-anchor",
+ ),
+ pytest.param(
+ ProductMapper(IdentityMapper(), StartOfDayMapper()),
+ "us-east-1|2024-01-15",
+ datetime(2024, 1, 15, 0, 0, tzinfo=timezone.utc),
+ id="categorical-first-temporal-second-returns-temporal-anchor",
+ ),
+ pytest.param(
+ ProductMapper(StartOfDayMapper(), StartOfHourMapper()),
+ "2024-01-15|2024-01-15T10",
+ None,
+ id="two-temporal-children-returns-none",
+ ),
+ pytest.param(
+ ProductMapper(IdentityMapper(), IdentityMapper()),
+ "us-east-1|batch-42",
+ None,
+ id="all-categorical-returns-none",
+ ),
+ pytest.param(
+ ProductMapper(StartOfDayMapper(), IdentityMapper()),
+ "2024-01-15",
+ None,
+ id="wrong-segment-count-returns-none",
+ ),
+ ],
+ )
+ def test_to_partition_date(self, mapper, downstream_key, expected):
+ assert mapper.to_partition_date(downstream_key) == expected