This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch v3-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-0-test by this push:
new d34c1f5c5c9 [v3-0-test] Fix lingering task supervisors when ``EOF`` is
missed (#51180) (#51970)
d34c1f5c5c9 is described below
commit d34c1f5c5c9464946aeb1bd69c8484e73003c46a
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Jun 20 14:39:05 2025 -0600
[v3-0-test] Fix lingering task supervisors when ``EOF`` is missed (#51180)
(#51970)
closes https://github.com/apache/airflow/issues/50500
Adds a new safeguard for cases where the task subprocess closes
before all pipe sockets send EOF.
The supervisor now records the
process exit time and forcibly closes any sockets still open after
`workers.socket_cleanup_timeout`. This stops the
supervisor loop from hanging indefinitely and allows the process
to exit cleanly.
(cherry picked from commit a2651f17d43fd1fc771370e25bd8d12915777117)
Co-authored-by: Kaxil Naik <[email protected]>
---
airflow-core/docs/troubleshooting.rst | 10 ++++
.../src/airflow/config_templates/config.yml | 9 ++++
.../src/airflow/sdk/execution_time/supervisor.py | 53 ++++++++++++++++++++++
.../src/airflow/sdk/execution_time/task_runner.py | 7 +++
.../task_sdk/execution_time/test_supervisor.py | 38 ++++++++++++++++
5 files changed, 117 insertions(+)
diff --git a/airflow-core/docs/troubleshooting.rst
b/airflow-core/docs/troubleshooting.rst
index f636b87a42c..f354ea1a2ff 100644
--- a/airflow-core/docs/troubleshooting.rst
+++ b/airflow-core/docs/troubleshooting.rst
@@ -46,3 +46,13 @@ Here are some examples that could cause such an event:
- A DAG run timeout, specified by ``dagrun_timeout`` in the DAG's definition.
- An Airflow worker running out of memory
- Usually, Airflow workers that run out of memory receive a SIGKILL, and the
scheduler will fail the corresponding task instance for not having a heartbeat.
However, in some scenarios, Airflow kills the task before that happens.
+
+Lingering task supervisor processes
+-----------------------------------
+
+Under very high concurrency the socket handlers inside the task supervisor may
+miss the final EOF events from the task process. When this occurs the
supervisor
+believes sockets are still open and will not exit. The
+:ref:`workers.socket_cleanup_timeout <config:workers__socket_cleanup_timeout>`
option controls how long the supervisor
+waits after the task finishes before force-closing any remaining sockets. If
you
+observe leftover ``supervisor`` processes, consider increasing this delay.
diff --git a/airflow-core/src/airflow/config_templates/config.yml
b/airflow-core/src/airflow/config_templates/config.yml
index 437b04e771e..6190f7f306e 100644
--- a/airflow-core/src/airflow/config_templates/config.yml
+++ b/airflow-core/src/airflow/config_templates/config.yml
@@ -1525,6 +1525,15 @@ workers:
type: float
example: ~
default: "90.0"
+ socket_cleanup_timeout:
+ description: |
+ Number of seconds to wait after a task process exits before forcibly
closing any
+ remaining communication sockets. This helps prevent the task
supervisor from hanging
+ indefinitely due to missed EOF signals.
+ version_added: 3.0.2
+ type: float
+ example: ~
+ default: "60.0"
api_auth:
description: Settings relating to authentication on the Airflow APIs
options:
diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
index 4c3ed654ec1..dc63daeef7b 100644
--- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
+++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
@@ -130,6 +130,7 @@ HEARTBEAT_TIMEOUT: int = conf.getint("scheduler",
"task_instance_heartbeat_timeo
MIN_HEARTBEAT_INTERVAL: int = conf.getint("workers", "min_heartbeat_interval")
MAX_FAILED_HEARTBEATS: int = conf.getint("workers", "max_failed_heartbeats")
+SOCKET_CLEANUP_TIMEOUT: float = conf.getfloat("workers",
"socket_cleanup_timeout")
SERVER_TERMINATED = "SERVER_TERMINATED"
@@ -357,6 +358,13 @@ def _fork_main(
sys.stderr.flush()
with suppress(ValueError, OSError):
last_chance_stderr.flush()
+
+ # Explicitly close the child-end of our supervisor sockets so
+ # the parent sees EOF on both "requests" and "logs" channels.
+ with suppress(OSError):
+ os.close(log_fd)
+ with suppress(OSError):
+ os.close(child_stdin.fileno())
os._exit(n)
if hasattr(atexit, "_clear"):
@@ -429,6 +437,8 @@ class WatchedSubprocess:
_num_open_sockets: int = 4
_exit_code: int | None = attrs.field(default=None, init=False)
+ _process_exit_monotonic: float | None = attrs.field(default=None,
init=False)
+ _fd_to_socket_type: dict[int, str] = attrs.field(factory=dict, init=False)
selector: selectors.BaseSelector =
attrs.field(factory=selectors.DefaultSelector, repr=False)
@@ -513,6 +523,14 @@ class WatchedSubprocess:
# alternatives are used automatically) -- this is a way of having
"event-based" code, but without
# needing full async, to read and process output from each socket as
it is received.
+ # Track socket types for debugging
+ self._fd_to_socket_type = {
+ stdout.fileno(): "stdout",
+ stderr.fileno(): "stderr",
+ requests.fileno(): "requests",
+ logs.fileno(): "logs",
+ }
+
target_loggers: tuple[FilteringBoundLogger, ...] = (self.process_log,)
if self.subprocess_logs_to_stdout:
target_loggers += (log,)
@@ -599,6 +617,28 @@ class WatchedSubprocess:
sock._sock.close()
sock.close()
+ def _cleanup_open_sockets(self):
+ """Force-close any sockets that never reported EOF."""
+ # In extremely busy environments the selector can fail to deliver a
+ # final read event before the subprocess exits. Without closing these
+ # sockets the supervisor would wait forever thinking they are still
+ # active. This cleanup ensures we always release resources and exit.
+ stuck_sockets = []
+ for key in list(self.selector.get_map().values()):
+ socket_type = self._fd_to_socket_type.get(key.fd,
f"unknown-{key.fd}")
+ stuck_sockets.append(f"{socket_type}({key.fd})")
+ with suppress(Exception):
+ self.selector.unregister(key.fileobj)
+ with suppress(Exception):
+ key.fileobj.close() # type: ignore[union-attr]
+
+ if stuck_sockets:
+ log.warning("Force-closed stuck sockets", pid=self.pid,
sockets=stuck_sockets)
+
+ self.selector.close()
+ self._close_unused_sockets(self.stdin)
+ self._num_open_sockets = 0
+
def kill(
self,
signal_to_send: signal.Signals = signal.SIGINT,
@@ -732,6 +772,7 @@ class WatchedSubprocess:
if raise_on_timeout:
raise
else:
+ self._process_exit_monotonic = time.monotonic()
self._close_unused_sockets(self.stdin)
# Put a message in the viewable task logs
@@ -905,6 +946,18 @@ class ActivitySubprocess(WatchedSubprocess):
# This listens for activity (e.g., subprocess output) on
registered file objects
alive = self._service_subprocess(max_wait_time=max_wait_time) is
None
+ if self._exit_code is not None and self._num_open_sockets > 0:
+ if (
+ self._process_exit_monotonic
+ and time.monotonic() - self._process_exit_monotonic >
SOCKET_CLEANUP_TIMEOUT
+ ):
+ log.debug(
+ "Forcefully closing remaining sockets",
+ open_sockets=self._num_open_sockets,
+ pid=self.pid,
+ )
+ self._cleanup_open_sockets()
+
if alive:
# We don't need to heartbeat if the process has shutdown, as
we are just finishing of reading the
# logs
diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
index 9d75a7250e4..33583ffe9c7 100644
--- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
+++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
@@ -26,6 +26,7 @@ import os
import sys
import time
from collections.abc import Callable, Iterable, Iterator, Mapping
+from contextlib import suppress
from datetime import datetime, timezone
from io import FileIO
from itertools import product
@@ -1301,6 +1302,12 @@ def main():
log = structlog.get_logger(logger_name="task")
log.exception("Top level error")
exit(1)
+ finally:
+ # Ensure the request socket is closed on the child side in all
circumstances
+ # before the process fully terminates.
+ if SUPERVISOR_COMMS and SUPERVISOR_COMMS.request_socket:
+ with suppress(Exception):
+ SUPERVISOR_COMMS.request_socket.close()
if __name__ == "__main__":
diff --git a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
index 4f6341c89da..4f5e4cfc7ac 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
@@ -774,6 +774,44 @@ class TestWatchedSubprocess:
} in cap_structlog
assert rc == -signal_to_raise
+ @pytest.mark.execution_timeout(3)
+ def test_cleanup_sockets_after_delay(self, monkeypatch, mocker,
time_machine):
+ """Supervisor should close sockets if EOF events are missed."""
+
+
monkeypatch.setattr("airflow.sdk.execution_time.supervisor.SOCKET_CLEANUP_TIMEOUT",
1.0)
+
+ mock_process = mocker.Mock(pid=12345)
+
+ time_machine.move_to(time.monotonic(), tick=False)
+
+ proc = ActivitySubprocess(
+ process_log=mocker.MagicMock(),
+ id=TI_ID,
+ pid=mock_process.pid,
+ stdin=mocker.MagicMock(),
+ client=mocker.MagicMock(),
+ process=mock_process,
+ requests_fd=-1,
+ )
+
+ proc.selector = mocker.MagicMock()
+ proc.selector.select.return_value = []
+
+ proc._exit_code = 0
+ proc._num_open_sockets = 1
+ proc._process_exit_monotonic = time.monotonic()
+
+ mocker.patch.object(
+ ActivitySubprocess,
+ "_cleanup_open_sockets",
+ side_effect=lambda: setattr(proc, "_num_open_sockets", 0),
+ )
+
+ time_machine.shift(2)
+
+ proc._monitor_subprocess()
+ assert proc._num_open_sockets == 0
+
class TestWatchedSubprocessKill:
@pytest.fixture