This is an automated email from the ASF dual-hosted git repository.

jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 7212829ee59 Fix Java SDK tasks rejected by coordinator on IPv4-mapped 
IPv6 connections (#68169)
7212829ee59 is described below

commit 7212829ee593033f603c6a0d1776ba28cb7280b0
Author: Dev-iL <[email protected]>
AuthorDate: Sun Jun 7 10:51:57 2026 -0400

    Fix Java SDK tasks rejected by coordinator on IPv4-mapped IPv6 connections 
(#68169)
    
    The coordinator's TCP connection-ownership check compared the AF_INET
    supervisor socket's plain-IPv4 getpeername()/getsockname() against the
    connecting process's socket address as reported by psutil. On an
    IPv6-enabled host the JVM connects back over a dual-stack socket, so the
    kernel records its loopback connection as the IPv4-mapped
    "::ffff:127.0.0.1" in /proc/net/tcp6. The mapped and plain forms never
    compared equal, so both the comm and logs channels were rejected and
    every Java task died with "process exited with 1 before connecting"
    (Python tasks, which connect over plain IPv4, were unaffected).
    
    Canonicalize IPv4-mapped IPv6 addresses to their IPv4 form in
    _socket_address so both sides of the comparison match. The ownership
    property added in #67781 is preserved: the connection must still be on
    loopback and owned by the child's process tree.
    
    Re-enable the two Java SDK e2e tests that were temporarily xfail'd as a
    CI workaround for this bug.
---
 .../java_sdk_tests/test_java_sdk_dag.py            |  6 ----
 .../src/airflow/sdk/coordinators/_subprocess.py    | 14 +++++++++-
 .../tests/task_sdk/coordinators/test_subprocess.py | 32 ++++++++++++++++++++++
 3 files changed, 45 insertions(+), 7 deletions(-)

diff --git 
a/airflow-e2e-tests/tests/airflow_e2e_tests/java_sdk_tests/test_java_sdk_dag.py 
b/airflow-e2e-tests/tests/airflow_e2e_tests/java_sdk_tests/test_java_sdk_dag.py
index 523658d95d3..0e99a399f2d 100644
--- 
a/airflow-e2e-tests/tests/airflow_e2e_tests/java_sdk_tests/test_java_sdk_dag.py
+++ 
b/airflow-e2e-tests/tests/airflow_e2e_tests/java_sdk_tests/test_java_sdk_dag.py
@@ -64,9 +64,6 @@ class TestJavaSDKAnnotationExample:
     airflow_client = AirflowClient()
 
     @pytest.mark.parametrize("dag_id", ["java_annotation_example"])
-    @pytest.mark.xfail(
-        reason="Test is instable and needs fixing: 
https://github.com/apache/airflow/issues/68160";
-    )
     def test_java_tasks_execute_successfully(self, dag_id: str):
         """Both Java stubs in the annotation example must succeed."""
         resp = self.airflow_client.trigger_dag(
@@ -102,9 +99,6 @@ class TestJavaSDKAnnotationExample:
             f"  all tasks  : { {k: v.get('state') for k, v in ti_map.items()} 
}"
         )
 
-    @pytest.mark.xfail(
-        reason="Test is instable and needs fixing: 
https://github.com/apache/airflow/issues/68160";
-    )
     def test_transform_xcom_is_numeric_timestamp(self):
         """The value returned by the Java 'transform' task must be a positive 
integer."""
         resp = self.airflow_client.trigger_dag(
diff --git a/task-sdk/src/airflow/sdk/coordinators/_subprocess.py 
b/task-sdk/src/airflow/sdk/coordinators/_subprocess.py
index 89dfe5f71d7..49bea634ac5 100644
--- a/task-sdk/src/airflow/sdk/coordinators/_subprocess.py
+++ b/task-sdk/src/airflow/sdk/coordinators/_subprocess.py
@@ -27,6 +27,7 @@ draining machinery in this module rather than re-implementing 
it.
 
 from __future__ import annotations
 
+import ipaddress
 import itertools
 import os
 import selectors
@@ -70,7 +71,18 @@ def _socket_address(value: tuple | str) -> tuple[str, int] | 
None:
     if not isinstance(value, tuple) or len(value) < 2:
         return None
     host, port = value[:2]
-    return str(host), int(port)
+    host = str(host)
+    # Canonicalize IPv4-mapped IPv6 ("::ffff:127.0.0.1" -> "127.0.0.1") so a 
dual-stack
+    # client (e.g. the JVM, shown v4-mapped in /proc/net/tcp6) matches the 
AF_INET
+    # supervisor socket's plain-IPv4 address in the ownership check below.
+    try:
+        parsed = ipaddress.ip_address(host)
+    except ValueError:
+        pass
+    else:
+        if isinstance(parsed, ipaddress.IPv6Address) and parsed.ipv4_mapped is 
not None:
+            host = str(parsed.ipv4_mapped)
+    return host, int(port)
 
 
 def _connection_owned_by_process_tree(
diff --git a/task-sdk/tests/task_sdk/coordinators/test_subprocess.py 
b/task-sdk/tests/task_sdk/coordinators/test_subprocess.py
index 8f91e729532..2c043526a4e 100644
--- a/task-sdk/tests/task_sdk/coordinators/test_subprocess.py
+++ b/task-sdk/tests/task_sdk/coordinators/test_subprocess.py
@@ -341,6 +341,38 @@ class TestConnectionFromProcess:
             client.close()
             server.close()
 
+    def test_matches_dual_stack_ipv4_mapped_connection(self):
+        """A dual-stack (AF_INET6) client connecting to the IPv4 server is 
accepted.
+
+        Regression test for the Java coordinator (#67781 / #68147): on an
+        IPv6-enabled host the JVM connects back over a dual-stack socket, so 
the
+        kernel records its loopback connection as the IPv4-mapped
+        ``::ffff:127.0.0.1`` in ``/proc/net/tcp6``. The AF_INET supervisor 
socket's
+        ``getpeername()`` reports plain ``127.0.0.1``, so the ownership check 
must
+        treat the mapped and plain forms as the same address -- otherwise every
+        Java task is rejected with "process exited with 1 before connecting".
+        """
+        server = _start_server()
+        _, port = server.getsockname()
+        try:
+            client = socket.socket(socket.AF_INET6)
+            client.connect(("::ffff:127.0.0.1", port))
+        except OSError as e:
+            server.close()
+            pytest.skip(f"IPv6 loopback unavailable: {e}")
+        conn, _ = server.accept()
+        # Sanity: the client really is using the IPv4-mapped form the JVM 
exhibits.
+        assert client.getsockname()[0] == "::ffff:127.0.0.1"
+        mock_proc = MagicMock(spec=subprocess.Popen)
+        mock_proc.pid = os.getpid()
+
+        try:
+            assert _is_connection_from_process(conn, mock_proc) is True
+        finally:
+            conn.close()
+            client.close()
+            server.close()
+
     def test_rejects_tcp_connection_not_owned_by_child_process(self):
         server = _start_server()
         _, port = server.getsockname()

Reply via email to