This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new 4de3d9389 PARQUET-2336: Add caching key to CodecFactory (#1134)
4de3d9389 is described below

commit 4de3d938966da2b5d6ec043b77bbbec56123277c
Author: Fokko Driesprong <fo...@apache.org>
AuthorDate: Mon Sep 18 21:45:30 2023 +0200

    PARQUET-2336: Add caching key to CodecFactory (#1134)
    
    * PARQUET-2336: Add caching key to CodecFactory
    
    * Remove old config
    
    * Fix the key
---
 .../org/apache/parquet/hadoop/CodecFactory.java    | 32 +++++++++++---
 .../parquet/hadoop/TestDirectCodecFactory.java     | 51 ++++++++++++++++++++++
 2 files changed, 77 insertions(+), 6 deletions(-)

diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java
index 1998ea09d..beb1c75ad 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java
@@ -173,10 +173,10 @@ public class CodecFactory implements 
CompressionCodecFactory {
           // null compressor for non-native gzip
           compressor.reset();
         }
-        CompressionOutputStream cos = 
codec.createOutputStream(compressedOutBuffer, compressor);
-        bytes.writeAllTo(cos);
-        cos.finish();
-        cos.close();
+        try (CompressionOutputStream cos = 
codec.createOutputStream(compressedOutBuffer, compressor)) {
+          bytes.writeAllTo(cos);
+          cos.finish();
+        }
         compressedBytes = BytesInput.from(compressedOutBuffer);
       }
       return compressedBytes;
@@ -234,7 +234,8 @@ public class CodecFactory implements 
CompressionCodecFactory {
     if (codecClassName == null) {
       return null;
     }
-    CompressionCodec codec = CODEC_BY_NAME.get(codecClassName);
+    String codecCacheKey = this.cacheKey(codecName);
+    CompressionCodec codec = CODEC_BY_NAME.get(codecCacheKey);
     if (codec != null) {
       return codec;
     }
@@ -248,13 +249,32 @@ public class CodecFactory implements 
CompressionCodecFactory {
         codecClass = configuration.getClassLoader().loadClass(codecClassName);
       }
       codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, 
configuration);
-      CODEC_BY_NAME.put(codecClassName, codec);
+      CODEC_BY_NAME.put(codecCacheKey, codec);
       return codec;
     } catch (ClassNotFoundException e) {
       throw new BadConfigurationException("Class " + codecClassName + " was 
not found", e);
     }
   }
 
+  private String cacheKey(CompressionCodecName codecName) {
+    String level = null;
+    switch (codecName) {
+      case GZIP:
+        level = configuration.get("zlib.compress.level");
+        break;
+      case BROTLI:
+        level = configuration.get("compression.brotli.quality");
+        break;
+      case ZSTD:
+        level = configuration.get("parquet.compression.codec.zstd.level");
+        break;
+      default:
+        // compression level is not supported; ignore it
+    }
+    String codecClass = codecName.getHadoopCompressionCodecClassName();
+    return level == null ? codecClass : codecClass + ":" + level;
+  }
+
   @Override
   public void release() {
     for (BytesCompressor compressor : compressors.values()) {
diff --git 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java
 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java
index 8fec515a4..8ebf63325 100644
--- 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java
+++ 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java
@@ -23,6 +23,7 @@ import java.util.Random;
 import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.parquet.bytes.ByteBufferAllocator;
 import org.apache.parquet.bytes.DirectByteBufferAllocator;
 import org.apache.parquet.bytes.HeapByteBufferAllocator;
@@ -174,5 +175,55 @@ public class TestDirectCodecFactory {
       }
     }
   }
+
+  static class PublicCodecFactory extends CodecFactory {
+    // To make getCodec public
+
+    public PublicCodecFactory(Configuration configuration, int pageSize) {
+      super(configuration, pageSize);
+    }
+
+    public org.apache.hadoop.io.compress.CompressionCodec 
getCodec(CompressionCodecName name) {
+      return super.getCodec(name);
+    }
+  }
+
+  @Test
+  public void cachingKeysGzip() {
+    Configuration config_zlib_2 = new Configuration();
+    config_zlib_2.set("zlib.compress.level", "2");
+
+    Configuration config_zlib_5 = new Configuration();
+    config_zlib_5.set("zlib.compress.level", "5");
+
+    final CodecFactory codecFactory_2 = new PublicCodecFactory(config_zlib_2, 
pageSize);
+    final CodecFactory codecFactory_5 = new PublicCodecFactory(config_zlib_5, 
pageSize);
+
+    CompressionCodec codec_2_1 = 
codecFactory_2.getCodec(CompressionCodecName.GZIP);
+    CompressionCodec codec_2_2 = 
codecFactory_2.getCodec(CompressionCodecName.GZIP);
+    CompressionCodec codec_5_1 = 
codecFactory_5.getCodec(CompressionCodecName.GZIP);
+
+    Assert.assertEquals(codec_2_1, codec_2_2);
+    Assert.assertNotEquals(codec_2_1, codec_5_1);
+  }
+
+  @Test
+  public void cachingKeysZstd() {
+    Configuration config_zstd_2 = new Configuration();
+    config_zstd_2.set("parquet.compression.codec.zstd.level", "2");
+
+    Configuration config_zstd_5 = new Configuration();
+    config_zstd_5.set("parquet.compression.codec.zstd.level", "5");
+
+    final CodecFactory codecFactory_2 = new PublicCodecFactory(config_zstd_2, 
pageSize);
+    final CodecFactory codecFactory_5 = new PublicCodecFactory(config_zstd_5, 
pageSize);
+
+    CompressionCodec codec_2_1 = 
codecFactory_2.getCodec(CompressionCodecName.ZSTD);
+    CompressionCodec codec_2_2 = 
codecFactory_2.getCodec(CompressionCodecName.ZSTD);
+    CompressionCodec codec_5_1 = 
codecFactory_5.getCodec(CompressionCodecName.ZSTD);
+
+    Assert.assertEquals(codec_2_1, codec_2_2);
+    Assert.assertNotEquals(codec_2_1, codec_5_1);
+  }
 }
 

Reply via email to