pan3793 commented on code in PR #3177:
URL: https://github.com/apache/celeborn/pull/3177#discussion_r2024356421
##########
common/src/main/scala/org/apache/celeborn/common/serializer/JavaSerializer.scala:
##########
@@ -98,19 +99,49 @@ private[celeborn] class JavaSerializerInstance(
override def serialize[T: ClassTag](t: T): ByteBuffer = {
val bos = new ByteBufferOutputStream()
+ val msg = Utils.toTransportMessage(t)
+ msg match {
+ case transMsg: TransportMessage =>
+ // Check if the msg is a TransportMessage with language-agnostic V2
serdeVersion.
+ // If so, write the marker and the body explicitly.
+ if (transMsg.getSerdeVersion == SerdeVersion.V2) {
Review Comment:
I understand this is the minimal change, though it's a little weird to write
that logic inside `JavaSerializer*`. If you don't want to touch much Java/Scala
code, I'm OK with the current state.
##########
common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala:
##########
@@ -504,6 +504,20 @@ private[celeborn] class RequestMessage(
writeRpcAddress(out, senderAddress)
writeRpcAddress(out, receiver.address)
out.writeUTF(receiver.name)
+ val msg = Utils.toTransportMessage(content)
+ msg match {
+ case transMsg: TransportMessage =>
+ // Check if the msg is a TransportMessage with language-agnostic V2
serdeVersion.
+ // If so, write the marker and the body explicitly.
+ if (transMsg.getSerdeVersion == SerdeVersion.V2) {
+ val out = new DataOutputStream(bos)
+ out.writeByte(SerdeVersion.V2.getMarker)
+ out.write(transMsg.toByteBuffer.array)
+ out.close()
+ return bos.toByteBuffer
Review Comment:
the memory copy can be reduced if we touch `transMsg.toByteBuffer`, but the
optimization can be deferred to future changes.
##########
common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala:
##########
@@ -504,6 +504,20 @@ private[celeborn] class RequestMessage(
writeRpcAddress(out, senderAddress)
writeRpcAddress(out, receiver.address)
out.writeUTF(receiver.name)
+ val msg = Utils.toTransportMessage(content)
+ msg match {
+ case transMsg: TransportMessage =>
+ // Check if the msg is a TransportMessage with language-agnostic V2
serdeVersion.
+ // If so, write the marker and the body explicitly.
+ if (transMsg.getSerdeVersion == SerdeVersion.V2) {
+ val out = new DataOutputStream(bos)
+ out.writeByte(SerdeVersion.V2.getMarker)
+ out.write(transMsg.toByteBuffer.array)
+ out.close()
+ return bos.toByteBuffer
Review Comment:
possible to move it `JavaSerializationStream`? to narrow the change scope to
serde layer
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]