This is an automated email from the ASF dual-hosted git repository.
Lee-W pushed a commit to branch v3-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-3-test by this push:
new 2102562d8ed [v3-3-test] Populate partition_date when manually
triggering partitioned Dags (#68458) (#68690)
2102562d8ed is described below
commit 2102562d8ed6b6412e28e812e951b7029d8a44e2
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Jun 18 20:47:54 2026 +0800
[v3-3-test] Populate partition_date when manually triggering partitioned
Dags (#68458) (#68690)
Co-authored-by: Wei Lee <[email protected]>
---
airflow-core/src/airflow/api/common/trigger_dag.py | 3 +
.../api_fastapi/core_api/datamodels/dag_run.py | 4 ++
.../api_fastapi/core_api/routes/public/dag_run.py | 1 +
.../api_fastapi/execution_api/routes/dag_runs.py | 7 +-
airflow-core/src/airflow/exceptions.py | 7 +-
airflow-core/src/airflow/timetables/base.py | 39 ++++++++++
airflow-core/src/airflow/timetables/simple.py | 42 +++++++++++
airflow-core/src/airflow/timetables/trigger.py | 23 +++++-
.../tests/unit/api/common/test_trigger_dag.py | 68 ++++++++++++++++++
.../core_api/routes/public/test_dag_run.py | 84 +++++++++++++++++++++-
.../execution_api/versions/head/test_dag_runs.py | 27 ++++++-
.../unit/timetables/test_partitioned_timetable.py | 23 ++++++
12 files changed, 322 insertions(+), 6 deletions(-)
diff --git a/airflow-core/src/airflow/api/common/trigger_dag.py
b/airflow-core/src/airflow/api/common/trigger_dag.py
index 4f8ca4c25e7..8dfd06c1ac9 100644
--- a/airflow-core/src/airflow/api/common/trigger_dag.py
+++ b/airflow-core/src/airflow/api/common/trigger_dag.py
@@ -118,6 +118,8 @@ def _trigger_dag(
if dag_run := DagRun.find_duplicate(dag_id=dag_id, run_id=run_id):
raise DagRunAlreadyExists(dag_run)
+ partition_date = dag.timetable.resolve_partition_date(partition_key)
+
run_conf = None
if is_arg_set(conf):
run_conf = _normalize_conf(conf)
@@ -133,6 +135,7 @@ def _trigger_dag(
note=note,
state=DagRunState.QUEUED,
partition_key=partition_key,
+ partition_date=partition_date,
session=session,
)
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py
index 88ffceca6d7..033437e2ad9 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py
@@ -191,6 +191,9 @@ class TriggerDAGRunPostBody(StrictBaseModel):
run_after=timezone.coerce_datetime(run_after),
data_interval=data_interval,
)
+
+ partition_date =
dag.timetable.resolve_partition_date(self.partition_key)
+
return {
"run_id": run_id,
"logical_date": coerced_logical_date,
@@ -199,6 +202,7 @@ class TriggerDAGRunPostBody(StrictBaseModel):
"conf": self.conf,
"note": self.note,
"partition_key": self.partition_key,
+ "partition_date": partition_date,
}
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
index 0860b29da94..30ae090289c 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
@@ -657,6 +657,7 @@ def trigger_dag_run(
triggering_user_name=user.get_name(),
state=DagRunState.QUEUED,
partition_key=params["partition_key"],
+ partition_date=params["partition_date"],
session=session,
)
except (ParamValidationError, ValueError) as e:
diff --git
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py
index 872c7495752..1092369e913 100644
--- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py
+++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py
@@ -34,7 +34,7 @@ from airflow.api_fastapi.execution_api.datamodels.dagrun
import DagRunStateRespo
from airflow.api_fastapi.execution_api.datamodels.taskinstance import DagRun
from airflow.api_fastapi.execution_api.datamodels.token import TIToken
from airflow.api_fastapi.execution_api.security import CurrentTIToken
-from airflow.exceptions import DagNotPartitionedError, DagRunAlreadyExists
+from airflow.exceptions import DagNotPartitionedError, DagRunAlreadyExists,
InvalidPartitionKeyError
from airflow.models.dag import DagModel
from airflow.models.dagrun import DagRun as DagRunModel
from airflow.models.taskinstance import TaskInstance
@@ -158,6 +158,11 @@ def trigger_dag_run(
status.HTTP_400_BAD_REQUEST,
detail={"reason": "not_partitioned", "message": str(e)},
)
+ except InvalidPartitionKeyError as e:
+ raise HTTPException(
+ status.HTTP_400_BAD_REQUEST,
+ detail={"reason": "invalid_partition_key", "message": str(e)},
+ ) from e
@router.post(
diff --git a/airflow-core/src/airflow/exceptions.py
b/airflow-core/src/airflow/exceptions.py
index ff45daebc02..addd4f8f2a5 100644
--- a/airflow-core/src/airflow/exceptions.py
+++ b/airflow-core/src/airflow/exceptions.py
@@ -146,7 +146,12 @@ class DagNotPartitionedError(ValueError):
class InvalidPartitionKeyError(ValueError):
- """Raise when a partition_key value is empty or exceeds the maximum
allowed length."""
+ """
+ Raise when a partition_key value is invalid.
+
+ 1. empty or exceeds the maximum allowed length
+ 2. cannot be decoded to a partition_date by the timetable
+ """
class DagRunAlreadyExists(AirflowBadRequest):
diff --git a/airflow-core/src/airflow/timetables/base.py
b/airflow-core/src/airflow/timetables/base.py
index b07e5c98c93..400b63f3d70 100644
--- a/airflow-core/src/airflow/timetables/base.py
+++ b/airflow-core/src/airflow/timetables/base.py
@@ -292,6 +292,45 @@ class Timetable(Protocol):
datetime.datetime(day.year, day.month, day.day,
tzinfo=datetime.timezone.utc)
)
+ def resolve_partition_date(self, partition_key: str | None) ->
datetime.datetime | None:
+ """
+ Decode *partition_key* into the period-start datetime it represents.
+
+ Returns the timezone-aware datetime that was used to format
*partition_key*
+ when the timetable originally created the run, or ``None`` when no
temporal
+ meaning can be derived from the key. ``None`` is returned without
decoding
+ when *partition_key* is ``None``, when this timetable is not
``partitioned``,
+ or when it defers partition selection to runtime
(``partitioned_at_runtime``).
+
+ Partitioned timetables whose keys carry a temporal structure override
+ :meth:`_decode_partition_date`:
+
+ - :class:`~airflow.timetables.trigger.CronPartitionTimetable` parses
the
+ key with ``strptime`` using its ``key_format`` and localizes with its
+ timezone.
+ - :class:`~airflow.timetables.simple.PartitionedAssetTimetable`
delegates
+ to each asset's partition mapper; when the mappers agree on the same
+ instant it is returned, otherwise ``None`` is returned.
+
+ :param partition_key: The partition key string to decode, or ``None``.
+ :returns: The period-start datetime, or ``None`` if not resolvable.
+ :raises InvalidPartitionKeyError: When *partition_key* is syntactically
+ invalid for this timetable's key format (e.g. ``strptime`` fails).
+ """
+ if partition_key is None or not self.partitioned or
self.partitioned_at_runtime:
+ return None
+ return self._decode_partition_date(partition_key)
+
+ def _decode_partition_date(self, partition_key: str) -> datetime.datetime
| None:
+ """
+ Decode a non-empty *partition_key* into its period-start datetime.
+
+ Called by :meth:`resolve_partition_date` only after the
partitioned-state
+ guards pass. The default returns ``None``; partitioned timetables whose
+ keys carry temporal structure override this.
+ """
+ return None
+
@property
def partition_mapper_info(self) -> list[PartitionMapperInfo]:
"""
diff --git a/airflow-core/src/airflow/timetables/simple.py
b/airflow-core/src/airflow/timetables/simple.py
index 496ba9da156..a8b499dbe50 100644
--- a/airflow-core/src/airflow/timetables/simple.py
+++ b/airflow-core/src/airflow/timetables/simple.py
@@ -17,11 +17,13 @@
from __future__ import annotations
from contextlib import suppress
+from datetime import datetime
from typing import TYPE_CHECKING, Any, TypeAlias
import structlog
from airflow._shared.timezones import timezone
+from airflow.exceptions import InvalidPartitionKeyError
from airflow.partition_mappers.identity import IdentityMapper
from airflow.serialization.definitions.assets import (
SerializedAsset,
@@ -355,6 +357,46 @@ class PartitionedAssetTimetable(AssetTriggeredTimetable):
entries.append(PartitionMapperInfo(uri=s_asset_ref.uri,
is_rollup=mapper.is_rollup))
return entries
+ def _decode_partition_date(self, partition_key: str) -> datetime | None:
+ """
+ Decode *partition_key* into the period-start datetime shared by all
asset mappers.
+
+ Iterates every asset (and asset ref) reachable from the asset
condition, asks
+ each mapper for the temporal anchor of *partition_key*, and returns it
when all
+ temporal mappers agree. Returns ``None`` when no mapper is temporal or
when the
+ mappers disagree — consistent with how the scheduler resolves
``partition_date``
+ for asset-triggered runs.
+ """
+ anchors: set[datetime] = set()
+ for unique_key, _ in self.asset_condition.iter_assets():
+ mapper = self.get_partition_mapper(name=unique_key.name,
uri=unique_key.uri)
+ try:
+ anchor = mapper.to_partition_date(partition_key)
+ except ValueError as exc:
+ raise InvalidPartitionKeyError(
+ f"Partition key {partition_key!r} is invalid for this
timetable's mappers: {exc}"
+ ) from exc
+ if anchor is not None:
+ anchors.add(anchor)
+ for s_asset_ref in self.asset_condition.iter_asset_refs():
+ if isinstance(s_asset_ref, SerializedAssetNameRef):
+ mapper = self.get_partition_mapper(name=s_asset_ref.name)
+ elif isinstance(s_asset_ref, SerializedAssetUriRef):
+ mapper = self.get_partition_mapper(uri=s_asset_ref.uri)
+ else:
+ continue
+ try:
+ anchor = mapper.to_partition_date(partition_key)
+ except ValueError as exc:
+ raise InvalidPartitionKeyError(
+ f"Partition key {partition_key!r} is invalid for this
timetable's mappers: {exc}"
+ ) from exc
+ if anchor is not None:
+ anchors.add(anchor)
+ if len(anchors) == 1:
+ return anchors.pop()
+ return None
+
def serialize(self) -> dict[str, Any]:
from airflow.serialization.serialized_objects import encode_asset_like
diff --git a/airflow-core/src/airflow/timetables/trigger.py
b/airflow-core/src/airflow/timetables/trigger.py
index b3ac7cbf2e0..a1de05ac76e 100644
--- a/airflow-core/src/airflow/timetables/trigger.py
+++ b/airflow-core/src/airflow/timetables/trigger.py
@@ -27,7 +27,8 @@ from typing import TYPE_CHECKING, Any
import structlog
-from airflow._shared.timezones.timezone import coerce_datetime,
parse_timezone, utcnow
+from airflow._shared.timezones.timezone import coerce_datetime, make_aware,
parse_timezone, utcnow
+from airflow.exceptions import InvalidPartitionKeyError
from airflow.timetables._cron import CronMixin
from airflow.timetables._delta import DeltaMixin
from airflow.timetables.base import DagRunInfo, DataInterval, Timetable
@@ -518,6 +519,26 @@ class CronPartitionTimetable(CronTriggerTimetable):
# midnight partition keys as "...T00:00:00", not the prior UTC day's
"...T16:00:00").
return
partition_date.in_timezone(self._timezone).strftime(self._key_format)
+ def _decode_partition_date(self, partition_key: str) -> datetime.datetime:
+ """
+ Decode *partition_key* back to the period-start datetime.
+
+ Parses the key with ``strptime`` using this timetable's ``key_format``
+ and localizes with the timetable's timezone, mirroring the forward
+ direction in :meth:`_format_key`.
+
+ :raises InvalidPartitionKeyError: When *partition_key* does not match
+ the timetable's ``key_format``.
+ """
+ try:
+ naive = datetime.datetime.strptime(partition_key, self._key_format)
+ except ValueError as exc:
+ raise InvalidPartitionKeyError(
+ f"Partition key {partition_key!r} does not match the
timetable's "
+ f"key_format {self._key_format!r}: {exc}"
+ ) from exc
+ return make_aware(naive, self._timezone)
+
def next_dagrun_info_v2(
self,
*,
diff --git a/airflow-core/tests/unit/api/common/test_trigger_dag.py
b/airflow-core/tests/unit/api/common/test_trigger_dag.py
index 49a385865f4..6fc48bd0814 100644
--- a/airflow-core/tests/unit/api/common/test_trigger_dag.py
+++ b/airflow-core/tests/unit/api/common/test_trigger_dag.py
@@ -16,12 +16,17 @@
# under the License.
from __future__ import annotations
+from datetime import datetime, timezone
+
import pytest
from sqlalchemy import select
from airflow.api.common.trigger_dag import trigger_dag
+from airflow.exceptions import InvalidPartitionKeyError
from airflow.models import DagModel
from airflow.providers.standard.operators.empty import EmptyOperator
+from airflow.timetables.simple import PartitionAtRuntime
+from airflow.timetables.trigger import CronPartitionTimetable
from airflow.utils.types import DagRunTriggeredByType, DagRunType
from tests_common.test_utils.db import (
@@ -83,6 +88,69 @@ def test_trigger_dag_with_custom_run_type(dag_maker,
session):
assert dag_run.run_type == DagRunType.ASSET_MATERIALIZATION
+def
test_trigger_dag_populates_partition_date_for_cron_partition_timetable(dag_maker,
session):
+ """Manually triggering a CronPartitionTimetable Dag with a partition_key
populates partition_date."""
+ with dag_maker(
+ session=session,
+ dag_id="TEST_CRON_PARTITION_DAG",
+ schedule=CronPartitionTimetable("0 0 * * *", timezone="UTC"),
+ ):
+ EmptyOperator(task_id="mytask")
+ session.commit()
+
+ dag_run = trigger_dag(
+ dag_id="TEST_CRON_PARTITION_DAG",
+ triggered_by=DagRunTriggeredByType.REST_API,
+ partition_key="2025-06-01T00:00:00",
+ session=session,
+ )
+
+ assert dag_run is not None
+ assert dag_run.partition_key == "2025-06-01T00:00:00"
+ assert dag_run.partition_date == datetime(2025, 6, 1, 0, 0, 0,
tzinfo=timezone.utc)
+
+
+def
test_trigger_dag_raises_invalid_partition_key_for_cron_partition_timetable(dag_maker,
session):
+ """A malformed partition_key for CronPartitionTimetable raises
InvalidPartitionKeyError."""
+ with dag_maker(
+ session=session,
+ dag_id="TEST_CRON_PARTITION_BAD_KEY",
+ schedule=CronPartitionTimetable("0 0 * * *", timezone="UTC"),
+ ):
+ EmptyOperator(task_id="mytask")
+ session.commit()
+
+ with pytest.raises(InvalidPartitionKeyError, match="does not match"):
+ trigger_dag(
+ dag_id="TEST_CRON_PARTITION_BAD_KEY",
+ triggered_by=DagRunTriggeredByType.REST_API,
+ partition_key="not-a-valid-date",
+ session=session,
+ )
+
+
+def
test_trigger_dag_partition_at_runtime_leaves_partition_date_none(dag_maker,
session):
+ """PartitionAtRuntime Dags accept arbitrary keys; partition_date stays
None."""
+ with dag_maker(
+ session=session,
+ dag_id="TEST_PARTITION_AT_RUNTIME",
+ schedule=PartitionAtRuntime(),
+ ):
+ EmptyOperator(task_id="mytask")
+ session.commit()
+
+ dag_run = trigger_dag(
+ dag_id="TEST_PARTITION_AT_RUNTIME",
+ triggered_by=DagRunTriggeredByType.REST_API,
+ partition_key="arbitrary-runtime-key",
+ session=session,
+ )
+
+ assert dag_run is not None
+ assert dag_run.partition_key == "arbitrary-runtime-key"
+ assert dag_run.partition_date is None
+
+
def test_trigger_dag_operator_denied_when_only_manual_allowed(dag_maker,
session):
with dag_maker(session=session, dag_id="TEST_DAG_1", schedule="0 * * * *"):
EmptyOperator(task_id="mytask")
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
index 0d0478f8d01..ad11f764d48 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
@@ -46,7 +46,7 @@ from airflow.sdk.definitions.asset import Asset
from airflow.sdk.definitions.param import Param
from airflow.settings import _configure_async_session
from airflow.timetables.interval import CronDataIntervalTimetable
-from airflow.timetables.simple import PartitionAtRuntime
+from airflow.timetables.simple import PartitionAtRuntime,
PartitionedAssetTimetable
from airflow.timetables.trigger import CronPartitionTimetable
from airflow.utils.session import provide_session
from airflow.utils.state import DagRunState, State
@@ -2874,7 +2874,7 @@ class TestTriggerDagRun:
partitioned_dag_id = "test_max_length_partition_key"
with dag_maker(
dag_id=partitioned_dag_id,
- schedule=CronPartitionTimetable("0 * * * *", timezone="UTC"),
+ schedule=PartitionedAssetTimetable(assets=Asset("test")),
start_date=START_DATE1,
session=session,
serialized=True,
@@ -2889,6 +2889,86 @@ class TestTriggerDagRun:
)
assert response.status_code == 200
+ @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
+ def test_trigger_partitioned_dag_populates_partition_date(self, dag_maker,
test_client, session):
+ """Triggering a CronPartitionTimetable Dag with a valid key populates
partition_date on the run.
+
+ Regression guard: before this fix partition_date was NULL for manually
triggered runs even
+ when partition_key was supplied, making partition-date-based filtering
(e.g.
+ ``airflow dags clear --partition-date-*``) silently skip those runs.
+ """
+ partitioned_dag_id = "test_trigger_populates_partition_date"
+ with dag_maker(
+ dag_id=partitioned_dag_id,
+ schedule=CronPartitionTimetable("0 0 * * *", timezone="UTC"),
+ start_date=START_DATE1,
+ session=session,
+ serialized=True,
+ ):
+ EmptyOperator(task_id="task")
+
+ session.commit()
+
+ response = test_client.post(
+ f"/dags/{partitioned_dag_id}/dagRuns",
+ json={"logical_date": None, "partition_key":
"2025-06-01T00:00:00"},
+ )
+ assert response.status_code == 200
+
+ dag_run = session.scalar(select(DagRun).where(DagRun.dag_id ==
partitioned_dag_id))
+ assert dag_run is not None
+ assert dag_run.partition_key == "2025-06-01T00:00:00"
+ assert dag_run.partition_date == datetime(2025, 6, 1, 0, 0, 0,
tzinfo=timezone.utc)
+
+ @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
+ def test_trigger_partitioned_dag_invalid_key_returns_400(self, dag_maker,
test_client, session):
+ """An invalid partition_key for a CronPartitionTimetable Dag must
return HTTP 400."""
+ partitioned_dag_id = "test_trigger_invalid_partition_key"
+ with dag_maker(
+ dag_id=partitioned_dag_id,
+ schedule=CronPartitionTimetable("0 0 * * *", timezone="UTC"),
+ start_date=START_DATE1,
+ session=session,
+ serialized=True,
+ ):
+ EmptyOperator(task_id="task")
+
+ session.commit()
+
+ response = test_client.post(
+ f"/dags/{partitioned_dag_id}/dagRuns",
+ json={"logical_date": None, "partition_key": "not-a-valid-date"},
+ )
+ assert response.status_code == 400
+
+ @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
+ def test_trigger_partition_at_runtime_dag_leaves_partition_date_none(
+ self, dag_maker, test_client, session
+ ):
+ """PartitionAtRuntime Dag with an arbitrary key must produce
partition_date=None."""
+ runtime_dag_id = "test_trigger_partition_at_runtime_none_date"
+ with dag_maker(
+ dag_id=runtime_dag_id,
+ schedule=PartitionAtRuntime(),
+ start_date=START_DATE1,
+ session=session,
+ serialized=True,
+ ):
+ EmptyOperator(task_id="task")
+
+ session.commit()
+
+ response = test_client.post(
+ f"/dags/{runtime_dag_id}/dagRuns",
+ json={"logical_date": None, "partition_key": "arbitrary-key"},
+ )
+ assert response.status_code == 200
+
+ dag_run = session.scalar(select(DagRun).where(DagRun.dag_id ==
runtime_dag_id))
+ assert dag_run is not None
+ assert dag_run.partition_key == "arbitrary-key"
+ assert dag_run.partition_date is None
+
class TestResolveRunOnLatestVersion:
@pytest.mark.parametrize("explicit_value", [True, False])
diff --git
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py
index 500e1ba4de4..5d3c37c9b52 100644
---
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py
+++
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py
@@ -71,7 +71,7 @@ class TestDagRunTrigger:
dag_id = "test_trigger_dag_run_partition_key"
run_id = "test_run_id"
logical_date = timezone.datetime(2025, 2, 20)
- partition_key = "2025-02-20"
+ partition_key = "2025-02-20T00:00:00"
with dag_maker(
dag_id=dag_id,
@@ -126,6 +126,31 @@ class TestDagRunTrigger:
}
}
+ def test_trigger_dag_run_invalid_partition_key(self, client, session,
dag_maker):
+ """partition_key that the timetable cannot decode must return 400."""
+ dag_id = "test_trigger_dag_run_invalid_partition_key"
+ run_id = "test_run_id_invalid_pk"
+ logical_date = timezone.datetime(2025, 2, 20)
+
+ with dag_maker(
+ dag_id=dag_id,
+ schedule=CronPartitionTimetable("0 * * * *", timezone="UTC"),
+ session=session,
+ serialized=True,
+ ):
+ EmptyOperator(task_id="test_task")
+ session.commit()
+
+ response = client.post(
+ f"/execution/dag-runs/{dag_id}/{run_id}",
+ json={"logical_date": logical_date.isoformat(), "partition_key":
"not-a-date"},
+ )
+
+ assert response.status_code == 400
+ detail = response.json()["detail"]
+ assert detail["reason"] == "invalid_partition_key"
+ assert "does not match the timetable's key_format" in detail["message"]
+
def test_trigger_dag_run_dag_not_found(self, client):
"""Test that a DAG that does not exist cannot be triggered."""
dag_id = "dag_not_found"
diff --git a/airflow-core/tests/unit/timetables/test_partitioned_timetable.py
b/airflow-core/tests/unit/timetables/test_partitioned_timetable.py
index 23caf06e79d..cb5d8821794 100644
--- a/airflow-core/tests/unit/timetables/test_partitioned_timetable.py
+++ b/airflow-core/tests/unit/timetables/test_partitioned_timetable.py
@@ -27,6 +27,7 @@ import pendulum
import pytest
from airflow._shared.module_loading import qualname
+from airflow.exceptions import InvalidPartitionKeyError
from airflow.partition_mappers.base import RollupMapper
from airflow.partition_mappers.identity import IdentityMapper as IdentityMapper
from airflow.partition_mappers.temporal import StartOfDayMapper
@@ -258,6 +259,28 @@ class TestPartitionedAssetTimetable:
assert isinstance(timetable.default_partition_mapper, IdentityMapper)
assert isinstance(timetable.partition_mapper_config[ser_asset],
IdentityMapper)
+ @pytest.mark.parametrize(
+ "asset_like",
+ [
+ Asset(name="daily", uri="s3://bucket/daily"),
+ Asset.ref(name="daily"),
+ ],
+ )
+ def test_decode_partition_date_raises_on_invalid_key(self, asset_like):
+ timetable = PartitionedAssetTimetable(
+ assets=asset_like,
+ default_partition_mapper=StartOfDayMapper(),
+ )
+ with pytest.raises(InvalidPartitionKeyError, match="is invalid for
this timetable's mappers"):
+ timetable._decode_partition_date("not-a-date")
+
+ def test_decode_partition_date_returns_period_start_for_valid_key(self):
+ timetable = PartitionedAssetTimetable(
+ assets=Asset(name="daily", uri="s3://bucket/daily"),
+ default_partition_mapper=StartOfDayMapper(),
+ )
+ assert timetable._decode_partition_date("2025-01-01") ==
pendulum.datetime(2025, 1, 1, tz="UTC")
+
def
test_partitioned_asset_timetable_resolve_day_bound_returns_midnight_utc(self):
"""PartitionedAssetTimetable has no local timezone; resolve_day_bound
uses the base default.