gszadovszky commented on code in PR #1280:
URL: https://github.com/apache/parquet-mr/pull/1280#discussion_r1502301174
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java:
##########
@@ -250,75 +341,86 @@ public BytesInput decompress(BytesInput bytes, int
uncompressedSize) throws IOEx
public void release() {}
}
- public class SnappyDecompressor extends BytesDecompressor {
+ public class SnappyDecompressor extends BaseDecompressor {
+ @Override
+ int decompress(ByteBuffer input, ByteBuffer output) throws IOException {
+ return Snappy.uncompress(input, output);
+ }
- private HeapBytesDecompressor extraDecompressor;
+ @Override
+ void closeDecompressor() {
+ // no-op
+ }
+ }
- public SnappyDecompressor() {
- this.extraDecompressor = new
HeapBytesDecompressor(CompressionCodecName.SNAPPY);
+ public class SnappyCompressor extends BaseCompressor {
+
+ @Override
+ int compress(ByteBuffer input, ByteBuffer output) throws IOException {
+ return Snappy.compress(input, output);
}
@Override
- public BytesInput decompress(BytesInput bytes, int uncompressedSize)
throws IOException {
- return extraDecompressor.decompress(bytes, uncompressedSize);
+ int maxCompressedSize(int size) {
+ return Snappy.maxCompressedLength(size);
}
@Override
- public void decompress(ByteBuffer src, int compressedSize, ByteBuffer dst,
int uncompressedSize)
- throws IOException {
- dst.clear();
- int size = Snappy.uncompress(src, dst);
- dst.limit(size);
+ public CompressionCodecName getCodecName() {
+ return CompressionCodecName.SNAPPY;
}
@Override
- public void release() {}
+ void closeCompressor() {
+ // no-op
+ }
}
- public class SnappyCompressor extends BytesCompressor {
+ private class ZstdDecompressor extends BaseDecompressor {
+ private final ZstdDecompressCtx context;
- // TODO - this outgoing buffer might be better off not being shared, this
seems to
- // only work because of an extra copy currently happening where this
interface is
- // be consumed
- private ByteBuffer incoming;
- private ByteBuffer outgoing;
+ ZstdDecompressor() {
+ context = new ZstdDecompressCtx();
+ }
- /**
- * Compress a given buffer of bytes
- * @param bytes
- * @return
- * @throws IOException
- */
@Override
- public BytesInput compress(BytesInput bytes) throws IOException {
- int maxOutputSize = Snappy.maxCompressedLength((int) bytes.size());
- ByteBuffer bufferIn = bytes.toByteBuffer();
- outgoing = ensure(outgoing, maxOutputSize);
- final int size;
- if (bufferIn.isDirect()) {
- size = Snappy.compress(bufferIn, outgoing);
- } else {
- // Snappy library requires buffers be direct
- this.incoming = ensure(this.incoming, (int) bytes.size());
- this.incoming.put(bufferIn);
- this.incoming.flip();
- size = Snappy.compress(this.incoming, outgoing);
- }
+ int decompress(ByteBuffer input, ByteBuffer output) {
+ return context.decompress(output, input);
+ }
- outgoing.limit(size);
+ @Override
+ void closeDecompressor() {
+ context.close();
+ }
+ }
+
+ private class ZstdCompressor extends BaseCompressor {
+ private final ZstdCompressCtx context;
Review Comment:
There is a big difference between our "usual" codecs that are using the
Hadoop codecs and the ones created by the DirectCodecFactory. The first one has
a stream interface behind our interfaces.
At the first attempt of implementing the direct Zstd codec, I was using the
stream implementation for the BytesInput and a static method of the Zstd API
for the ByteBuffer interface.
Now, I realized that when we use a direct ByteBufferAllocator the BytesInput
objects are also mainly represented by direct ByteBuffers. So we can use them
without a stream.
The context is used by the static API of the Zstd codec anyway. Based on the
class comments it is suggested to use the context instead.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]