dabla commented on code in PR #59087:
URL: https://github.com/apache/airflow/pull/59087#discussion_r2672685995
##########
task-sdk/src/airflow/sdk/execution_time/comms.py:
##########
@@ -185,31 +187,69 @@ class CommsDecoder(Generic[ReceiveMsgType, SendMsgType]):
err_decoder: TypeAdapter[ErrorResponse] = attrs.field(factory=lambda:
TypeAdapter(ToTask), repr=False)
+ # Threading lock for sync operations
+ _thread_lock: threading.Lock = attrs.field(factory=threading.Lock,
repr=False)
+ # Async lock for async operations
+ _async_lock: asyncio.Lock = attrs.field(factory=asyncio.Lock, repr=False)
+
def send(self, msg: SendMsgType) -> ReceiveMsgType | None:
"""Send a request to the parent and block until the response is
received."""
frame = _RequestFrame(id=next(self.id_counter), body=msg.model_dump())
frame_bytes = frame.as_bytes()
- self.socket.sendall(frame_bytes)
- if isinstance(msg, ResendLoggingFD):
- if recv_fds is None:
- return None
- # We need special handling here! The server can't send us the fd
number, as the number on the
- # supervisor will be different to in this process, so we have to
mutate the message ourselves here.
- frame, fds = self._read_frame(maxfds=1)
- resp = self._from_frame(frame)
- if TYPE_CHECKING:
- assert isinstance(resp, SentFDs)
- resp.fds = fds
- # Since we know this is an expliclt SendFDs, and since this class
is generic SendFDs might not
- # always be in the return type union
- return resp # type: ignore[return-value]
+ # We must make sure sockets aren't intermixed between sync and async
calls,
+ # thus we need a dual locking mechanism to ensure that.
+ with self._thread_lock:
+ self.socket.sendall(frame_bytes)
+ if isinstance(msg, ResendLoggingFD):
+ if recv_fds is None:
+ return None
+ # We need special handling here! The server can't send us the
fd number, as the number on the
+ # supervisor will be different to in this process, so we have
to mutate the message ourselves here.
+ frame, fds = self._read_frame(maxfds=1)
+ resp = self._from_frame(frame)
+ if TYPE_CHECKING:
+ assert isinstance(resp, SentFDs)
+ resp.fds = fds
+ # Since we know this is an expliclt SendFDs, and since this
class is generic SendFDs might not
+ # always be in the return type union
+ return resp # type: ignore[return-value]
return self._get_response()
Review Comment:
Normally it should behave correctly, unless I missed something, as the async
version checks both async and sync lock, the sync version only checks the sync
lock. We are now running the async PythonOperator since right after the Summit
and until today haven't experienced any issues with it, I had issues in the
beginning and this is how I came up with this solution as indeed it could lead
to mix-ups very quickly without those locks.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]