This is an automated email from the ASF dual-hosted git repository.
shahar1 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new f5b1161577f Fix flaky test_send_thread_safety in Task SDK comms tests
(#67811)
f5b1161577f is described below
commit f5b1161577fe31e739f924119672b2485b8e2954
Author: Jarek Potiuk <[email protected]>
AuthorDate: Mon Jun 1 07:30:57 2026 +0200
Fix flaky test_send_thread_safety in Task SDK comms tests (#67811)
The test assumed concurrent send() calls would be answered in thread-index
order, but the order in which the requests reach the socket is not
deterministic, so the parent must not hard-code a per-thread response order.
The parent now reads each request off the socket and echoes a response built
from that request's own body, so every thread reliably receives the response to
its own message regardless of scheduling. Production code is unchanged.
---
.../tests/task_sdk/execution_time/test_comms.py | 28 +++++++++++++++-------
1 file changed, 20 insertions(+), 8 deletions(-)
diff --git a/task-sdk/tests/task_sdk/execution_time/test_comms.py
b/task-sdk/tests/task_sdk/execution_time/test_comms.py
index 37a91dd0ecc..51782796b82 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_comms.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_comms.py
@@ -32,6 +32,7 @@ from airflow.sdk.execution_time.comms import (
MaskSecret,
StartupDetails,
VariableResult,
+ _RequestFrame,
_ResponseFrame,
)
@@ -165,10 +166,8 @@ class TestCommsDecoder:
num_threads = 5
results = [None] * num_threads
errors = [None] * num_threads
- request_sent = [threading.Event() for _ in range(num_threads)]
def send_and_store(idx):
- request_sent[idx].set() # Signal that this thread is about to send
try:
msg = VariableResult(key=f"key{idx}", value=f"value{idx}",
type="VariableResult")
results[idx] = decoder.send(msg)
@@ -179,12 +178,25 @@ class TestCommsDecoder:
for t in threads:
t.start()
- # For each thread, wait until it signals it's ready, then send the
response
- for idx in range(num_threads):
- request_sent[idx].wait()
- resp = {"type": "VariableResult", "key": f"key{idx}", "value":
f"value{idx}"}
- frame = _ResponseFrame(idx, resp, None)
- data = msgspec.msgpack.encode(frame)
+ def _recv_exactly(sock, n):
+ buffer = bytearray()
+ while len(buffer) < n:
+ chunk = sock.recv(n - len(buffer))
+ if not chunk:
+ raise EOFError("socket closed before a full frame was
received")
+ buffer.extend(chunk)
+ return bytes(buffer)
+
+ # The order in which the concurrent ``send`` calls reach the socket is
not
+ # deterministic, so the parent must not assume requests arrive in
thread-index
+ # order. Read each request as it arrives and echo a response built
from that
+ # request's own body, so every thread reliably gets the response to
its own
+ # message regardless of thread scheduling.
+ for _ in range(num_threads):
+ length = int.from_bytes(_recv_exactly(w, 4), byteorder="big")
+ request = msgspec.msgpack.decode(_recv_exactly(w, length),
type=_RequestFrame)
+ resp = {"type": "VariableResult", "key": request.body["key"],
"value": request.body["value"]}
+ data = msgspec.msgpack.encode(_ResponseFrame(request.id, resp,
None))
w.sendall(len(data).to_bytes(4, byteorder="big") + data)
for t in threads: