pan3793 commented on code in PR #3177:
URL: https://github.com/apache/celeborn/pull/3177#discussion_r2024356421


##########
common/src/main/scala/org/apache/celeborn/common/serializer/JavaSerializer.scala:
##########
@@ -98,19 +99,49 @@ private[celeborn] class JavaSerializerInstance(
 
   override def serialize[T: ClassTag](t: T): ByteBuffer = {
     val bos = new ByteBufferOutputStream()
+    val msg = Utils.toTransportMessage(t)
+    msg match {
+      case transMsg: TransportMessage =>
+        // Check if the msg is a TransportMessage with language-agnostic V2 
serdeVersion.
+        // If so, write the marker and the body explicitly.
+        if (transMsg.getSerdeVersion == SerdeVersion.V2) {

Review Comment:
   I understand this is the minimal change, though it's a little weird to write 
that logic inside `JavaSerializer*`. If you don't want to touch much Java/Scala 
code, I'm OK with the current state.



##########
common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala:
##########
@@ -504,6 +504,20 @@ private[celeborn] class RequestMessage(
       writeRpcAddress(out, senderAddress)
       writeRpcAddress(out, receiver.address)
       out.writeUTF(receiver.name)
+      val msg = Utils.toTransportMessage(content)
+      msg match {
+        case transMsg: TransportMessage =>
+          // Check if the msg is a TransportMessage with language-agnostic V2 
serdeVersion.
+          // If so, write the marker and the body explicitly.
+          if (transMsg.getSerdeVersion == SerdeVersion.V2) {
+            val out = new DataOutputStream(bos)
+            out.writeByte(SerdeVersion.V2.getMarker)
+            out.write(transMsg.toByteBuffer.array)
+            out.close()
+            return bos.toByteBuffer

Review Comment:
   the memory copy can be reduced if we touch `transMsg.toByteBuffer`, but the 
optimization can be deferred to future changes.



##########
common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala:
##########
@@ -504,6 +504,20 @@ private[celeborn] class RequestMessage(
       writeRpcAddress(out, senderAddress)
       writeRpcAddress(out, receiver.address)
       out.writeUTF(receiver.name)
+      val msg = Utils.toTransportMessage(content)
+      msg match {
+        case transMsg: TransportMessage =>
+          // Check if the msg is a TransportMessage with language-agnostic V2 
serdeVersion.
+          // If so, write the marker and the body explicitly.
+          if (transMsg.getSerdeVersion == SerdeVersion.V2) {
+            val out = new DataOutputStream(bos)
+            out.writeByte(SerdeVersion.V2.getMarker)
+            out.write(transMsg.toByteBuffer.array)
+            out.close()
+            return bos.toByteBuffer

Review Comment:
   possible to move it `JavaSerializationStream`? to narrow the change scope to 
serde layer



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to