kaxil commented on code in PR #59087:
URL: https://github.com/apache/airflow/pull/59087#discussion_r2672627083
##########
task-sdk/src/airflow/sdk/execution_time/comms.py:
##########
@@ -185,31 +187,69 @@ class CommsDecoder(Generic[ReceiveMsgType, SendMsgType]):
err_decoder: TypeAdapter[ErrorResponse] = attrs.field(factory=lambda:
TypeAdapter(ToTask), repr=False)
+ # Threading lock for sync operations
+ _thread_lock: threading.Lock = attrs.field(factory=threading.Lock,
repr=False)
+ # Async lock for async operations
+ _async_lock: asyncio.Lock = attrs.field(factory=asyncio.Lock, repr=False)
+
def send(self, msg: SendMsgType) -> ReceiveMsgType | None:
"""Send a request to the parent and block until the response is
received."""
frame = _RequestFrame(id=next(self.id_counter), body=msg.model_dump())
frame_bytes = frame.as_bytes()
- self.socket.sendall(frame_bytes)
- if isinstance(msg, ResendLoggingFD):
- if recv_fds is None:
- return None
- # We need special handling here! The server can't send us the fd
number, as the number on the
- # supervisor will be different to in this process, so we have to
mutate the message ourselves here.
- frame, fds = self._read_frame(maxfds=1)
- resp = self._from_frame(frame)
- if TYPE_CHECKING:
- assert isinstance(resp, SentFDs)
- resp.fds = fds
- # Since we know this is an expliclt SendFDs, and since this class
is generic SendFDs might not
- # always be in the return type union
- return resp # type: ignore[return-value]
+ # We must make sure sockets aren't intermixed between sync and async
calls,
+ # thus we need a dual locking mechanism to ensure that.
+ with self._thread_lock:
+ self.socket.sendall(frame_bytes)
+ if isinstance(msg, ResendLoggingFD):
+ if recv_fds is None:
+ return None
+ # We need special handling here! The server can't send us the
fd number, as the number on the
+ # supervisor will be different to in this process, so we have
to mutate the message ourselves here.
+ frame, fds = self._read_frame(maxfds=1)
+ resp = self._from_frame(frame)
+ if TYPE_CHECKING:
+ assert isinstance(resp, SentFDs)
+ resp.fds = fds
+ # Since we know this is an expliclt SendFDs, and since this
class is generic SendFDs might not
+ # always be in the return type union
+ return resp # type: ignore[return-value]
return self._get_response()
Review Comment:
If a sync send() and async asend() are called concurrently (which is now
possible with mixed sync/async code), could the response reads interleave? The
async version you have keeps the read inside the lock, but the sync version
doesn't so curious.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]