reiabreu commented on code in PR #8707:
URL: https://github.com/apache/storm/pull/8707#discussion_r3307281266


##########
storm-client/src/jvm/org/apache/storm/serialization/KryoTupleSerializer.java:
##########
@@ -38,7 +51,16 @@ public byte[] serialize(Tuple tuple) {
             kryoOut.writeInt(ids.getStreamId(tuple.getSourceComponent(), 
tuple.getSourceStreamId()), true);
             tuple.getMessageId().serialize(kryoOut);
             kryo.serializeInto(tuple.getValues(), kryoOut);
-            return kryoOut.toBytes();
+
+            byte[] rawBytes = kryoOut.getBuffer();
+            int dataLength = kryoOut.position();
+
+            if (this.isCompressionEnabled && dataLength > 
this.compressionThreshold) {

Review Comment:
   I ran this by AI and it proposed a low level optimization that is very 
interesting
   
   
   Minor area where we can optimize memory allocations and garbage collection 
overhead. In the current compression path, we perform two allocations and two 
copy operations before the final payload is returned:               
                                                                                
                                                                                
                                                                              
     1.  Arrays.copyOf(rawBytes, dataLength)  inside  KryoTupleSerializer  to 
extract the active slice.
     2.  bos.toByteArray()  inside  Utils.ZstdUtils.compress  to extract the 
compressed data.
     
     We can cut this in half by introducing an offset-aware signature for  
Utils.ZstdUtils.compress . This lets us stream the active slice of the 
pre-allocated Kryo buffer directly into Zstd, avoiding the first copy entirely. 
            
     
     Here is the proposed change:
     
     #### 1. In  Utils.java  ( ZstdUtils  class)
     
     Add a helper signature that accepts an  offset  and  length :
     
       public static byte[] compress(byte[] data, int offset, int length, int 
compressionLevel) {
           if (data == null || length == 0) {
               return new byte[0];
           }
     
           try (ByteArrayOutputStream bos = new ByteArrayOutputStream(length)) {
               try (ZstdCompressorOutputStream zstdOut = 
ZstdCompressorOutputStream.builder()
                       .setOutputStream(bos)
                       .setBufferSize(BUFFER_SIZE)
                       .setLevel(compressionLevel)
                       .get()) {
                   zstdOut.write(data, offset, length); // Write the slice 
directly
                   zstdOut.finish();
               }
               return bos.toByteArray();
           } catch (IOException e) {
               throw new RuntimeException("Zstd compression failed", e);
           }
       }
     
       // Keep the existing signature for backwards compatibility
       public static byte[] compress(byte[] data, int compressionLevel) {
           return compress(data, 0, data == null ? 0 : data.length, 
compressionLevel);
       }
     
     #### 2. In  KryoTupleSerializer.java 
     
     Update the serialization call site to pass the raw buffer along with its 
offset and length directly:
     
       -            if (this.isCompressionEnabled && dataLength > 
this.compressionThreshold) {
       -                byte[] bytesToCompress = Arrays.copyOf(rawBytes, 
dataLength);
       -                return Utils.ZstdUtils.compress(bytesToCompress, 
this.zstdCompressionLevel);
       -            } else {
       -                return Arrays.copyOf(rawBytes, dataLength);
       -            }
       +            if (this.isCompressionEnabled && dataLength > 
this.compressionThreshold) {
       +                return Utils.ZstdUtils.compress(rawBytes, 0, 
dataLength, this.zstdCompressionLevel);
       +            } else {
       +                return Arrays.copyOf(rawBytes, dataLength);
       +            }
     
     This simple optimization reduces GC allocation and copy overhead by 50% 
for every compressed tuple on the serialization sid



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to