This is an automated email from the ASF dual-hosted git repository.

etudenhoefner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 3bfb2b2ce3 Parquet: Cache codecs by name and level (#8182)
3bfb2b2ce3 is described below

commit 3bfb2b2ce328983d99719de1e6e3f4f6e0209398
Author: Bryan Keller <[email protected]>
AuthorDate: Tue Aug 1 00:22:33 2023 -0700

    Parquet: Cache codecs by name and level (#8182)
---
 .../iceberg/parquet/ParquetCodecFactory.java       | 111 +++++++++------------
 .../org/apache/iceberg/parquet/ParquetWriter.java  |   3 +-
 2 files changed, 51 insertions(+), 63 deletions(-)

diff --git 
a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetCodecFactory.java 
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetCodecFactory.java
index 47b9d158c5..bfcece6259 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetCodecFactory.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetCodecFactory.java
@@ -18,88 +18,75 @@
  */
 package org.apache.iceberg.parquet;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.compress.CodecPool;
 import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.Decompressor;
-import org.apache.parquet.bytes.BytesInput;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.parquet.hadoop.BadConfigurationException;
 import org.apache.parquet.hadoop.CodecFactory;
-import org.apache.parquet.hadoop.codec.ZstandardCodec;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 
 /**
  * This class implements a codec factory that is used when reading from 
Parquet. It adds a
- * workaround for memory issues encountered when reading from zstd-compressed 
files. This is no
- * longer used, as Parquet 1.13 includes this fix.
- *
- * @deprecated will be removed in 1.5.0
+ * workaround to cache codecs by name and level, not just by name. This can be 
removed when this
+ * change is made to Parquet.
  */
-@Deprecated
 public class ParquetCodecFactory extends CodecFactory {
 
   public ParquetCodecFactory(Configuration configuration, int pageSize) {
     super(configuration, pageSize);
   }
 
-  /** Copied and modified from CodecFactory.HeapBytesDecompressor */
-  class HeapBytesDecompressor extends BytesDecompressor {
-
-    private final CompressionCodec codec;
-    private final Decompressor decompressor;
-
-    HeapBytesDecompressor(CompressionCodecName codecName) {
-      this.codec = getCodec(codecName);
-      if (codec != null) {
-        decompressor = CodecPool.getDecompressor(codec);
-      } else {
-        decompressor = null;
-      }
-    }
-
-    @Override
-    public BytesInput decompress(BytesInput bytes, int uncompressedSize) 
throws IOException {
-      final BytesInput decompressed;
-      if (codec != null) {
-        if (decompressor != null) {
-          decompressor.reset();
-        }
-        if (codec instanceof ZstandardCodec) {
-          // we need to close the zstd input stream ASAP to free up native 
resources, so
-          // read everything into a buffer and then close it
-          try (InputStream is = codec.createInputStream(bytes.toInputStream(), 
decompressor)) {
-            decompressed = BytesInput.copy(BytesInput.from(is, 
uncompressedSize));
-          }
-        } else {
-          InputStream is = codec.createInputStream(bytes.toInputStream(), 
decompressor);
-          decompressed = BytesInput.from(is, uncompressedSize);
-        }
-      } else {
-        decompressed = bytes;
-      }
-      return decompressed;
+  /**
+   * This is copied from {@link CodecFactory} and modified to include the 
level in the cache key.
+   */
+  @Override
+  protected CompressionCodec getCodec(CompressionCodecName codecName) {
+    String codecClassName = codecName.getHadoopCompressionCodecClassName();
+    if (codecClassName == null) {
+      return null;
     }
-
-    @Override
-    public void decompress(
-        ByteBuffer input, int compressedSize, ByteBuffer output, int 
uncompressedSize)
-        throws IOException {
-      ByteBuffer decompressed = decompress(BytesInput.from(input), 
uncompressedSize).toByteBuffer();
-      output.put(decompressed);
+    String cacheKey = cacheKey(codecName);
+    CompressionCodec codec = CODEC_BY_NAME.get(cacheKey);
+    if (codec != null) {
+      return codec;
     }
 
-    @Override
-    public void release() {
-      if (decompressor != null) {
-        CodecPool.returnDecompressor(decompressor);
+    try {
+      Class<?> codecClass;
+      try {
+        codecClass = Class.forName(codecClassName);
+      } catch (ClassNotFoundException e) {
+        // Try to load the class using the job classloader
+        codecClass = configuration.getClassLoader().loadClass(codecClassName);
       }
+      codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, 
configuration);
+      CODEC_BY_NAME.put(cacheKey, codec);
+      return codec;
+    } catch (ClassNotFoundException e) {
+      throw new BadConfigurationException("Class " + codecClassName + " was 
not found", e);
     }
   }
 
-  @Override
-  protected BytesDecompressor createDecompressor(CompressionCodecName 
codecName) {
-    return new HeapBytesDecompressor(codecName);
+  private String cacheKey(CompressionCodecName codecName) {
+    String level = null;
+    switch (codecName) {
+      case GZIP:
+        level = configuration.get("zlib.compress.level");
+        break;
+      case BROTLI:
+        level = configuration.get("compression.brotli.quality");
+        break;
+      case ZSTD:
+        level = configuration.get("parquet.compression.codec.zstd.level");
+        if (level == null) {
+          // keep "io.compression.codec.zstd.level" for backwards compatibility
+          level = configuration.get("io.compression.codec.zstd.level");
+        }
+        break;
+      default:
+        // compression level is not supported; ignore it
+    }
+    String codecClass = codecName.getHadoopCompressionCodecClassName();
+    return level == null ? codecClass : codecClass + ":" + level;
   }
 }
diff --git 
a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java 
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java
index 5770049937..099cffc33b 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java
@@ -86,7 +86,8 @@ class ParquetWriter<T> implements FileAppender<T>, Closeable {
     this.targetRowGroupSize = rowGroupSize;
     this.props = properties;
     this.metadata = ImmutableMap.copyOf(metadata);
-    this.compressor = new CodecFactory(conf, 
props.getPageSizeThreshold()).getCompressor(codec);
+    this.compressor =
+        new ParquetCodecFactory(conf, 
props.getPageSizeThreshold()).getCompressor(codec);
     this.parquetSchema = ParquetSchemaUtil.convert(schema, "table");
     this.model = (ParquetValueWriter<T>) createWriterFunc.apply(parquetSchema);
     this.metricsConfig = metricsConfig;

Reply via email to