This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 7212829ee59 Fix Java SDK tasks rejected by coordinator on IPv4-mapped
IPv6 connections (#68169)
7212829ee59 is described below
commit 7212829ee593033f603c6a0d1776ba28cb7280b0
Author: Dev-iL <[email protected]>
AuthorDate: Sun Jun 7 10:51:57 2026 -0400
Fix Java SDK tasks rejected by coordinator on IPv4-mapped IPv6 connections
(#68169)
The coordinator's TCP connection-ownership check compared the AF_INET
supervisor socket's plain-IPv4 getpeername()/getsockname() against the
connecting process's socket address as reported by psutil. On an
IPv6-enabled host the JVM connects back over a dual-stack socket, so the
kernel records its loopback connection as the IPv4-mapped
"::ffff:127.0.0.1" in /proc/net/tcp6. The mapped and plain forms never
compared equal, so both the comm and logs channels were rejected and
every Java task died with "process exited with 1 before connecting"
(Python tasks, which connect over plain IPv4, were unaffected).
Canonicalize IPv4-mapped IPv6 addresses to their IPv4 form in
_socket_address so both sides of the comparison match. The ownership
property added in #67781 is preserved: the connection must still be on
loopback and owned by the child's process tree.
Re-enable the two Java SDK e2e tests that were temporarily xfail'd as a
CI workaround for this bug.
---
.../java_sdk_tests/test_java_sdk_dag.py | 6 ----
.../src/airflow/sdk/coordinators/_subprocess.py | 14 +++++++++-
.../tests/task_sdk/coordinators/test_subprocess.py | 32 ++++++++++++++++++++++
3 files changed, 45 insertions(+), 7 deletions(-)
diff --git
a/airflow-e2e-tests/tests/airflow_e2e_tests/java_sdk_tests/test_java_sdk_dag.py
b/airflow-e2e-tests/tests/airflow_e2e_tests/java_sdk_tests/test_java_sdk_dag.py
index 523658d95d3..0e99a399f2d 100644
---
a/airflow-e2e-tests/tests/airflow_e2e_tests/java_sdk_tests/test_java_sdk_dag.py
+++
b/airflow-e2e-tests/tests/airflow_e2e_tests/java_sdk_tests/test_java_sdk_dag.py
@@ -64,9 +64,6 @@ class TestJavaSDKAnnotationExample:
airflow_client = AirflowClient()
@pytest.mark.parametrize("dag_id", ["java_annotation_example"])
- @pytest.mark.xfail(
- reason="Test is instable and needs fixing:
https://github.com/apache/airflow/issues/68160"
- )
def test_java_tasks_execute_successfully(self, dag_id: str):
"""Both Java stubs in the annotation example must succeed."""
resp = self.airflow_client.trigger_dag(
@@ -102,9 +99,6 @@ class TestJavaSDKAnnotationExample:
f" all tasks : { {k: v.get('state') for k, v in ti_map.items()}
}"
)
- @pytest.mark.xfail(
- reason="Test is instable and needs fixing:
https://github.com/apache/airflow/issues/68160"
- )
def test_transform_xcom_is_numeric_timestamp(self):
"""The value returned by the Java 'transform' task must be a positive
integer."""
resp = self.airflow_client.trigger_dag(
diff --git a/task-sdk/src/airflow/sdk/coordinators/_subprocess.py
b/task-sdk/src/airflow/sdk/coordinators/_subprocess.py
index 89dfe5f71d7..49bea634ac5 100644
--- a/task-sdk/src/airflow/sdk/coordinators/_subprocess.py
+++ b/task-sdk/src/airflow/sdk/coordinators/_subprocess.py
@@ -27,6 +27,7 @@ draining machinery in this module rather than re-implementing
it.
from __future__ import annotations
+import ipaddress
import itertools
import os
import selectors
@@ -70,7 +71,18 @@ def _socket_address(value: tuple | str) -> tuple[str, int] |
None:
if not isinstance(value, tuple) or len(value) < 2:
return None
host, port = value[:2]
- return str(host), int(port)
+ host = str(host)
+ # Canonicalize IPv4-mapped IPv6 ("::ffff:127.0.0.1" -> "127.0.0.1") so a
dual-stack
+ # client (e.g. the JVM, shown v4-mapped in /proc/net/tcp6) matches the
AF_INET
+ # supervisor socket's plain-IPv4 address in the ownership check below.
+ try:
+ parsed = ipaddress.ip_address(host)
+ except ValueError:
+ pass
+ else:
+ if isinstance(parsed, ipaddress.IPv6Address) and parsed.ipv4_mapped is
not None:
+ host = str(parsed.ipv4_mapped)
+ return host, int(port)
def _connection_owned_by_process_tree(
diff --git a/task-sdk/tests/task_sdk/coordinators/test_subprocess.py
b/task-sdk/tests/task_sdk/coordinators/test_subprocess.py
index 8f91e729532..2c043526a4e 100644
--- a/task-sdk/tests/task_sdk/coordinators/test_subprocess.py
+++ b/task-sdk/tests/task_sdk/coordinators/test_subprocess.py
@@ -341,6 +341,38 @@ class TestConnectionFromProcess:
client.close()
server.close()
+ def test_matches_dual_stack_ipv4_mapped_connection(self):
+ """A dual-stack (AF_INET6) client connecting to the IPv4 server is
accepted.
+
+ Regression test for the Java coordinator (#67781 / #68147): on an
+ IPv6-enabled host the JVM connects back over a dual-stack socket, so
the
+ kernel records its loopback connection as the IPv4-mapped
+ ``::ffff:127.0.0.1`` in ``/proc/net/tcp6``. The AF_INET supervisor
socket's
+ ``getpeername()`` reports plain ``127.0.0.1``, so the ownership check
must
+ treat the mapped and plain forms as the same address -- otherwise every
+ Java task is rejected with "process exited with 1 before connecting".
+ """
+ server = _start_server()
+ _, port = server.getsockname()
+ try:
+ client = socket.socket(socket.AF_INET6)
+ client.connect(("::ffff:127.0.0.1", port))
+ except OSError as e:
+ server.close()
+ pytest.skip(f"IPv6 loopback unavailable: {e}")
+ conn, _ = server.accept()
+ # Sanity: the client really is using the IPv4-mapped form the JVM
exhibits.
+ assert client.getsockname()[0] == "::ffff:127.0.0.1"
+ mock_proc = MagicMock(spec=subprocess.Popen)
+ mock_proc.pid = os.getpid()
+
+ try:
+ assert _is_connection_from_process(conn, mock_proc) is True
+ finally:
+ conn.close()
+ client.close()
+ server.close()
+
def test_rejects_tcp_connection_not_owned_by_child_process(self):
server = _start_server()
_, port = server.getsockname()