jason810496 commented on code in PR #65958:
URL: https://github.com/apache/airflow/pull/65958#discussion_r3271856268


##########
task-sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -730,35 +733,51 @@ def _create_log_forwarder(self, loggers, name, 
log_level=logging.INFO) -> Callab
 
     def _on_socket_closed(self, sock: socket):
         # We want to keep servicing this process until we've read up to EOF 
from all the sockets.
-
         with suppress(KeyError):
             self.selector.unregister(sock)
             del self._open_sockets[sock]
 
+    def _serialize_response(self, msg: BaseModel | ErrorResponse, **dump_opts) 
-> dict[str, Any]:
+        if self._subprocess_schema_version is not None:
+            migrator = get_schema_version_migrator()
+            msg = migrator.downgrade(msg, self._subprocess_schema_version, 
dump_kwargs=dump_opts)
+        return msg.model_dump(**dump_opts)
+
     def send_msg(
-        self, msg: BaseModel | None, request_id: int, error: ErrorResponse | 
None = None, **dump_opts
+        self,
+        msg: BaseModel | None,
+        request_id: int,
+        error: ErrorResponse | None = None,
+        **dump_opts,
     ):
         """
         Send the msg as a length-prefixed response frame.
 
-        ``request_id`` is the ID that the client sent in it's request, and has 
no meaning to the server
-
+        :param request_id: The ID sent in the request by the client. This has 
no
+            meaning to the server, and is only included in the response frame
+            for the client to identify what the response is for.
         """
         if msg:
-            frame = _ResponseFrame(id=request_id, 
body=msg.model_dump(**dump_opts))
+            frame = _ResponseFrame(id=request_id, 
body=self._serialize_response(msg, **dump_opts))
         else:
-            err_resp = error.model_dump() if error else None
+            err_resp = self._serialize_response(error) if error else None
             frame = _ResponseFrame(id=request_id, error=err_resp)
-
         self.stdin.sendall(frame.as_bytes())
 
+    def _deserialize_request(self, body: dict[str, Any] | None) -> dict[str, 
Any] | None:

Review Comment:
   For `_deserialize_request`, `_serialize_response` naming. Would it be better 
to name as something with migrate, compat, upgrade or downgrade? Since, those 
methods are really migrating the schema, using de/ serialize is a bit too 
ambiguous.



##########
task-sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -730,35 +733,51 @@ def _create_log_forwarder(self, loggers, name, 
log_level=logging.INFO) -> Callab
 
     def _on_socket_closed(self, sock: socket):
         # We want to keep servicing this process until we've read up to EOF 
from all the sockets.
-
         with suppress(KeyError):
             self.selector.unregister(sock)
             del self._open_sockets[sock]
 
+    def _serialize_response(self, msg: BaseModel | ErrorResponse, **dump_opts) 
-> dict[str, Any]:
+        if self._subprocess_schema_version is not None:
+            migrator = get_schema_version_migrator()
+            msg = migrator.downgrade(msg, self._subprocess_schema_version, 
dump_kwargs=dump_opts)
+        return msg.model_dump(**dump_opts)
+
     def send_msg(
-        self, msg: BaseModel | None, request_id: int, error: ErrorResponse | 
None = None, **dump_opts
+        self,
+        msg: BaseModel | None,
+        request_id: int,
+        error: ErrorResponse | None = None,
+        **dump_opts,
     ):
         """
         Send the msg as a length-prefixed response frame.
 
-        ``request_id`` is the ID that the client sent in it's request, and has 
no meaning to the server
-
+        :param request_id: The ID sent in the request by the client. This has 
no
+            meaning to the server, and is only included in the response frame
+            for the client to identify what the response is for.
         """
         if msg:
-            frame = _ResponseFrame(id=request_id, 
body=msg.model_dump(**dump_opts))
+            frame = _ResponseFrame(id=request_id, 
body=self._serialize_response(msg, **dump_opts))
         else:
-            err_resp = error.model_dump() if error else None
+            err_resp = self._serialize_response(error) if error else None
             frame = _ResponseFrame(id=request_id, error=err_resp)
-
         self.stdin.sendall(frame.as_bytes())
 
+    def _deserialize_request(self, body: dict[str, Any] | None) -> dict[str, 
Any] | None:

Review Comment:
   For `_deserialize_request`, `_serialize_response` naming. Would it be better 
to name as something with migrate, compat, upgrade or downgrade? Since, those 
methods are really migrating the schema, using de/ serialize is a bit ambiguous.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to