This is an automated email from the ASF dual-hosted git repository.

kaxil pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 990372e256a Fix scheduler crashloop from KubernetesExecutor 
completed-pod adoption (#67850)
990372e256a is described below

commit 990372e256afe48eb10befefff7db38957b07a83
Author: Kaxil Naik <[email protected]>
AuthorDate: Mon Jun 1 21:17:29 2026 +0100

    Fix scheduler crashloop from KubernetesExecutor completed-pod adoption 
(#67850)
    
    The completed-pod adoption helper _alive_other_scheduler_job_ids (added in
    #66400) opened a scoped create_session(). try_adopt_task_instances runs 
inside
    the scheduler thread, so a scoped session resolves to the scheduler own
    in-flight session from adopt_or_reset_orphaned_tasks. The context manager
    commit()/close() on exit then committed the scheduler transaction early
    (releasing its FOR UPDATE SKIP LOCKED row locks) and detached the orphaned
    TaskInstances it still held, so the reset path crashed with
    DetachedInstanceError and the scheduler crashlooped (#67813).
    
    Use an independent create_session(scoped=False) so the helper never touches 
the
    caller transaction.
---
 .../kubernetes/executors/kubernetes_executor.py    |  8 +++++-
 .../executors/test_kubernetes_executor.py          | 31 ++++++++++++++++++++++
 2 files changed, 38 insertions(+), 1 deletion(-)

diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
index 481960b71d2..0dcc01537cb 100644
--- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
+++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
@@ -722,7 +722,13 @@ class KubernetesExecutor(BaseExecutor):
 
             timeout = conf.getint("scheduler", 
"scheduler_health_check_threshold")
             cutoff = timezone.utcnow() - timedelta(seconds=timeout)
-            with create_session() as session:
+            # Must be an *independent* (non-scoped) session. 
try_adopt_task_instances runs
+            # inside the scheduler's own transaction 
(adopt_or_reset_orphaned_tasks); a scoped
+            # session here would resolve to that same thread-local session, 
and the context
+            # manager's commit()/close() on exit would commit the scheduler's 
in-flight work
+            # early (releasing its FOR UPDATE SKIP LOCKED row locks) and 
detach the orphaned
+            # TaskInstances it still holds, crashing the reset path (#67813).
+            with create_session(scoped=False) as session:
                 # Iterate the scalar cursor straight into the set so we never
                 # materialize an intermediate list — keeps the memory
                 # footprint flat regardless of how many sibling schedulers
diff --git 
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py
 
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py
index 2930eb7f2c4..bc1c2a97f55 100644
--- 
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py
+++ 
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py
@@ -27,8 +27,10 @@ import pytest
 import yaml
 from kubernetes.client import models as k8s
 from kubernetes.client.rest import ApiException
+from sqlalchemy import inspect
 from urllib3 import HTTPResponse
 
+from airflow.jobs.job import Job
 from airflow.models.taskinstancekey import TaskInstanceKey
 from airflow.providers.cncf.kubernetes import pod_generator
 from airflow.providers.cncf.kubernetes.executors.kubernetes_executor import (
@@ -1505,6 +1507,35 @@ class TestKubernetesExecutor:
             header_params={"Accept": 
"application/json;as=PartialObjectMetadataList;v=v1;g=meta.k8s.io"},
         )
 
+    @pytest.mark.db_test
+    def 
test_alive_other_scheduler_job_ids_does_not_detach_caller_session(self, 
session):
+        """``_alive_other_scheduler_job_ids`` must use an independent 
(non-scoped) session.
+
+        Regression test for #67813. ``try_adopt_task_instances`` runs inside 
the scheduler's
+        own scoped transaction (``adopt_or_reset_orphaned_tasks``). If this 
helper opens a
+        *scoped* ``create_session()``, the context manager's 
``commit()``/``close()`` on exit
+        tears down that shared transaction: it commits the scheduler's 
in-flight work early
+        (releasing its ``FOR UPDATE SKIP LOCKED`` row locks) and detaches the 
orphaned
+        TaskInstances it still holds, crashing the reset path with 
``DetachedInstanceError``.
+        """
+        # An object attached to the caller's scoped session, standing in for 
the orphaned
+        # TaskInstances the scheduler holds while adopting.
+        job = Job()
+        session.add(job)
+        session.flush()
+        assert inspect(job).session is not None
+
+        executor = self.kubernetes_executor
+        executor.scheduler_job_id = "5"
+
+        executor._alive_other_scheduler_job_ids()
+
+        # A scoped create_session() inside the helper would have closed this 
session and
+        # detached ``job``; an independent session leaves the caller's session 
untouched.
+        assert inspect(job).session is not None, (
+            "_alive_other_scheduler_job_ids closed/detached the caller's 
scoped session"
+        )
+
     
@mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client")
     def test_not_adopt_unassigned_task(self, mock_kube_client):
         """

Reply via email to