This is an automated email from the ASF dual-hosted git repository.

ndimiduk pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new 9f60f1821f8 HBASE-28485 Re-use ZstdDecompressCtx/ZstdCompressCtx for 
performance (#5797)
9f60f1821f8 is described below

commit 9f60f1821f88ac26b29c111d9d0ec1e0aad2c52e
Author: Charles Connell <char...@connells.org>
AuthorDate: Tue Apr 9 04:20:47 2024 -0400

    HBASE-28485 Re-use ZstdDecompressCtx/ZstdCompressCtx for performance (#5797)
    
    Co-authored-by: Charles Connell <cconn...@hubspot.com>
    Signed-off-by: Andrew Purtell <apurt...@apache.org>
    Signed-off-by: Nick Dimiduk <ndimi...@apache.org>
---
 .../hbase/io/compress/zstd/ZstdCompressor.java       | 20 ++++++++++++++------
 .../hbase/io/compress/zstd/ZstdDecompressor.java     | 19 ++++++++++++-------
 2 files changed, 26 insertions(+), 13 deletions(-)

diff --git 
a/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdCompressor.java
 
b/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdCompressor.java
index 4d34d4825d3..b48db9106fb 100644
--- 
a/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdCompressor.java
+++ 
b/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdCompressor.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hbase.io.compress.zstd;
 
 import com.github.luben.zstd.Zstd;
+import com.github.luben.zstd.ZstdCompressCtx;
 import com.github.luben.zstd.ZstdDictCompress;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -39,6 +40,7 @@ public class ZstdCompressor implements CanReinit, Compressor {
   protected long bytesRead, bytesWritten;
   protected int dictId;
   protected ZstdDictCompress dict;
+  protected ZstdCompressCtx ctx;
 
   ZstdCompressor(final int level, final int bufferSize, final byte[] 
dictionary) {
     this.level = level;
@@ -46,9 +48,12 @@ public class ZstdCompressor implements CanReinit, Compressor 
{
     this.inBuf = ByteBuffer.allocateDirect(bufferSize);
     this.outBuf = ByteBuffer.allocateDirect(bufferSize);
     this.outBuf.position(bufferSize);
+    this.ctx = new ZstdCompressCtx();
+    this.ctx.setLevel(level);
     if (dictionary != null) {
       this.dictId = ZstdCodec.getDictionaryId(dictionary);
       this.dict = new ZstdDictCompress(dictionary, level);
+      this.ctx.loadDict(this.dict);
     }
   }
 
@@ -79,12 +84,7 @@ public class ZstdCompressor implements CanReinit, Compressor 
{
         } else {
           outBuf.clear();
         }
-        int written;
-        if (dict != null) {
-          written = Zstd.compress(outBuf, inBuf, dict);
-        } else {
-          written = Zstd.compress(outBuf, inBuf, level);
-        }
+        int written = ctx.compress(outBuf, inBuf);
         bytesWritten += written;
         inBuf.clear();
         finished = true;
@@ -170,6 +170,14 @@ public class ZstdCompressor implements CanReinit, 
Compressor {
     bytesWritten = 0;
     finish = false;
     finished = false;
+    ctx.reset();
+    ctx.setLevel(level);
+    if (dict != null) {
+      ctx.loadDict(dict);
+    } else {
+      // loadDict((byte[]) accepts null to clear the dictionary
+      ctx.loadDict((byte[]) null);
+    }
   }
 
   @Override
diff --git 
a/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdDecompressor.java
 
b/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdDecompressor.java
index ef0a0f87651..79826c96d5e 100644
--- 
a/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdDecompressor.java
+++ 
b/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdDecompressor.java
@@ -17,7 +17,7 @@
  */
 package org.apache.hadoop.hbase.io.compress.zstd;
 
-import com.github.luben.zstd.Zstd;
+import com.github.luben.zstd.ZstdDecompressCtx;
 import com.github.luben.zstd.ZstdDictDecompress;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -39,15 +39,18 @@ public class ZstdDecompressor implements CanReinit, 
Decompressor {
   protected boolean finished;
   protected int dictId;
   protected ZstdDictDecompress dict;
+  protected ZstdDecompressCtx ctx;
 
   ZstdDecompressor(final int bufferSize, final byte[] dictionary) {
     this.bufferSize = bufferSize;
     this.inBuf = ByteBuffer.allocateDirect(bufferSize);
     this.outBuf = ByteBuffer.allocateDirect(bufferSize);
     this.outBuf.position(bufferSize);
+    this.ctx = new ZstdDecompressCtx();
     if (dictionary != null) {
       this.dictId = ZstdCodec.getDictionaryId(dictionary);
       this.dict = new ZstdDictDecompress(dictionary);
+      this.ctx.loadDict(this.dict);
     }
   }
 
@@ -67,12 +70,7 @@ public class ZstdDecompressor implements CanReinit, 
Decompressor {
       int remaining = inBuf.remaining();
       inLen -= remaining;
       outBuf.clear();
-      int written;
-      if (dict != null) {
-        written = Zstd.decompress(outBuf, inBuf, dict);
-      } else {
-        written = Zstd.decompress(outBuf, inBuf);
-      }
+      int written = ctx.decompress(outBuf, inBuf);
       inBuf.clear();
       outBuf.flip();
       int n = Math.min(written, len);
@@ -109,6 +107,13 @@ public class ZstdDecompressor implements CanReinit, 
Decompressor {
     outBuf.clear();
     outBuf.position(outBuf.capacity());
     finished = false;
+    ctx.reset();
+    if (dict != null) {
+      ctx.loadDict(dict);
+    } else {
+      // loadDict((byte[]) accepts null to clear the dictionary
+      ctx.loadDict((byte[]) null);
+    }
   }
 
   @Override

Reply via email to