This is an automated email from the ASF dual-hosted git repository.
ndimiduk pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.5 by this push:
new 18dac457333 HBASE-29218: Reduce calls to Configuration#get() in
decompression path (#6857)
18dac457333 is described below
commit 18dac457333712950ebcb1bf693fcbadbe074528
Author: Nick Dimiduk <[email protected]>
AuthorDate: Fri Apr 25 10:32:49 2025 +0200
HBASE-29218: Reduce calls to Configuration#get() in decompression path
(#6857)
Signed-off-by: Nick Dimiduk <[email protected]>
Co-authored-by: Charles Connell <[email protected]>
---
hbase-common/pom.xml | 4 -
.../io/compress/ByteBuffDecompressionCodec.java | 4 +
.../hbase/io/compress/ByteBuffDecompressor.java | 8 ++
.../apache/hadoop/hbase/io/compress/CodecPool.java | 24 +++---
.../hadoop/hbase/io/compress/Compression.java | 28 +++++++
.../hadoop/hbase/io/compress/DictionaryCache.java | 6 +-
.../encoding/HFileBlockDefaultDecodingContext.java | 8 +-
.../apache/hadoop/hbase/io/hfile/HFileContext.java | 37 ++++++++-
.../hadoop/hbase/io/hfile/HFileContextBuilder.java | 16 +++-
.../io/compress/zstd/ZstdByteBuffDecompressor.java | 72 ++++++----------
.../hadoop/hbase/io/compress/zstd/ZstdCodec.java | 42 +++++++++-
.../zstd/ZstdHFileDecompressionContext.java | 97 ++++++++++++++++++++++
.../apache/hadoop/hbase/io/hfile/HFileInfo.java | 2 +
13 files changed, 269 insertions(+), 79 deletions(-)
diff --git a/hbase-common/pom.xml b/hbase-common/pom.xml
index 899e1522957..b8124a67c9a 100644
--- a/hbase-common/pom.xml
+++ b/hbase-common/pom.xml
@@ -109,10 +109,6 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-crypto</artifactId>
</dependency>
- <dependency>
- <groupId>com.github.ben-manes.caffeine</groupId>
- <artifactId>caffeine</artifactId>
- </dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
diff --git
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/ByteBuffDecompressionCodec.java
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/ByteBuffDecompressionCodec.java
index 821f0d82544..233fc0160bd 100644
---
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/ByteBuffDecompressionCodec.java
+++
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/ByteBuffDecompressionCodec.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.io.compress;
+import org.apache.hadoop.conf.Configuration;
import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
@@ -26,4 +27,7 @@ public interface ByteBuffDecompressionCodec {
ByteBuffDecompressor createByteBuffDecompressor();
+ Compression.HFileDecompressionContext
+ getDecompressionContextFromConfiguration(Configuration conf);
+
}
diff --git
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/ByteBuffDecompressor.java
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/ByteBuffDecompressor.java
index 8a0ff71919a..432b903fe4d 100644
---
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/ByteBuffDecompressor.java
+++
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/ByteBuffDecompressor.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.io.compress;
+import edu.umd.cs.findbugs.annotations.Nullable;
import java.io.Closeable;
import java.io.IOException;
import org.apache.hadoop.hbase.nio.ByteBuff;
@@ -45,4 +46,11 @@ public interface ByteBuffDecompressor extends Closeable {
*/
boolean canDecompress(ByteBuff output, ByteBuff input);
+ /**
+ * Call before every use of {@link #canDecompress(ByteBuff, ByteBuff)} and
+ * {@link #decompress(ByteBuff, ByteBuff, int)} to reinitialize the
decompressor with settings
+ * from the HFileInfo. This can matter because ByteBuffDecompressors are
reused many times.
+ */
+ void reinit(@Nullable Compression.HFileDecompressionContext
newHFileDecompressionContext);
+
}
diff --git
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/CodecPool.java
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/CodecPool.java
index 6c376fa5c3f..1d8aeca8412 100644
---
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/CodecPool.java
+++
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/CodecPool.java
@@ -17,8 +17,6 @@
*/
package org.apache.hadoop.hbase.io.compress;
-import com.github.benmanes.caffeine.cache.Caffeine;
-import com.github.benmanes.caffeine.cache.LoadingCache;
import edu.umd.cs.findbugs.annotations.Nullable;
import java.util.Comparator;
import java.util.NavigableSet;
@@ -37,6 +35,10 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
+import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader;
+import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache;
+
/**
* A global compressor/decompressor pool used to save and reuse (possibly
native)
* compression/decompression codecs. Copied from the class of the same name in
hadoop-common and
@@ -56,7 +58,12 @@ public class CodecPool {
NavigableSet<ByteBuffDecompressor>> BYTE_BUFF_DECOMPRESSOR_POOL = new
ConcurrentHashMap<>();
private static <T> LoadingCache<Class<T>, AtomicInteger> createCache() {
- return Caffeine.newBuilder().build(key -> new AtomicInteger());
+ return CacheBuilder.newBuilder().build(new CacheLoader<Class<T>,
AtomicInteger>() {
+ @Override
+ public AtomicInteger load(Class<T> key) throws Exception {
+ return new AtomicInteger();
+ }
+ });
}
/**
@@ -108,26 +115,19 @@ public class CodecPool {
/**
* Copied from hadoop-common without significant modification.
*/
- @SuppressWarnings("unchecked")
- @edu.umd.cs.findbugs.annotations.SuppressWarnings(
- value = "NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE",
- justification = "LoadingCache will compute value if absent")
private static <T> int getLeaseCount(LoadingCache<Class<T>, AtomicInteger>
usageCounts,
Class<? extends T> codecClass) {
- return usageCounts.get((Class<T>) codecClass).get();
+ return usageCounts.getUnchecked((Class<T>) codecClass).get();
}
/**
* Copied from hadoop-common without significant modification.
*/
- @edu.umd.cs.findbugs.annotations.SuppressWarnings(
- value = "NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE",
- justification = "LoadingCache will compute value if absent")
private static <T> void updateLeaseCount(LoadingCache<Class<T>,
AtomicInteger> usageCounts,
T codec, int delta) {
if (codec != null && usageCounts != null) {
Class<T> codecClass = ReflectionUtils.getClass(codec);
- usageCounts.get(codecClass).addAndGet(delta);
+ usageCounts.getUnchecked(codecClass).addAndGet(delta);
}
}
diff --git
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/Compression.java
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/Compression.java
index 7886be5b1fc..63d317ee2f1 100644
---
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/Compression.java
+++
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/Compression.java
@@ -17,8 +17,10 @@
*/
package org.apache.hadoop.hbase.io.compress;
+import edu.umd.cs.findbugs.annotations.Nullable;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
+import java.io.Closeable;
import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -26,6 +28,7 @@ import java.io.OutputStream;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionInputStream;
@@ -552,11 +555,36 @@ public final class Compression {
}
}
+ /**
+ * Get an object that holds settings used by ByteBuffDecompressor. It's
expensive to pull these
+ * from a Configuration object every time we decompress a block, so pull
them here when, for
+ * example, opening an HFile, and reuse the returned
HFileDecompressionContext as much as
+ * possible. The concrete class of this object will be one that is
specific to the codec
+ * implementation in use. You don't need to inspect it yourself, just pass
it along to
+ * {@link ByteBuffDecompressor#reinit(HFileDecompressionContext)}.
+ */
+ @Nullable
+ public HFileDecompressionContext
+ getHFileDecompressionContextForConfiguration(Configuration conf) {
+ if (supportsByteBuffDecompression()) {
+ return ((ByteBuffDecompressionCodec) getCodec(conf))
+ .getDecompressionContextFromConfiguration(conf);
+ } else {
+ return null;
+ }
+ }
+
public String getName() {
return compressName;
}
}
+ /**
+ * See {@link
Algorithm#getHFileDecompressionContextForConfiguration(Configuration)}.
+ */
+ public static abstract class HFileDecompressionContext implements Closeable,
HeapSize {
+ }
+
public static Algorithm getCompressionAlgorithmByName(String compressName) {
Algorithm[] algos = Algorithm.class.getEnumConstants();
diff --git
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/DictionaryCache.java
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/DictionaryCache.java
index 1d6e25675f2..78fa448b63d 100644
---
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/DictionaryCache.java
+++
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/DictionaryCache.java
@@ -59,11 +59,11 @@ public final class DictionaryCache {
* @param path the hadoop Path where the dictionary is located, as a String
* @return the dictionary bytes if successful, null otherwise
*/
- public static byte[] getDictionary(final Configuration conf, final String
path)
- throws IOException {
+ public static byte[] getDictionary(final Configuration conf, final String
path) {
if (path == null || path.isEmpty()) {
return null;
}
+
// Create the dictionary loading cache if we haven't already
if (CACHE == null) {
synchronized (DictionaryCache.class) {
@@ -91,7 +91,7 @@ public final class DictionaryCache {
try {
return CACHE.get(path);
} catch (ExecutionException e) {
- throw new IOException(e);
+ throw new RuntimeException("Unable to load dictionary at " + path, e);
}
}
diff --git
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultDecodingContext.java
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultDecodingContext.java
index 2cdbdc620e0..81f8e5fa6a2 100644
---
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultDecodingContext.java
+++
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultDecodingContext.java
@@ -139,9 +139,7 @@ public class HFileBlockDefaultDecodingContext implements
HFileBlockDecodingConte
Compression.Algorithm compression = fileContext.getCompression();
ByteBuffDecompressor decompressor = compression.getByteBuffDecompressor();
try {
- if (decompressor instanceof CanReinit) {
- ((CanReinit) decompressor).reinit(conf);
- }
+ decompressor.reinit(fileContext.getDecompressionContext());
decompressor.decompress(blockBufferWithoutHeader, onDiskBlock,
onDiskSizeWithoutHeader);
} finally {
compression.returnByteBuffDecompressor(decompressor);
@@ -160,9 +158,7 @@ public class HFileBlockDefaultDecodingContext implements
HFileBlockDecodingConte
} else {
ByteBuffDecompressor decompressor =
fileContext.getCompression().getByteBuffDecompressor();
try {
- if (decompressor instanceof CanReinit) {
- ((CanReinit) decompressor).reinit(conf);
- }
+ decompressor.reinit(fileContext.getDecompressionContext());
// Even if we have a ByteBuffDecompressor, we still need to check if
it can decompress
// our particular ByteBuffs
return decompressor.canDecompress(blockBufferWithoutHeader,
onDiskBlock);
diff --git
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java
index b371cf83867..af56f401be6 100644
---
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java
+++
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.io.hfile;
+import edu.umd.cs.findbugs.annotations.Nullable;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellComparatorImpl;
import org.apache.hadoop.hbase.HConstants;
@@ -50,6 +52,11 @@ public class HFileContext implements HeapSize, Cloneable {
private boolean includesTags;
/** Compression algorithm used **/
private Compression.Algorithm compressAlgo = Compression.Algorithm.NONE;
+ /**
+ * Details used by compression algorithm that are more efficiently loaded
once and then reused
+ **/
+ @Nullable
+ private Compression.HFileDecompressionContext decompressionContext = null;
/** Whether tags to be compressed or not **/
private boolean compressTags;
/** the checksum type **/
@@ -80,6 +87,7 @@ public class HFileContext implements HeapSize, Cloneable {
this.includesMvcc = context.includesMvcc;
this.includesTags = context.includesTags;
this.compressAlgo = context.compressAlgo;
+ this.decompressionContext = context.decompressionContext;
this.compressTags = context.compressTags;
this.checksumType = context.checksumType;
this.bytesPerChecksum = context.bytesPerChecksum;
@@ -95,14 +103,16 @@ public class HFileContext implements HeapSize, Cloneable {
}
HFileContext(boolean useHBaseChecksum, boolean includesMvcc, boolean
includesTags,
- Compression.Algorithm compressAlgo, boolean compressTags, ChecksumType
checksumType,
- int bytesPerChecksum, int blockSize, DataBlockEncoding encoding,
- Encryption.Context cryptoContext, long fileCreateTime, String hfileName,
byte[] columnFamily,
- byte[] tableName, CellComparator cellComparator, IndexBlockEncoding
indexBlockEncoding) {
+ Compression.Algorithm compressAlgo, Compression.HFileDecompressionContext
decompressionContext,
+ boolean compressTags, ChecksumType checksumType, int bytesPerChecksum, int
blockSize,
+ DataBlockEncoding encoding, Encryption.Context cryptoContext, long
fileCreateTime,
+ String hfileName, byte[] columnFamily, byte[] tableName, CellComparator
cellComparator,
+ IndexBlockEncoding indexBlockEncoding) {
this.usesHBaseChecksum = useHBaseChecksum;
this.includesMvcc = includesMvcc;
this.includesTags = includesTags;
this.compressAlgo = compressAlgo;
+ this.decompressionContext = decompressionContext;
this.compressTags = compressTags;
this.checksumType = checksumType;
this.bytesPerChecksum = bytesPerChecksum;
@@ -140,6 +150,20 @@ public class HFileContext implements HeapSize, Cloneable {
return compressAlgo;
}
+ /**
+ * Get an object that, if non-null, may be cast into a codec-specific type
that exposes some
+ * information from the store-file-specific Configuration that is relevant
to decompression. For
+ * example, ZSTD tables can have "hbase.io.compress.zstd.dictionary" on
their table descriptor,
+ * and decompressions of blocks in that table must use that dictionary. It's
cheaper for HBase to
+ * load these settings into an object of their own once and check this upon
each block
+ * decompression, than it is to call into {@link Configuration#get(String)}
on each block
+ * decompression.
+ */
+ @Nullable
+ public Compression.HFileDecompressionContext getDecompressionContext() {
+ return decompressionContext;
+ }
+
public boolean isUseHBaseChecksum() {
return usesHBaseChecksum;
}
@@ -237,6 +261,9 @@ public class HFileContext implements HeapSize, Cloneable {
if (this.tableName != null) {
size += ClassSize.sizeOfByteArray(this.tableName.length);
}
+ if (this.decompressionContext != null) {
+ size += this.decompressionContext.heapSize();
+ }
return size;
}
@@ -273,6 +300,8 @@ public class HFileContext implements HeapSize, Cloneable {
sb.append(compressAlgo);
sb.append(", compressTags=");
sb.append(compressTags);
+ sb.append(", decompressionContext=");
+ sb.append(decompressionContext);
sb.append(", cryptoContext=[");
sb.append(cryptoContext);
sb.append("]");
diff --git
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContextBuilder.java
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContextBuilder.java
index 0394f12144e..341461b26b1 100644
---
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContextBuilder.java
+++
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContextBuilder.java
@@ -17,8 +17,10 @@
*/
package org.apache.hadoop.hbase.io.hfile;
+import edu.umd.cs.findbugs.annotations.Nullable;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
import org.apache.hadoop.hbase.io.crypto.Encryption;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
@@ -42,6 +44,8 @@ public class HFileContextBuilder {
private boolean includesTags = false;
/** Compression algorithm used **/
private Algorithm compression = Algorithm.NONE;
+ @Nullable
+ private Compression.HFileDecompressionContext decompressionContext = null;
/** Whether tags to be compressed or not **/
private boolean compressTags = false;
/** the checksum type **/
@@ -73,6 +77,7 @@ public class HFileContextBuilder {
this.includesMvcc = hfc.isIncludesMvcc();
this.includesTags = hfc.isIncludesTags();
this.compression = hfc.getCompression();
+ this.decompressionContext = hfc.getDecompressionContext();
this.compressTags = hfc.isCompressTags();
this.checkSumType = hfc.getChecksumType();
this.bytesPerChecksum = hfc.getBytesPerChecksum();
@@ -107,6 +112,12 @@ public class HFileContextBuilder {
return this;
}
+ public HFileContextBuilder
+ withDecompressionContext(@Nullable Compression.HFileDecompressionContext
decompressionContext) {
+ this.decompressionContext = decompressionContext;
+ return this;
+ }
+
public HFileContextBuilder withCompressTags(boolean compressTags) {
this.compressTags = compressTags;
return this;
@@ -169,7 +180,8 @@ public class HFileContextBuilder {
public HFileContext build() {
return new HFileContext(usesHBaseChecksum, includesMvcc, includesTags,
compression,
- compressTags, checkSumType, bytesPerChecksum, blockSize, encoding,
cryptoContext,
- fileCreateTime, hfileName, columnFamily, tableName, cellComparator,
indexBlockEncoding);
+ decompressionContext, compressTags, checkSumType, bytesPerChecksum,
blockSize, encoding,
+ cryptoContext, fileCreateTime, hfileName, columnFamily, tableName,
cellComparator,
+ indexBlockEncoding);
}
}
diff --git
a/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdByteBuffDecompressor.java
b/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdByteBuffDecompressor.java
index d71d46e2946..9a466b9f9ee 100644
---
a/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdByteBuffDecompressor.java
+++
b/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdByteBuffDecompressor.java
@@ -22,10 +22,9 @@ import com.github.luben.zstd.ZstdDictDecompress;
import edu.umd.cs.findbugs.annotations.Nullable;
import java.io.IOException;
import java.nio.ByteBuffer;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.io.compress.BlockDecompressorHelper;
import org.apache.hadoop.hbase.io.compress.ByteBuffDecompressor;
-import org.apache.hadoop.hbase.io.compress.CanReinit;
+import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.yetus.audience.InterfaceAudience;
@@ -34,21 +33,18 @@ import org.apache.yetus.audience.InterfaceAudience;
* Glue for ByteBuffDecompressor on top of zstd-jni
*/
@InterfaceAudience.Private
-public class ZstdByteBuffDecompressor implements ByteBuffDecompressor,
CanReinit {
+public class ZstdByteBuffDecompressor implements ByteBuffDecompressor {
protected int dictId;
- @Nullable
- protected ZstdDictDecompress dict;
protected ZstdDecompressCtx ctx;
// Intended to be set to false by some unit tests
private boolean allowByteBuffDecompression;
- ZstdByteBuffDecompressor(@Nullable byte[] dictionary) {
+ ZstdByteBuffDecompressor(@Nullable byte[] dictionaryBytes) {
ctx = new ZstdDecompressCtx();
- if (dictionary != null) {
- this.dictId = ZstdCodec.getDictionaryId(dictionary);
- this.dict = new ZstdDictDecompress(dictionary);
- this.ctx.loadDict(this.dict);
+ if (dictionaryBytes != null) {
+ this.ctx.loadDict(new ZstdDictDecompress(dictionaryBytes));
+ dictId = ZstdCodec.getDictionaryId(dictionaryBytes);
}
allowByteBuffDecompression = true;
}
@@ -100,44 +96,30 @@ public class ZstdByteBuffDecompressor implements
ByteBuffDecompressor, CanReinit
}
@Override
- public void close() {
- ctx.close();
- if (dict != null) {
- dict.close();
- }
- }
-
- @Override
- public void reinit(Configuration conf) {
- if (conf != null) {
- // Dictionary may have changed
- byte[] b = ZstdCodec.getDictionary(conf);
- if (b != null) {
- // Don't casually create dictionary objects; they consume native memory
- int thisDictId = ZstdCodec.getDictionaryId(b);
- if (dict == null || dictId != thisDictId) {
- dictId = thisDictId;
- ZstdDictDecompress oldDict = dict;
- dict = new ZstdDictDecompress(b);
- ctx.loadDict(dict);
- if (oldDict != null) {
- oldDict.close();
- }
+ public void reinit(@Nullable Compression.HFileDecompressionContext
newHFileDecompressionContext) {
+ if (newHFileDecompressionContext != null) {
+ if (newHFileDecompressionContext instanceof
ZstdHFileDecompressionContext) {
+ ZstdHFileDecompressionContext zstdContext =
+ (ZstdHFileDecompressionContext) newHFileDecompressionContext;
+ allowByteBuffDecompression =
zstdContext.isAllowByteBuffDecompression();
+ if (zstdContext.getDict() == null && dictId != 0) {
+ ctx.loadDict((byte[]) null);
+ dictId = 0;
+ } else if (zstdContext.getDictId() != dictId) {
+ this.ctx.loadDict(zstdContext.getDict());
+ this.dictId = zstdContext.getDictId();
}
} else {
- ZstdDictDecompress oldDict = dict;
- dict = null;
- dictId = 0;
- // loadDict((byte[]) accepts null to clear the dictionary
- ctx.loadDict((byte[]) null);
- if (oldDict != null) {
- oldDict.close();
- }
+ throw new IllegalArgumentException(
+ "ZstdByteBuffDecompression#reinit() was given an
HFileDecompressionContext that was not "
+ + "a ZstdHFileDecompressionContext, this should never happen");
}
-
- // unit test helper
- this.allowByteBuffDecompression =
- conf.getBoolean("hbase.io.compress.zstd.allowByteBuffDecompression",
true);
}
}
+
+ @Override
+ public void close() {
+ ctx.close();
+ }
+
}
diff --git
a/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdCodec.java
b/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdCodec.java
index b06b93e3167..e934aa12c6c 100644
---
a/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdCodec.java
+++
b/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdCodec.java
@@ -18,17 +18,23 @@
package org.apache.hadoop.hbase.io.compress.zstd;
import com.github.luben.zstd.Zstd;
+import com.github.luben.zstd.ZstdDictDecompress;
+import edu.umd.cs.findbugs.annotations.Nullable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.hbase.io.compress.ByteBuffDecompressionCodec;
import org.apache.hadoop.hbase.io.compress.ByteBuffDecompressor;
+import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.compress.DictionaryCache;
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.io.compress.BlockCompressorStream;
import org.apache.hadoop.io.compress.BlockDecompressorStream;
import org.apache.hadoop.io.compress.CompressionCodec;
@@ -38,6 +44,9 @@ import org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
+import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
+
/**
* Hadoop ZStandard codec implemented with zstd-jni.
* <p>
@@ -51,6 +60,9 @@ public class ZstdCodec implements Configurable,
CompressionCodec, ByteBuffDecomp
public static final int ZSTD_BUFFER_SIZE_DEFAULT = 256 * 1024;
public static final String ZSTD_DICTIONARY_KEY =
"hbase.io.compress.zstd.dictionary";
+ private static final Cache<String, Pair<ZstdDictDecompress, Integer>>
DECOMPRESS_DICT_CACHE =
+ CacheBuilder.newBuilder().maximumSize(100).expireAfterAccess(10,
TimeUnit.MINUTES).build();
+
private Configuration conf;
private int bufferSize;
private int level;
@@ -125,6 +137,12 @@ public class ZstdCodec implements Configurable,
CompressionCodec, ByteBuffDecomp
return ZstdByteBuffDecompressor.class;
}
+ @Override
+ public Compression.HFileDecompressionContext
+ getDecompressionContextFromConfiguration(Configuration conf) {
+ return ZstdHFileDecompressionContext.fromConfiguration(conf);
+ }
+
@Override
public String getDefaultExtension() {
return ".zst";
@@ -145,12 +163,30 @@ public class ZstdCodec implements Configurable,
CompressionCodec, ByteBuffDecomp
return size > 0 ? size : ZSTD_BUFFER_SIZE_DEFAULT;
}
+ @Nullable
static byte[] getDictionary(final Configuration conf) {
String path = conf.get(ZSTD_DICTIONARY_KEY);
+ return DictionaryCache.getDictionary(conf, path);
+ }
+
+ /**
+ * Returns dictionary and its ID number, useful for comparing to other
dictionaries for equality
+ */
+ @Nullable
+ static Pair<ZstdDictDecompress, Integer> getDecompressDictionary(final
Configuration conf) {
+ String path = conf.get(ZSTD_DICTIONARY_KEY);
+ if (path == null) {
+ return null;
+ }
+
try {
- return DictionaryCache.getDictionary(conf, path);
- } catch (IOException e) {
- throw new RuntimeException("Unable to load dictionary at " + path, e);
+ return DECOMPRESS_DICT_CACHE.get(path, () -> {
+ byte[] dictBytes = DictionaryCache.getDictionary(conf, path);
+ int dictId = getDictionaryId(dictBytes);
+ return new Pair<>(new ZstdDictDecompress(dictBytes), dictId);
+ });
+ } catch (ExecutionException e) {
+ throw new RuntimeException("Unable to load ZSTD dictionary", e);
}
}
diff --git
a/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdHFileDecompressionContext.java
b/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdHFileDecompressionContext.java
new file mode 100644
index 00000000000..ccca038ac19
--- /dev/null
+++
b/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdHFileDecompressionContext.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.compress.zstd;
+
+import com.github.luben.zstd.ZstdDictDecompress;
+import edu.umd.cs.findbugs.annotations.Nullable;
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.util.ClassSize;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Holds HFile-level settings used by ZstdByteBuffDecompressor. It's expensive
to pull these from a
+ * Configuration object every time we decompress a block, so pull them upon
opening an HFile, and
+ * reuse them in every block that gets decompressed.
+ */
[email protected]
+public final class ZstdHFileDecompressionContext extends
Compression.HFileDecompressionContext {
+
+ public static final long FIXED_OVERHEAD =
+ ClassSize.estimateBase(ZstdHFileDecompressionContext.class, false);
+
+ @Nullable
+ private final ZstdDictDecompress dict;
+ private final int dictId;
+ // Intended to be set to false by some unit tests
+ private final boolean allowByteBuffDecompression;
+
+ private ZstdHFileDecompressionContext(@Nullable ZstdDictDecompress dict, int
dictId,
+ boolean allowByteBuffDecompression) {
+ this.dict = dict;
+ this.dictId = dictId;
+ this.allowByteBuffDecompression = allowByteBuffDecompression;
+ }
+
+ @Nullable
+ public ZstdDictDecompress getDict() {
+ return dict;
+ }
+
+ public int getDictId() {
+ return dictId;
+ }
+
+ public boolean isAllowByteBuffDecompression() {
+ return allowByteBuffDecompression;
+ }
+
+ public static ZstdHFileDecompressionContext fromConfiguration(Configuration
conf) {
+ boolean allowByteBuffDecompression =
+ conf.getBoolean("hbase.io.compress.zstd.allowByteBuffDecompression",
true);
+ Pair<ZstdDictDecompress, Integer> dictAndId =
ZstdCodec.getDecompressDictionary(conf);
+ if (dictAndId != null) {
+ return new ZstdHFileDecompressionContext(dictAndId.getFirst(),
dictAndId.getSecond(),
+ allowByteBuffDecompression);
+ } else {
+ return new ZstdHFileDecompressionContext(null, 0,
allowByteBuffDecompression);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (dict != null) {
+ dict.close();
+ }
+ }
+
+ @Override
+ public long heapSize() {
+ // ZstdDictDecompress objects are cached and shared between
ZstdHFileDecompressionContexts, so
+ // don't include ours in our heap size.
+ return FIXED_OVERHEAD;
+ }
+
+ @Override
+ public String toString() {
+ return "ZstdHFileDecompressionContext{dictId=" + dictId + ",
allowByteBuffDecompression="
+ + allowByteBuffDecompression + '}';
+ }
+}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileInfo.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileInfo.java
index 17bdadc461d..4c1192aa015 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileInfo.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileInfo.java
@@ -397,6 +397,8 @@ public class HFileInfo implements SortedMap<byte[], byte[]>
{
throws IOException {
HFileContextBuilder builder = new
HFileContextBuilder().withHBaseCheckSum(true)
.withHFileName(path.getName()).withCompression(trailer.getCompressionCodec())
+ .withDecompressionContext(
+
trailer.getCompressionCodec().getHFileDecompressionContextForConfiguration(conf))
.withCellComparator(FixedFileTrailer.createComparator(trailer.getComparatorClassName()));
// Check for any key material available
byte[] keyBytes = trailer.getEncryptionKey();