This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new a2651f17d43 Fix lingering task supervisors when ``EOF`` is missed 
(#51180)
a2651f17d43 is described below

commit a2651f17d43fd1fc771370e25bd8d12915777117
Author: Kaxil Naik <[email protected]>
AuthorDate: Thu May 29 15:04:40 2025 +0530

    Fix lingering task supervisors when ``EOF`` is missed (#51180)
    
    closes https://github.com/apache/airflow/issues/50500
    
    Adds a new safeguard for cases where the task subprocess closes
    before all pipe sockets send EOF.
    
    The supervisor now records the
    process exit time and forcibly closes any sockets still open after
    `workers.socket_cleanup_timeout`. This stops the
    supervisor loop from hanging indefinitely and allows the process
    to exit cleanly.
---
 airflow-core/docs/troubleshooting.rst              | 10 ++++
 .../src/airflow/config_templates/config.yml        |  9 ++++
 .../src/airflow/sdk/execution_time/supervisor.py   | 53 ++++++++++++++++++++++
 .../src/airflow/sdk/execution_time/task_runner.py  |  7 +++
 .../task_sdk/execution_time/test_supervisor.py     | 38 ++++++++++++++++
 5 files changed, 117 insertions(+)

diff --git a/airflow-core/docs/troubleshooting.rst 
b/airflow-core/docs/troubleshooting.rst
index f636b87a42c..f354ea1a2ff 100644
--- a/airflow-core/docs/troubleshooting.rst
+++ b/airflow-core/docs/troubleshooting.rst
@@ -46,3 +46,13 @@ Here are some examples that could cause such an event:
 - A DAG run timeout, specified by ``dagrun_timeout`` in the DAG's definition.
 - An Airflow worker running out of memory
   - Usually, Airflow workers that run out of memory receive a SIGKILL, and the 
scheduler will fail the corresponding task instance for not having a heartbeat. 
However, in some scenarios, Airflow kills the task before that happens.
+
+Lingering task supervisor processes
+-----------------------------------
+
+Under very high concurrency the socket handlers inside the task supervisor may
+miss the final EOF events from the task process. When this occurs the 
supervisor
+believes sockets are still open and will not exit. The
+:ref:`workers.socket_cleanup_timeout <config:workers__socket_cleanup_timeout>` 
option controls how long the supervisor
+waits after the task finishes before force-closing any remaining sockets. If 
you
+observe leftover ``supervisor`` processes, consider increasing this delay.
diff --git a/airflow-core/src/airflow/config_templates/config.yml 
b/airflow-core/src/airflow/config_templates/config.yml
index 950844bcfee..bda494eed5d 100644
--- a/airflow-core/src/airflow/config_templates/config.yml
+++ b/airflow-core/src/airflow/config_templates/config.yml
@@ -1547,6 +1547,15 @@ workers:
       type: float
       example: ~
       default: "90.0"
+    socket_cleanup_timeout:
+      description: |
+        Number of seconds to wait after a task process exits before forcibly 
closing any
+        remaining communication sockets. This helps prevent the task 
supervisor from hanging
+        indefinitely due to missed EOF signals.
+      version_added: 3.0.2
+      type: float
+      example: ~
+      default: "60.0"
 api_auth:
   description: Settings relating to authentication on the Airflow APIs
   options:
diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py 
b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
index 81fd945cdbc..1006b861378 100644
--- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
+++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
@@ -124,6 +124,7 @@ HEARTBEAT_TIMEOUT: int = conf.getint("scheduler", 
"task_instance_heartbeat_timeo
 MIN_HEARTBEAT_INTERVAL: int = conf.getint("workers", "min_heartbeat_interval")
 MAX_FAILED_HEARTBEATS: int = conf.getint("workers", "max_failed_heartbeats")
 
+SOCKET_CLEANUP_TIMEOUT: float = conf.getfloat("workers", 
"socket_cleanup_timeout")
 
 SERVER_TERMINATED = "SERVER_TERMINATED"
 
@@ -351,6 +352,13 @@ def _fork_main(
             sys.stderr.flush()
         with suppress(ValueError, OSError):
             last_chance_stderr.flush()
+
+        # Explicitly close the child-end of our supervisor sockets so
+        # the parent sees EOF on both "requests" and "logs" channels.
+        with suppress(OSError):
+            os.close(log_fd)
+        with suppress(OSError):
+            os.close(child_stdin.fileno())
         os._exit(n)
 
     if hasattr(atexit, "_clear"):
@@ -423,6 +431,8 @@ class WatchedSubprocess:
 
     _num_open_sockets: int = 4
     _exit_code: int | None = attrs.field(default=None, init=False)
+    _process_exit_monotonic: float | None = attrs.field(default=None, 
init=False)
+    _fd_to_socket_type: dict[int, str] = attrs.field(factory=dict, init=False)
 
     selector: selectors.BaseSelector = 
attrs.field(factory=selectors.DefaultSelector, repr=False)
 
@@ -507,6 +517,14 @@ class WatchedSubprocess:
         # alternatives are used automatically) -- this is a way of having 
"event-based" code, but without
         # needing full async, to read and process output from each socket as 
it is received.
 
+        # Track socket types for debugging
+        self._fd_to_socket_type = {
+            stdout.fileno(): "stdout",
+            stderr.fileno(): "stderr",
+            requests.fileno(): "requests",
+            logs.fileno(): "logs",
+        }
+
         target_loggers: tuple[FilteringBoundLogger, ...] = (self.process_log,)
         if self.subprocess_logs_to_stdout:
             target_loggers += (log,)
@@ -593,6 +611,28 @@ class WatchedSubprocess:
                 sock._sock.close()
             sock.close()
 
+    def _cleanup_open_sockets(self):
+        """Force-close any sockets that never reported EOF."""
+        # In extremely busy environments the selector can fail to deliver a
+        # final read event before the subprocess exits. Without closing these
+        # sockets the supervisor would wait forever thinking they are still
+        # active. This cleanup ensures we always release resources and exit.
+        stuck_sockets = []
+        for key in list(self.selector.get_map().values()):
+            socket_type = self._fd_to_socket_type.get(key.fd, 
f"unknown-{key.fd}")
+            stuck_sockets.append(f"{socket_type}({key.fd})")
+            with suppress(Exception):
+                self.selector.unregister(key.fileobj)
+            with suppress(Exception):
+                key.fileobj.close()  # type: ignore[union-attr]
+
+        if stuck_sockets:
+            log.warning("Force-closed stuck sockets", pid=self.pid, 
sockets=stuck_sockets)
+
+        self.selector.close()
+        self._close_unused_sockets(self.stdin)
+        self._num_open_sockets = 0
+
     def kill(
         self,
         signal_to_send: signal.Signals = signal.SIGINT,
@@ -726,6 +766,7 @@ class WatchedSubprocess:
             if raise_on_timeout:
                 raise
         else:
+            self._process_exit_monotonic = time.monotonic()
             self._close_unused_sockets(self.stdin)
             # Put a message in the viewable task logs
 
@@ -899,6 +940,18 @@ class ActivitySubprocess(WatchedSubprocess):
             # This listens for activity (e.g., subprocess output) on 
registered file objects
             alive = self._service_subprocess(max_wait_time=max_wait_time) is 
None
 
+            if self._exit_code is not None and self._num_open_sockets > 0:
+                if (
+                    self._process_exit_monotonic
+                    and time.monotonic() - self._process_exit_monotonic > 
SOCKET_CLEANUP_TIMEOUT
+                ):
+                    log.debug(
+                        "Forcefully closing remaining sockets",
+                        open_sockets=self._num_open_sockets,
+                        pid=self.pid,
+                    )
+                    self._cleanup_open_sockets()
+
             if alive:
                 # We don't need to heartbeat if the process has shutdown, as 
we are just finishing of reading the
                 # logs
diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py 
b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
index cb8040278c4..7d2e6b65fe9 100644
--- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
+++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
@@ -26,6 +26,7 @@ import os
 import sys
 import time
 from collections.abc import Callable, Iterable, Iterator, Mapping
+from contextlib import suppress
 from datetime import datetime, timezone
 from io import FileIO
 from itertools import product
@@ -1297,6 +1298,12 @@ def main():
         log = structlog.get_logger(logger_name="task")
         log.exception("Top level error")
         exit(1)
+    finally:
+        # Ensure the request socket is closed on the child side in all 
circumstances
+        # before the process fully terminates.
+        if SUPERVISOR_COMMS and SUPERVISOR_COMMS.request_socket:
+            with suppress(Exception):
+                SUPERVISOR_COMMS.request_socket.close()
 
 
 if __name__ == "__main__":
diff --git a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py 
b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
index 7861a9d5997..4696908bae7 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
@@ -768,6 +768,44 @@ class TestWatchedSubprocess:
         } in cap_structlog
         assert rc == -signal_to_raise
 
+    @pytest.mark.execution_timeout(3)
+    def test_cleanup_sockets_after_delay(self, monkeypatch, mocker, 
time_machine):
+        """Supervisor should close sockets if EOF events are missed."""
+
+        
monkeypatch.setattr("airflow.sdk.execution_time.supervisor.SOCKET_CLEANUP_TIMEOUT",
 1.0)
+
+        mock_process = mocker.Mock(pid=12345)
+
+        time_machine.move_to(time.monotonic(), tick=False)
+
+        proc = ActivitySubprocess(
+            process_log=mocker.MagicMock(),
+            id=TI_ID,
+            pid=mock_process.pid,
+            stdin=mocker.MagicMock(),
+            client=mocker.MagicMock(),
+            process=mock_process,
+            requests_fd=-1,
+        )
+
+        proc.selector = mocker.MagicMock()
+        proc.selector.select.return_value = []
+
+        proc._exit_code = 0
+        proc._num_open_sockets = 1
+        proc._process_exit_monotonic = time.monotonic()
+
+        mocker.patch.object(
+            ActivitySubprocess,
+            "_cleanup_open_sockets",
+            side_effect=lambda: setattr(proc, "_num_open_sockets", 0),
+        )
+
+        time_machine.shift(2)
+
+        proc._monitor_subprocess()
+        assert proc._num_open_sockets == 0
+
 
 class TestWatchedSubprocessKill:
     @pytest.fixture

Reply via email to