This is an automated email from the ASF dual-hosted git repository.
vatsrahul1001 pushed a commit to branch v3-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-2-test by this push:
new bd425efc1b1 Fix pod_override serialization in DAG details and executor
path (#65407) (#66898)
bd425efc1b1 is described below
commit bd425efc1b1d2aace12c2ff6f2ff0c8a51a7cdf0
Author: Rahul Vats <[email protected]>
AuthorDate: Fri May 15 10:49:04 2026 +0530
Fix pod_override serialization in DAG details and executor path (#65407)
(#66898)
Sanitize default_args["executor_config"]["pod_override"] in DAG
details responses without changing other default_args values,
and make V1Pod serialization force the Kubernetes import path
so executor config no longer falls back to stringification.
This fixes two related Kubernetes serialization bugs: one where
the DAG details API could fail when pod_override was present in
default_args, and another where V1Pod objects could be flattened
into strings before reaching the Kubernetes executor. The updated
tests cover both the API response behavior and the serializer
regression with real Kubernetes pod objects.
(cherry picked from commit ff15983262376fb375a67e23ecbc5c2a32786fa0)
Co-authored-by: Ephraim Anierobi <[email protected]>
---
.../api_fastapi/core_api/datamodels/dags.py | 37 +++++++++
.../airflow/serialization/serialized_objects.py | 6 +-
.../api_fastapi/core_api/datamodels/test_dags.py | 95 ++++++++++++++++++++++
.../unit/serialization/test_serialized_objects.py | 23 ++++++
4 files changed, 160 insertions(+), 1 deletion(-)
diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dags.py
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dags.py
index ea73f1fca13..f98ced89f02 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dags.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dags.py
@@ -33,6 +33,7 @@ from pydantic import (
field_validator,
)
+from airflow._shared.module_loading import qualname
from airflow.api_fastapi.core_api.base import BaseModel, StrictBaseModel,
make_partial_model
from airflow.api_fastapi.core_api.datamodels.dag_tags import DagTagResponse
from airflow.api_fastapi.core_api.datamodels.dag_versions import
DagVersionResponse
@@ -44,6 +45,11 @@ if TYPE_CHECKING:
from airflow.serialization.definitions.param import SerializedParamsDict
+def _is_response_safe_pod_override(value: Any) -> bool:
+ """Whether a pod_override value is already safe to preserve in the
response."""
+ return value is None or isinstance(value, str | int | float | Mapping |
list)
+
+
@cache
def _get_file_token_serializer() -> URLSafeSerializer:
"""
@@ -204,6 +210,37 @@ class DAGDetailsResponse(DAGResponse):
return None
return inspect.cleandoc(doc_md)
+ @field_validator("default_args", mode="before")
+ @classmethod
+ def get_default_args(cls, default_args: Mapping | None) -> Mapping | None:
+ """
+ Sanitize default_args for the API response.
+
+ Targets the common case where ``executor_config["pod_override"]`` is a
+ Kubernetes ``V1Pod``: when the value is not a JSON primitive
+ (``None``/``str``/``int``/``float``) or a ``Mapping``/``list``, it is
+ rewritten to a fully-qualified type-name string so the response stays
+ valid JSON. The container check is shallow — a ``Mapping`` or ``list``
+ whose contents are themselves non-serializable (e.g. nested ``V1Pod``)
+ will still raise during response serialization, as will any other
+ non-JSON values elsewhere in ``default_args``.
+ """
+ if default_args is None:
+ return None
+ executor_config = default_args.get("executor_config")
+ if not (isinstance(executor_config, Mapping) and "pod_override" in
executor_config):
+ return default_args
+
+ pod_override = executor_config["pod_override"]
+ if _is_response_safe_pod_override(pod_override):
+ return default_args
+
+ sanitized_executor_config = dict(executor_config)
+ sanitized_executor_config["pod_override"] = qualname(pod_override)
+ result = dict(default_args)
+ result["executor_config"] = sanitized_executor_config
+ return result
+
@field_validator("params", mode="before")
@classmethod
def get_params(cls, params: SerializedParamsDict | None) -> dict | None:
diff --git a/airflow-core/src/airflow/serialization/serialized_objects.py
b/airflow-core/src/airflow/serialization/serialized_objects.py
index 9a0c7ab11d7..a0295dc2370 100644
--- a/airflow-core/src/airflow/serialization/serialized_objects.py
+++ b/airflow-core/src/airflow/serialization/serialized_objects.py
@@ -491,7 +491,11 @@ class BaseSerialization:
)
elif isinstance(var, list):
return [cls.serialize(v, strict=strict) for v in var]
- elif var.__class__.__name__ == "V1Pod" and _has_kubernetes() and
isinstance(var, k8s.V1Pod):
+ elif (
+ var.__class__.__name__ == "V1Pod"
+ and _has_kubernetes(attempt_import=True)
+ and isinstance(var, k8s.V1Pod)
+ ):
json_pod = PodGenerator.serialize_pod(var)
return cls._encode(json_pod, type_=DAT.POD)
elif isinstance(var, OutletEventAccessors):
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/datamodels/test_dags.py
b/airflow-core/tests/unit/api_fastapi/core_api/datamodels/test_dags.py
new file mode 100644
index 00000000000..5e9a226d7c4
--- /dev/null
+++ b/airflow-core/tests/unit/api_fastapi/core_api/datamodels/test_dags.py
@@ -0,0 +1,95 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from datetime import datetime, timedelta, timezone
+
+import pytest
+
+from airflow._shared.module_loading import qualname
+from airflow.api_fastapi.core_api.datamodels.dags import DAGDetailsResponse
+
+
+class TestGetDefaultArgsValidator:
+ """Test the get_default_args field_validator on DAGDetailsResponse."""
+
+ def _call_validator(self, value):
+ """Invoke the classmethod validator directly."""
+ return DAGDetailsResponse.get_default_args(value)
+
+ def test_none_returns_none(self):
+ assert self._call_validator(None) is None
+
+ def test_plain_dict_is_preserved(self):
+ result = self._call_validator({"retries": 3, "depends_on_past": False})
+ assert result == {"retries": 3, "depends_on_past": False}
+
+ def test_timedelta_values_are_preserved(self):
+ td = timedelta(minutes=5)
+ result = self._call_validator({"retry_delay": td})
+ assert result == {"retry_delay": td}
+
+ def test_datetime_values_are_preserved(self):
+ start_date = datetime(2024, 1, 1, tzinfo=timezone.utc)
+ result = self._call_validator({"start_date": start_date})
+ assert result == {"start_date": start_date}
+
+ def test_pod_override_is_replaced_with_type_name(self):
+ k8s = pytest.importorskip("kubernetes.client.models")
+ pod = k8s.V1Pod(metadata=k8s.V1ObjectMeta(name="test-pod"))
+ result = self._call_validator({"executor_config": {"pod_override":
pod, "namespace": "custom"}})
+ assert result == {"executor_config": {"pod_override": qualname(pod),
"namespace": "custom"}}
+
+ @pytest.mark.parametrize(
+ "pod_override",
+ [
+ pytest.param(None, id="none"),
+ pytest.param("already-serialized", id="string"),
+ pytest.param({"metadata": {"name": "pod"}}, id="dict"),
+ pytest.param([{"metadata": {"name": "pod"}}], id="list"),
+ ],
+ )
+ def test_serialized_pod_override_values_are_preserved(self, pod_override):
+ result = self._call_validator({"executor_config": {"pod_override":
pod_override}})
+ assert result == {"executor_config": {"pod_override": pod_override}}
+
+ def
test_serialized_pod_override_preserves_other_executor_config_keys(self):
+ executor_config = {
+ "pod_override": {"metadata": {"name": "pod"}},
+ "KubernetesExecutor": {"image": "custom-image"},
+ }
+
+ result = self._call_validator({"executor_config": executor_config})
+
+ assert result == {"executor_config": executor_config}
+
+ def
test_non_serialized_pod_override_object_is_replaced_with_type_name(self):
+ class Opaque:
+ pass
+
+ value = Opaque()
+ result = self._call_validator({"executor_config": {"pod_override":
value}})
+ assert result == {"executor_config": {"pod_override": qualname(value)}}
+
+ def test_non_pod_override_objects_are_left_unchanged(self):
+ class Opaque:
+ def to_dict(self):
+ return {"password": "secret"}
+
+ value = Opaque()
+ result = self._call_validator({"connection": value})
+ assert result["connection"] is value
diff --git a/airflow-core/tests/unit/serialization/test_serialized_objects.py
b/airflow-core/tests/unit/serialization/test_serialized_objects.py
index f59939be9b2..c117561273b 100644
--- a/airflow-core/tests/unit/serialization/test_serialized_objects.py
+++ b/airflow-core/tests/unit/serialization/test_serialized_objects.py
@@ -86,6 +86,7 @@ from airflow.sdk.definitions.operator_resources import
Resources
from airflow.sdk.definitions.param import Param
from airflow.sdk.definitions.taskgroup import TaskGroup
from airflow.sdk.execution_time.context import OutletEventAccessor,
OutletEventAccessors
+from airflow.serialization import serialized_objects
from airflow.serialization.definitions.assets import (
SerializedAsset,
SerializedAssetAlias,
@@ -1049,3 +1050,25 @@ class TestKubernetesImportAvoidance:
result = _has_kubernetes()
assert result is True
+
+ def test_serialize_v1pod_attempts_import_before_serializing(self,
monkeypatch):
+ """Regression test: V1Pod serialization must call
_has_kubernetes(attempt_import=True)."""
+ k8s = pytest.importorskip("kubernetes.client.models")
+ from airflow.providers.cncf.kubernetes.pod_generator import
PodGenerator
+
+ calls = []
+
+ def fake_has_kubernetes(*, attempt_import=False):
+ calls.append(attempt_import)
+ return True
+
+ monkeypatch.setattr(serialized_objects, "_has_kubernetes",
fake_has_kubernetes)
+ monkeypatch.setattr(serialized_objects, "k8s", k8s, raising=False)
+ monkeypatch.setattr(serialized_objects, "PodGenerator", PodGenerator,
raising=False)
+
+ pod = k8s.V1Pod(metadata=k8s.V1ObjectMeta(name="test-pod"))
+ result = BaseSerialization.serialize(pod)
+
+ assert isinstance(result, dict), "V1Pod should serialize to a dict,
not a string"
+ assert result.get(Encoding.TYPE) == DAT.POD, "V1Pod should have type
DAT.POD"
+ assert True in calls