reiabreu commented on code in PR #8707:
URL: https://github.com/apache/storm/pull/8707#discussion_r3307281266
##########
storm-client/src/jvm/org/apache/storm/serialization/KryoTupleSerializer.java:
##########
@@ -38,7 +51,16 @@ public byte[] serialize(Tuple tuple) {
kryoOut.writeInt(ids.getStreamId(tuple.getSourceComponent(),
tuple.getSourceStreamId()), true);
tuple.getMessageId().serialize(kryoOut);
kryo.serializeInto(tuple.getValues(), kryoOut);
- return kryoOut.toBytes();
+
+ byte[] rawBytes = kryoOut.getBuffer();
+ int dataLength = kryoOut.position();
+
+ if (this.isCompressionEnabled && dataLength >
this.compressionThreshold) {
Review Comment:
I ran this by AI and it did a low level optimization that is very interesting
Minor area where we can optimize memory allocations and garbage collection
overhead. In the current compression path, we perform two allocations and two
copy operations before the final payload is returned:
1. Arrays.copyOf(rawBytes, dataLength) inside KryoTupleSerializer to
extract the active slice.
2. bos.toByteArray() inside Utils.ZstdUtils.compress to extract the
compressed data.
We can cut this in half by introducing an offset-aware signature for
Utils.ZstdUtils.compress . This lets us stream the active slice of the
pre-allocated Kryo buffer directly into Zstd, avoiding the first copy entirely.
Here is the proposed change:
#### 1. In Utils.java ( ZstdUtils class)
Add a helper signature that accepts an offset and length :
public static byte[] compress(byte[] data, int offset, int length, int
compressionLevel) {
if (data == null || length == 0) {
return new byte[0];
}
try (ByteArrayOutputStream bos = new ByteArrayOutputStream(length)) {
try (ZstdCompressorOutputStream zstdOut =
ZstdCompressorOutputStream.builder()
.setOutputStream(bos)
.setBufferSize(BUFFER_SIZE)
.setLevel(compressionLevel)
.get()) {
zstdOut.write(data, offset, length); // Write the slice
directly
zstdOut.finish();
}
return bos.toByteArray();
} catch (IOException e) {
throw new RuntimeException("Zstd compression failed", e);
}
}
// Keep the existing signature for backwards compatibility
public static byte[] compress(byte[] data, int compressionLevel) {
return compress(data, 0, data == null ? 0 : data.length,
compressionLevel);
}
#### 2. In KryoTupleSerializer.java
Update the serialization call site to pass the raw buffer along with its
offset and length directly:
- if (this.isCompressionEnabled && dataLength >
this.compressionThreshold) {
- byte[] bytesToCompress = Arrays.copyOf(rawBytes,
dataLength);
- return Utils.ZstdUtils.compress(bytesToCompress,
this.zstdCompressionLevel);
- } else {
- return Arrays.copyOf(rawBytes, dataLength);
- }
+ if (this.isCompressionEnabled && dataLength >
this.compressionThreshold) {
+ return Utils.ZstdUtils.compress(rawBytes, 0,
dataLength, this.zstdCompressionLevel);
+ } else {
+ return Arrays.copyOf(rawBytes, dataLength);
+ }
This simple optimization reduces GC allocation and copy overhead by 50%
for every compressed tuple on the serialization sid
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]