kaxil commented on code in PR #59087:
URL: https://github.com/apache/airflow/pull/59087#discussion_r2672627083


##########
task-sdk/src/airflow/sdk/execution_time/comms.py:
##########
@@ -185,31 +187,69 @@ class CommsDecoder(Generic[ReceiveMsgType, SendMsgType]):
 
     err_decoder: TypeAdapter[ErrorResponse] = attrs.field(factory=lambda: 
TypeAdapter(ToTask), repr=False)
 
+    # Threading lock for sync operations
+    _thread_lock: threading.Lock = attrs.field(factory=threading.Lock, 
repr=False)
+    # Async lock for async operations
+    _async_lock: asyncio.Lock = attrs.field(factory=asyncio.Lock, repr=False)
+
     def send(self, msg: SendMsgType) -> ReceiveMsgType | None:
         """Send a request to the parent and block until the response is 
received."""
         frame = _RequestFrame(id=next(self.id_counter), body=msg.model_dump())
         frame_bytes = frame.as_bytes()
 
-        self.socket.sendall(frame_bytes)
-        if isinstance(msg, ResendLoggingFD):
-            if recv_fds is None:
-                return None
-            # We need special handling here! The server can't send us the fd 
number, as the number on the
-            # supervisor will be different to in this process, so we have to 
mutate the message ourselves here.
-            frame, fds = self._read_frame(maxfds=1)
-            resp = self._from_frame(frame)
-            if TYPE_CHECKING:
-                assert isinstance(resp, SentFDs)
-            resp.fds = fds
-            # Since we know this is an expliclt SendFDs, and since this class 
is generic SendFDs might not
-            # always be in the return type union
-            return resp  # type: ignore[return-value]
+        # We must make sure sockets aren't intermixed between sync and async 
calls,
+        # thus we need a dual locking mechanism to ensure that.
+        with self._thread_lock:
+            self.socket.sendall(frame_bytes)
+            if isinstance(msg, ResendLoggingFD):
+                if recv_fds is None:
+                    return None
+                # We need special handling here! The server can't send us the 
fd number, as the number on the
+                # supervisor will be different to in this process, so we have 
to mutate the message ourselves here.
+                frame, fds = self._read_frame(maxfds=1)
+                resp = self._from_frame(frame)
+                if TYPE_CHECKING:
+                    assert isinstance(resp, SentFDs)
+                resp.fds = fds
+                # Since we know this is an expliclt SendFDs, and since this 
class is generic SendFDs might not
+                # always be in the return type union
+                return resp  # type: ignore[return-value]
 
         return self._get_response()

Review Comment:
   If a sync send() and async asend() are called concurrently (which is now 
possible with mixed sync/async code), could the response reads interleave? The 
async version you have keeps the read inside the lock, but the sync version 
doesn't so curious.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to