This is an automated email from the ASF dual-hosted git repository.

Lee-W pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 88a1b329a0e Validate partition keys are non-empty and within column 
length (#68443)
88a1b329a0e is described below

commit 88a1b329a0ee9c240bd4c9f3facbc5564498de65
Author: Wei Lee <[email protected]>
AuthorDate: Mon Jun 15 20:35:22 2026 +0800

    Validate partition keys are non-empty and within column length (#68443)
---
 .../execution_api/routes/task_instances.py         | 35 +++++++++++++-
 airflow-core/src/airflow/exceptions.py             |  4 ++
 .../src/airflow/serialization/definitions/dag.py   | 34 +++++++++----
 .../core_api/routes/public/test_dag_run.py         | 56 ++++++++++++++++++++++
 .../versions/head/test_task_instances.py           | 40 ++++++++++++++++
 airflow-core/tests/unit/models/test_dag.py         | 34 ++++++++++++-
 task-sdk/src/airflow/sdk/execution_time/context.py | 19 +++++++-
 .../tests/task_sdk/execution_time/test_context.py  | 35 ++++++++++++++
 8 files changed, 246 insertions(+), 11 deletions(-)

diff --git 
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py 
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
index 7a53a9ddf59..3deccf35be6 100644
--- 
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
+++ 
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
@@ -76,8 +76,9 @@ from airflow.api_fastapi.execution_api.security import (
     require_auth,
 )
 from airflow.configuration import conf
-from airflow.exceptions import TaskNotFound
+from airflow.exceptions import InvalidPartitionKeyError, TaskNotFound
 from airflow.models.asset import AssetActive
+from airflow.models.base import ID_LEN
 from airflow.models.dag import DagModel
 from airflow.models.dagrun import DagRun as DR
 from airflow.models.hitl import HITLDetail
@@ -423,6 +424,17 @@ def ti_update_state(
             },
         )
 
+    # Validate outlet event partition keys early, before entering the catch-all
+    # except block that would otherwise swallow the HTTPException and mark the 
TI failed.
+    if isinstance(ti_patch_payload, TISuccessStatePayload):
+        try:
+            
_validate_outlet_event_partition_keys(ti_patch_payload.outlet_events)
+        except InvalidPartitionKeyError as e:
+            raise HTTPException(
+                status_code=HTTP_422_UNPROCESSABLE_CONTENT,
+                detail={"reason": "invalid_partition_key", "message": str(e)},
+            ) from e
+
     # We exclude_unset to avoid updating fields that are not set in the payload
     data = ti_patch_payload.model_dump(
         exclude={"task_outlets", "outlet_events", "retry_delay_seconds", 
"retry_reason"},
@@ -571,6 +583,27 @@ def _handle_fail_fast_for_dag(ti: TI, dag_id: str, 
session: SessionDep, dag_bag:
         _stop_remaining_tasks(task_instance=ti, 
task_teardown_map=task_teardown_map, session=session)
 
 
+def _validate_outlet_event_partition_keys(outlet_events: list[dict[str, Any]]) 
-> None:
+    """
+    Validate partition_key values embedded in outlet events.
+
+    Raises ``InvalidPartitionKeyError`` (which the caller translates to HTTP 
422)
+    if any per-emission partition key is empty/whitespace-only or exceeds the
+    ``ID_LEN`` column width used in the metadata database.
+    """
+    for event in outlet_events:
+        if (pk := event.get("partition_key")) is None:
+            continue
+        if not pk.strip():
+            raise InvalidPartitionKeyError(
+                f"partition_key in outlet event must not be empty or 
whitespace-only; got {pk!r}."
+            )
+        if len(pk) > ID_LEN:
+            raise InvalidPartitionKeyError(
+                f"partition_key in outlet event must be at most {ID_LEN} 
characters; got {len(pk)}."
+            )
+
+
 def _create_ti_state_update_query_and_update_state(
     *,
     ti_patch_payload: TIStateUpdate,
diff --git a/airflow-core/src/airflow/exceptions.py 
b/airflow-core/src/airflow/exceptions.py
index 2ac186bebd4..ff45daebc02 100644
--- a/airflow-core/src/airflow/exceptions.py
+++ b/airflow-core/src/airflow/exceptions.py
@@ -145,6 +145,10 @@ class DagNotPartitionedError(ValueError):
     """Raise when a partition_key is supplied for a Dag that is not 
partitioned."""
 
 
+class InvalidPartitionKeyError(ValueError):
+    """Raise when a partition_key value is empty or exceeds the maximum 
allowed length."""
+
+
 class DagRunAlreadyExists(AirflowBadRequest):
     """Raise when creating a DAG run for DAG which already has DAG run 
entry."""
 
diff --git a/airflow-core/src/airflow/serialization/definitions/dag.py 
b/airflow-core/src/airflow/serialization/definitions/dag.py
index d4de2a2c479..77f440987f5 100644
--- a/airflow-core/src/airflow/serialization/definitions/dag.py
+++ b/airflow-core/src/airflow/serialization/definitions/dag.py
@@ -33,7 +33,14 @@ from sqlalchemy import func, or_, select, tuple_
 from airflow._shared.observability.metrics import stats
 from airflow._shared.timezones.timezone import coerce_datetime
 from airflow.configuration import conf as airflow_conf
-from airflow.exceptions import AirflowException, DagNotPartitionedError, 
NodeNotFound, TaskNotFound
+from airflow.exceptions import (
+    AirflowException,
+    DagNotPartitionedError,
+    InvalidPartitionKeyError,
+    NodeNotFound,
+    TaskNotFound,
+)
+from airflow.models.base import ID_LEN
 from airflow.models.dag import DagModel
 from airflow.models.dag_version import DagVersion
 from airflow.models.dagbundle import DagBundleModel
@@ -526,23 +533,34 @@ class SerializedDAG:
 
     def validate_partition_key(self, partition_key: str | None) -> None:
         """
-        Raise ``DagNotPartitionedError`` if a partition key is supplied for a 
non-partitioned Dag.
+        Validate a partition key against Dag partitioning state and content 
constraints.
 
-        A ``None`` value is always accepted. A non-``None`` value is accepted 
only when the
+        A ``None`` value is always accepted. A non-``None`` value must be a 
``str``,
+        non-empty, at most ``ID_LEN`` characters long, and may only be 
supplied when the
         Dag's timetable sets ``partitioned=True`` or 
``partitioned_at_runtime=True``.
 
         :param partition_key: The partition key to validate, or ``None`` to 
skip validation.
         :raises DagNotPartitionedError: When ``partition_key`` is not ``None`` 
and the Dag's
             timetable is neither ``partitioned`` nor 
``partitioned_at_runtime``.
+        :raises InvalidPartitionKeyError: When ``partition_key`` is not a 
``str``, is an empty
+            string, or exceeds ``ID_LEN`` characters.
         """
-        if (
-            partition_key is not None
-            and not self.timetable.partitioned
-            and not self.timetable.partitioned_at_runtime
-        ):
+        if partition_key is None:
+            return
+        if not self.timetable.partitioned and not 
self.timetable.partitioned_at_runtime:
             raise DagNotPartitionedError(
                 f"Dag '{self.dag_id}' is not a partitioned Dag and does not 
accept a partition_key."
             )
+        if not isinstance(partition_key, str):
+            raise InvalidPartitionKeyError(
+                f"Expected partition_key to be a `str` or `None` but got 
`{type(partition_key).__name__}`"
+            )
+        if not partition_key.strip():
+            raise InvalidPartitionKeyError("partition_key must not be empty or 
whitespace-only.")
+        if len(partition_key) > ID_LEN:
+            raise InvalidPartitionKeyError(
+                f"partition_key must be at most {ID_LEN} characters; got 
{len(partition_key)}."
+            )
 
     @provide_session
     def create_dagrun(
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
index 653d7ba8d8f..b975353163c 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
@@ -2804,6 +2804,62 @@ class TestTriggerDagRun:
         )
         assert response.status_code == 200
 
+    def test_should_respond_400_when_empty_partition_key(self, test_client):
+        """An empty partition_key must return 400, not 500."""
+        now = timezone.utcnow().isoformat()
+        response = test_client.post(
+            f"/dags/{DAG1_ID}/dagRuns",
+            json={"logical_date": now, "partition_key": ""},
+        )
+        assert response.status_code == 400
+        assert (
+            response.json()["detail"]
+            == "Dag 'test_dag1' is not a partitioned Dag and does not accept a 
partition_key."
+        )
+
+    @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
+    def test_should_respond_400_when_over_length_partition_key(self, 
dag_maker, test_client, session):
+        """A partition_key exceeding 250 characters must return 400, not 
500."""
+        partitioned_dag_id = "test_over_length_partition_key"
+        with dag_maker(
+            dag_id=partitioned_dag_id,
+            schedule=CronPartitionTimetable("0 * * * *", timezone="UTC"),
+            start_date=START_DATE1,
+            session=session,
+            serialized=True,
+        ):
+            EmptyOperator(task_id="task")
+
+        session.commit()
+
+        response = test_client.post(
+            f"/dags/{partitioned_dag_id}/dagRuns",
+            json={"logical_date": None, "partition_key": "a" * 251},
+        )
+        assert response.status_code == 400
+        assert "at most 250 characters" in response.json()["detail"]
+
+    @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
+    def test_should_respond_200_when_exactly_max_length_partition_key(self, 
dag_maker, test_client, session):
+        """A partition_key of exactly 250 characters must be accepted."""
+        partitioned_dag_id = "test_max_length_partition_key"
+        with dag_maker(
+            dag_id=partitioned_dag_id,
+            schedule=CronPartitionTimetable("0 * * * *", timezone="UTC"),
+            start_date=START_DATE1,
+            session=session,
+            serialized=True,
+        ):
+            EmptyOperator(task_id="task")
+
+        session.commit()
+
+        response = test_client.post(
+            f"/dags/{partitioned_dag_id}/dagRuns",
+            json={"logical_date": None, "partition_key": "a" * 250},
+        )
+        assert response.status_code == 200
+
 
 class TestResolveRunOnLatestVersion:
     @pytest.mark.parametrize("explicit_value", [True, False])
diff --git 
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
 
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
index 0b7e4cd77a2..f990b7008ab 100644
--- 
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
+++ 
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
@@ -1331,6 +1331,46 @@ class TestTIUpdateState:
             assert events[0].asset == AssetModel(name="my-task", 
uri="s3://bucket/my-task", extra={})
             assert events[0].extra == expected_extra
 
+    @pytest.mark.parametrize(
+        ("partition_key", "expected_status"),
+        [
+            pytest.param("a" * 250, 204, id="at_limit_accepted"),
+            pytest.param("a" * 251, 422, id="over_limit_rejected"),
+            pytest.param("", 422, id="empty_rejected"),
+            pytest.param("   ", 422, id="whitespace_rejected"),
+        ],
+    )
+    def test_ti_update_state_to_success_outlet_event_partition_key_validation(
+        self, client, session, create_task_instance, partition_key, 
expected_status
+    ):
+        """Outlet event partition_key content is validated server-side; 
invalid keys return 422."""
+        ti = create_task_instance(
+            task_id="test_outlet_event_partition_key_validation",
+            start_date=DEFAULT_START_DATE,
+            state=State.RUNNING,
+        )
+        session.commit()
+
+        response = client.patch(
+            f"/execution/task-instances/{ti.id}/state",
+            json={
+                "state": "success",
+                "end_date": DEFAULT_END_DATE.isoformat(),
+                "task_outlets": [],
+                "outlet_events": [
+                    {
+                        "dest_asset_key": {"name": "my-asset", "uri": 
"s3://bucket/my-asset"},
+                        "extra": {},
+                        "partition_key": partition_key,
+                    }
+                ],
+            },
+        )
+        assert response.status_code == expected_status
+        if expected_status == 422:
+            detail = response.json()["detail"]
+            assert detail["reason"] == "invalid_partition_key"
+
     def test_ti_update_state_not_found(self, client, session):
         """
         Test that a 404 error is returned when the Task Instance does not 
exist.
diff --git a/airflow-core/tests/unit/models/test_dag.py 
b/airflow-core/tests/unit/models/test_dag.py
index 73628b9919c..f09ab836b6b 100644
--- a/airflow-core/tests/unit/models/test_dag.py
+++ b/airflow-core/tests/unit/models/test_dag.py
@@ -41,7 +41,7 @@ from airflow._shared.timezones import timezone
 from airflow._shared.timezones.timezone import datetime as datetime_tz
 from airflow.configuration import conf
 from airflow.dag_processing.dagbag import BundleDagBag, DagBag
-from airflow.exceptions import AirflowException, DagNotPartitionedError
+from airflow.exceptions import AirflowException, DagNotPartitionedError, 
InvalidPartitionKeyError
 from airflow.models.asset import (
     AssetAliasModel,
     AssetDagRunQueue,
@@ -1602,6 +1602,38 @@ class TestDag:
         else:
             sdag.validate_partition_key(partition_key)  # must not raise
 
+    @pytest.mark.need_serialized_dag
+    @pytest.mark.parametrize(
+        ("partition_key", "exc_type", "match"),
+        [
+            ("", InvalidPartitionKeyError, "empty or whitespace-only"),
+            ("   ", InvalidPartitionKeyError, "empty or whitespace-only"),
+            ("a" * 251, InvalidPartitionKeyError, "at most 250 characters"),
+        ],
+        ids=["empty", "whitespace", "over_limit"],
+    )
+    def test_validate_partition_key_rejects_invalid_content(self, 
partition_key, exc_type, match, dag_maker):
+        """validate_partition_key raises InvalidPartitionKeyError for empty or 
over-length keys."""
+        with dag_maker(
+            "test_validate_partition_key_content",
+            schedule=CronPartitionTimetable("@daily", timezone="UTC"),
+        ):
+            ...
+        sdag = dag_maker.serialized_dag
+        with pytest.raises(exc_type, match=match):
+            sdag.validate_partition_key(partition_key)
+
+    @pytest.mark.need_serialized_dag
+    def test_validate_partition_key_accepts_exactly_max_length(self, 
dag_maker):
+        """validate_partition_key accepts a key of exactly 250 characters."""
+        with dag_maker(
+            "test_validate_partition_key_at_limit",
+            schedule=CronPartitionTimetable("@daily", timezone="UTC"),
+        ):
+            ...
+        sdag = dag_maker.serialized_dag
+        sdag.validate_partition_key("a" * 250)  # must not raise
+
     def test_dag_add_task_sets_default_task_group(self):
         dag = DAG(dag_id="test_dag_add_task_sets_default_task_group", 
schedule=None, start_date=DEFAULT_DATE)
         task_without_task_group = 
EmptyOperator(task_id="task_without_group_id")
diff --git a/task-sdk/src/airflow/sdk/execution_time/context.py 
b/task-sdk/src/airflow/sdk/execution_time/context.py
index 92dfa47dd5c..6cbf3dacadb 100644
--- a/task-sdk/src/airflow/sdk/execution_time/context.py
+++ b/task-sdk/src/airflow/sdk/execution_time/context.py
@@ -965,10 +965,27 @@ class OutletEventAccessor(_AssetRefResolutionMixin):
     asset_alias_events: list[AssetAliasEvent] = attrs.field(factory=list)
     partition_keys: set[str] = attrs.field(factory=set)
 
+    # Maximum length mirrors the StringID column width used in the metadata 
database
+    # (airflow.models.base.ID_LEN = 250).
+    _PARTITION_KEY_MAX_LENGTH: int = 250
+
     def add_partitions(self, keys: str | list[str]) -> None:
-        """Add one or more partition keys to :attr:`partition_keys`."""
+        """
+        Add one or more partition keys to :attr:`partition_keys`.
+
+        :raises ValueError: If any key is empty/whitespace-only or longer than
+            ``_PARTITION_KEY_MAX_LENGTH`` characters.
+        """
         if isinstance(keys, str):
             keys = [keys]
+        for key in keys:
+            if not key.strip():
+                raise ValueError(f"partition_key must not be empty or 
whitespace-only; got {key!r}.")
+            if len(key) > self._PARTITION_KEY_MAX_LENGTH:
+                raise ValueError(
+                    f"partition_key must be at most 
{self._PARTITION_KEY_MAX_LENGTH} characters; "
+                    f"got {len(key)}."
+                )
         self.partition_keys.update(keys)
 
     def add(self, asset: Asset | AssetRef, extra: dict[str, JsonValue] | None 
= None) -> None:
diff --git a/task-sdk/tests/task_sdk/execution_time/test_context.py 
b/task-sdk/tests/task_sdk/execution_time/test_context.py
index c3bb678b039..6f09489a88d 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_context.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_context.py
@@ -501,6 +501,41 @@ class TestOutletEventAccessorPartitionKeys:
         accessor.add_partitions(["us", "eu"])
         assert accessor.partition_keys == {"us", "eu"}
 
+    @pytest.mark.parametrize(
+        "key",
+        [
+            "",
+            "   ",
+            "\t",
+        ],
+        ids=["empty", "spaces", "tab"],
+    )
+    def test_add_partitions_rejects_empty_key(self, accessor, key):
+        with pytest.raises(ValueError, match="must not be empty or 
whitespace-only"):
+            accessor.add_partitions(key)
+
+    @pytest.mark.parametrize(
+        "key",
+        [
+            "a" * 250,
+            "a" * 251,
+        ],
+        ids=["at_limit_accepted", "over_limit_rejected"],
+    )
+    def test_add_partitions_length_boundary(self, accessor, key):
+        if len(key) <= accessor._PARTITION_KEY_MAX_LENGTH:
+            accessor.add_partitions(key)
+            assert key in accessor.partition_keys
+        else:
+            with pytest.raises(ValueError, match="at most 250 characters"):
+                accessor.add_partitions(key)
+
+    def test_add_partitions_rejects_any_invalid_in_list(self, accessor):
+        """A list with a mix of valid and invalid keys fails before any are 
added."""
+        with pytest.raises(ValueError, match="must not be empty or 
whitespace-only"):
+            accessor.add_partitions(["us", ""])
+        assert accessor.partition_keys == set()
+
 
 class TestTriggeringAssetEventsAccessor:
     @pytest.fixture(autouse=True)

Reply via email to