This is an automated email from the ASF dual-hosted git repository.
Lee-W pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 88a1b329a0e Validate partition keys are non-empty and within column
length (#68443)
88a1b329a0e is described below
commit 88a1b329a0ee9c240bd4c9f3facbc5564498de65
Author: Wei Lee <[email protected]>
AuthorDate: Mon Jun 15 20:35:22 2026 +0800
Validate partition keys are non-empty and within column length (#68443)
---
.../execution_api/routes/task_instances.py | 35 +++++++++++++-
airflow-core/src/airflow/exceptions.py | 4 ++
.../src/airflow/serialization/definitions/dag.py | 34 +++++++++----
.../core_api/routes/public/test_dag_run.py | 56 ++++++++++++++++++++++
.../versions/head/test_task_instances.py | 40 ++++++++++++++++
airflow-core/tests/unit/models/test_dag.py | 34 ++++++++++++-
task-sdk/src/airflow/sdk/execution_time/context.py | 19 +++++++-
.../tests/task_sdk/execution_time/test_context.py | 35 ++++++++++++++
8 files changed, 246 insertions(+), 11 deletions(-)
diff --git
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
index 7a53a9ddf59..3deccf35be6 100644
---
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
+++
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
@@ -76,8 +76,9 @@ from airflow.api_fastapi.execution_api.security import (
require_auth,
)
from airflow.configuration import conf
-from airflow.exceptions import TaskNotFound
+from airflow.exceptions import InvalidPartitionKeyError, TaskNotFound
from airflow.models.asset import AssetActive
+from airflow.models.base import ID_LEN
from airflow.models.dag import DagModel
from airflow.models.dagrun import DagRun as DR
from airflow.models.hitl import HITLDetail
@@ -423,6 +424,17 @@ def ti_update_state(
},
)
+ # Validate outlet event partition keys early, before entering the catch-all
+ # except block that would otherwise swallow the HTTPException and mark the
TI failed.
+ if isinstance(ti_patch_payload, TISuccessStatePayload):
+ try:
+
_validate_outlet_event_partition_keys(ti_patch_payload.outlet_events)
+ except InvalidPartitionKeyError as e:
+ raise HTTPException(
+ status_code=HTTP_422_UNPROCESSABLE_CONTENT,
+ detail={"reason": "invalid_partition_key", "message": str(e)},
+ ) from e
+
# We exclude_unset to avoid updating fields that are not set in the payload
data = ti_patch_payload.model_dump(
exclude={"task_outlets", "outlet_events", "retry_delay_seconds",
"retry_reason"},
@@ -571,6 +583,27 @@ def _handle_fail_fast_for_dag(ti: TI, dag_id: str,
session: SessionDep, dag_bag:
_stop_remaining_tasks(task_instance=ti,
task_teardown_map=task_teardown_map, session=session)
+def _validate_outlet_event_partition_keys(outlet_events: list[dict[str, Any]])
-> None:
+ """
+ Validate partition_key values embedded in outlet events.
+
+ Raises ``InvalidPartitionKeyError`` (which the caller translates to HTTP
422)
+ if any per-emission partition key is empty/whitespace-only or exceeds the
+ ``ID_LEN`` column width used in the metadata database.
+ """
+ for event in outlet_events:
+ if (pk := event.get("partition_key")) is None:
+ continue
+ if not pk.strip():
+ raise InvalidPartitionKeyError(
+ f"partition_key in outlet event must not be empty or
whitespace-only; got {pk!r}."
+ )
+ if len(pk) > ID_LEN:
+ raise InvalidPartitionKeyError(
+ f"partition_key in outlet event must be at most {ID_LEN}
characters; got {len(pk)}."
+ )
+
+
def _create_ti_state_update_query_and_update_state(
*,
ti_patch_payload: TIStateUpdate,
diff --git a/airflow-core/src/airflow/exceptions.py
b/airflow-core/src/airflow/exceptions.py
index 2ac186bebd4..ff45daebc02 100644
--- a/airflow-core/src/airflow/exceptions.py
+++ b/airflow-core/src/airflow/exceptions.py
@@ -145,6 +145,10 @@ class DagNotPartitionedError(ValueError):
"""Raise when a partition_key is supplied for a Dag that is not
partitioned."""
+class InvalidPartitionKeyError(ValueError):
+ """Raise when a partition_key value is empty or exceeds the maximum
allowed length."""
+
+
class DagRunAlreadyExists(AirflowBadRequest):
"""Raise when creating a DAG run for DAG which already has DAG run
entry."""
diff --git a/airflow-core/src/airflow/serialization/definitions/dag.py
b/airflow-core/src/airflow/serialization/definitions/dag.py
index d4de2a2c479..77f440987f5 100644
--- a/airflow-core/src/airflow/serialization/definitions/dag.py
+++ b/airflow-core/src/airflow/serialization/definitions/dag.py
@@ -33,7 +33,14 @@ from sqlalchemy import func, or_, select, tuple_
from airflow._shared.observability.metrics import stats
from airflow._shared.timezones.timezone import coerce_datetime
from airflow.configuration import conf as airflow_conf
-from airflow.exceptions import AirflowException, DagNotPartitionedError,
NodeNotFound, TaskNotFound
+from airflow.exceptions import (
+ AirflowException,
+ DagNotPartitionedError,
+ InvalidPartitionKeyError,
+ NodeNotFound,
+ TaskNotFound,
+)
+from airflow.models.base import ID_LEN
from airflow.models.dag import DagModel
from airflow.models.dag_version import DagVersion
from airflow.models.dagbundle import DagBundleModel
@@ -526,23 +533,34 @@ class SerializedDAG:
def validate_partition_key(self, partition_key: str | None) -> None:
"""
- Raise ``DagNotPartitionedError`` if a partition key is supplied for a
non-partitioned Dag.
+ Validate a partition key against Dag partitioning state and content
constraints.
- A ``None`` value is always accepted. A non-``None`` value is accepted
only when the
+ A ``None`` value is always accepted. A non-``None`` value must be a
``str``,
+ non-empty, at most ``ID_LEN`` characters long, and may only be
supplied when the
Dag's timetable sets ``partitioned=True`` or
``partitioned_at_runtime=True``.
:param partition_key: The partition key to validate, or ``None`` to
skip validation.
:raises DagNotPartitionedError: When ``partition_key`` is not ``None``
and the Dag's
timetable is neither ``partitioned`` nor
``partitioned_at_runtime``.
+ :raises InvalidPartitionKeyError: When ``partition_key`` is not a
``str``, is an empty
+ string, or exceeds ``ID_LEN`` characters.
"""
- if (
- partition_key is not None
- and not self.timetable.partitioned
- and not self.timetable.partitioned_at_runtime
- ):
+ if partition_key is None:
+ return
+ if not self.timetable.partitioned and not
self.timetable.partitioned_at_runtime:
raise DagNotPartitionedError(
f"Dag '{self.dag_id}' is not a partitioned Dag and does not
accept a partition_key."
)
+ if not isinstance(partition_key, str):
+ raise InvalidPartitionKeyError(
+ f"Expected partition_key to be a `str` or `None` but got
`{type(partition_key).__name__}`"
+ )
+ if not partition_key.strip():
+ raise InvalidPartitionKeyError("partition_key must not be empty or
whitespace-only.")
+ if len(partition_key) > ID_LEN:
+ raise InvalidPartitionKeyError(
+ f"partition_key must be at most {ID_LEN} characters; got
{len(partition_key)}."
+ )
@provide_session
def create_dagrun(
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
index 653d7ba8d8f..b975353163c 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
@@ -2804,6 +2804,62 @@ class TestTriggerDagRun:
)
assert response.status_code == 200
+ def test_should_respond_400_when_empty_partition_key(self, test_client):
+ """An empty partition_key must return 400, not 500."""
+ now = timezone.utcnow().isoformat()
+ response = test_client.post(
+ f"/dags/{DAG1_ID}/dagRuns",
+ json={"logical_date": now, "partition_key": ""},
+ )
+ assert response.status_code == 400
+ assert (
+ response.json()["detail"]
+ == "Dag 'test_dag1' is not a partitioned Dag and does not accept a
partition_key."
+ )
+
+ @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
+ def test_should_respond_400_when_over_length_partition_key(self,
dag_maker, test_client, session):
+ """A partition_key exceeding 250 characters must return 400, not
500."""
+ partitioned_dag_id = "test_over_length_partition_key"
+ with dag_maker(
+ dag_id=partitioned_dag_id,
+ schedule=CronPartitionTimetable("0 * * * *", timezone="UTC"),
+ start_date=START_DATE1,
+ session=session,
+ serialized=True,
+ ):
+ EmptyOperator(task_id="task")
+
+ session.commit()
+
+ response = test_client.post(
+ f"/dags/{partitioned_dag_id}/dagRuns",
+ json={"logical_date": None, "partition_key": "a" * 251},
+ )
+ assert response.status_code == 400
+ assert "at most 250 characters" in response.json()["detail"]
+
+ @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
+ def test_should_respond_200_when_exactly_max_length_partition_key(self,
dag_maker, test_client, session):
+ """A partition_key of exactly 250 characters must be accepted."""
+ partitioned_dag_id = "test_max_length_partition_key"
+ with dag_maker(
+ dag_id=partitioned_dag_id,
+ schedule=CronPartitionTimetable("0 * * * *", timezone="UTC"),
+ start_date=START_DATE1,
+ session=session,
+ serialized=True,
+ ):
+ EmptyOperator(task_id="task")
+
+ session.commit()
+
+ response = test_client.post(
+ f"/dags/{partitioned_dag_id}/dagRuns",
+ json={"logical_date": None, "partition_key": "a" * 250},
+ )
+ assert response.status_code == 200
+
class TestResolveRunOnLatestVersion:
@pytest.mark.parametrize("explicit_value", [True, False])
diff --git
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
index 0b7e4cd77a2..f990b7008ab 100644
---
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
+++
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
@@ -1331,6 +1331,46 @@ class TestTIUpdateState:
assert events[0].asset == AssetModel(name="my-task",
uri="s3://bucket/my-task", extra={})
assert events[0].extra == expected_extra
+ @pytest.mark.parametrize(
+ ("partition_key", "expected_status"),
+ [
+ pytest.param("a" * 250, 204, id="at_limit_accepted"),
+ pytest.param("a" * 251, 422, id="over_limit_rejected"),
+ pytest.param("", 422, id="empty_rejected"),
+ pytest.param(" ", 422, id="whitespace_rejected"),
+ ],
+ )
+ def test_ti_update_state_to_success_outlet_event_partition_key_validation(
+ self, client, session, create_task_instance, partition_key,
expected_status
+ ):
+ """Outlet event partition_key content is validated server-side;
invalid keys return 422."""
+ ti = create_task_instance(
+ task_id="test_outlet_event_partition_key_validation",
+ start_date=DEFAULT_START_DATE,
+ state=State.RUNNING,
+ )
+ session.commit()
+
+ response = client.patch(
+ f"/execution/task-instances/{ti.id}/state",
+ json={
+ "state": "success",
+ "end_date": DEFAULT_END_DATE.isoformat(),
+ "task_outlets": [],
+ "outlet_events": [
+ {
+ "dest_asset_key": {"name": "my-asset", "uri":
"s3://bucket/my-asset"},
+ "extra": {},
+ "partition_key": partition_key,
+ }
+ ],
+ },
+ )
+ assert response.status_code == expected_status
+ if expected_status == 422:
+ detail = response.json()["detail"]
+ assert detail["reason"] == "invalid_partition_key"
+
def test_ti_update_state_not_found(self, client, session):
"""
Test that a 404 error is returned when the Task Instance does not
exist.
diff --git a/airflow-core/tests/unit/models/test_dag.py
b/airflow-core/tests/unit/models/test_dag.py
index 73628b9919c..f09ab836b6b 100644
--- a/airflow-core/tests/unit/models/test_dag.py
+++ b/airflow-core/tests/unit/models/test_dag.py
@@ -41,7 +41,7 @@ from airflow._shared.timezones import timezone
from airflow._shared.timezones.timezone import datetime as datetime_tz
from airflow.configuration import conf
from airflow.dag_processing.dagbag import BundleDagBag, DagBag
-from airflow.exceptions import AirflowException, DagNotPartitionedError
+from airflow.exceptions import AirflowException, DagNotPartitionedError,
InvalidPartitionKeyError
from airflow.models.asset import (
AssetAliasModel,
AssetDagRunQueue,
@@ -1602,6 +1602,38 @@ class TestDag:
else:
sdag.validate_partition_key(partition_key) # must not raise
+ @pytest.mark.need_serialized_dag
+ @pytest.mark.parametrize(
+ ("partition_key", "exc_type", "match"),
+ [
+ ("", InvalidPartitionKeyError, "empty or whitespace-only"),
+ (" ", InvalidPartitionKeyError, "empty or whitespace-only"),
+ ("a" * 251, InvalidPartitionKeyError, "at most 250 characters"),
+ ],
+ ids=["empty", "whitespace", "over_limit"],
+ )
+ def test_validate_partition_key_rejects_invalid_content(self,
partition_key, exc_type, match, dag_maker):
+ """validate_partition_key raises InvalidPartitionKeyError for empty or
over-length keys."""
+ with dag_maker(
+ "test_validate_partition_key_content",
+ schedule=CronPartitionTimetable("@daily", timezone="UTC"),
+ ):
+ ...
+ sdag = dag_maker.serialized_dag
+ with pytest.raises(exc_type, match=match):
+ sdag.validate_partition_key(partition_key)
+
+ @pytest.mark.need_serialized_dag
+ def test_validate_partition_key_accepts_exactly_max_length(self,
dag_maker):
+ """validate_partition_key accepts a key of exactly 250 characters."""
+ with dag_maker(
+ "test_validate_partition_key_at_limit",
+ schedule=CronPartitionTimetable("@daily", timezone="UTC"),
+ ):
+ ...
+ sdag = dag_maker.serialized_dag
+ sdag.validate_partition_key("a" * 250) # must not raise
+
def test_dag_add_task_sets_default_task_group(self):
dag = DAG(dag_id="test_dag_add_task_sets_default_task_group",
schedule=None, start_date=DEFAULT_DATE)
task_without_task_group =
EmptyOperator(task_id="task_without_group_id")
diff --git a/task-sdk/src/airflow/sdk/execution_time/context.py
b/task-sdk/src/airflow/sdk/execution_time/context.py
index 92dfa47dd5c..6cbf3dacadb 100644
--- a/task-sdk/src/airflow/sdk/execution_time/context.py
+++ b/task-sdk/src/airflow/sdk/execution_time/context.py
@@ -965,10 +965,27 @@ class OutletEventAccessor(_AssetRefResolutionMixin):
asset_alias_events: list[AssetAliasEvent] = attrs.field(factory=list)
partition_keys: set[str] = attrs.field(factory=set)
+ # Maximum length mirrors the StringID column width used in the metadata
database
+ # (airflow.models.base.ID_LEN = 250).
+ _PARTITION_KEY_MAX_LENGTH: int = 250
+
def add_partitions(self, keys: str | list[str]) -> None:
- """Add one or more partition keys to :attr:`partition_keys`."""
+ """
+ Add one or more partition keys to :attr:`partition_keys`.
+
+ :raises ValueError: If any key is empty/whitespace-only or longer than
+ ``_PARTITION_KEY_MAX_LENGTH`` characters.
+ """
if isinstance(keys, str):
keys = [keys]
+ for key in keys:
+ if not key.strip():
+ raise ValueError(f"partition_key must not be empty or
whitespace-only; got {key!r}.")
+ if len(key) > self._PARTITION_KEY_MAX_LENGTH:
+ raise ValueError(
+ f"partition_key must be at most
{self._PARTITION_KEY_MAX_LENGTH} characters; "
+ f"got {len(key)}."
+ )
self.partition_keys.update(keys)
def add(self, asset: Asset | AssetRef, extra: dict[str, JsonValue] | None
= None) -> None:
diff --git a/task-sdk/tests/task_sdk/execution_time/test_context.py
b/task-sdk/tests/task_sdk/execution_time/test_context.py
index c3bb678b039..6f09489a88d 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_context.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_context.py
@@ -501,6 +501,41 @@ class TestOutletEventAccessorPartitionKeys:
accessor.add_partitions(["us", "eu"])
assert accessor.partition_keys == {"us", "eu"}
+ @pytest.mark.parametrize(
+ "key",
+ [
+ "",
+ " ",
+ "\t",
+ ],
+ ids=["empty", "spaces", "tab"],
+ )
+ def test_add_partitions_rejects_empty_key(self, accessor, key):
+ with pytest.raises(ValueError, match="must not be empty or
whitespace-only"):
+ accessor.add_partitions(key)
+
+ @pytest.mark.parametrize(
+ "key",
+ [
+ "a" * 250,
+ "a" * 251,
+ ],
+ ids=["at_limit_accepted", "over_limit_rejected"],
+ )
+ def test_add_partitions_length_boundary(self, accessor, key):
+ if len(key) <= accessor._PARTITION_KEY_MAX_LENGTH:
+ accessor.add_partitions(key)
+ assert key in accessor.partition_keys
+ else:
+ with pytest.raises(ValueError, match="at most 250 characters"):
+ accessor.add_partitions(key)
+
+ def test_add_partitions_rejects_any_invalid_in_list(self, accessor):
+ """A list with a mix of valid and invalid keys fails before any are
added."""
+ with pytest.raises(ValueError, match="must not be empty or
whitespace-only"):
+ accessor.add_partitions(["us", ""])
+ assert accessor.partition_keys == set()
+
class TestTriggeringAssetEventsAccessor:
@pytest.fixture(autouse=True)