This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 9532a46c0 [#2072] improvement: Corrected buffer offsets in
NoOpCodec.decompress (#2072)
9532a46c0 is described below
commit 9532a46c00dbc0e5e74f49d7f0ee638f501b2a15
Author: Zhen Wang <[email protected]>
AuthorDate: Mon Sep 2 14:20:10 2024 +0800
[#2072] improvement: Corrected buffer offsets in NoOpCodec.decompress
(#2072)
### What changes were proposed in this pull request?
Corrected buffer offsets in NoOpCodec.decompress
### Why are the changes needed?
Make NoOpCodec.decompress logic consistent with other codecs.
Fix: # (issue)
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
added unit tests
---
.../uniffle/common/compression/NoOpCodec.java | 6 +--
.../common/compression/CompressionTest.java | 48 ++++++++++++++++++++++
2 files changed, 51 insertions(+), 3 deletions(-)
diff --git
a/common/src/main/java/org/apache/uniffle/common/compression/NoOpCodec.java
b/common/src/main/java/org/apache/uniffle/common/compression/NoOpCodec.java
index 4cd111a34..89cd05771 100644
--- a/common/src/main/java/org/apache/uniffle/common/compression/NoOpCodec.java
+++ b/common/src/main/java/org/apache/uniffle/common/compression/NoOpCodec.java
@@ -31,9 +31,9 @@ public class NoOpCodec extends Codec {
@Override
public void decompress(ByteBuffer src, int uncompressedLen, ByteBuffer dest,
int destOffset) {
- dest.put(src);
- dest.position(destOffset);
- dest.limit(destOffset + uncompressedLen);
+ ByteBuffer destDuplicated = dest.duplicate();
+ destDuplicated.position(destOffset);
+ destDuplicated.put(src.duplicate());
}
@Override
diff --git
a/common/src/test/java/org/apache/uniffle/common/compression/CompressionTest.java
b/common/src/test/java/org/apache/uniffle/common/compression/CompressionTest.java
index ac5af5aa7..31238b11f 100644
---
a/common/src/test/java/org/apache/uniffle/common/compression/CompressionTest.java
+++
b/common/src/test/java/org/apache/uniffle/common/compression/CompressionTest.java
@@ -19,14 +19,17 @@ package org.apache.uniffle.common.compression;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import org.apache.commons.lang3.RandomUtils;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.util.ByteBufferUtils;
import static org.apache.uniffle.common.config.RssClientConf.COMPRESSION_TYPE;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
@@ -147,4 +150,49 @@ public class CompressionTest {
dest.get(res);
assertArrayEquals(originData, res);
}
+
+ @Test
+ public void checkDecompressBufferOffsets() {
+ byte[] data = RandomUtils.nextBytes(1024);
+ // Snappy decompression does not support non-zero offset for destination
direct ByteBuffer
+ Codec.Type[] types = {Codec.Type.ZSTD, Codec.Type.LZ4, Codec.Type.NOOP};
+ Boolean[] isDirects = {true, false};
+ for (Boolean isDirect : isDirects) {
+ for (Codec.Type type : types) {
+ Codec codec = Codec.newInstance(new RssConf().set(COMPRESSION_TYPE,
type)).get();
+ byte[] compressed = codec.compress(data);
+
+ ByteBuffer src;
+ if (isDirect) {
+ src = ByteBuffer.allocateDirect(compressed.length);
+ } else {
+ src = ByteBuffer.allocate(compressed.length);
+ }
+ src.put(compressed);
+ src.flip();
+
+ ByteBuffer dest;
+ if (isDirect) {
+ dest = ByteBuffer.allocateDirect(2048);
+ } else {
+ dest = ByteBuffer.allocate(2048);
+ }
+ codec.decompress(src, 1024, dest, 0);
+ assertEquals(0, src.position());
+ assertEquals(compressed.length, src.limit());
+ assertEquals(0, dest.position());
+ assertEquals(2048, dest.limit());
+ assertArrayEquals(
+ data,
Arrays.copyOfRange(ByteBufferUtils.bufferToArray(dest.duplicate()), 0, 1024));
+
+ codec.decompress(src, 1024, dest, 1024);
+ assertEquals(0, src.position());
+ assertEquals(compressed.length, src.limit());
+ assertEquals(0, dest.position());
+ assertEquals(2048, dest.limit());
+ assertArrayEquals(
+ data,
Arrays.copyOfRange(ByteBufferUtils.bufferToArray(dest.duplicate()), 1024,
2048));
+ }
+ }
+ }
}