This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/orc.git


The following commit(s) were added to refs/heads/main by this push:
     new 25beaa26c ORC-1613: Zstd decompression supports direct buffer
25beaa26c is described below

commit 25beaa26c86e316ab58a0950514b9d99df893fc5
Author: sychen <[email protected]>
AuthorDate: Wed Feb 7 18:52:31 2024 -0800

    ORC-1613: Zstd decompression supports direct buffer
    
    ### What changes were proposed in this pull request?
    `ZstdCodec` implements the `DirectDecompressionCodec` interface.
    
    ### Why are the changes needed?
    `zstd-jni` supports direct buffer decompression, which can reduce Buffer 
copying.
    
    ### How was this patch tested?
    add UT
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #1789 from cxzl25/ORC-1613.
    
    Authored-by: sychen <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../src/java/org/apache/orc/impl/ZstdCodec.java    | 18 ++++++++++-
 .../src/test/org/apache/orc/impl/TestZstd.java     | 35 ++++++++++++++++++++++
 2 files changed, 52 insertions(+), 1 deletion(-)

diff --git a/java/core/src/java/org/apache/orc/impl/ZstdCodec.java 
b/java/core/src/java/org/apache/orc/impl/ZstdCodec.java
index 3818eb3f1..6703a82c1 100644
--- a/java/core/src/java/org/apache/orc/impl/ZstdCodec.java
+++ b/java/core/src/java/org/apache/orc/impl/ZstdCodec.java
@@ -25,7 +25,7 @@ import org.apache.orc.CompressionKind;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
-public class ZstdCodec implements CompressionCodec {
+public class ZstdCodec implements CompressionCodec, DirectDecompressionCodec {
   private ZstdOptions zstdOptions = null;
   private ZstdCompressCtx zstdCompressCtx = null;
 
@@ -214,6 +214,11 @@ public class ZstdCodec implements CompressionCodec {
 
   @Override
   public void decompress(ByteBuffer in, ByteBuffer out) throws IOException {
+    if (in.isDirect() && out.isDirect()) {
+      directDecompress(in, out);
+      return;
+    }
+
     int srcOffset = in.arrayOffset() + in.position();
     int srcSize = in.remaining();
     int dstOffset = out.arrayOffset() + out.position();
@@ -227,6 +232,17 @@ public class ZstdCodec implements CompressionCodec {
     out.flip();
   }
 
+  @Override
+  public boolean isAvailable() {
+    return true;
+  }
+
+  @Override
+  public void directDecompress(ByteBuffer in, ByteBuffer out) throws 
IOException {
+    Zstd.decompress(out, in);
+    out.flip();
+  }
+
   @Override
   public void reset() {
 
diff --git a/java/core/src/test/org/apache/orc/impl/TestZstd.java 
b/java/core/src/test/org/apache/orc/impl/TestZstd.java
index b424feb82..dfa310a40 100644
--- a/java/core/src/test/org/apache/orc/impl/TestZstd.java
+++ b/java/core/src/test/org/apache/orc/impl/TestZstd.java
@@ -28,8 +28,10 @@ import org.apache.orc.CompressionCodec;
 import org.apache.orc.CompressionKind;
 import org.junit.jupiter.api.Test;
 
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 
 public class TestZstd {
@@ -119,4 +121,37 @@ public class TestZstd {
         "zstd-jni compression with aircompressor decompression did not return"
             + " the input!");
   }
+
+  @Test
+  public void testZstdDirectDecompress() {
+    ByteBuffer in = ByteBuffer.allocate(10000);
+    ByteBuffer out = ByteBuffer.allocate(10000);
+    ByteBuffer directOut = ByteBuffer.allocateDirect(10000);
+    ByteBuffer directResult = ByteBuffer.allocateDirect(10000);
+    for (int i = 0; i < 10000; i++) {
+      in.put((byte) i);
+    }
+    in.flip();
+    try (ZstdCodec zstdCodec = new ZstdCodec()) {
+      // write bytes to heap buffer.
+      assertTrue(zstdCodec.compress(in, out, null,
+              zstdCodec.getDefaultOptions()));
+      int position = out.position();
+      out.flip();
+      // copy heap buffer to direct buffer.
+      directOut.put(out.array());
+      directOut.flip();
+      directOut.limit(position);
+
+      zstdCodec.decompress(directOut, directResult);
+
+      // copy result from direct buffer to heap.
+      byte[] heapBytes = new byte[in.array().length];
+      directResult.get(heapBytes, 0, directResult.limit());
+
+      assertArrayEquals(in.array(), heapBytes);
+    } catch (Exception e) {
+      fail(e);
+    }
+  }
 }

Reply via email to