This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 84270cbc234 Fix Java SDK tasks rejected by coordinator
connection-ownership check (#68147)
84270cbc234 is described below
commit 84270cbc2348cd635c9d19573b15969dfdb8dbd4
Author: Jarek Potiuk <[email protected]>
AuthorDate: Sun Jun 7 10:26:24 2026 +0200
Fix Java SDK tasks rejected by coordinator connection-ownership check
(#68147)
The TCP connection-ownership verification added in #67781 only accepted
the supervisor channel when the connecting peer belonged to the spawned
process's exact PID. In the real Java SDK PROD e2e, the JVM's loopback
connection is not found under that single PID, so both the `comm` and
`logs` channels are rejected, the task subprocess dies with "process
exited with 1 before connecting", and every Java task fails. The Java SDK
e2e suite is canary-only, so it did not run on #67781 and the breakage
only surfaced in the nightly main runs.
Trust the connecting peer when it belongs to the child process *or any of
its descendants* — a launcher (JVM, shell wrapper, or any runtime that
forks a worker) legitimately connects back from a descendant rather than
the launched PID itself. A process outside the spawned subtree is still
rejected, so the security property #67781 added is preserved. The lookup
is also retried briefly to absorb the race where a freshly established
connection is not yet visible in `/proc`.
---
.../src/airflow/sdk/coordinators/_subprocess.py | 63 ++++++++++++++---
.../tests/task_sdk/coordinators/test_subprocess.py | 78 +++++++++++++++++++++-
2 files changed, 129 insertions(+), 12 deletions(-)
diff --git a/task-sdk/src/airflow/sdk/coordinators/_subprocess.py
b/task-sdk/src/airflow/sdk/coordinators/_subprocess.py
index c223243eb6f..89dfe5f71d7 100644
--- a/task-sdk/src/airflow/sdk/coordinators/_subprocess.py
+++ b/task-sdk/src/airflow/sdk/coordinators/_subprocess.py
@@ -73,22 +73,63 @@ def _socket_address(value: tuple | str) -> tuple[str, int]
| None:
return str(host), int(port)
-def _is_connection_from_process(conn: socket.socket, proc: subprocess.Popen)
-> bool:
- """Return whether the accepted TCP connection belongs to the child
process."""
+def _connection_owned_by_process_tree(
+ peer: tuple[str, int], local: tuple[str, int], proc: subprocess.Popen
+) -> bool:
+ """
+ Return whether ``peer`` <-> ``local`` is an established connection in the
child's process tree.
+
+ The launched child may itself spawn the process that connects back to the
+ supervisor — a JVM launcher, a shell wrapper, or any runtime that forks a
+ worker — so the connecting peer can legitimately belong to a *descendant*
of
+ ``proc.pid`` rather than ``proc.pid`` itself. Every process in the subtree
+ rooted at ``proc.pid`` is part of the task and is trusted; a process
outside
+ that subtree (e.g. an unrelated local process racing for the port) is not.
+ """
+ try:
+ root = psutil.Process(proc.pid)
+ processes = [root, *root.children(recursive=True)]
+ except (psutil.AccessDenied, psutil.NoSuchProcess, psutil.ZombieProcess,
OSError):
+ return False
+ for process in processes:
+ try:
+ connections = process.net_connections(kind="tcp")
+ except (psutil.AccessDenied, psutil.NoSuchProcess,
psutil.ZombieProcess, OSError):
+ # A descendant may exit between enumeration and inspection — skip
it
+ # rather than failing verification for the whole tree.
+ continue
+ for connection in connections:
+ if _socket_address(connection.laddr) == peer and
_socket_address(connection.raddr) == local:
+ return True
+ return False
+
+
+def _is_connection_from_process(
+ conn: socket.socket,
+ proc: subprocess.Popen,
+ *,
+ verify_timeout: float = 1.0,
+ poll_interval: float = 0.05,
+) -> bool:
+ """
+ Return whether the accepted TCP connection originates from the child
process tree.
+
+ The connection is trusted only if it belongs to ``proc.pid`` or one of its
+ descendants. A freshly established connection is not always visible in
+ ``/proc`` the instant it is accepted, so the lookup is retried for up to
+ *verify_timeout* seconds before the connection is rejected.
+ """
peer = _socket_address(conn.getpeername())
local = _socket_address(conn.getsockname())
if peer is None or local is None:
return False
- try:
- process = psutil.Process(proc.pid)
- connections = process.net_connections(kind="tcp")
- except (psutil.AccessDenied, psutil.NoSuchProcess, psutil.ZombieProcess,
OSError):
- log.warning("Unable to verify child process connection", pid=proc.pid,
exc_info=True)
- return False
- for connection in connections:
- if _socket_address(connection.laddr) == peer and
_socket_address(connection.raddr) == local:
+ deadline = time.monotonic() + verify_timeout
+ while True:
+ if _connection_owned_by_process_tree(peer, local, proc):
return True
- return False
+ if time.monotonic() >= deadline:
+ return False
+ time.sleep(poll_interval)
def _accept_connections(
diff --git a/task-sdk/tests/task_sdk/coordinators/test_subprocess.py
b/task-sdk/tests/task_sdk/coordinators/test_subprocess.py
index 95a41e6282b..8f91e729532 100644
--- a/task-sdk/tests/task_sdk/coordinators/test_subprocess.py
+++ b/task-sdk/tests/task_sdk/coordinators/test_subprocess.py
@@ -27,6 +27,7 @@ import time
from unittest.mock import ANY, MagicMock, call, patch
import attrs
+import psutil
import pytest
from uuid6 import uuid7
@@ -34,6 +35,7 @@ from airflow.sdk.api.client import Client,
TaskInstanceOperations
from airflow.sdk.coordinators._subprocess import (
SubprocessCoordinator,
_accept_connections,
+ _connection_owned_by_process_tree,
_is_connection_from_process,
_PopenActivitySubprocess,
_ResourceTracker,
@@ -350,13 +352,87 @@ class TestConnectionFromProcess:
try:
with patch("airflow.sdk.coordinators._subprocess.psutil.Process")
as mock_process:
+ mock_process.return_value.children.return_value = []
mock_process.return_value.net_connections.return_value = []
- assert _is_connection_from_process(conn, mock_proc) is False
+ assert _is_connection_from_process(conn, mock_proc,
verify_timeout=0.0) is False
finally:
conn.close()
client.close()
server.close()
+ def test_matches_descendant_process_tcp_connection(self):
+ """A connection owned by a *descendant* of the child process is
accepted.
+
+ Regression test for the Java coordinator (#67781): the launched process
+ may itself spawn the runtime that connects back, so the peer can belong
+ to a descendant of ``proc.pid`` rather than ``proc.pid`` directly.
+ """
+ server = _start_server()
+ host, port = server.getsockname()
+ # A real subprocess — a descendant of this test process — opens the
connection.
+ connector = subprocess.Popen(
+ [
+ sys.executable,
+ "-c",
+ "import socket, sys, time; s = socket.socket(); "
+ "s.connect((sys.argv[1], int(sys.argv[2]))); time.sleep(30)",
+ host,
+ str(port),
+ ],
+ )
+ mock_proc = MagicMock(spec=subprocess.Popen)
+ mock_proc.pid = os.getpid() # connector is a descendant of this
process
+
+ try:
+ conn, _ = server.accept()
+ try:
+ assert _is_connection_from_process(conn, mock_proc) is True
+ finally:
+ conn.close()
+ finally:
+ connector.terminate()
+ connector.wait(timeout=5)
+ server.close()
+
+ def test_retries_until_ownership_is_confirmed(self):
+ """The lookup is retried while the connection is not yet visible in
/proc."""
+ conn = MagicMock()
+ conn.getpeername.return_value = ("127.0.0.1", 5000)
+ conn.getsockname.return_value = ("127.0.0.1", 6000)
+ mock_proc = MagicMock(spec=subprocess.Popen)
+ mock_proc.pid = 999
+
+ with patch(
+
"airflow.sdk.coordinators._subprocess._connection_owned_by_process_tree",
+ side_effect=[False, False, True],
+ ) as mock_owned:
+ assert _is_connection_from_process(conn, mock_proc,
poll_interval=0.0) is True
+ assert mock_owned.call_count == 3
+
+ def test_rejects_when_ownership_never_confirmed(self):
+ conn = MagicMock()
+ conn.getpeername.return_value = ("127.0.0.1", 5000)
+ conn.getsockname.return_value = ("127.0.0.1", 6000)
+ mock_proc = MagicMock(spec=subprocess.Popen)
+ mock_proc.pid = 999
+
+ with patch(
+
"airflow.sdk.coordinators._subprocess._connection_owned_by_process_tree",
+ return_value=False,
+ ):
+ assert (
+ _is_connection_from_process(conn, mock_proc,
verify_timeout=0.0, poll_interval=0.0) is False
+ )
+
+ def test_owned_by_tree_returns_false_when_process_gone(self):
+ mock_proc = MagicMock(spec=subprocess.Popen)
+ mock_proc.pid = 999999
+ with patch(
+ "airflow.sdk.coordinators._subprocess.psutil.Process",
+ side_effect=psutil.NoSuchProcess(999999),
+ ):
+ assert _connection_owned_by_process_tree(("127.0.0.1", 1),
("127.0.0.1", 2), mock_proc) is False
+
class TestResourceTracker:
"""