gopidesupavan commented on code in PR #51699:
URL: https://github.com/apache/airflow/pull/51699#discussion_r2159634181


##########
task-sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -552,37 +542,52 @@ def _register_pipe_readers(self, stdout: socket, stderr: 
socket, requests: socke
         self.selector.register(
             requests,
             selectors.EVENT_READ,
-            make_buffered_socket_reader(self.handle_requests(log), 
on_close=self._on_socket_closed),
+            length_prefixed_frame_reader(self.handle_requests(log), 
on_close=self._on_socket_closed),
         )
 
-    def _create_socket_handler(self, loggers, channel, log_level=logging.INFO) 
-> Callable[[socket], bool]:
+    def _create_log_forwarder(self, loggers, channel, log_level=logging.INFO) 
-> Callable[[socket], bool]:
         """Create a socket handler that forwards logs to a logger."""
         return make_buffered_socket_reader(
             forward_to_log(loggers, chan=channel, level=log_level), 
on_close=self._on_socket_closed
         )
 
-    def _on_socket_closed(self):
+    def _on_socket_closed(self, sock: socket):
         # We want to keep servicing this process until we've read up to EOF 
from all the sockets.
-        self._num_open_sockets -= 1
 
-    def send_msg(self, msg: BaseModel, **dump_opts):
-        """Send the given pydantic message to the subprocess at once by 
encoding it and adding a line break."""
-        b = msg.model_dump_json(**dump_opts).encode() + b"\n"
-        self.stdin.sendall(b)
+        with suppress(KeyError):
+            self.selector.unregister(sock)
+            del self._open_sockets[sock]
+
+    def send_msg(
+        self, msg: BaseModel | None, request_id: int, error: ErrorResponse | 
None = None, **dump_opts
+    ):
+        """
+        Send the msg as a length-prefixed response frame.
+
+        ``request_id`` is the ID that the client sent in it's request, and has 
no meaning to the server
+
+        """
+        if msg:
+            frame = _ResponseFrame(id=request_id, 
body=msg.model_dump(**dump_opts))
+        else:
+            err_resp = error.model_dump() if error else None
+            frame = _ResponseFrame(id=request_id, error=err_resp)
+
+        self.stdin.sendall(frame.as_bytes())
 
-    def handle_requests(self, log: FilteringBoundLogger) -> Generator[None, 
bytes, None]:
+    def handle_requests(self, log: FilteringBoundLogger) -> Generator[None, 
_RequestFrame, None]:
         """Handle incoming requests from the task process, respond with the 
appropriate data."""
         while True:
-            line = yield
+            request = yield
 
             try:
-                msg = self.decoder.validate_json(line)
+                msg = self.decoder.validate_python(request.body)
             except Exception:
-                log.exception("Unable to decode message", line=line)
+                log.exception("Unable to decode message", body=request.body)
                 continue

Review Comment:
   Why we are continuing? i think if we unable to decode we should respond ? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to