This is an automated email from the ASF dual-hosted git repository.
gabor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/master by this push:
new 4d53f98eb PARQUET-2440: Avoid getting Hadoop codec for internal
compressor/decompressor (#1286)
4d53f98eb is described below
commit 4d53f98eb479794408ad04856b3424bbacfba45a
Author: Gabor Szadovszky <[email protected]>
AuthorDate: Mon Mar 4 09:27:26 2024 +0100
PARQUET-2440: Avoid getting Hadoop codec for internal
compressor/decompressor (#1286)
---
.../org/apache/parquet/hadoop/CodecFactory.java | 120 ++++++++++++---------
.../apache/parquet/hadoop/DirectCodecFactory.java | 73 ++++++++-----
2 files changed, 115 insertions(+), 78 deletions(-)
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java
index 9df1f7e5e..98c7002fe 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java
@@ -25,6 +25,7 @@ import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.Objects;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodec;
@@ -32,6 +33,7 @@ import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.parquet.Preconditions;
import org.apache.parquet.bytes.ByteBufferAllocator;
import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.compression.CompressionCodecFactory;
@@ -52,6 +54,40 @@ public class CodecFactory implements CompressionCodecFactory
{
protected final ParquetConfiguration configuration;
protected final int pageSize;
+ static final BytesDecompressor NO_OP_DECOMPRESSOR = new BytesDecompressor() {
+ @Override
+ public void decompress(ByteBuffer input, int compressedSize, ByteBuffer
output, int uncompressedSize) {
+ Preconditions.checkArgument(
+ compressedSize == uncompressedSize,
+ "Non-compressed data did not have matching compressed and
uncompressed sizes.");
+ output.clear();
+ output.put((ByteBuffer)
input.duplicate().position(0).limit(compressedSize));
+ }
+
+ @Override
+ public BytesInput decompress(BytesInput bytes, int uncompressedSize) {
+ return bytes;
+ }
+
+ @Override
+ public void release() {}
+ };
+
+ static final BytesCompressor NO_OP_COMPRESSOR = new BytesCompressor() {
+ @Override
+ public BytesInput compress(BytesInput bytes) {
+ return bytes;
+ }
+
+ @Override
+ public CompressionCodecName getCodecName() {
+ return CompressionCodecName.UNCOMPRESSED;
+ }
+
+ @Override
+ public void release() {}
+ };
+
/**
* Create a new codec factory.
*
@@ -108,37 +144,28 @@ public class CodecFactory implements
CompressionCodecFactory {
private final CompressionCodec codec;
private final Decompressor decompressor;
- HeapBytesDecompressor(CompressionCodecName codecName) {
- this.codec = getCodec(codecName);
- if (codec != null) {
- decompressor = CodecPool.getDecompressor(codec);
- } else {
- decompressor = null;
- }
+ HeapBytesDecompressor(CompressionCodec codec) {
+ this.codec = Objects.requireNonNull(codec);
+ decompressor = CodecPool.getDecompressor(codec);
}
@Override
public BytesInput decompress(BytesInput bytes, int uncompressedSize)
throws IOException {
final BytesInput decompressed;
- if (codec != null) {
- if (decompressor != null) {
- decompressor.reset();
- }
- InputStream is = codec.createInputStream(bytes.toInputStream(),
decompressor);
-
- // We need to explicitly close the ZstdDecompressorStream here to
release the resources it holds to
- // avoid
- // off-heap memory fragmentation issue, see
https://issues.apache.org/jira/browse/PARQUET-2160.
- // This change will load the decompressor stream into heap a little
earlier, since the problem it solves
- // only happens in the ZSTD codec, so this modification is only made
for ZSTD streams.
- if (codec instanceof ZstandardCodec) {
- decompressed = BytesInput.copy(BytesInput.from(is,
uncompressedSize));
- is.close();
- } else {
- decompressed = BytesInput.from(is, uncompressedSize);
- }
+ if (decompressor != null) {
+ decompressor.reset();
+ }
+ InputStream is = codec.createInputStream(bytes.toInputStream(),
decompressor);
+
+ // We need to explicitly close the ZstdDecompressorStream here to
release the resources it holds to
+ // avoid off-heap memory fragmentation issue, see
https://issues.apache.org/jira/browse/PARQUET-2160.
+ // This change will load the decompressor stream into heap a little
earlier, since the problem it solves
+ // only happens in the ZSTD codec, so this modification is only made for
ZSTD streams.
+ if (codec instanceof ZstandardCodec) {
+ decompressed = BytesInput.copy(BytesInput.from(is, uncompressedSize));
+ is.close();
} else {
- decompressed = bytes;
+ decompressed = BytesInput.from(is, uncompressedSize);
}
return decompressed;
}
@@ -168,36 +195,25 @@ public class CodecFactory implements
CompressionCodecFactory {
private final ByteArrayOutputStream compressedOutBuffer;
private final CompressionCodecName codecName;
- HeapBytesCompressor(CompressionCodecName codecName) {
+ HeapBytesCompressor(CompressionCodecName codecName, CompressionCodec
codec) {
this.codecName = codecName;
- this.codec = getCodec(codecName);
- if (codec != null) {
- this.compressor = CodecPool.getCompressor(codec);
- this.compressedOutBuffer = new ByteArrayOutputStream(pageSize);
- } else {
- this.compressor = null;
- this.compressedOutBuffer = null;
- }
+ this.codec = Objects.requireNonNull(codec);
+ this.compressor = CodecPool.getCompressor(codec);
+ this.compressedOutBuffer = new ByteArrayOutputStream(pageSize);
}
@Override
public BytesInput compress(BytesInput bytes) throws IOException {
- final BytesInput compressedBytes;
- if (codec == null) {
- compressedBytes = bytes;
- } else {
- compressedOutBuffer.reset();
- if (compressor != null) {
- // null compressor for non-native gzip
- compressor.reset();
- }
- try (CompressionOutputStream cos =
codec.createOutputStream(compressedOutBuffer, compressor)) {
- bytes.writeAllTo(cos);
- cos.finish();
- }
- compressedBytes = BytesInput.from(compressedOutBuffer);
+ compressedOutBuffer.reset();
+ if (compressor != null) {
+ // null compressor for non-native gzip
+ compressor.reset();
+ }
+ try (CompressionOutputStream cos =
codec.createOutputStream(compressedOutBuffer, compressor)) {
+ bytes.writeAllTo(cos);
+ cos.finish();
}
- return compressedBytes;
+ return BytesInput.from(compressedOutBuffer);
}
@Override
@@ -233,11 +249,13 @@ public class CodecFactory implements
CompressionCodecFactory {
}
protected BytesCompressor createCompressor(CompressionCodecName codecName) {
- return new HeapBytesCompressor(codecName);
+ CompressionCodec codec = getCodec(codecName);
+ return codec == null ? NO_OP_COMPRESSOR : new
HeapBytesCompressor(codecName, codec);
}
protected BytesDecompressor createDecompressor(CompressionCodecName
codecName) {
- return new HeapBytesDecompressor(codecName);
+ CompressionCodec codec = getCodec(codecName);
+ return codec == null ? NO_OP_DECOMPRESSOR : new
HeapBytesDecompressor(codec);
}
/**
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java
index 3e2ad10b1..f509dce55 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java
@@ -96,35 +96,37 @@ class DirectCodecFactory extends CodecFactory implements
AutoCloseable {
@Override
protected BytesCompressor createCompressor(final CompressionCodecName
codecName) {
-
- CompressionCodec codec = getCodec(codecName);
- if (codec == null) {
- return new NoopCompressor();
- } else if (codecName == CompressionCodecName.SNAPPY) {
- // avoid using the default Snappy codec since it allocates direct
buffers at awkward spots.
- return new SnappyCompressor();
- } else if (codecName == CompressionCodecName.ZSTD) {
- return new ZstdCompressor();
- } else {
- // todo: create class similar to the SnappyCompressor for zlib and
exclude it as
- // snappy is above since it also generates allocateDirect calls.
- return new HeapBytesCompressor(codecName);
+ switch (codecName) {
+ case SNAPPY:
+ // avoid using the default Snappy codec since it allocates direct
buffers at awkward spots.
+ return new SnappyCompressor();
+ case ZSTD:
+ return new ZstdCompressor();
+ // todo: create class similar to the SnappyCompressor for zlib and
exclude it as
+ // snappy is above since it also generates allocateDirect calls.
+ default:
+ return super.createCompressor(codecName);
}
}
@Override
protected BytesDecompressor createDecompressor(final CompressionCodecName
codecName) {
- CompressionCodec codec = getCodec(codecName);
- if (codec == null) {
- return new NoopDecompressor();
- } else if (codecName == CompressionCodecName.SNAPPY) {
- return new SnappyDecompressor();
- } else if (codecName == CompressionCodecName.ZSTD) {
- return new ZstdDecompressor();
- } else if
(DirectCodecPool.INSTANCE.codec(codec).supportsDirectDecompression()) {
- return new FullDirectDecompressor(codecName);
- } else {
- return new IndirectDecompressor(codec);
+ switch (codecName) {
+ case SNAPPY:
+ return new SnappyDecompressor();
+ case ZSTD:
+ return new ZstdDecompressor();
+ default:
+ CompressionCodec codec = getCodec(codecName);
+ if (codec == null) {
+ return NO_OP_DECOMPRESSOR;
+ }
+ DirectCodecPool.CodecPool pool = DirectCodecPool.INSTANCE.codec(codec);
+ if (pool.supportsDirectDecompression()) {
+ return new FullDirectDecompressor(pool.borrowDirectDecompressor());
+ } else {
+ return new IndirectDecompressor(pool.borrowDecompressor());
+ }
}
}
@@ -140,7 +142,11 @@ class DirectCodecFactory extends CodecFactory implements
AutoCloseable {
private final Decompressor decompressor;
public IndirectDecompressor(CompressionCodec codec) {
- this.decompressor =
DirectCodecPool.INSTANCE.codec(codec).borrowDecompressor();
+ this(DirectCodecPool.INSTANCE.codec(codec).borrowDecompressor());
+ }
+
+ private IndirectDecompressor(Decompressor decompressor) {
+ this.decompressor = decompressor;
}
@Override
@@ -280,8 +286,13 @@ class DirectCodecFactory extends CodecFactory implements
AutoCloseable {
private final Object decompressor;
public FullDirectDecompressor(CompressionCodecName codecName) {
- CompressionCodec codec = getCodec(codecName);
- this.decompressor =
DirectCodecPool.INSTANCE.codec(codec).borrowDirectDecompressor();
+ this(DirectCodecPool.INSTANCE
+ .codec(Objects.requireNonNull(getCodec(codecName)))
+ .borrowDirectDecompressor());
+ }
+
+ private FullDirectDecompressor(Object decompressor) {
+ this.decompressor = decompressor;
}
@Override
@@ -320,6 +331,10 @@ class DirectCodecFactory extends CodecFactory implements
AutoCloseable {
}
}
+ /**
+ * @deprecated Use {@link CodecFactory#NO_OP_DECOMPRESSOR} instead
+ */
+ @Deprecated
public class NoopDecompressor extends BytesDecompressor {
@Override
@@ -424,6 +439,10 @@ class DirectCodecFactory extends CodecFactory implements
AutoCloseable {
}
}
+ /**
+ * @deprecated Use {@link CodecFactory#NO_OP_COMPRESSOR} instead
+ */
+ @Deprecated
public static class NoopCompressor extends BytesCompressor {
public NoopCompressor() {}