gszadovszky commented on code in PR #1280:
URL: https://github.com/apache/parquet-mr/pull/1280#discussion_r1502301174


##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java:
##########
@@ -250,75 +341,86 @@ public BytesInput decompress(BytesInput bytes, int 
uncompressedSize) throws IOEx
     public void release() {}
   }
 
-  public class SnappyDecompressor extends BytesDecompressor {
+  public class SnappyDecompressor extends BaseDecompressor {
+    @Override
+    int decompress(ByteBuffer input, ByteBuffer output) throws IOException {
+      return Snappy.uncompress(input, output);
+    }
 
-    private HeapBytesDecompressor extraDecompressor;
+    @Override
+    void closeDecompressor() {
+      // no-op
+    }
+  }
 
-    public SnappyDecompressor() {
-      this.extraDecompressor = new 
HeapBytesDecompressor(CompressionCodecName.SNAPPY);
+  public class SnappyCompressor extends BaseCompressor {
+
+    @Override
+    int compress(ByteBuffer input, ByteBuffer output) throws IOException {
+      return Snappy.compress(input, output);
     }
 
     @Override
-    public BytesInput decompress(BytesInput bytes, int uncompressedSize) 
throws IOException {
-      return extraDecompressor.decompress(bytes, uncompressedSize);
+    int maxCompressedSize(int size) {
+      return Snappy.maxCompressedLength(size);
     }
 
     @Override
-    public void decompress(ByteBuffer src, int compressedSize, ByteBuffer dst, 
int uncompressedSize)
-        throws IOException {
-      dst.clear();
-      int size = Snappy.uncompress(src, dst);
-      dst.limit(size);
+    public CompressionCodecName getCodecName() {
+      return CompressionCodecName.SNAPPY;
     }
 
     @Override
-    public void release() {}
+    void closeCompressor() {
+      // no-op
+    }
   }
 
-  public class SnappyCompressor extends BytesCompressor {
+  private class ZstdDecompressor extends BaseDecompressor {
+    private final ZstdDecompressCtx context;
 
-    // TODO - this outgoing buffer might be better off not being shared, this 
seems to
-    // only work because of an extra copy currently happening where this 
interface is
-    // be consumed
-    private ByteBuffer incoming;
-    private ByteBuffer outgoing;
+    ZstdDecompressor() {
+      context = new ZstdDecompressCtx();
+    }
 
-    /**
-     * Compress a given buffer of bytes
-     * @param bytes
-     * @return
-     * @throws IOException
-     */
     @Override
-    public BytesInput compress(BytesInput bytes) throws IOException {
-      int maxOutputSize = Snappy.maxCompressedLength((int) bytes.size());
-      ByteBuffer bufferIn = bytes.toByteBuffer();
-      outgoing = ensure(outgoing, maxOutputSize);
-      final int size;
-      if (bufferIn.isDirect()) {
-        size = Snappy.compress(bufferIn, outgoing);
-      } else {
-        // Snappy library requires buffers be direct
-        this.incoming = ensure(this.incoming, (int) bytes.size());
-        this.incoming.put(bufferIn);
-        this.incoming.flip();
-        size = Snappy.compress(this.incoming, outgoing);
-      }
+    int decompress(ByteBuffer input, ByteBuffer output) {
+      return context.decompress(output, input);
+    }
 
-      outgoing.limit(size);
+    @Override
+    void closeDecompressor() {
+      context.close();
+    }
+  }
+
+  private class ZstdCompressor extends BaseCompressor {
+    private final ZstdCompressCtx context;

Review Comment:
   There is a big difference between our "usual" codecs that are using the 
Hadoop codecs and the ones created by the DirectCodecFactory. The first one has 
a stream interface behind our interfaces.
   At the first attempt of implementing the direct Zstd codec, I was using the 
stream implementation for the BytesInput and a static method of the Zstd API 
for the ByteBuffer interface.
   Now, I realized that when we use a direct ByteBufferAllocator the BytesInput 
objects are also mainly represented by direct ByteBuffers. So we can use them 
without a stream.
   The context is used by the static API of the Zstd codec anyway. Based on the 
class comments it is suggested to use the context instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to