This is an automated email from the ASF dual-hosted git repository.
etudenhoefner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 3bfb2b2ce3 Parquet: Cache codecs by name and level (#8182)
3bfb2b2ce3 is described below
commit 3bfb2b2ce328983d99719de1e6e3f4f6e0209398
Author: Bryan Keller <[email protected]>
AuthorDate: Tue Aug 1 00:22:33 2023 -0700
Parquet: Cache codecs by name and level (#8182)
---
.../iceberg/parquet/ParquetCodecFactory.java | 111 +++++++++------------
.../org/apache/iceberg/parquet/ParquetWriter.java | 3 +-
2 files changed, 51 insertions(+), 63 deletions(-)
diff --git
a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetCodecFactory.java
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetCodecFactory.java
index 47b9d158c5..bfcece6259 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetCodecFactory.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetCodecFactory.java
@@ -18,88 +18,75 @@
*/
package org.apache.iceberg.parquet;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.Decompressor;
-import org.apache.parquet.bytes.BytesInput;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.parquet.hadoop.BadConfigurationException;
import org.apache.parquet.hadoop.CodecFactory;
-import org.apache.parquet.hadoop.codec.ZstandardCodec;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
/**
* This class implements a codec factory that is used when reading from
Parquet. It adds a
- * workaround for memory issues encountered when reading from zstd-compressed
files. This is no
- * longer used, as Parquet 1.13 includes this fix.
- *
- * @deprecated will be removed in 1.5.0
+ * workaround to cache codecs by name and level, not just by name. This can be
removed when this
+ * change is made to Parquet.
*/
-@Deprecated
public class ParquetCodecFactory extends CodecFactory {
public ParquetCodecFactory(Configuration configuration, int pageSize) {
super(configuration, pageSize);
}
- /** Copied and modified from CodecFactory.HeapBytesDecompressor */
- class HeapBytesDecompressor extends BytesDecompressor {
-
- private final CompressionCodec codec;
- private final Decompressor decompressor;
-
- HeapBytesDecompressor(CompressionCodecName codecName) {
- this.codec = getCodec(codecName);
- if (codec != null) {
- decompressor = CodecPool.getDecompressor(codec);
- } else {
- decompressor = null;
- }
- }
-
- @Override
- public BytesInput decompress(BytesInput bytes, int uncompressedSize)
throws IOException {
- final BytesInput decompressed;
- if (codec != null) {
- if (decompressor != null) {
- decompressor.reset();
- }
- if (codec instanceof ZstandardCodec) {
- // we need to close the zstd input stream ASAP to free up native
resources, so
- // read everything into a buffer and then close it
- try (InputStream is = codec.createInputStream(bytes.toInputStream(),
decompressor)) {
- decompressed = BytesInput.copy(BytesInput.from(is,
uncompressedSize));
- }
- } else {
- InputStream is = codec.createInputStream(bytes.toInputStream(),
decompressor);
- decompressed = BytesInput.from(is, uncompressedSize);
- }
- } else {
- decompressed = bytes;
- }
- return decompressed;
+ /**
+ * This is copied from {@link CodecFactory} and modified to include the
level in the cache key.
+ */
+ @Override
+ protected CompressionCodec getCodec(CompressionCodecName codecName) {
+ String codecClassName = codecName.getHadoopCompressionCodecClassName();
+ if (codecClassName == null) {
+ return null;
}
-
- @Override
- public void decompress(
- ByteBuffer input, int compressedSize, ByteBuffer output, int
uncompressedSize)
- throws IOException {
- ByteBuffer decompressed = decompress(BytesInput.from(input),
uncompressedSize).toByteBuffer();
- output.put(decompressed);
+ String cacheKey = cacheKey(codecName);
+ CompressionCodec codec = CODEC_BY_NAME.get(cacheKey);
+ if (codec != null) {
+ return codec;
}
- @Override
- public void release() {
- if (decompressor != null) {
- CodecPool.returnDecompressor(decompressor);
+ try {
+ Class<?> codecClass;
+ try {
+ codecClass = Class.forName(codecClassName);
+ } catch (ClassNotFoundException e) {
+ // Try to load the class using the job classloader
+ codecClass = configuration.getClassLoader().loadClass(codecClassName);
}
+ codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass,
configuration);
+ CODEC_BY_NAME.put(cacheKey, codec);
+ return codec;
+ } catch (ClassNotFoundException e) {
+ throw new BadConfigurationException("Class " + codecClassName + " was
not found", e);
}
}
- @Override
- protected BytesDecompressor createDecompressor(CompressionCodecName
codecName) {
- return new HeapBytesDecompressor(codecName);
+ private String cacheKey(CompressionCodecName codecName) {
+ String level = null;
+ switch (codecName) {
+ case GZIP:
+ level = configuration.get("zlib.compress.level");
+ break;
+ case BROTLI:
+ level = configuration.get("compression.brotli.quality");
+ break;
+ case ZSTD:
+ level = configuration.get("parquet.compression.codec.zstd.level");
+ if (level == null) {
+ // keep "io.compression.codec.zstd.level" for backwards compatibility
+ level = configuration.get("io.compression.codec.zstd.level");
+ }
+ break;
+ default:
+ // compression level is not supported; ignore it
+ }
+ String codecClass = codecName.getHadoopCompressionCodecClassName();
+ return level == null ? codecClass : codecClass + ":" + level;
}
}
diff --git
a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java
index 5770049937..099cffc33b 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java
@@ -86,7 +86,8 @@ class ParquetWriter<T> implements FileAppender<T>, Closeable {
this.targetRowGroupSize = rowGroupSize;
this.props = properties;
this.metadata = ImmutableMap.copyOf(metadata);
- this.compressor = new CodecFactory(conf,
props.getPageSizeThreshold()).getCompressor(codec);
+ this.compressor =
+ new ParquetCodecFactory(conf,
props.getPageSizeThreshold()).getCompressor(codec);
this.parquetSchema = ParquetSchemaUtil.convert(schema, "table");
this.model = (ParquetValueWriter<T>) createWriterFunc.apply(parquetSchema);
this.metricsConfig = metricsConfig;