wgtmac commented on code in PR #1280:
URL: https://github.com/apache/parquet-mr/pull/1280#discussion_r1501968943
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java:
##########
@@ -250,75 +341,86 @@ public BytesInput decompress(BytesInput bytes, int
uncompressedSize) throws IOEx
public void release() {}
}
- public class SnappyDecompressor extends BytesDecompressor {
+ public class SnappyDecompressor extends BaseDecompressor {
+ @Override
+ int decompress(ByteBuffer input, ByteBuffer output) throws IOException {
+ return Snappy.uncompress(input, output);
+ }
- private HeapBytesDecompressor extraDecompressor;
+ @Override
+ void closeDecompressor() {
+ // no-op
+ }
+ }
- public SnappyDecompressor() {
- this.extraDecompressor = new
HeapBytesDecompressor(CompressionCodecName.SNAPPY);
+ public class SnappyCompressor extends BaseCompressor {
+
+ @Override
+ int compress(ByteBuffer input, ByteBuffer output) throws IOException {
+ return Snappy.compress(input, output);
}
@Override
- public BytesInput decompress(BytesInput bytes, int uncompressedSize)
throws IOException {
- return extraDecompressor.decompress(bytes, uncompressedSize);
+ int maxCompressedSize(int size) {
+ return Snappy.maxCompressedLength(size);
}
@Override
- public void decompress(ByteBuffer src, int compressedSize, ByteBuffer dst,
int uncompressedSize)
- throws IOException {
- dst.clear();
- int size = Snappy.uncompress(src, dst);
- dst.limit(size);
+ public CompressionCodecName getCodecName() {
+ return CompressionCodecName.SNAPPY;
}
@Override
- public void release() {}
+ void closeCompressor() {
+ // no-op
+ }
}
- public class SnappyCompressor extends BytesCompressor {
+ private class ZstdDecompressor extends BaseDecompressor {
+ private final ZstdDecompressCtx context;
- // TODO - this outgoing buffer might be better off not being shared, this
seems to
- // only work because of an extra copy currently happening where this
interface is
- // be consumed
- private ByteBuffer incoming;
- private ByteBuffer outgoing;
+ ZstdDecompressor() {
+ context = new ZstdDecompressCtx();
+ }
- /**
- * Compress a given buffer of bytes
- * @param bytes
- * @return
- * @throws IOException
- */
@Override
- public BytesInput compress(BytesInput bytes) throws IOException {
- int maxOutputSize = Snappy.maxCompressedLength((int) bytes.size());
- ByteBuffer bufferIn = bytes.toByteBuffer();
- outgoing = ensure(outgoing, maxOutputSize);
- final int size;
- if (bufferIn.isDirect()) {
- size = Snappy.compress(bufferIn, outgoing);
- } else {
- // Snappy library requires buffers be direct
- this.incoming = ensure(this.incoming, (int) bytes.size());
- this.incoming.put(bufferIn);
- this.incoming.flip();
- size = Snappy.compress(this.incoming, outgoing);
- }
+ int decompress(ByteBuffer input, ByteBuffer output) {
+ return context.decompress(output, input);
+ }
- outgoing.limit(size);
+ @Override
+ void closeDecompressor() {
+ context.close();
+ }
+ }
+
+ private class ZstdCompressor extends BaseCompressor {
+ private final ZstdCompressCtx context;
Review Comment:
I'm not familiar with `ZstdCompressCtx`. Does it provide the same
performance as before? IIUC, original impl leverages zstd workers but I didn't
see it from the new class.
https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectZstd.java#L91
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java:
##########
@@ -186,6 +174,100 @@ public void release() {
}
}
+ private abstract class BaseDecompressor extends BytesDecompressor {
Review Comment:
BytesDecompressor is deprecated. Should we switch to
CompressionCodecFactory.BytesInputDecompressor directly?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]