This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/orc.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new fb40a3904 ORC-1613: Zstd decompression supports direct buffer
fb40a3904 is described below
commit fb40a3904640c8c5437dd1f3217e46a205648dbc
Author: sychen <[email protected]>
AuthorDate: Wed Feb 7 18:52:31 2024 -0800
ORC-1613: Zstd decompression supports direct buffer
### What changes were proposed in this pull request?
`ZstdCodec` implements the `DirectDecompressionCodec` interface.
### Why are the changes needed?
`zstd-jni` supports direct buffer decompression, which can reduce Buffer
copying.
### How was this patch tested?
add UT
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #1789 from cxzl25/ORC-1613.
Authored-by: sychen <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit 25beaa26c86e316ab58a0950514b9d99df893fc5)
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../src/java/org/apache/orc/impl/ZstdCodec.java | 18 ++++++++++-
.../src/test/org/apache/orc/impl/TestZstd.java | 35 ++++++++++++++++++++++
2 files changed, 52 insertions(+), 1 deletion(-)
diff --git a/java/core/src/java/org/apache/orc/impl/ZstdCodec.java
b/java/core/src/java/org/apache/orc/impl/ZstdCodec.java
index 3818eb3f1..6703a82c1 100644
--- a/java/core/src/java/org/apache/orc/impl/ZstdCodec.java
+++ b/java/core/src/java/org/apache/orc/impl/ZstdCodec.java
@@ -25,7 +25,7 @@ import org.apache.orc.CompressionKind;
import java.io.IOException;
import java.nio.ByteBuffer;
-public class ZstdCodec implements CompressionCodec {
+public class ZstdCodec implements CompressionCodec, DirectDecompressionCodec {
private ZstdOptions zstdOptions = null;
private ZstdCompressCtx zstdCompressCtx = null;
@@ -214,6 +214,11 @@ public class ZstdCodec implements CompressionCodec {
@Override
public void decompress(ByteBuffer in, ByteBuffer out) throws IOException {
+ if (in.isDirect() && out.isDirect()) {
+ directDecompress(in, out);
+ return;
+ }
+
int srcOffset = in.arrayOffset() + in.position();
int srcSize = in.remaining();
int dstOffset = out.arrayOffset() + out.position();
@@ -227,6 +232,17 @@ public class ZstdCodec implements CompressionCodec {
out.flip();
}
+ @Override
+ public boolean isAvailable() {
+ return true;
+ }
+
+ @Override
+ public void directDecompress(ByteBuffer in, ByteBuffer out) throws
IOException {
+ Zstd.decompress(out, in);
+ out.flip();
+ }
+
@Override
public void reset() {
diff --git a/java/core/src/test/org/apache/orc/impl/TestZstd.java
b/java/core/src/test/org/apache/orc/impl/TestZstd.java
index b424feb82..dfa310a40 100644
--- a/java/core/src/test/org/apache/orc/impl/TestZstd.java
+++ b/java/core/src/test/org/apache/orc/impl/TestZstd.java
@@ -28,8 +28,10 @@ import org.apache.orc.CompressionCodec;
import org.apache.orc.CompressionKind;
import org.junit.jupiter.api.Test;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
public class TestZstd {
@@ -119,4 +121,37 @@ public class TestZstd {
"zstd-jni compression with aircompressor decompression did not return"
+ " the input!");
}
+
+ @Test
+ public void testZstdDirectDecompress() {
+ ByteBuffer in = ByteBuffer.allocate(10000);
+ ByteBuffer out = ByteBuffer.allocate(10000);
+ ByteBuffer directOut = ByteBuffer.allocateDirect(10000);
+ ByteBuffer directResult = ByteBuffer.allocateDirect(10000);
+ for (int i = 0; i < 10000; i++) {
+ in.put((byte) i);
+ }
+ in.flip();
+ try (ZstdCodec zstdCodec = new ZstdCodec()) {
+ // write bytes to heap buffer.
+ assertTrue(zstdCodec.compress(in, out, null,
+ zstdCodec.getDefaultOptions()));
+ int position = out.position();
+ out.flip();
+ // copy heap buffer to direct buffer.
+ directOut.put(out.array());
+ directOut.flip();
+ directOut.limit(position);
+
+ zstdCodec.decompress(directOut, directResult);
+
+ // copy result from direct buffer to heap.
+ byte[] heapBytes = new byte[in.array().length];
+ directResult.get(heapBytes, 0, directResult.limit());
+
+ assertArrayEquals(in.array(), heapBytes);
+ } catch (Exception e) {
+ fail(e);
+ }
+ }
}