This is an automated email from the ASF dual-hosted git repository. fokko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/master by this push: new 4de3d9389 PARQUET-2336: Add caching key to CodecFactory (#1134) 4de3d9389 is described below commit 4de3d938966da2b5d6ec043b77bbbec56123277c Author: Fokko Driesprong <fo...@apache.org> AuthorDate: Mon Sep 18 21:45:30 2023 +0200 PARQUET-2336: Add caching key to CodecFactory (#1134) * PARQUET-2336: Add caching key to CodecFactory * Remove old config * Fix the key --- .../org/apache/parquet/hadoop/CodecFactory.java | 32 +++++++++++--- .../parquet/hadoop/TestDirectCodecFactory.java | 51 ++++++++++++++++++++++ 2 files changed, 77 insertions(+), 6 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java index 1998ea09d..beb1c75ad 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java @@ -173,10 +173,10 @@ public class CodecFactory implements CompressionCodecFactory { // null compressor for non-native gzip compressor.reset(); } - CompressionOutputStream cos = codec.createOutputStream(compressedOutBuffer, compressor); - bytes.writeAllTo(cos); - cos.finish(); - cos.close(); + try (CompressionOutputStream cos = codec.createOutputStream(compressedOutBuffer, compressor)) { + bytes.writeAllTo(cos); + cos.finish(); + } compressedBytes = BytesInput.from(compressedOutBuffer); } return compressedBytes; @@ -234,7 +234,8 @@ public class CodecFactory implements CompressionCodecFactory { if (codecClassName == null) { return null; } - CompressionCodec codec = CODEC_BY_NAME.get(codecClassName); + String codecCacheKey = this.cacheKey(codecName); + CompressionCodec codec = CODEC_BY_NAME.get(codecCacheKey); if (codec != null) { return codec; } @@ -248,13 +249,32 @@ public class CodecFactory implements CompressionCodecFactory { codecClass = configuration.getClassLoader().loadClass(codecClassName); } codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, configuration); - CODEC_BY_NAME.put(codecClassName, codec); + CODEC_BY_NAME.put(codecCacheKey, codec); return codec; } catch (ClassNotFoundException e) { throw new BadConfigurationException("Class " + codecClassName + " was not found", e); } } + private String cacheKey(CompressionCodecName codecName) { + String level = null; + switch (codecName) { + case GZIP: + level = configuration.get("zlib.compress.level"); + break; + case BROTLI: + level = configuration.get("compression.brotli.quality"); + break; + case ZSTD: + level = configuration.get("parquet.compression.codec.zstd.level"); + break; + default: + // compression level is not supported; ignore it + } + String codecClass = codecName.getHadoopCompressionCodecClassName(); + return level == null ? codecClass : codecClass + ":" + level; + } + @Override public void release() { for (BytesCompressor compressor : compressors.values()) { diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java index 8fec515a4..8ebf63325 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java @@ -23,6 +23,7 @@ import java.util.Random; import java.util.Set; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.parquet.bytes.ByteBufferAllocator; import org.apache.parquet.bytes.DirectByteBufferAllocator; import org.apache.parquet.bytes.HeapByteBufferAllocator; @@ -174,5 +175,55 @@ public class TestDirectCodecFactory { } } } + + static class PublicCodecFactory extends CodecFactory { + // To make getCodec public + + public PublicCodecFactory(Configuration configuration, int pageSize) { + super(configuration, pageSize); + } + + public org.apache.hadoop.io.compress.CompressionCodec getCodec(CompressionCodecName name) { + return super.getCodec(name); + } + } + + @Test + public void cachingKeysGzip() { + Configuration config_zlib_2 = new Configuration(); + config_zlib_2.set("zlib.compress.level", "2"); + + Configuration config_zlib_5 = new Configuration(); + config_zlib_5.set("zlib.compress.level", "5"); + + final CodecFactory codecFactory_2 = new PublicCodecFactory(config_zlib_2, pageSize); + final CodecFactory codecFactory_5 = new PublicCodecFactory(config_zlib_5, pageSize); + + CompressionCodec codec_2_1 = codecFactory_2.getCodec(CompressionCodecName.GZIP); + CompressionCodec codec_2_2 = codecFactory_2.getCodec(CompressionCodecName.GZIP); + CompressionCodec codec_5_1 = codecFactory_5.getCodec(CompressionCodecName.GZIP); + + Assert.assertEquals(codec_2_1, codec_2_2); + Assert.assertNotEquals(codec_2_1, codec_5_1); + } + + @Test + public void cachingKeysZstd() { + Configuration config_zstd_2 = new Configuration(); + config_zstd_2.set("parquet.compression.codec.zstd.level", "2"); + + Configuration config_zstd_5 = new Configuration(); + config_zstd_5.set("parquet.compression.codec.zstd.level", "5"); + + final CodecFactory codecFactory_2 = new PublicCodecFactory(config_zstd_2, pageSize); + final CodecFactory codecFactory_5 = new PublicCodecFactory(config_zstd_5, pageSize); + + CompressionCodec codec_2_1 = codecFactory_2.getCodec(CompressionCodecName.ZSTD); + CompressionCodec codec_2_2 = codecFactory_2.getCodec(CompressionCodecName.ZSTD); + CompressionCodec codec_5_1 = codecFactory_5.getCodec(CompressionCodecName.ZSTD); + + Assert.assertEquals(codec_2_1, codec_2_2); + Assert.assertNotEquals(codec_2_1, codec_5_1); + } }