churromorales commented on code in PR #12408:
URL: https://github.com/apache/druid/pull/12408#discussion_r863244305
##########
processing/src/main/java/org/apache/druid/segment/data/CompressionStrategy.java:
##########
@@ -344,6 +360,81 @@ public ByteBuffer compress(ByteBuffer in, ByteBuffer out)
}
}
+ public static class ZstdCompressor extends Compressor
+ {
+ private static final ZstdCompressor DEFAULT_COMPRESSOR = new
ZstdCompressor();
+
+ @Override
+ ByteBuffer allocateInBuffer(int inputSize, Closer closer)
+ {
+ ByteBuffer inBuffer = ByteBuffer.allocateDirect(inputSize);
+ closer.register(() -> ByteBufferUtils.free(inBuffer));
+ return inBuffer;
+ }
+
+ @Override
+ ByteBuffer allocateOutBuffer(int inputSize, Closer closer)
+ {
+ ByteBuffer outBuffer = ByteBuffer.allocateDirect((int)
Zstd.compressBound(inputSize));
+ closer.register(() -> ByteBufferUtils.free(outBuffer));
+ return outBuffer;
+ }
+
+ @Override
+ public ByteBuffer compress(ByteBuffer in, ByteBuffer out)
+ {
+ int position = in.position();
+ out.clear();
+ long sizeNeeded = Zstd.compressBound(in.remaining());
+ if (out.remaining() < sizeNeeded) {
+ throw new RuntimeException("Output buffer too small, please allocate
more space. " + sizeNeeded + " required.");
+ }
+ Zstd.compress(out, in, Zstd.maxCompressionLevel());
+ in.position(position);
+ out.flip();
+ return out;
+ }
+ }
+
+ public static class ZstdDecompressor implements Decompressor
+ {
+ private static final ZstdDecompressor DEFAULT_COMPRESSOR = new
ZstdDecompressor();
+
+ @Override
+ public void decompress(ByteBuffer in, int numBytes, ByteBuffer out)
+ {
+ out.clear();
+ // some tests don't use dbb's and zstd jni doesn't allow for non-dbb
byte buffers.
Review Comment:
yeah, everyone should use DBB, but I believe when running the following
test: It failed with:
```[ERROR]
testSanityWithSerde[2](org.apache.druid.segment.data.CompressedColumnarIntsSupplierTest)
Time elapsed: 0 s <<< ERROR!
java.lang.IllegalArgumentException: srcBuff must be a direct buffer
at
com.github.luben.zstd.ZstdDecompressCtx.decompressDirectByteBuffer(ZstdDecompressCtx.java:110)
at
com.github.luben.zstd.Zstd.decompressDirectByteBuffer(Zstd.java:434)
at
org.apache.druid.segment.data.CompressionStrategy$ZstdDecompressor.decompress(CompressionStrategy.java:407)
at
org.apache.druid.segment.data.DecompressingByteBufferObjectStrategy.fromByteBuffer(DecompressingByteBufferObjectStrategy.java:53)
at
org.apache.druid.segment.data.DecompressingByteBufferObjectStrategy.fromByteBuffer(DecompressingByteBufferObjectStrategy.java:28)
at
org.apache.druid.segment.data.GenericIndexed$BufferIndexed.bufferedIndexedGet(GenericIndexed.java:490)
at
org.apache.druid.segment.data.GenericIndexed$3.get(GenericIndexed.java:645)
at
org.apache.druid.segment.data.CompressedColumnarIntsSupplier$CompressedColumnarInts.loadBuffer(CompressedColumnarIntsSupplier.java:321)
at
org.apache.druid.segment.data.CompressedColumnarIntsSupplier$CompressedColumnarInts.get(CompressedColumnarIntsSupplier.java:312)
at
org.apache.druid.segment.data.CompressedColumnarIntsSupplierTest.assertIndexMatchesVals(CompressedColumnarIntsSupplierTest.java:289)
at
org.apache.druid.segment.data.CompressedColumnarIntsSupplierTest.testSanityWithSerde(CompressedColumnarIntsSupplierTest.java:175)
```
I don't know what is the right solution here, I agree what I wrote feels a
bit dirty. I'll defer to you here on how you think we should sort this out..
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]