This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/master by this push:
new 4de3d9389 PARQUET-2336: Add caching key to CodecFactory (#1134)
4de3d9389 is described below
commit 4de3d938966da2b5d6ec043b77bbbec56123277c
Author: Fokko Driesprong <[email protected]>
AuthorDate: Mon Sep 18 21:45:30 2023 +0200
PARQUET-2336: Add caching key to CodecFactory (#1134)
* PARQUET-2336: Add caching key to CodecFactory
* Remove old config
* Fix the key
---
.../org/apache/parquet/hadoop/CodecFactory.java | 32 +++++++++++---
.../parquet/hadoop/TestDirectCodecFactory.java | 51 ++++++++++++++++++++++
2 files changed, 77 insertions(+), 6 deletions(-)
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java
index 1998ea09d..beb1c75ad 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java
@@ -173,10 +173,10 @@ public class CodecFactory implements
CompressionCodecFactory {
// null compressor for non-native gzip
compressor.reset();
}
- CompressionOutputStream cos =
codec.createOutputStream(compressedOutBuffer, compressor);
- bytes.writeAllTo(cos);
- cos.finish();
- cos.close();
+ try (CompressionOutputStream cos =
codec.createOutputStream(compressedOutBuffer, compressor)) {
+ bytes.writeAllTo(cos);
+ cos.finish();
+ }
compressedBytes = BytesInput.from(compressedOutBuffer);
}
return compressedBytes;
@@ -234,7 +234,8 @@ public class CodecFactory implements
CompressionCodecFactory {
if (codecClassName == null) {
return null;
}
- CompressionCodec codec = CODEC_BY_NAME.get(codecClassName);
+ String codecCacheKey = this.cacheKey(codecName);
+ CompressionCodec codec = CODEC_BY_NAME.get(codecCacheKey);
if (codec != null) {
return codec;
}
@@ -248,13 +249,32 @@ public class CodecFactory implements
CompressionCodecFactory {
codecClass = configuration.getClassLoader().loadClass(codecClassName);
}
codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass,
configuration);
- CODEC_BY_NAME.put(codecClassName, codec);
+ CODEC_BY_NAME.put(codecCacheKey, codec);
return codec;
} catch (ClassNotFoundException e) {
throw new BadConfigurationException("Class " + codecClassName + " was
not found", e);
}
}
+ private String cacheKey(CompressionCodecName codecName) {
+ String level = null;
+ switch (codecName) {
+ case GZIP:
+ level = configuration.get("zlib.compress.level");
+ break;
+ case BROTLI:
+ level = configuration.get("compression.brotli.quality");
+ break;
+ case ZSTD:
+ level = configuration.get("parquet.compression.codec.zstd.level");
+ break;
+ default:
+ // compression level is not supported; ignore it
+ }
+ String codecClass = codecName.getHadoopCompressionCodecClassName();
+ return level == null ? codecClass : codecClass + ":" + level;
+ }
+
@Override
public void release() {
for (BytesCompressor compressor : compressors.values()) {
diff --git
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java
index 8fec515a4..8ebf63325 100644
---
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java
+++
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java
@@ -23,6 +23,7 @@ import java.util.Random;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.parquet.bytes.ByteBufferAllocator;
import org.apache.parquet.bytes.DirectByteBufferAllocator;
import org.apache.parquet.bytes.HeapByteBufferAllocator;
@@ -174,5 +175,55 @@ public class TestDirectCodecFactory {
}
}
}
+
+ static class PublicCodecFactory extends CodecFactory {
+ // To make getCodec public
+
+ public PublicCodecFactory(Configuration configuration, int pageSize) {
+ super(configuration, pageSize);
+ }
+
+ public org.apache.hadoop.io.compress.CompressionCodec
getCodec(CompressionCodecName name) {
+ return super.getCodec(name);
+ }
+ }
+
+ @Test
+ public void cachingKeysGzip() {
+ Configuration config_zlib_2 = new Configuration();
+ config_zlib_2.set("zlib.compress.level", "2");
+
+ Configuration config_zlib_5 = new Configuration();
+ config_zlib_5.set("zlib.compress.level", "5");
+
+ final CodecFactory codecFactory_2 = new PublicCodecFactory(config_zlib_2,
pageSize);
+ final CodecFactory codecFactory_5 = new PublicCodecFactory(config_zlib_5,
pageSize);
+
+ CompressionCodec codec_2_1 =
codecFactory_2.getCodec(CompressionCodecName.GZIP);
+ CompressionCodec codec_2_2 =
codecFactory_2.getCodec(CompressionCodecName.GZIP);
+ CompressionCodec codec_5_1 =
codecFactory_5.getCodec(CompressionCodecName.GZIP);
+
+ Assert.assertEquals(codec_2_1, codec_2_2);
+ Assert.assertNotEquals(codec_2_1, codec_5_1);
+ }
+
+ @Test
+ public void cachingKeysZstd() {
+ Configuration config_zstd_2 = new Configuration();
+ config_zstd_2.set("parquet.compression.codec.zstd.level", "2");
+
+ Configuration config_zstd_5 = new Configuration();
+ config_zstd_5.set("parquet.compression.codec.zstd.level", "5");
+
+ final CodecFactory codecFactory_2 = new PublicCodecFactory(config_zstd_2,
pageSize);
+ final CodecFactory codecFactory_5 = new PublicCodecFactory(config_zstd_5,
pageSize);
+
+ CompressionCodec codec_2_1 =
codecFactory_2.getCodec(CompressionCodecName.ZSTD);
+ CompressionCodec codec_2_2 =
codecFactory_2.getCodec(CompressionCodecName.ZSTD);
+ CompressionCodec codec_5_1 =
codecFactory_5.getCodec(CompressionCodecName.ZSTD);
+
+ Assert.assertEquals(codec_2_1, codec_2_2);
+ Assert.assertNotEquals(codec_2_1, codec_5_1);
+ }
}