This is an automated email from the ASF dual-hosted git repository.
xianjin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 4a17a586 [#593][part-1] feat: Codec compress support ByteBuffer (#830)
4a17a586 is described below
commit 4a17a5866494fed87d5497737d259488de0ba6d6
Author: xumanbu <[email protected]>
AuthorDate: Mon Apr 24 13:04:13 2023 +0800
[#593][part-1] feat: Codec compress support ByteBuffer (#830)
### What changes were proposed in this pull request?
1. Codec add `ByteBuffer` type compress.
### Why are the changes needed?
Fix: #593
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
UT
---
.../apache/uniffle/common/compression/Codec.java | 15 +++++
.../uniffle/common/compression/Lz4Codec.java | 18 ++++++
.../uniffle/common/compression/NoOpCodec.java | 12 ++++
.../uniffle/common/compression/SnappyCodec.java | 28 +++++++++
.../uniffle/common/compression/ZstdCodec.java | 22 ++++++++
.../common/compression/CompressionTest.java | 66 ++++++++++++++++++++++
6 files changed, 161 insertions(+)
diff --git
a/common/src/main/java/org/apache/uniffle/common/compression/Codec.java
b/common/src/main/java/org/apache/uniffle/common/compression/Codec.java
index f5ebc02a..511cdd09 100644
--- a/common/src/main/java/org/apache/uniffle/common/compression/Codec.java
+++ b/common/src/main/java/org/apache/uniffle/common/compression/Codec.java
@@ -55,6 +55,21 @@ public abstract class Codec {
*/
public abstract byte[] compress(byte[] src);
+ /**
+ * Compresses the data in buffer src into dest.
+ * Snappy & Zstd should be the same type of both buffer.
+ * make sure dest.remaining() >= maxCompressedLength(src.remaining()).
+ * This method move the position of dest ByteBuffer,keep src ByteBuffer
position.
+ * Returns:the compressed size
+ */
+ public abstract int compress(ByteBuffer src, ByteBuffer dest);
+
+ /**
+ * maximum size of the compressed data
+ * @param sourceLength
+ */
+ public abstract int maxCompressedLength(int sourceLength);
+
public enum Type {
LZ4,
ZSTD,
diff --git
a/common/src/main/java/org/apache/uniffle/common/compression/Lz4Codec.java
b/common/src/main/java/org/apache/uniffle/common/compression/Lz4Codec.java
index 59b6df6f..d1fccb37 100644
--- a/common/src/main/java/org/apache/uniffle/common/compression/Lz4Codec.java
+++ b/common/src/main/java/org/apache/uniffle/common/compression/Lz4Codec.java
@@ -21,6 +21,8 @@ import java.nio.ByteBuffer;
import net.jpountz.lz4.LZ4Factory;
+import org.apache.uniffle.common.exception.RssException;
+
public class Lz4Codec extends Codec {
private LZ4Factory lz4Factory;
@@ -38,4 +40,20 @@ public class Lz4Codec extends Codec {
public byte[] compress(byte[] src) {
return lz4Factory.fastCompressor().compress(src);
}
+
+ @Override
+ public int compress(ByteBuffer src, ByteBuffer dest) {
+ try {
+ int destOff = dest.position();
+ lz4Factory.fastCompressor().compress(src.duplicate(), dest);
+ return dest.position() - destOff;
+ } catch (Exception e) {
+ throw new RssException("Failed to compress by Lz4", e);
+ }
+ }
+
+ @Override
+ public int maxCompressedLength(int sourceLength) {
+ return lz4Factory.fastCompressor().maxCompressedLength(sourceLength);
+ }
}
diff --git
a/common/src/main/java/org/apache/uniffle/common/compression/NoOpCodec.java
b/common/src/main/java/org/apache/uniffle/common/compression/NoOpCodec.java
index 14b9773e..40f508b8 100644
--- a/common/src/main/java/org/apache/uniffle/common/compression/NoOpCodec.java
+++ b/common/src/main/java/org/apache/uniffle/common/compression/NoOpCodec.java
@@ -34,4 +34,16 @@ public class NoOpCodec extends Codec {
System.arraycopy(src, 0, dst, 0, src.length);
return dst;
}
+
+ @Override
+ public int compress(ByteBuffer src, ByteBuffer dest) {
+ int destOff = dest.position();
+ dest.put(src.duplicate());
+ return dest.position() - destOff;
+ }
+
+ @Override
+ public int maxCompressedLength(int sourceLength) {
+ return sourceLength;
+ }
}
diff --git
a/common/src/main/java/org/apache/uniffle/common/compression/SnappyCodec.java
b/common/src/main/java/org/apache/uniffle/common/compression/SnappyCodec.java
index 41e4a8a5..0e9a57c2 100644
---
a/common/src/main/java/org/apache/uniffle/common/compression/SnappyCodec.java
+++
b/common/src/main/java/org/apache/uniffle/common/compression/SnappyCodec.java
@@ -64,4 +64,32 @@ public class SnappyCodec extends Codec {
throw new RssException("Failed to uncompress by Snappy", e);
}
}
+
+ @Override
+ public int compress(ByteBuffer src, ByteBuffer dest) {
+ try {
+ if (src.isDirect() && dest.isDirect()) {
+ int destOff = dest.position();
+ // dest.duplicate and reset dest.position is consistent with other
codec
+ int compressedSize = Snappy.compress(src.duplicate(),
dest.duplicate());
+ dest.position(destOff + compressedSize);
+ return compressedSize;
+ }
+ if (!src.isDirect() && !dest.isDirect()) {
+ int destOff = dest.position();
+ int compressedSize = Snappy.compress(src.array(), src.position(),
src.limit() - src.position(), dest.array(),
+ dest.position());
+ dest.position(destOff + compressedSize);
+ return compressedSize;
+ }
+ } catch (Exception e) {
+ throw new RssException("Failed to compress by Snappy", e);
+ }
+ throw new IllegalStateException("Snappy only supports the same type of
bytebuffer compression.");
+ }
+
+ @Override
+ public int maxCompressedLength(int sourceLength) {
+ return Snappy.maxCompressedLength(sourceLength);
+ }
}
diff --git
a/common/src/main/java/org/apache/uniffle/common/compression/ZstdCodec.java
b/common/src/main/java/org/apache/uniffle/common/compression/ZstdCodec.java
index 4381c661..0f507e16 100644
--- a/common/src/main/java/org/apache/uniffle/common/compression/ZstdCodec.java
+++ b/common/src/main/java/org/apache/uniffle/common/compression/ZstdCodec.java
@@ -63,4 +63,26 @@ public class ZstdCodec extends Codec {
public byte[] compress(byte[] src) {
return Zstd.compress(src, compressionLevel);
}
+
+ @Override
+ public int compress(ByteBuffer src, ByteBuffer dest) {
+ try {
+ if (src.isDirect() && dest.isDirect()) {
+ return Zstd.compress(dest, src.duplicate(), compressionLevel);
+ }
+ if (!src.isDirect() && !dest.isDirect()) {
+ long compressedSize = Zstd.compressByteArray(dest.array(),
dest.position(), dest.remaining(), src.array(),
+ src.position(), src.remaining(), compressionLevel);
+ return (int) compressedSize;
+ }
+ } catch (Exception e) {
+ throw new RssException("Failed to compress by Zstd", e);
+ }
+ throw new IllegalStateException("Zstd only supports the same type of
bytebuffer compression.");
+ }
+
+ @Override
+ public int maxCompressedLength(int sourceLength) {
+ return (int) Zstd.compressBound(sourceLength);
+ }
}
diff --git
a/common/src/test/java/org/apache/uniffle/common/compression/CompressionTest.java
b/common/src/test/java/org/apache/uniffle/common/compression/CompressionTest.java
index fb140b63..cf76b1fa 100644
---
a/common/src/test/java/org/apache/uniffle/common/compression/CompressionTest.java
+++
b/common/src/test/java/org/apache/uniffle/common/compression/CompressionTest.java
@@ -30,6 +30,8 @@ import org.apache.uniffle.common.config.RssConf;
import static org.apache.uniffle.common.config.RssClientConf.COMPRESSION_TYPE;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
public class CompressionTest {
@@ -79,5 +81,69 @@ public class CompressionTest {
codec.decompress(ByteBuffer.wrap(compressed), size, recycledDst, 0);
recycledDst.get(res);
assertArrayEquals(data, res);
+
+ // case4: use off heap bytebuffer compress
+ ByteBuffer srcBuffer = ByteBuffer.allocateDirect(size);
+ ByteBuffer destBuffer =
ByteBuffer.allocateDirect(codec.maxCompressedLength(size));
+ testCompressWithByteBuffer(codec, data, srcBuffer, destBuffer, 0);
+
+ // case5: use on heap bytebuffer compress
+ srcBuffer = ByteBuffer.allocate(size);
+ destBuffer = ByteBuffer.allocate(codec.maxCompressedLength(size));
+ testCompressWithByteBuffer(codec, data, srcBuffer, destBuffer, 0);
+
+ // case6: src buffer is on heap && dest buffer is off heap
+ srcBuffer = ByteBuffer.allocate(size);
+ destBuffer = ByteBuffer.allocateDirect(codec.maxCompressedLength(size));
+ testCompressWithByteBuffer(codec, data, srcBuffer, destBuffer, 0);
+
+ // case7: src buffer is off heap && dest buffer is on heap
+ srcBuffer = ByteBuffer.allocateDirect(size);
+ destBuffer = ByteBuffer.allocate(codec.maxCompressedLength(size));
+ testCompressWithByteBuffer(codec, data, srcBuffer, destBuffer, 0);
+
+ // case8: use src&dest bytebuffer with offset
+ int destOffset = 10;
+ srcBuffer = ByteBuffer.allocateDirect(size + destOffset);
+ destBuffer = ByteBuffer.allocateDirect(codec.maxCompressedLength(size) +
destOffset);
+ testCompressWithByteBuffer(codec, data, srcBuffer, destBuffer, destOffset);
}
+
+ private void testCompressWithByteBuffer(Codec codec, byte[] originData,
ByteBuffer srcBuffer, ByteBuffer destBuffer,
+ int destOffset) {
+ srcBuffer.position(destOffset);
+ srcBuffer.put(originData);
+ srcBuffer.flip();
+ srcBuffer.position(destOffset);
+ destBuffer.position(destOffset);
+ if (!isSameType(srcBuffer, destBuffer) && (codec instanceof SnappyCodec ||
codec instanceof ZstdCodec)) {
+ try {
+ codec.compress(srcBuffer, destBuffer);
+ } catch (Exception e) {
+ assertTrue(e instanceof IllegalStateException);
+ }
+ } else {
+ codec.compress(srcBuffer, destBuffer);
+ assertEquals(srcBuffer.position(), destOffset);
+ destBuffer.flip();
+ destBuffer.position(destOffset);
+ srcBuffer.clear();
+ checkCompressedData(codec, originData, srcBuffer, destBuffer);
+ }
+ }
+
+ private boolean isSameType(ByteBuffer srcBuffer, ByteBuffer destBuffer) {
+ if (srcBuffer == null || destBuffer == null) {
+ return false;
+ }
+ return (srcBuffer.isDirect() && destBuffer.isDirect()) ||
(!srcBuffer.isDirect() && !destBuffer.isDirect());
+ }
+
+ private void checkCompressedData(Codec codec, byte[] originData, ByteBuffer
dest, ByteBuffer src) {
+ codec.decompress(src, originData.length, dest, 0);
+ byte[] res = new byte[originData.length];
+ dest.get(res);
+ assertArrayEquals(originData, res);
+ }
+
}