This is an automated email from the ASF dual-hosted git repository.

xianjin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 4a17a586 [#593][part-1] feat: Codec compress support ByteBuffer (#830)
4a17a586 is described below

commit 4a17a5866494fed87d5497737d259488de0ba6d6
Author: xumanbu <[email protected]>
AuthorDate: Mon Apr 24 13:04:13 2023 +0800

    [#593][part-1] feat: Codec compress support ByteBuffer (#830)
    
    ### What changes were proposed in this pull request?
    1. Codec add `ByteBuffer` type compress.
    
    ### Why are the changes needed?
    Fix: #593
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    UT
---
 .../apache/uniffle/common/compression/Codec.java   | 15 +++++
 .../uniffle/common/compression/Lz4Codec.java       | 18 ++++++
 .../uniffle/common/compression/NoOpCodec.java      | 12 ++++
 .../uniffle/common/compression/SnappyCodec.java    | 28 +++++++++
 .../uniffle/common/compression/ZstdCodec.java      | 22 ++++++++
 .../common/compression/CompressionTest.java        | 66 ++++++++++++++++++++++
 6 files changed, 161 insertions(+)

diff --git 
a/common/src/main/java/org/apache/uniffle/common/compression/Codec.java 
b/common/src/main/java/org/apache/uniffle/common/compression/Codec.java
index f5ebc02a..511cdd09 100644
--- a/common/src/main/java/org/apache/uniffle/common/compression/Codec.java
+++ b/common/src/main/java/org/apache/uniffle/common/compression/Codec.java
@@ -55,6 +55,21 @@ public abstract class Codec {
    */
   public abstract byte[] compress(byte[] src);
 
+  /**
+   * Compresses the data in buffer src into dest.
+   * Snappy & Zstd should be the same type of both buffer.
+   * make sure dest.remaining() >= maxCompressedLength(src.remaining()).
+   * This method move the position of dest ByteBuffer,keep src ByteBuffer 
position.
+   * Returns:the compressed size
+   */
+  public abstract int compress(ByteBuffer src, ByteBuffer dest);
+
+  /**
+   * maximum size of the compressed data
+   * @param sourceLength
+   */
+  public abstract int maxCompressedLength(int sourceLength);
+
   public enum Type {
     LZ4,
     ZSTD,
diff --git 
a/common/src/main/java/org/apache/uniffle/common/compression/Lz4Codec.java 
b/common/src/main/java/org/apache/uniffle/common/compression/Lz4Codec.java
index 59b6df6f..d1fccb37 100644
--- a/common/src/main/java/org/apache/uniffle/common/compression/Lz4Codec.java
+++ b/common/src/main/java/org/apache/uniffle/common/compression/Lz4Codec.java
@@ -21,6 +21,8 @@ import java.nio.ByteBuffer;
 
 import net.jpountz.lz4.LZ4Factory;
 
+import org.apache.uniffle.common.exception.RssException;
+
 public class Lz4Codec extends Codec {
 
   private LZ4Factory lz4Factory;
@@ -38,4 +40,20 @@ public class Lz4Codec extends Codec {
   public byte[] compress(byte[] src) {
     return lz4Factory.fastCompressor().compress(src);
   }
+
+  @Override
+  public int compress(ByteBuffer src, ByteBuffer dest) {
+    try {
+      int destOff = dest.position();
+      lz4Factory.fastCompressor().compress(src.duplicate(), dest);
+      return dest.position() - destOff;
+    } catch (Exception e) {
+      throw new RssException("Failed to compress by Lz4", e);
+    }
+  }
+
+  @Override
+  public int maxCompressedLength(int sourceLength) {
+    return lz4Factory.fastCompressor().maxCompressedLength(sourceLength);
+  }
 }
diff --git 
a/common/src/main/java/org/apache/uniffle/common/compression/NoOpCodec.java 
b/common/src/main/java/org/apache/uniffle/common/compression/NoOpCodec.java
index 14b9773e..40f508b8 100644
--- a/common/src/main/java/org/apache/uniffle/common/compression/NoOpCodec.java
+++ b/common/src/main/java/org/apache/uniffle/common/compression/NoOpCodec.java
@@ -34,4 +34,16 @@ public class NoOpCodec extends Codec {
     System.arraycopy(src, 0, dst, 0, src.length);
     return dst;
   }
+
+  @Override
+  public int compress(ByteBuffer src, ByteBuffer dest) {
+    int destOff = dest.position();
+    dest.put(src.duplicate());
+    return dest.position() - destOff;
+  }
+
+  @Override
+  public int maxCompressedLength(int sourceLength) {
+    return sourceLength;
+  }
 }
diff --git 
a/common/src/main/java/org/apache/uniffle/common/compression/SnappyCodec.java 
b/common/src/main/java/org/apache/uniffle/common/compression/SnappyCodec.java
index 41e4a8a5..0e9a57c2 100644
--- 
a/common/src/main/java/org/apache/uniffle/common/compression/SnappyCodec.java
+++ 
b/common/src/main/java/org/apache/uniffle/common/compression/SnappyCodec.java
@@ -64,4 +64,32 @@ public class SnappyCodec extends Codec {
       throw new RssException("Failed to uncompress by Snappy", e);
     }
   }
+
+  @Override
+  public int compress(ByteBuffer src, ByteBuffer dest) {
+    try {
+      if (src.isDirect() && dest.isDirect()) {
+        int destOff = dest.position();
+        // dest.duplicate and reset dest.position is consistent with other 
codec
+        int compressedSize = Snappy.compress(src.duplicate(), 
dest.duplicate());
+        dest.position(destOff + compressedSize);
+        return compressedSize;
+      }
+      if (!src.isDirect() && !dest.isDirect()) {
+        int destOff = dest.position();
+        int compressedSize = Snappy.compress(src.array(), src.position(), 
src.limit() - src.position(), dest.array(),
+            dest.position());
+        dest.position(destOff + compressedSize);
+        return compressedSize;
+      }
+    } catch (Exception e) {
+      throw new RssException("Failed to compress by Snappy", e);
+    }
+    throw new IllegalStateException("Snappy only supports the same type of 
bytebuffer compression.");
+  }
+
+  @Override
+  public int maxCompressedLength(int sourceLength) {
+    return Snappy.maxCompressedLength(sourceLength);
+  }
 }
diff --git 
a/common/src/main/java/org/apache/uniffle/common/compression/ZstdCodec.java 
b/common/src/main/java/org/apache/uniffle/common/compression/ZstdCodec.java
index 4381c661..0f507e16 100644
--- a/common/src/main/java/org/apache/uniffle/common/compression/ZstdCodec.java
+++ b/common/src/main/java/org/apache/uniffle/common/compression/ZstdCodec.java
@@ -63,4 +63,26 @@ public class ZstdCodec extends Codec {
   public byte[] compress(byte[] src) {
     return Zstd.compress(src, compressionLevel);
   }
+
+  @Override
+  public int compress(ByteBuffer src, ByteBuffer dest) {
+    try {
+      if (src.isDirect() && dest.isDirect()) {
+        return Zstd.compress(dest, src.duplicate(), compressionLevel);
+      }
+      if (!src.isDirect() && !dest.isDirect()) {
+        long compressedSize = Zstd.compressByteArray(dest.array(), 
dest.position(), dest.remaining(), src.array(),
+            src.position(), src.remaining(), compressionLevel);
+        return (int) compressedSize;
+      }
+    } catch (Exception e) {
+      throw new RssException("Failed to compress by Zstd", e);
+    }
+    throw new IllegalStateException("Zstd only supports the same type of 
bytebuffer compression.");
+  }
+
+  @Override
+  public int maxCompressedLength(int sourceLength) {
+    return (int) Zstd.compressBound(sourceLength);
+  }
 }
diff --git 
a/common/src/test/java/org/apache/uniffle/common/compression/CompressionTest.java
 
b/common/src/test/java/org/apache/uniffle/common/compression/CompressionTest.java
index fb140b63..cf76b1fa 100644
--- 
a/common/src/test/java/org/apache/uniffle/common/compression/CompressionTest.java
+++ 
b/common/src/test/java/org/apache/uniffle/common/compression/CompressionTest.java
@@ -30,6 +30,8 @@ import org.apache.uniffle.common.config.RssConf;
 
 import static org.apache.uniffle.common.config.RssClientConf.COMPRESSION_TYPE;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class CompressionTest {
 
@@ -79,5 +81,69 @@ public class CompressionTest {
     codec.decompress(ByteBuffer.wrap(compressed), size, recycledDst, 0);
     recycledDst.get(res);
     assertArrayEquals(data, res);
+
+    // case4: use off heap bytebuffer compress
+    ByteBuffer srcBuffer = ByteBuffer.allocateDirect(size);
+    ByteBuffer destBuffer = 
ByteBuffer.allocateDirect(codec.maxCompressedLength(size));
+    testCompressWithByteBuffer(codec, data, srcBuffer, destBuffer, 0);
+
+    // case5: use on heap bytebuffer compress
+    srcBuffer = ByteBuffer.allocate(size);
+    destBuffer = ByteBuffer.allocate(codec.maxCompressedLength(size));
+    testCompressWithByteBuffer(codec, data, srcBuffer, destBuffer, 0);
+
+    // case6: src buffer is on heap && dest buffer is off heap
+    srcBuffer = ByteBuffer.allocate(size);
+    destBuffer = ByteBuffer.allocateDirect(codec.maxCompressedLength(size));
+    testCompressWithByteBuffer(codec, data, srcBuffer, destBuffer, 0);
+
+    // case7: src buffer is off heap && dest buffer is on heap
+    srcBuffer = ByteBuffer.allocateDirect(size);
+    destBuffer = ByteBuffer.allocate(codec.maxCompressedLength(size));
+    testCompressWithByteBuffer(codec, data, srcBuffer, destBuffer, 0);
+
+    // case8: use src&dest bytebuffer with offset
+    int destOffset = 10;
+    srcBuffer = ByteBuffer.allocateDirect(size + destOffset);
+    destBuffer = ByteBuffer.allocateDirect(codec.maxCompressedLength(size) + 
destOffset);
+    testCompressWithByteBuffer(codec, data, srcBuffer, destBuffer, destOffset);
   }
+
+  private void testCompressWithByteBuffer(Codec codec, byte[] originData, 
ByteBuffer srcBuffer, ByteBuffer destBuffer,
+                                          int destOffset) {
+    srcBuffer.position(destOffset);
+    srcBuffer.put(originData);
+    srcBuffer.flip();
+    srcBuffer.position(destOffset);
+    destBuffer.position(destOffset);
+    if (!isSameType(srcBuffer, destBuffer) && (codec instanceof SnappyCodec || 
codec instanceof ZstdCodec)) {
+      try {
+        codec.compress(srcBuffer, destBuffer);
+      } catch (Exception e) {
+        assertTrue(e instanceof IllegalStateException);
+      }
+    } else {
+      codec.compress(srcBuffer, destBuffer);
+      assertEquals(srcBuffer.position(), destOffset);
+      destBuffer.flip();
+      destBuffer.position(destOffset);
+      srcBuffer.clear();
+      checkCompressedData(codec, originData, srcBuffer, destBuffer);
+    }
+  }
+
+  private boolean isSameType(ByteBuffer srcBuffer, ByteBuffer destBuffer) {
+    if (srcBuffer == null || destBuffer == null) {
+      return false;
+    }
+    return (srcBuffer.isDirect() && destBuffer.isDirect()) || 
(!srcBuffer.isDirect() && !destBuffer.isDirect());
+  }
+
+  private void checkCompressedData(Codec codec, byte[] originData, ByteBuffer 
dest, ByteBuffer src) {
+    codec.decompress(src, originData.length, dest, 0);
+    byte[] res = new byte[originData.length];
+    dest.get(res);
+    assertArrayEquals(originData, res);
+  }
+
 }

Reply via email to