wgtmac commented on code in PR #1280:
URL: https://github.com/apache/parquet-mr/pull/1280#discussion_r1501968943


##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java:
##########
@@ -250,75 +341,86 @@ public BytesInput decompress(BytesInput bytes, int 
uncompressedSize) throws IOEx
     public void release() {}
   }
 
-  public class SnappyDecompressor extends BytesDecompressor {
+  public class SnappyDecompressor extends BaseDecompressor {
+    @Override
+    int decompress(ByteBuffer input, ByteBuffer output) throws IOException {
+      return Snappy.uncompress(input, output);
+    }
 
-    private HeapBytesDecompressor extraDecompressor;
+    @Override
+    void closeDecompressor() {
+      // no-op
+    }
+  }
 
-    public SnappyDecompressor() {
-      this.extraDecompressor = new 
HeapBytesDecompressor(CompressionCodecName.SNAPPY);
+  public class SnappyCompressor extends BaseCompressor {
+
+    @Override
+    int compress(ByteBuffer input, ByteBuffer output) throws IOException {
+      return Snappy.compress(input, output);
     }
 
     @Override
-    public BytesInput decompress(BytesInput bytes, int uncompressedSize) 
throws IOException {
-      return extraDecompressor.decompress(bytes, uncompressedSize);
+    int maxCompressedSize(int size) {
+      return Snappy.maxCompressedLength(size);
     }
 
     @Override
-    public void decompress(ByteBuffer src, int compressedSize, ByteBuffer dst, 
int uncompressedSize)
-        throws IOException {
-      dst.clear();
-      int size = Snappy.uncompress(src, dst);
-      dst.limit(size);
+    public CompressionCodecName getCodecName() {
+      return CompressionCodecName.SNAPPY;
     }
 
     @Override
-    public void release() {}
+    void closeCompressor() {
+      // no-op
+    }
   }
 
-  public class SnappyCompressor extends BytesCompressor {
+  private class ZstdDecompressor extends BaseDecompressor {
+    private final ZstdDecompressCtx context;
 
-    // TODO - this outgoing buffer might be better off not being shared, this 
seems to
-    // only work because of an extra copy currently happening where this 
interface is
-    // be consumed
-    private ByteBuffer incoming;
-    private ByteBuffer outgoing;
+    ZstdDecompressor() {
+      context = new ZstdDecompressCtx();
+    }
 
-    /**
-     * Compress a given buffer of bytes
-     * @param bytes
-     * @return
-     * @throws IOException
-     */
     @Override
-    public BytesInput compress(BytesInput bytes) throws IOException {
-      int maxOutputSize = Snappy.maxCompressedLength((int) bytes.size());
-      ByteBuffer bufferIn = bytes.toByteBuffer();
-      outgoing = ensure(outgoing, maxOutputSize);
-      final int size;
-      if (bufferIn.isDirect()) {
-        size = Snappy.compress(bufferIn, outgoing);
-      } else {
-        // Snappy library requires buffers be direct
-        this.incoming = ensure(this.incoming, (int) bytes.size());
-        this.incoming.put(bufferIn);
-        this.incoming.flip();
-        size = Snappy.compress(this.incoming, outgoing);
-      }
+    int decompress(ByteBuffer input, ByteBuffer output) {
+      return context.decompress(output, input);
+    }
 
-      outgoing.limit(size);
+    @Override
+    void closeDecompressor() {
+      context.close();
+    }
+  }
+
+  private class ZstdCompressor extends BaseCompressor {
+    private final ZstdCompressCtx context;

Review Comment:
   I'm not familiar with `ZstdCompressCtx`. Does it provide the same 
performance as before? IIUC, original impl leverages zstd workers but I didn't 
see it from the new class.
   
   
https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectZstd.java#L91



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java:
##########
@@ -186,6 +174,100 @@ public void release() {
     }
   }
 
+  private abstract class BaseDecompressor extends BytesDecompressor {

Review Comment:
   BytesDecompressor is deprecated. Should we switch to 
CompressionCodecFactory.BytesInputDecompressor directly?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to