ashb commented on code in PR #67811:
URL: https://github.com/apache/airflow/pull/67811#discussion_r3330951528
##########
task-sdk/tests/task_sdk/execution_time/test_comms.py:
##########
@@ -179,12 +178,25 @@ def send_and_store(idx):
for t in threads:
t.start()
- # For each thread, wait until it signals it's ready, then send the
response
- for idx in range(num_threads):
- request_sent[idx].wait()
- resp = {"type": "VariableResult", "key": f"key{idx}", "value":
f"value{idx}"}
- frame = _ResponseFrame(idx, resp, None)
- data = msgspec.msgpack.encode(frame)
+ def _recv_exactly(sock, n):
+ buffer = bytearray()
+ while len(buffer) < n:
+ chunk = sock.recv(n - len(buffer))
+ if not chunk:
+ raise EOFError("socket closed before a full frame was
received")
+ buffer.extend(chunk)
+ return bytes(buffer)
+
+ # ``send`` serializes the write + read under a lock and correlates
responses FIFO,
Review Comment:
This isn't true - the lock was removed entirely
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]