This is an automated email from the ASF dual-hosted git repository.

ephraimbuddy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new e7619316995 Fix KubernetesExecutor scheduler crash from unpicklable 
pod_override (#68831)
e7619316995 is described below

commit e76193169953de0d4b7c24ebb617e4392d470a5f
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Tue Jun 23 13:34:14 2026 +0100

    Fix KubernetesExecutor scheduler crash from unpicklable pod_override 
(#68831)
    
    * Fix KubernetesExecutor scheduler crash from unpicklable pod_override
    
    When the scheduler runs in-cluster, the kubernetes client installs a
    process-global default Configuration whose refresh_api_key_hook is a local
    closure. Under kubernetes-client v36, deserializing a V1Pod copies that
    Configuration onto the pod and every nested model. A task that sets a V1Pod
    pod_override therefore produces a pod that pickle cannot serialize, and the
    KubernetesExecutor pickles each task onto a multiprocessing queue
    synchronously in the scheduler loop, so the scheduler crashes in a loop and 
no
    task can be launched.
    
    Deserialize pods through an ApiClient built with a fresh Configuration so 
that
    neither the pod nor any nested model captures the in-cluster global. This is
    applied where the config gets attached -- airflow-core's serialization and
    unpickling repair paths -- so every consumer of a deserialized pod stays
    picklable.
    
    * Address review feedback on pod deserialization fix
    
    Move the deserialize_pod_dict import to module top-level, hoist pickle and
    Configuration to top-level imports in the serialization test, and drop the
    redundant _has_kubernetes cache_clear calls that were copied from a sibling
    test.
---
 .../airflow/serialization/serialized_objects.py    |  5 ++-
 airflow-core/src/airflow/utils/sqlalchemy.py       | 26 +++++++++++++---
 .../unit/serialization/test_serialized_objects.py  | 36 +++++++++++++++++++++-
 airflow-core/tests/unit/utils/test_sqlalchemy.py   | 36 +++++++++++++++++++++-
 4 files changed, 94 insertions(+), 9 deletions(-)

diff --git a/airflow-core/src/airflow/serialization/serialized_objects.py 
b/airflow-core/src/airflow/serialization/serialized_objects.py
index 14bbd343359..4ba0bbefdce 100644
--- a/airflow-core/src/airflow/serialization/serialized_objects.py
+++ b/airflow-core/src/airflow/serialization/serialized_objects.py
@@ -109,6 +109,7 @@ from airflow.timetables.base import DagRunInfo, Timetable
 from airflow.triggers.base import StartTriggerArgs
 from airflow.utils.code_utils import get_python_source
 from airflow.utils.db import LazySelectSequence
+from airflow.utils.sqlalchemy import deserialize_pod_dict
 
 if TYPE_CHECKING:
     from inspect import Parameter
@@ -643,9 +644,7 @@ class BaseSerialization:
                     "Cannot deserialize POD objects without kubernetes 
libraries. "
                     "Please install the `kubernetes` package."
                 )
-            # kubernetes-client does not expose a public dict->model API; see 
https://github.com/kubernetes-client/python/issues/977.
-            pod = ApiClient()._ApiClient__deserialize_model(var, k8s.V1Pod)
-            return pod
+            return deserialize_pod_dict(var)
         elif type_ == DAT.TIMEDELTA:
             return datetime.timedelta(seconds=var)
         elif type_ == DAT.TIMEZONE:
diff --git a/airflow-core/src/airflow/utils/sqlalchemy.py 
b/airflow-core/src/airflow/utils/sqlalchemy.py
index bd4781adfe9..c47d8fd4796 100644
--- a/airflow-core/src/airflow/utils/sqlalchemy.py
+++ b/airflow-core/src/airflow/utils/sqlalchemy.py
@@ -271,6 +271,27 @@ def sanitize_for_serialization(obj: V1Pod):
     return {key: sanitize_for_serialization(val) for key, val in 
obj_dict.items()}
 
 
+def deserialize_pod_dict(pod_dict: dict) -> V1Pod:
+    """
+    Deserialize a serialized pod dict back into a ``V1Pod``.
+
+    kubernetes-client exposes no public dict->model API; see
+    https://github.com/kubernetes-client/python/issues/977.
+
+    A fresh ``Configuration`` is passed so that neither the pod nor any nested 
model captures the
+    process-global in-cluster ``Configuration``. In-cluster, that global 
carries a
+    ``refresh_api_key_hook`` local closure which ``pickle`` cannot serialize, 
and which would
+    otherwise break pickling a ``pod_override`` onto the KubernetesExecutor 
multiprocessing queue.
+
+    :meta private:
+    """
+    from kubernetes.client import Configuration
+    from kubernetes.client.api_client import ApiClient
+    from kubernetes.client.models.v1_pod import V1Pod
+
+    return 
ApiClient(configuration=Configuration())._ApiClient__deserialize_model(pod_dict,
 V1Pod)
+
+
 def ensure_pod_is_valid_after_unpickling(pod: V1Pod) -> V1Pod | None:
     """
     Convert pod to json and back so that pod is safe.
@@ -299,12 +320,9 @@ def ensure_pod_is_valid_after_unpickling(pod: V1Pod) -> 
V1Pod | None:
     if not isinstance(pod, V1Pod):
         return None
     try:
-        from kubernetes.client.api_client import ApiClient
-
         # now we actually reserialize / deserialize the pod
         pod_dict = sanitize_for_serialization(pod)
-        # kubernetes-client does not expose a public dict->model API; see 
https://github.com/kubernetes-client/python/issues/977.
-        return ApiClient()._ApiClient__deserialize_model(pod_dict, V1Pod)
+        return deserialize_pod_dict(pod_dict)
     except Exception:
         return None
 
diff --git a/airflow-core/tests/unit/serialization/test_serialized_objects.py 
b/airflow-core/tests/unit/serialization/test_serialized_objects.py
index 2d2119095d3..7de4c87148a 100644
--- a/airflow-core/tests/unit/serialization/test_serialized_objects.py
+++ b/airflow-core/tests/unit/serialization/test_serialized_objects.py
@@ -19,6 +19,7 @@ from __future__ import annotations
 
 import json
 import math
+import pickle
 import sys
 from collections.abc import Iterator
 from datetime import datetime, timedelta
@@ -27,7 +28,7 @@ from typing import TYPE_CHECKING
 import pendulum
 import pytest
 from dateutil import relativedelta
-from kubernetes.client import models as k8s
+from kubernetes.client import Configuration, models as k8s
 from pendulum.tz.timezone import FixedTimezone, Timezone
 from uuid6 import uuid7
 
@@ -1389,6 +1390,39 @@ class TestKubernetesImportAvoidance:
 
         _has_kubernetes.cache_clear()
 
+    def test_deserialized_v1pod_does_not_capture_unpicklable_config(self, 
monkeypatch):
+        """A deserialized V1Pod must not capture the in-cluster Configuration.
+
+        In-cluster, the kubernetes client installs a process-global default 
``Configuration`` whose
+        ``refresh_api_key_hook`` is an unpicklable local closure. Under 
kubernetes-client v36,
+        ``ApiClient.__deserialize_model`` copies that config onto the pod and 
every nested model, so a
+        naively deserialized ``pod_override`` cannot be pickled onto the 
KubernetesExecutor queue and
+        crashes the scheduler. Deserializing through a fresh ``Configuration`` 
keeps the pod picklable.
+        """
+        k8s = pytest.importorskip("kubernetes.client.models")
+
+        def _make_unpicklable_hook():
+            def _refresh_api_key(config):
+                return None
+
+            return _refresh_api_key
+
+        dirty = Configuration()
+        dirty.refresh_api_key_hook = _make_unpicklable_hook()
+        monkeypatch.setattr(Configuration, "_default", dirty, raising=False)
+
+        pod = k8s.V1Pod(
+            metadata=k8s.V1ObjectMeta(name="test-pod"),
+            spec=k8s.V1PodSpec(containers=[k8s.V1Container(name="base", 
image="airflow:3")]),
+        )
+        decoded = 
BaseSerialization.deserialize(BaseSerialization.serialize(pod))
+
+        assert isinstance(decoded, k8s.V1Pod)
+        # The top-level pod and every nested model must carry a clean, 
picklable Configuration.
+        pickle.dumps(decoded)
+        assert decoded.local_vars_configuration.refresh_api_key_hook is None
+        assert 
decoded.spec.containers[0].local_vars_configuration.refresh_api_key_hook is None
+
 
 @pytest.mark.db_test
 def test_serialized_dag_getitem_returns_task(dag_maker):
diff --git a/airflow-core/tests/unit/utils/test_sqlalchemy.py 
b/airflow-core/tests/unit/utils/test_sqlalchemy.py
index 43ead81f46d..1ef7b42c587 100644
--- a/airflow-core/tests/unit/utils/test_sqlalchemy.py
+++ b/airflow-core/tests/unit/utils/test_sqlalchemy.py
@@ -23,7 +23,7 @@ from copy import deepcopy
 from unittest import mock
 
 import pytest
-from kubernetes.client import models as k8s
+from kubernetes.client import Configuration, models as k8s
 from sqlalchemy import text
 from sqlalchemy.exc import StatementError
 
@@ -346,3 +346,37 @@ class TestExecutorConfigType:
         # show that the pickled (bad) pod is now a good pod, and same as the 
copy made
         # before making it bad
         assert result["pod_override"].to_dict() == copy_of_test_pod.to_dict()
+
+    def 
test_ensure_pod_is_valid_after_unpickling_is_picklable_in_cluster(self, 
monkeypatch):
+        """The repaired pod must not capture the unpicklable in-cluster 
Configuration.
+
+        In-cluster, the kubernetes client installs a process-global default 
``Configuration`` whose
+        ``refresh_api_key_hook`` is an unpicklable local closure. When the 
repair branch re-deserializes
+        the pod it must round-trip through a fresh ``Configuration`` so it 
stays picklable onto the
+        KubernetesExecutor queue.
+        """
+
+        def _make_unpicklable_hook():
+            def _refresh_api_key(config):
+                return None
+
+            return _refresh_api_key
+
+        dirty = Configuration()
+        dirty.refresh_api_key_hook = _make_unpicklable_hook()
+        monkeypatch.setattr(Configuration, "_default", dirty, raising=False)
+
+        container = k8s.V1Container(name="base")
+        pod = k8s.V1Pod(spec=k8s.V1PodSpec(containers=[container]))
+        # Force the repair (re-deserialize) branch the way real version-skew 
does: drop a protected
+        # attr so ``to_dict()`` raises and 
``ensure_pod_is_valid_after_unpickling`` reserializes.
+        del container._tty
+        with pytest.raises(AttributeError):
+            pod.to_dict()
+
+        fixed_pod = ensure_pod_is_valid_after_unpickling(pod)
+
+        assert fixed_pod is not None
+        pickle.dumps(fixed_pod)
+        assert fixed_pod.local_vars_configuration.refresh_api_key_hook is None
+        assert 
fixed_pod.spec.containers[0].local_vars_configuration.refresh_api_key_hook is 
None

Reply via email to