clintropolis commented on code in PR #12408:
URL: https://github.com/apache/druid/pull/12408#discussion_r877957735


##########
processing/src/main/java/org/apache/druid/segment/data/CompressionStrategy.java:
##########
@@ -344,6 +360,81 @@ public ByteBuffer compress(ByteBuffer in, ByteBuffer out)
     }
   }
 
+  public static class ZstdCompressor extends Compressor
+  {
+    private static final ZstdCompressor DEFAULT_COMPRESSOR = new 
ZstdCompressor();
+
+    @Override
+    ByteBuffer allocateInBuffer(int inputSize, Closer closer)
+    {
+      ByteBuffer inBuffer = ByteBuffer.allocateDirect(inputSize);
+      closer.register(() -> ByteBufferUtils.free(inBuffer));
+      return inBuffer;
+    }
+
+    @Override
+    ByteBuffer allocateOutBuffer(int inputSize, Closer closer)
+    {
+      ByteBuffer outBuffer = ByteBuffer.allocateDirect((int) 
Zstd.compressBound(inputSize));
+      closer.register(() -> ByteBufferUtils.free(outBuffer));
+      return outBuffer;
+    }
+
+    @Override
+    public ByteBuffer compress(ByteBuffer in, ByteBuffer out)
+    {
+      int position = in.position();
+      out.clear();
+      long sizeNeeded = Zstd.compressBound(in.remaining());
+      if (out.remaining() < sizeNeeded) {
+        throw new RuntimeException("Output buffer too small, please allocate 
more space. " + sizeNeeded + " required.");
+      }
+      Zstd.compress(out, in, Zstd.maxCompressionLevel());
+      in.position(position);
+      out.flip();
+      return out;
+    }
+  }
+
+  public static class ZstdDecompressor implements Decompressor
+  {
+    private static final ZstdDecompressor DEFAULT_COMPRESSOR = new 
ZstdDecompressor();
+
+    @Override
+    public void decompress(ByteBuffer in, int numBytes, ByteBuffer out)
+    {
+      out.clear();
+      // some tests don't use dbb's and zstd jni doesn't allow for non-dbb 
byte buffers.

Review Comment:
   Sorry for the delay, but I have a better idea I think on what to do... how 
about instead of cloning into a direct buffer, we just use byte arrays and 
`Zstd.decompressByteArray` and copy the output byte array to the output buffer.
   
   ```java
       @Override
       public void decompress(ByteBuffer in, int numBytes, ByteBuffer out)
       {
         out.clear();
         if (!in.isDirect() || !out.isDirect()) {
           // fall back to heap byte arrays if both buffers are not direct
           final byte[] inputBytes = new byte[numBytes];
           in.get(inputBytes);
           try (final ResourceHolder<byte[]> outputBytesHolder = 
CompressedPools.getOutputBytes()) {
             final byte[] outputBytes = outputBytesHolder.get();
             int decompressedBytes = (int) 
Zstd.decompressByteArray(outputBytes, 0, outputBytes.length, inputBytes, 0, 
numBytes);
             out.put(outputBytes, 0, decompressedBytes);
             out.flip();
           }
         } else {
           int decompressedBytes = (int) Zstd.decompressDirectByteBuffer(
               out,
               out.position(),
               out.remaining(),
               in,
               in.position(),
               numBytes
           );
           out.limit(out.position() + decompressedBytes);
         }
       }
   ```
   I tested it locally and it seems to work.
   
   
   Also make sure any modifications done to tests in this PR that might 
allocate direct buffers are releasing them, see #12521, which also looks like 
has caused a conflict with this PR. I'll have a scan and see if I can spot 
anything too.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to