This is an automated email from the ASF dual-hosted git repository.

Lee-W pushed a commit to branch v3-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-3-test by this push:
     new 2102562d8ed [v3-3-test] Populate partition_date when manually 
triggering partitioned Dags (#68458) (#68690)
2102562d8ed is described below

commit 2102562d8ed6b6412e28e812e951b7029d8a44e2
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Jun 18 20:47:54 2026 +0800

    [v3-3-test] Populate partition_date when manually triggering partitioned 
Dags (#68458) (#68690)
    
    Co-authored-by: Wei Lee <[email protected]>
---
 airflow-core/src/airflow/api/common/trigger_dag.py |  3 +
 .../api_fastapi/core_api/datamodels/dag_run.py     |  4 ++
 .../api_fastapi/core_api/routes/public/dag_run.py  |  1 +
 .../api_fastapi/execution_api/routes/dag_runs.py   |  7 +-
 airflow-core/src/airflow/exceptions.py             |  7 +-
 airflow-core/src/airflow/timetables/base.py        | 39 ++++++++++
 airflow-core/src/airflow/timetables/simple.py      | 42 +++++++++++
 airflow-core/src/airflow/timetables/trigger.py     | 23 +++++-
 .../tests/unit/api/common/test_trigger_dag.py      | 68 ++++++++++++++++++
 .../core_api/routes/public/test_dag_run.py         | 84 +++++++++++++++++++++-
 .../execution_api/versions/head/test_dag_runs.py   | 27 ++++++-
 .../unit/timetables/test_partitioned_timetable.py  | 23 ++++++
 12 files changed, 322 insertions(+), 6 deletions(-)

diff --git a/airflow-core/src/airflow/api/common/trigger_dag.py 
b/airflow-core/src/airflow/api/common/trigger_dag.py
index 4f8ca4c25e7..8dfd06c1ac9 100644
--- a/airflow-core/src/airflow/api/common/trigger_dag.py
+++ b/airflow-core/src/airflow/api/common/trigger_dag.py
@@ -118,6 +118,8 @@ def _trigger_dag(
     if dag_run := DagRun.find_duplicate(dag_id=dag_id, run_id=run_id):
         raise DagRunAlreadyExists(dag_run)
 
+    partition_date = dag.timetable.resolve_partition_date(partition_key)
+
     run_conf = None
     if is_arg_set(conf):
         run_conf = _normalize_conf(conf)
@@ -133,6 +135,7 @@ def _trigger_dag(
         note=note,
         state=DagRunState.QUEUED,
         partition_key=partition_key,
+        partition_date=partition_date,
         session=session,
     )
 
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py 
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py
index 88ffceca6d7..033437e2ad9 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py
@@ -191,6 +191,9 @@ class TriggerDAGRunPostBody(StrictBaseModel):
             run_after=timezone.coerce_datetime(run_after),
             data_interval=data_interval,
         )
+
+        partition_date = 
dag.timetable.resolve_partition_date(self.partition_key)
+
         return {
             "run_id": run_id,
             "logical_date": coerced_logical_date,
@@ -199,6 +202,7 @@ class TriggerDAGRunPostBody(StrictBaseModel):
             "conf": self.conf,
             "note": self.note,
             "partition_key": self.partition_key,
+            "partition_date": partition_date,
         }
 
 
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
index 0860b29da94..30ae090289c 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
@@ -657,6 +657,7 @@ def trigger_dag_run(
             triggering_user_name=user.get_name(),
             state=DagRunState.QUEUED,
             partition_key=params["partition_key"],
+            partition_date=params["partition_date"],
             session=session,
         )
     except (ParamValidationError, ValueError) as e:
diff --git 
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py 
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py
index 872c7495752..1092369e913 100644
--- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py
+++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py
@@ -34,7 +34,7 @@ from airflow.api_fastapi.execution_api.datamodels.dagrun 
import DagRunStateRespo
 from airflow.api_fastapi.execution_api.datamodels.taskinstance import DagRun
 from airflow.api_fastapi.execution_api.datamodels.token import TIToken
 from airflow.api_fastapi.execution_api.security import CurrentTIToken
-from airflow.exceptions import DagNotPartitionedError, DagRunAlreadyExists
+from airflow.exceptions import DagNotPartitionedError, DagRunAlreadyExists, 
InvalidPartitionKeyError
 from airflow.models.dag import DagModel
 from airflow.models.dagrun import DagRun as DagRunModel
 from airflow.models.taskinstance import TaskInstance
@@ -158,6 +158,11 @@ def trigger_dag_run(
             status.HTTP_400_BAD_REQUEST,
             detail={"reason": "not_partitioned", "message": str(e)},
         )
+    except InvalidPartitionKeyError as e:
+        raise HTTPException(
+            status.HTTP_400_BAD_REQUEST,
+            detail={"reason": "invalid_partition_key", "message": str(e)},
+        ) from e
 
 
 @router.post(
diff --git a/airflow-core/src/airflow/exceptions.py 
b/airflow-core/src/airflow/exceptions.py
index ff45daebc02..addd4f8f2a5 100644
--- a/airflow-core/src/airflow/exceptions.py
+++ b/airflow-core/src/airflow/exceptions.py
@@ -146,7 +146,12 @@ class DagNotPartitionedError(ValueError):
 
 
 class InvalidPartitionKeyError(ValueError):
-    """Raise when a partition_key value is empty or exceeds the maximum 
allowed length."""
+    """
+    Raise when a partition_key value is invalid.
+
+    1. empty or exceeds the maximum allowed length
+    2. cannot be decoded to a partition_date by the timetable
+    """
 
 
 class DagRunAlreadyExists(AirflowBadRequest):
diff --git a/airflow-core/src/airflow/timetables/base.py 
b/airflow-core/src/airflow/timetables/base.py
index b07e5c98c93..400b63f3d70 100644
--- a/airflow-core/src/airflow/timetables/base.py
+++ b/airflow-core/src/airflow/timetables/base.py
@@ -292,6 +292,45 @@ class Timetable(Protocol):
             datetime.datetime(day.year, day.month, day.day, 
tzinfo=datetime.timezone.utc)
         )
 
+    def resolve_partition_date(self, partition_key: str | None) -> 
datetime.datetime | None:
+        """
+        Decode *partition_key* into the period-start datetime it represents.
+
+        Returns the timezone-aware datetime that was used to format 
*partition_key*
+        when the timetable originally created the run, or ``None`` when no 
temporal
+        meaning can be derived from the key. ``None`` is returned without 
decoding
+        when *partition_key* is ``None``, when this timetable is not 
``partitioned``,
+        or when it defers partition selection to runtime 
(``partitioned_at_runtime``).
+
+        Partitioned timetables whose keys carry a temporal structure override
+        :meth:`_decode_partition_date`:
+
+        - :class:`~airflow.timetables.trigger.CronPartitionTimetable` parses 
the
+          key with ``strptime`` using its ``key_format`` and localizes with its
+          timezone.
+        - :class:`~airflow.timetables.simple.PartitionedAssetTimetable` 
delegates
+          to each asset's partition mapper; when the mappers agree on the same
+          instant it is returned, otherwise ``None`` is returned.
+
+        :param partition_key: The partition key string to decode, or ``None``.
+        :returns: The period-start datetime, or ``None`` if not resolvable.
+        :raises InvalidPartitionKeyError: When *partition_key* is syntactically
+            invalid for this timetable's key format (e.g. ``strptime`` fails).
+        """
+        if partition_key is None or not self.partitioned or 
self.partitioned_at_runtime:
+            return None
+        return self._decode_partition_date(partition_key)
+
+    def _decode_partition_date(self, partition_key: str) -> datetime.datetime 
| None:
+        """
+        Decode a non-empty *partition_key* into its period-start datetime.
+
+        Called by :meth:`resolve_partition_date` only after the 
partitioned-state
+        guards pass. The default returns ``None``; partitioned timetables whose
+        keys carry temporal structure override this.
+        """
+        return None
+
     @property
     def partition_mapper_info(self) -> list[PartitionMapperInfo]:
         """
diff --git a/airflow-core/src/airflow/timetables/simple.py 
b/airflow-core/src/airflow/timetables/simple.py
index 496ba9da156..a8b499dbe50 100644
--- a/airflow-core/src/airflow/timetables/simple.py
+++ b/airflow-core/src/airflow/timetables/simple.py
@@ -17,11 +17,13 @@
 from __future__ import annotations
 
 from contextlib import suppress
+from datetime import datetime
 from typing import TYPE_CHECKING, Any, TypeAlias
 
 import structlog
 
 from airflow._shared.timezones import timezone
+from airflow.exceptions import InvalidPartitionKeyError
 from airflow.partition_mappers.identity import IdentityMapper
 from airflow.serialization.definitions.assets import (
     SerializedAsset,
@@ -355,6 +357,46 @@ class PartitionedAssetTimetable(AssetTriggeredTimetable):
                 entries.append(PartitionMapperInfo(uri=s_asset_ref.uri, 
is_rollup=mapper.is_rollup))
         return entries
 
+    def _decode_partition_date(self, partition_key: str) -> datetime | None:
+        """
+        Decode *partition_key* into the period-start datetime shared by all 
asset mappers.
+
+        Iterates every asset (and asset ref) reachable from the asset 
condition, asks
+        each mapper for the temporal anchor of *partition_key*, and returns it 
when all
+        temporal mappers agree. Returns ``None`` when no mapper is temporal or 
when the
+        mappers disagree — consistent with how the scheduler resolves 
``partition_date``
+        for asset-triggered runs.
+        """
+        anchors: set[datetime] = set()
+        for unique_key, _ in self.asset_condition.iter_assets():
+            mapper = self.get_partition_mapper(name=unique_key.name, 
uri=unique_key.uri)
+            try:
+                anchor = mapper.to_partition_date(partition_key)
+            except ValueError as exc:
+                raise InvalidPartitionKeyError(
+                    f"Partition key {partition_key!r} is invalid for this 
timetable's mappers: {exc}"
+                ) from exc
+            if anchor is not None:
+                anchors.add(anchor)
+        for s_asset_ref in self.asset_condition.iter_asset_refs():
+            if isinstance(s_asset_ref, SerializedAssetNameRef):
+                mapper = self.get_partition_mapper(name=s_asset_ref.name)
+            elif isinstance(s_asset_ref, SerializedAssetUriRef):
+                mapper = self.get_partition_mapper(uri=s_asset_ref.uri)
+            else:
+                continue
+            try:
+                anchor = mapper.to_partition_date(partition_key)
+            except ValueError as exc:
+                raise InvalidPartitionKeyError(
+                    f"Partition key {partition_key!r} is invalid for this 
timetable's mappers: {exc}"
+                ) from exc
+            if anchor is not None:
+                anchors.add(anchor)
+        if len(anchors) == 1:
+            return anchors.pop()
+        return None
+
     def serialize(self) -> dict[str, Any]:
         from airflow.serialization.serialized_objects import encode_asset_like
 
diff --git a/airflow-core/src/airflow/timetables/trigger.py 
b/airflow-core/src/airflow/timetables/trigger.py
index b3ac7cbf2e0..a1de05ac76e 100644
--- a/airflow-core/src/airflow/timetables/trigger.py
+++ b/airflow-core/src/airflow/timetables/trigger.py
@@ -27,7 +27,8 @@ from typing import TYPE_CHECKING, Any
 
 import structlog
 
-from airflow._shared.timezones.timezone import coerce_datetime, 
parse_timezone, utcnow
+from airflow._shared.timezones.timezone import coerce_datetime, make_aware, 
parse_timezone, utcnow
+from airflow.exceptions import InvalidPartitionKeyError
 from airflow.timetables._cron import CronMixin
 from airflow.timetables._delta import DeltaMixin
 from airflow.timetables.base import DagRunInfo, DataInterval, Timetable
@@ -518,6 +519,26 @@ class CronPartitionTimetable(CronTriggerTimetable):
         # midnight partition keys as "...T00:00:00", not the prior UTC day's 
"...T16:00:00").
         return 
partition_date.in_timezone(self._timezone).strftime(self._key_format)
 
+    def _decode_partition_date(self, partition_key: str) -> datetime.datetime:
+        """
+        Decode *partition_key* back to the period-start datetime.
+
+        Parses the key with ``strptime`` using this timetable's ``key_format``
+        and localizes with the timetable's timezone, mirroring the forward
+        direction in :meth:`_format_key`.
+
+        :raises InvalidPartitionKeyError: When *partition_key* does not match
+            the timetable's ``key_format``.
+        """
+        try:
+            naive = datetime.datetime.strptime(partition_key, self._key_format)
+        except ValueError as exc:
+            raise InvalidPartitionKeyError(
+                f"Partition key {partition_key!r} does not match the 
timetable's "
+                f"key_format {self._key_format!r}: {exc}"
+            ) from exc
+        return make_aware(naive, self._timezone)
+
     def next_dagrun_info_v2(
         self,
         *,
diff --git a/airflow-core/tests/unit/api/common/test_trigger_dag.py 
b/airflow-core/tests/unit/api/common/test_trigger_dag.py
index 49a385865f4..6fc48bd0814 100644
--- a/airflow-core/tests/unit/api/common/test_trigger_dag.py
+++ b/airflow-core/tests/unit/api/common/test_trigger_dag.py
@@ -16,12 +16,17 @@
 # under the License.
 from __future__ import annotations
 
+from datetime import datetime, timezone
+
 import pytest
 from sqlalchemy import select
 
 from airflow.api.common.trigger_dag import trigger_dag
+from airflow.exceptions import InvalidPartitionKeyError
 from airflow.models import DagModel
 from airflow.providers.standard.operators.empty import EmptyOperator
+from airflow.timetables.simple import PartitionAtRuntime
+from airflow.timetables.trigger import CronPartitionTimetable
 from airflow.utils.types import DagRunTriggeredByType, DagRunType
 
 from tests_common.test_utils.db import (
@@ -83,6 +88,69 @@ def test_trigger_dag_with_custom_run_type(dag_maker, 
session):
     assert dag_run.run_type == DagRunType.ASSET_MATERIALIZATION
 
 
+def 
test_trigger_dag_populates_partition_date_for_cron_partition_timetable(dag_maker,
 session):
+    """Manually triggering a CronPartitionTimetable Dag with a partition_key 
populates partition_date."""
+    with dag_maker(
+        session=session,
+        dag_id="TEST_CRON_PARTITION_DAG",
+        schedule=CronPartitionTimetable("0 0 * * *", timezone="UTC"),
+    ):
+        EmptyOperator(task_id="mytask")
+    session.commit()
+
+    dag_run = trigger_dag(
+        dag_id="TEST_CRON_PARTITION_DAG",
+        triggered_by=DagRunTriggeredByType.REST_API,
+        partition_key="2025-06-01T00:00:00",
+        session=session,
+    )
+
+    assert dag_run is not None
+    assert dag_run.partition_key == "2025-06-01T00:00:00"
+    assert dag_run.partition_date == datetime(2025, 6, 1, 0, 0, 0, 
tzinfo=timezone.utc)
+
+
+def 
test_trigger_dag_raises_invalid_partition_key_for_cron_partition_timetable(dag_maker,
 session):
+    """A malformed partition_key for CronPartitionTimetable raises 
InvalidPartitionKeyError."""
+    with dag_maker(
+        session=session,
+        dag_id="TEST_CRON_PARTITION_BAD_KEY",
+        schedule=CronPartitionTimetable("0 0 * * *", timezone="UTC"),
+    ):
+        EmptyOperator(task_id="mytask")
+    session.commit()
+
+    with pytest.raises(InvalidPartitionKeyError, match="does not match"):
+        trigger_dag(
+            dag_id="TEST_CRON_PARTITION_BAD_KEY",
+            triggered_by=DagRunTriggeredByType.REST_API,
+            partition_key="not-a-valid-date",
+            session=session,
+        )
+
+
+def 
test_trigger_dag_partition_at_runtime_leaves_partition_date_none(dag_maker, 
session):
+    """PartitionAtRuntime Dags accept arbitrary keys; partition_date stays 
None."""
+    with dag_maker(
+        session=session,
+        dag_id="TEST_PARTITION_AT_RUNTIME",
+        schedule=PartitionAtRuntime(),
+    ):
+        EmptyOperator(task_id="mytask")
+    session.commit()
+
+    dag_run = trigger_dag(
+        dag_id="TEST_PARTITION_AT_RUNTIME",
+        triggered_by=DagRunTriggeredByType.REST_API,
+        partition_key="arbitrary-runtime-key",
+        session=session,
+    )
+
+    assert dag_run is not None
+    assert dag_run.partition_key == "arbitrary-runtime-key"
+    assert dag_run.partition_date is None
+
+
 def test_trigger_dag_operator_denied_when_only_manual_allowed(dag_maker, 
session):
     with dag_maker(session=session, dag_id="TEST_DAG_1", schedule="0 * * * *"):
         EmptyOperator(task_id="mytask")
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
index 0d0478f8d01..ad11f764d48 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
@@ -46,7 +46,7 @@ from airflow.sdk.definitions.asset import Asset
 from airflow.sdk.definitions.param import Param
 from airflow.settings import _configure_async_session
 from airflow.timetables.interval import CronDataIntervalTimetable
-from airflow.timetables.simple import PartitionAtRuntime
+from airflow.timetables.simple import PartitionAtRuntime, 
PartitionedAssetTimetable
 from airflow.timetables.trigger import CronPartitionTimetable
 from airflow.utils.session import provide_session
 from airflow.utils.state import DagRunState, State
@@ -2874,7 +2874,7 @@ class TestTriggerDagRun:
         partitioned_dag_id = "test_max_length_partition_key"
         with dag_maker(
             dag_id=partitioned_dag_id,
-            schedule=CronPartitionTimetable("0 * * * *", timezone="UTC"),
+            schedule=PartitionedAssetTimetable(assets=Asset("test")),
             start_date=START_DATE1,
             session=session,
             serialized=True,
@@ -2889,6 +2889,86 @@ class TestTriggerDagRun:
         )
         assert response.status_code == 200
 
+    @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
+    def test_trigger_partitioned_dag_populates_partition_date(self, dag_maker, 
test_client, session):
+        """Triggering a CronPartitionTimetable Dag with a valid key populates 
partition_date on the run.
+
+        Regression guard: before this fix partition_date was NULL for manually 
triggered runs even
+        when partition_key was supplied, making partition-date-based filtering 
(e.g.
+        ``airflow dags clear --partition-date-*``) silently skip those runs.
+        """
+        partitioned_dag_id = "test_trigger_populates_partition_date"
+        with dag_maker(
+            dag_id=partitioned_dag_id,
+            schedule=CronPartitionTimetable("0 0 * * *", timezone="UTC"),
+            start_date=START_DATE1,
+            session=session,
+            serialized=True,
+        ):
+            EmptyOperator(task_id="task")
+
+        session.commit()
+
+        response = test_client.post(
+            f"/dags/{partitioned_dag_id}/dagRuns",
+            json={"logical_date": None, "partition_key": 
"2025-06-01T00:00:00"},
+        )
+        assert response.status_code == 200
+
+        dag_run = session.scalar(select(DagRun).where(DagRun.dag_id == 
partitioned_dag_id))
+        assert dag_run is not None
+        assert dag_run.partition_key == "2025-06-01T00:00:00"
+        assert dag_run.partition_date == datetime(2025, 6, 1, 0, 0, 0, 
tzinfo=timezone.utc)
+
+    @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
+    def test_trigger_partitioned_dag_invalid_key_returns_400(self, dag_maker, 
test_client, session):
+        """An invalid partition_key for a CronPartitionTimetable Dag must 
return HTTP 400."""
+        partitioned_dag_id = "test_trigger_invalid_partition_key"
+        with dag_maker(
+            dag_id=partitioned_dag_id,
+            schedule=CronPartitionTimetable("0 0 * * *", timezone="UTC"),
+            start_date=START_DATE1,
+            session=session,
+            serialized=True,
+        ):
+            EmptyOperator(task_id="task")
+
+        session.commit()
+
+        response = test_client.post(
+            f"/dags/{partitioned_dag_id}/dagRuns",
+            json={"logical_date": None, "partition_key": "not-a-valid-date"},
+        )
+        assert response.status_code == 400
+
+    @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
+    def test_trigger_partition_at_runtime_dag_leaves_partition_date_none(
+        self, dag_maker, test_client, session
+    ):
+        """PartitionAtRuntime Dag with an arbitrary key must produce 
partition_date=None."""
+        runtime_dag_id = "test_trigger_partition_at_runtime_none_date"
+        with dag_maker(
+            dag_id=runtime_dag_id,
+            schedule=PartitionAtRuntime(),
+            start_date=START_DATE1,
+            session=session,
+            serialized=True,
+        ):
+            EmptyOperator(task_id="task")
+
+        session.commit()
+
+        response = test_client.post(
+            f"/dags/{runtime_dag_id}/dagRuns",
+            json={"logical_date": None, "partition_key": "arbitrary-key"},
+        )
+        assert response.status_code == 200
+
+        dag_run = session.scalar(select(DagRun).where(DagRun.dag_id == 
runtime_dag_id))
+        assert dag_run is not None
+        assert dag_run.partition_key == "arbitrary-key"
+        assert dag_run.partition_date is None
+
 
 class TestResolveRunOnLatestVersion:
     @pytest.mark.parametrize("explicit_value", [True, False])
diff --git 
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py
 
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py
index 500e1ba4de4..5d3c37c9b52 100644
--- 
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py
+++ 
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py
@@ -71,7 +71,7 @@ class TestDagRunTrigger:
         dag_id = "test_trigger_dag_run_partition_key"
         run_id = "test_run_id"
         logical_date = timezone.datetime(2025, 2, 20)
-        partition_key = "2025-02-20"
+        partition_key = "2025-02-20T00:00:00"
 
         with dag_maker(
             dag_id=dag_id,
@@ -126,6 +126,31 @@ class TestDagRunTrigger:
             }
         }
 
+    def test_trigger_dag_run_invalid_partition_key(self, client, session, 
dag_maker):
+        """partition_key that the timetable cannot decode must return 400."""
+        dag_id = "test_trigger_dag_run_invalid_partition_key"
+        run_id = "test_run_id_invalid_pk"
+        logical_date = timezone.datetime(2025, 2, 20)
+
+        with dag_maker(
+            dag_id=dag_id,
+            schedule=CronPartitionTimetable("0 * * * *", timezone="UTC"),
+            session=session,
+            serialized=True,
+        ):
+            EmptyOperator(task_id="test_task")
+        session.commit()
+
+        response = client.post(
+            f"/execution/dag-runs/{dag_id}/{run_id}",
+            json={"logical_date": logical_date.isoformat(), "partition_key": 
"not-a-date"},
+        )
+
+        assert response.status_code == 400
+        detail = response.json()["detail"]
+        assert detail["reason"] == "invalid_partition_key"
+        assert "does not match the timetable's key_format" in detail["message"]
+
     def test_trigger_dag_run_dag_not_found(self, client):
         """Test that a DAG that does not exist cannot be triggered."""
         dag_id = "dag_not_found"
diff --git a/airflow-core/tests/unit/timetables/test_partitioned_timetable.py 
b/airflow-core/tests/unit/timetables/test_partitioned_timetable.py
index 23caf06e79d..cb5d8821794 100644
--- a/airflow-core/tests/unit/timetables/test_partitioned_timetable.py
+++ b/airflow-core/tests/unit/timetables/test_partitioned_timetable.py
@@ -27,6 +27,7 @@ import pendulum
 import pytest
 
 from airflow._shared.module_loading import qualname
+from airflow.exceptions import InvalidPartitionKeyError
 from airflow.partition_mappers.base import RollupMapper
 from airflow.partition_mappers.identity import IdentityMapper as IdentityMapper
 from airflow.partition_mappers.temporal import StartOfDayMapper
@@ -258,6 +259,28 @@ class TestPartitionedAssetTimetable:
         assert isinstance(timetable.default_partition_mapper, IdentityMapper)
         assert isinstance(timetable.partition_mapper_config[ser_asset], 
IdentityMapper)
 
+    @pytest.mark.parametrize(
+        "asset_like",
+        [
+            Asset(name="daily", uri="s3://bucket/daily"),
+            Asset.ref(name="daily"),
+        ],
+    )
+    def test_decode_partition_date_raises_on_invalid_key(self, asset_like):
+        timetable = PartitionedAssetTimetable(
+            assets=asset_like,
+            default_partition_mapper=StartOfDayMapper(),
+        )
+        with pytest.raises(InvalidPartitionKeyError, match="is invalid for 
this timetable's mappers"):
+            timetable._decode_partition_date("not-a-date")
+
+    def test_decode_partition_date_returns_period_start_for_valid_key(self):
+        timetable = PartitionedAssetTimetable(
+            assets=Asset(name="daily", uri="s3://bucket/daily"),
+            default_partition_mapper=StartOfDayMapper(),
+        )
+        assert timetable._decode_partition_date("2025-01-01") == 
pendulum.datetime(2025, 1, 1, tz="UTC")
+
     def 
test_partitioned_asset_timetable_resolve_day_bound_returns_midnight_utc(self):
         """PartitionedAssetTimetable has no local timezone; resolve_day_bound 
uses the base default.
 

Reply via email to