This is an automated email from the ASF dual-hosted git repository.

vatsrahul1001 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 9bb5ff3535a Fix ValueError when supervisor force-closes stuck sockets 
after timeout (#67115)
9bb5ff3535a is described below

commit 9bb5ff3535a46020ae22a61557aef7e806aa3e91
Author: AutomationDev85 <[email protected]>
AuthorDate: Tue May 19 12:02:59 2026 +0200

    Fix ValueError when supervisor force-closes stuck sockets after timeout 
(#67115)
    
    * Fix ValueError when supervisor force-closes stuck sockets after timeout
    
    * Improve mock socket spec
    
    ---------
    
    Co-authored-by: AutomationDev85 <AutomationDev85>
---
 .../src/airflow/sdk/execution_time/supervisor.py     |  2 ++
 .../tests/task_sdk/execution_time/test_supervisor.py | 20 +++++++-------------
 2 files changed, 9 insertions(+), 13 deletions(-)

diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py 
b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
index 5e46b6bc864..27dcbb44346 100644
--- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
+++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
@@ -823,6 +823,7 @@ class WatchedSubprocess:
         if stuck_sockets:
             log.warning("Force-closed stuck sockets", pid=self.pid, 
sockets=stuck_sockets)
 
+        self._open_sockets.clear()
         self.selector.close()
         self.stdin.close()
 
@@ -1312,6 +1313,7 @@ class ActivitySubprocess(WatchedSubprocess):
                         timeout_seconds=SOCKET_CLEANUP_TIMEOUT,
                     )
                     self._cleanup_open_sockets()
+                    break
 
             if alive:
                 # We don't need to heartbeat if the process has shutdown, as 
we are just finishing of reading the
diff --git a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py 
b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
index b4ba8de42e2..881f5ddaad1 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
@@ -1165,15 +1165,13 @@ class TestWatchedSubprocess:
         assert rc == -signal_to_raise
 
     @pytest.mark.execution_timeout(3)
-    def test_cleanup_sockets_after_delay(self, monkeypatch, mocker, 
time_machine):
+    def test_cleanup_sockets_after_delay(self, monkeypatch, mocker):
         """Supervisor should close sockets if EOF events are missed."""
 
         
monkeypatch.setattr("airflow.sdk.execution_time.supervisor.SOCKET_CLEANUP_TIMEOUT",
 1.0)
 
         mock_process = mocker.Mock(pid=12345)
 
-        time_machine.move_to(time.time(), tick=False)
-
         proc = ActivitySubprocess(
             process_log=mocker.MagicMock(),
             id=TI_ID,
@@ -1188,19 +1186,15 @@ class TestWatchedSubprocess:
 
         proc._exit_code = 0
         # Create a fake placeholder in the open socket weakref
-        proc._open_sockets[mocker.MagicMock()] = "test placeholder"
-        proc._process_exit_monotonic = time.time()
-
-        mocker.patch.object(
-            ActivitySubprocess,
-            "_cleanup_open_sockets",
-            side_effect=lambda: setattr(proc, "_open_sockets", {}),
-        )
-
-        time_machine.shift(2)
+        mock_socket = mocker.MagicMock(spec=socket.socket)
+        proc._open_sockets[mock_socket] = "test placeholder"
+        proc._process_exit_monotonic = time.monotonic() - 2
 
         proc._monitor_subprocess()
         assert len(proc._open_sockets) == 0
+        mock_socket.close.assert_called_once()
+        proc.selector.close.assert_called_once()
+        proc.stdin.close.assert_called_once()
 
 
 class TestWatchedSubprocessKill:

Reply via email to