This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new a2651f17d43 Fix lingering task supervisors when ``EOF`` is missed
(#51180)
a2651f17d43 is described below
commit a2651f17d43fd1fc771370e25bd8d12915777117
Author: Kaxil Naik <[email protected]>
AuthorDate: Thu May 29 15:04:40 2025 +0530
Fix lingering task supervisors when ``EOF`` is missed (#51180)
closes https://github.com/apache/airflow/issues/50500
Adds a new safeguard for cases where the task subprocess closes
before all pipe sockets send EOF.
The supervisor now records the
process exit time and forcibly closes any sockets still open after
`workers.socket_cleanup_timeout`. This stops the
supervisor loop from hanging indefinitely and allows the process
to exit cleanly.
---
airflow-core/docs/troubleshooting.rst | 10 ++++
.../src/airflow/config_templates/config.yml | 9 ++++
.../src/airflow/sdk/execution_time/supervisor.py | 53 ++++++++++++++++++++++
.../src/airflow/sdk/execution_time/task_runner.py | 7 +++
.../task_sdk/execution_time/test_supervisor.py | 38 ++++++++++++++++
5 files changed, 117 insertions(+)
diff --git a/airflow-core/docs/troubleshooting.rst
b/airflow-core/docs/troubleshooting.rst
index f636b87a42c..f354ea1a2ff 100644
--- a/airflow-core/docs/troubleshooting.rst
+++ b/airflow-core/docs/troubleshooting.rst
@@ -46,3 +46,13 @@ Here are some examples that could cause such an event:
- A DAG run timeout, specified by ``dagrun_timeout`` in the DAG's definition.
- An Airflow worker running out of memory
- Usually, Airflow workers that run out of memory receive a SIGKILL, and the
scheduler will fail the corresponding task instance for not having a heartbeat.
However, in some scenarios, Airflow kills the task before that happens.
+
+Lingering task supervisor processes
+-----------------------------------
+
+Under very high concurrency the socket handlers inside the task supervisor may
+miss the final EOF events from the task process. When this occurs the
supervisor
+believes sockets are still open and will not exit. The
+:ref:`workers.socket_cleanup_timeout <config:workers__socket_cleanup_timeout>`
option controls how long the supervisor
+waits after the task finishes before force-closing any remaining sockets. If
you
+observe leftover ``supervisor`` processes, consider increasing this delay.
diff --git a/airflow-core/src/airflow/config_templates/config.yml
b/airflow-core/src/airflow/config_templates/config.yml
index 950844bcfee..bda494eed5d 100644
--- a/airflow-core/src/airflow/config_templates/config.yml
+++ b/airflow-core/src/airflow/config_templates/config.yml
@@ -1547,6 +1547,15 @@ workers:
type: float
example: ~
default: "90.0"
+ socket_cleanup_timeout:
+ description: |
+ Number of seconds to wait after a task process exits before forcibly
closing any
+ remaining communication sockets. This helps prevent the task
supervisor from hanging
+ indefinitely due to missed EOF signals.
+ version_added: 3.0.2
+ type: float
+ example: ~
+ default: "60.0"
api_auth:
description: Settings relating to authentication on the Airflow APIs
options:
diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
index 81fd945cdbc..1006b861378 100644
--- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
+++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
@@ -124,6 +124,7 @@ HEARTBEAT_TIMEOUT: int = conf.getint("scheduler",
"task_instance_heartbeat_timeo
MIN_HEARTBEAT_INTERVAL: int = conf.getint("workers", "min_heartbeat_interval")
MAX_FAILED_HEARTBEATS: int = conf.getint("workers", "max_failed_heartbeats")
+SOCKET_CLEANUP_TIMEOUT: float = conf.getfloat("workers",
"socket_cleanup_timeout")
SERVER_TERMINATED = "SERVER_TERMINATED"
@@ -351,6 +352,13 @@ def _fork_main(
sys.stderr.flush()
with suppress(ValueError, OSError):
last_chance_stderr.flush()
+
+ # Explicitly close the child-end of our supervisor sockets so
+ # the parent sees EOF on both "requests" and "logs" channels.
+ with suppress(OSError):
+ os.close(log_fd)
+ with suppress(OSError):
+ os.close(child_stdin.fileno())
os._exit(n)
if hasattr(atexit, "_clear"):
@@ -423,6 +431,8 @@ class WatchedSubprocess:
_num_open_sockets: int = 4
_exit_code: int | None = attrs.field(default=None, init=False)
+ _process_exit_monotonic: float | None = attrs.field(default=None,
init=False)
+ _fd_to_socket_type: dict[int, str] = attrs.field(factory=dict, init=False)
selector: selectors.BaseSelector =
attrs.field(factory=selectors.DefaultSelector, repr=False)
@@ -507,6 +517,14 @@ class WatchedSubprocess:
# alternatives are used automatically) -- this is a way of having
"event-based" code, but without
# needing full async, to read and process output from each socket as
it is received.
+ # Track socket types for debugging
+ self._fd_to_socket_type = {
+ stdout.fileno(): "stdout",
+ stderr.fileno(): "stderr",
+ requests.fileno(): "requests",
+ logs.fileno(): "logs",
+ }
+
target_loggers: tuple[FilteringBoundLogger, ...] = (self.process_log,)
if self.subprocess_logs_to_stdout:
target_loggers += (log,)
@@ -593,6 +611,28 @@ class WatchedSubprocess:
sock._sock.close()
sock.close()
+ def _cleanup_open_sockets(self):
+ """Force-close any sockets that never reported EOF."""
+ # In extremely busy environments the selector can fail to deliver a
+ # final read event before the subprocess exits. Without closing these
+ # sockets the supervisor would wait forever thinking they are still
+ # active. This cleanup ensures we always release resources and exit.
+ stuck_sockets = []
+ for key in list(self.selector.get_map().values()):
+ socket_type = self._fd_to_socket_type.get(key.fd,
f"unknown-{key.fd}")
+ stuck_sockets.append(f"{socket_type}({key.fd})")
+ with suppress(Exception):
+ self.selector.unregister(key.fileobj)
+ with suppress(Exception):
+ key.fileobj.close() # type: ignore[union-attr]
+
+ if stuck_sockets:
+ log.warning("Force-closed stuck sockets", pid=self.pid,
sockets=stuck_sockets)
+
+ self.selector.close()
+ self._close_unused_sockets(self.stdin)
+ self._num_open_sockets = 0
+
def kill(
self,
signal_to_send: signal.Signals = signal.SIGINT,
@@ -726,6 +766,7 @@ class WatchedSubprocess:
if raise_on_timeout:
raise
else:
+ self._process_exit_monotonic = time.monotonic()
self._close_unused_sockets(self.stdin)
# Put a message in the viewable task logs
@@ -899,6 +940,18 @@ class ActivitySubprocess(WatchedSubprocess):
# This listens for activity (e.g., subprocess output) on
registered file objects
alive = self._service_subprocess(max_wait_time=max_wait_time) is
None
+ if self._exit_code is not None and self._num_open_sockets > 0:
+ if (
+ self._process_exit_monotonic
+ and time.monotonic() - self._process_exit_monotonic >
SOCKET_CLEANUP_TIMEOUT
+ ):
+ log.debug(
+ "Forcefully closing remaining sockets",
+ open_sockets=self._num_open_sockets,
+ pid=self.pid,
+ )
+ self._cleanup_open_sockets()
+
if alive:
# We don't need to heartbeat if the process has shutdown, as
we are just finishing of reading the
# logs
diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
index cb8040278c4..7d2e6b65fe9 100644
--- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
+++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
@@ -26,6 +26,7 @@ import os
import sys
import time
from collections.abc import Callable, Iterable, Iterator, Mapping
+from contextlib import suppress
from datetime import datetime, timezone
from io import FileIO
from itertools import product
@@ -1297,6 +1298,12 @@ def main():
log = structlog.get_logger(logger_name="task")
log.exception("Top level error")
exit(1)
+ finally:
+ # Ensure the request socket is closed on the child side in all
circumstances
+ # before the process fully terminates.
+ if SUPERVISOR_COMMS and SUPERVISOR_COMMS.request_socket:
+ with suppress(Exception):
+ SUPERVISOR_COMMS.request_socket.close()
if __name__ == "__main__":
diff --git a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
index 7861a9d5997..4696908bae7 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
@@ -768,6 +768,44 @@ class TestWatchedSubprocess:
} in cap_structlog
assert rc == -signal_to_raise
+ @pytest.mark.execution_timeout(3)
+ def test_cleanup_sockets_after_delay(self, monkeypatch, mocker,
time_machine):
+ """Supervisor should close sockets if EOF events are missed."""
+
+
monkeypatch.setattr("airflow.sdk.execution_time.supervisor.SOCKET_CLEANUP_TIMEOUT",
1.0)
+
+ mock_process = mocker.Mock(pid=12345)
+
+ time_machine.move_to(time.monotonic(), tick=False)
+
+ proc = ActivitySubprocess(
+ process_log=mocker.MagicMock(),
+ id=TI_ID,
+ pid=mock_process.pid,
+ stdin=mocker.MagicMock(),
+ client=mocker.MagicMock(),
+ process=mock_process,
+ requests_fd=-1,
+ )
+
+ proc.selector = mocker.MagicMock()
+ proc.selector.select.return_value = []
+
+ proc._exit_code = 0
+ proc._num_open_sockets = 1
+ proc._process_exit_monotonic = time.monotonic()
+
+ mocker.patch.object(
+ ActivitySubprocess,
+ "_cleanup_open_sockets",
+ side_effect=lambda: setattr(proc, "_num_open_sockets", 0),
+ )
+
+ time_machine.shift(2)
+
+ proc._monitor_subprocess()
+ assert proc._num_open_sockets == 0
+
class TestWatchedSubprocessKill:
@pytest.fixture