ashb commented on code in PR #48949:
URL: https://github.com/apache/airflow/pull/48949#discussion_r2035334063


##########
task-sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -128,32 +124,26 @@
     TerminalTIState.SUCCESS,
 ]
 
-
-@overload
-def mkpipe() -> tuple[socket, socket]: ...
-
-
-@overload
-def mkpipe(remote_read: Literal[True]) -> tuple[socket, BinaryIO]: ...
+# Setting a fair buffer size here to handle most message sizes. Intention is 
to enforce a buffer size
+# that is big enough to handle small to medium messages while not enforcing 
hard latency issues
+BUFFER_SIZE = 4096
 
 
 def mkpipe(
     remote_read: bool = False,
-) -> tuple[socket, socket | BinaryIO]:
+) -> tuple[socket, socket]:
     """Create a pair of connected sockets."""
     rsock, wsock = socketpair()
     local, remote = (wsock, rsock) if remote_read else (rsock, wsock)
 
-    local.setblocking(False)
-
-    io: BinaryIO | socket
     if remote_read:
-        # If _we_ are writing, we don't want to buffer
-        io = cast("BinaryIO", local.makefile("wb", buffering=0))
-    else:
-        io = local
+        # Setting a 4KB buffer here if possible, if not, it still works, so we 
will suppress all exceptions
+        with suppress(Exception):
+            local.setsockopt(SO_SNDBUF, SOL_SOCKET, BUFFER_SIZE)
+        # set nonblocking to True so that send or sendall waits till all data 
is sent
+        local.setblocking(True)

Review Comment:
   Nope, it should be fine as it's in the supervisor, and we can't proceed 
before we've written the entire message anyway. (And client can't do anything 
else on the socket until it's read the response)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to