This is an automated email from the ASF dual-hosted git repository.

shahar1 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new f5b1161577f Fix flaky test_send_thread_safety in Task SDK comms tests 
(#67811)
f5b1161577f is described below

commit f5b1161577fe31e739f924119672b2485b8e2954
Author: Jarek Potiuk <[email protected]>
AuthorDate: Mon Jun 1 07:30:57 2026 +0200

    Fix flaky test_send_thread_safety in Task SDK comms tests (#67811)
    
    The test assumed concurrent send() calls would be answered in thread-index 
order, but the order in which the requests reach the socket is not 
deterministic, so the parent must not hard-code a per-thread response order. 
The parent now reads each request off the socket and echoes a response built 
from that request's own body, so every thread reliably receives the response to 
its own message regardless of scheduling. Production code is unchanged.
---
 .../tests/task_sdk/execution_time/test_comms.py    | 28 +++++++++++++++-------
 1 file changed, 20 insertions(+), 8 deletions(-)

diff --git a/task-sdk/tests/task_sdk/execution_time/test_comms.py 
b/task-sdk/tests/task_sdk/execution_time/test_comms.py
index 37a91dd0ecc..51782796b82 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_comms.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_comms.py
@@ -32,6 +32,7 @@ from airflow.sdk.execution_time.comms import (
     MaskSecret,
     StartupDetails,
     VariableResult,
+    _RequestFrame,
     _ResponseFrame,
 )
 
@@ -165,10 +166,8 @@ class TestCommsDecoder:
         num_threads = 5
         results = [None] * num_threads
         errors = [None] * num_threads
-        request_sent = [threading.Event() for _ in range(num_threads)]
 
         def send_and_store(idx):
-            request_sent[idx].set()  # Signal that this thread is about to send
             try:
                 msg = VariableResult(key=f"key{idx}", value=f"value{idx}", 
type="VariableResult")
                 results[idx] = decoder.send(msg)
@@ -179,12 +178,25 @@ class TestCommsDecoder:
         for t in threads:
             t.start()
 
-        # For each thread, wait until it signals it's ready, then send the 
response
-        for idx in range(num_threads):
-            request_sent[idx].wait()
-            resp = {"type": "VariableResult", "key": f"key{idx}", "value": 
f"value{idx}"}
-            frame = _ResponseFrame(idx, resp, None)
-            data = msgspec.msgpack.encode(frame)
+        def _recv_exactly(sock, n):
+            buffer = bytearray()
+            while len(buffer) < n:
+                chunk = sock.recv(n - len(buffer))
+                if not chunk:
+                    raise EOFError("socket closed before a full frame was 
received")
+                buffer.extend(chunk)
+            return bytes(buffer)
+
+        # The order in which the concurrent ``send`` calls reach the socket is 
not
+        # deterministic, so the parent must not assume requests arrive in 
thread-index
+        # order. Read each request as it arrives and echo a response built 
from that
+        # request's own body, so every thread reliably gets the response to 
its own
+        # message regardless of thread scheduling.
+        for _ in range(num_threads):
+            length = int.from_bytes(_recv_exactly(w, 4), byteorder="big")
+            request = msgspec.msgpack.decode(_recv_exactly(w, length), 
type=_RequestFrame)
+            resp = {"type": "VariableResult", "key": request.body["key"], 
"value": request.body["value"]}
+            data = msgspec.msgpack.encode(_ResponseFrame(request.id, resp, 
None))
             w.sendall(len(data).to_bytes(4, byteorder="big") + data)
 
         for t in threads:

Reply via email to