This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new 784cd7c1 [ISSUE #959] Automatically select the appropriate 
decompression algorithm according to magic code (#960)
784cd7c1 is described below

commit 784cd7c1f2147fc0f31af3de2dcce28082caa73f
Author: qianye <wuxingcan....@alibaba-inc.com>
AuthorDate: Fri Mar 14 15:55:23 2025 +0800

    [ISSUE #959] Automatically select the appropriate decompression algorithm 
according to magic code (#960)
---
 java/client/pom.xml                                |  9 +++
 .../client/java/message/MessageViewImpl.java       |  2 +-
 .../rocketmq/client/java/misc/Utilities.java       | 86 ++++++++++++++++++----
 .../rocketmq/client/java/misc/UtilitiesTest.java   | 24 +++++-
 java/pom.xml                                       | 14 ++++
 5 files changed, 115 insertions(+), 20 deletions(-)

diff --git a/java/client/pom.xml b/java/client/pom.xml
index 3c8ceb49..83a5e85a 100644
--- a/java/client/pom.xml
+++ b/java/client/pom.xml
@@ -127,6 +127,15 @@
             <artifactId>awaitility</artifactId>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>com.github.luben</groupId>
+            <artifactId>zstd-jni</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.lz4</groupId>
+            <artifactId>lz4-java</artifactId>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageViewImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageViewImpl.java
index b657f934..aa9db400 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageViewImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageViewImpl.java
@@ -274,7 +274,7 @@ public class MessageViewImpl implements MessageView {
         switch (bodyEncoding) {
             case GZIP:
                 try {
-                    body = Utilities.uncompressBytesGzip(body);
+                    body = Utilities.decompressBytes(body);
                 } catch (IOException e) {
                     log.error("Failed to uncompress message body, topic={}, 
messageId={}", topic, messageId);
                     corrupted = true;
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/misc/Utilities.java 
b/java/client/src/main/java/org/apache/rocketmq/client/java/misc/Utilities.java
index 35916a5b..f7c71f1e 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/misc/Utilities.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/misc/Utilities.java
@@ -18,9 +18,14 @@
 package org.apache.rocketmq.client.java.misc;
 
 import apache.rocketmq.v2.ReceiveMessageRequest;
+import com.github.luben.zstd.ZstdInputStream;
+import com.github.luben.zstd.ZstdOutputStream;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
+import java.io.FilterInputStream;
+import java.io.FilterOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.lang.management.ManagementFactory;
 import java.lang.management.RuntimeMXBean;
 import java.net.InetAddress;
@@ -35,7 +40,11 @@ import java.util.Map;
 import java.util.Random;
 import java.util.zip.CRC32;
 import java.util.zip.DeflaterOutputStream;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
 import java.util.zip.InflaterInputStream;
+import net.jpountz.lz4.LZ4FrameInputStream;
+import net.jpountz.lz4.LZ4FrameOutputStream;
 import org.apache.commons.lang3.StringUtils;
 
 public class Utilities {
@@ -140,37 +149,59 @@ public class Utilities {
         }
     }
 
-    public static byte[] compressBytesGzip(final byte[] src, final int level) 
throws IOException {
-        ByteArrayOutputStream byteArrayOutputStream = new 
ByteArrayOutputStream(src.length);
+    public static byte[] compressBytesGZIP(final byte[] src) throws 
IOException {
+        try (ByteArrayOutputStream byteArrayOutputStream = new 
ByteArrayOutputStream(src.length)) {
+            try (FilterOutputStream outputStream = new 
GZIPOutputStream(byteArrayOutputStream)) {
+                outputStream.write(src);
+                outputStream.flush();
+            }
+            return byteArrayOutputStream.toByteArray();
+        }
+    }
+
+    public static byte[] compressBytesZSTD(final byte[] src, final int level) 
throws IOException {
+        try (ByteArrayOutputStream byteArrayOutputStream = new 
ByteArrayOutputStream(src.length)) {
+            try (FilterOutputStream outputStream = new 
ZstdOutputStream(byteArrayOutputStream, level)) {
+                outputStream.write(src);
+                outputStream.flush();
+            }
+            return byteArrayOutputStream.toByteArray();
+        }
+    }
+
+    public static byte[] compressBytesZLIB(final byte[] src, final int level) 
throws IOException {
         java.util.zip.Deflater defeater = new java.util.zip.Deflater(level);
-        DeflaterOutputStream deflaterOutputStream =
-            new DeflaterOutputStream(byteArrayOutputStream, defeater);
-        try {
+        try (ByteArrayOutputStream byteArrayOutputStream = new 
ByteArrayOutputStream(src.length);
+             DeflaterOutputStream deflaterOutputStream =
+                 new DeflaterOutputStream(byteArrayOutputStream, defeater)) {
             deflaterOutputStream.write(src);
             deflaterOutputStream.finish();
-            deflaterOutputStream.close();
-
             return byteArrayOutputStream.toByteArray();
         } finally {
-            try {
-                byteArrayOutputStream.close();
-            } catch (IOException ignore) {
-                // Exception not expected here.
-            }
             defeater.end();
         }
     }
 
-    public static byte[] uncompressBytesGzip(final byte[] src) throws 
IOException {
+    public static byte[] compressBytesLZ4(byte[] src) throws IOException {
+        try (ByteArrayOutputStream byteArrayOutputStream = new 
ByteArrayOutputStream(src.length)) {
+            try (FilterOutputStream outputStream = new 
LZ4FrameOutputStream(byteArrayOutputStream)) {
+                outputStream.write(src);
+                outputStream.flush();
+            }
+            return byteArrayOutputStream.toByteArray();
+        }
+    }
+
+    public static byte[] decompressBytes(final byte[] src) throws IOException {
         byte[] uncompressData = new byte[src.length];
 
         ByteArrayInputStream byteArrayInputStream = new 
ByteArrayInputStream(src);
-        InflaterInputStream inflaterInputStream = new 
InflaterInputStream(byteArrayInputStream);
+        FilterInputStream filterInputStream = getStreamByMagicCode(src, 
byteArrayInputStream);
         ByteArrayOutputStream byteArrayOutputStream = new 
ByteArrayOutputStream(src.length);
 
         try {
             int length;
-            while ((length = inflaterInputStream.read(uncompressData, 0, 
uncompressData.length)) > 0) {
+            while ((length = filterInputStream.read(uncompressData, 0, 
uncompressData.length)) > 0) {
                 byteArrayOutputStream.write(uncompressData, 0, length);
             }
             byteArrayOutputStream.flush();
@@ -183,7 +214,7 @@ public class Utilities {
                 // Exception not expected here.
             }
             try {
-                inflaterInputStream.close();
+                filterInputStream.close();
             } catch (IOException ignore) {
                 // Exception not expected here.
             }
@@ -195,6 +226,29 @@ public class Utilities {
         }
     }
 
+    private static FilterInputStream getStreamByMagicCode(byte[] src, 
InputStream inputStream) throws IOException {
+        // Automatically select the appropriate decompression algorithm 
according to magic code
+        // GZIP magic code: 0x1F 0x8B
+        // ZLIB magic code: 0x78
+        // LZ4 magic code: 0x04 0x22 0x4D 0x18
+        // ZSTD magic code: 0x28 0xB5 0x2F 0xFD
+        FilterInputStream filterInputStream;
+        if ((src[0] & 0xFF) == 0x1F && (src[1] & 0xFF) == 0x8B) {
+            filterInputStream = new GZIPInputStream(inputStream);
+        } else if ((src[0] & 0xFF) == 0x78) {
+            filterInputStream = new InflaterInputStream(inputStream);
+        } else if ((src[0] & 0xFF) == 0x04 && (src[1] & 0xFF) == 0x22 && 
(src[2] & 0xFF) == 0x4D
+            && (src[3] & 0xFF) == 0x18) {
+            filterInputStream = new LZ4FrameInputStream(inputStream);
+        } else if (((src[0] & 0xFF) == 0x28 && (src[1] & 0xFF) == 0xB5 && 
(src[2] & 0xFF) == 0x2F
+            && (src[3] & 0xFF) == 0xFD)) {
+            filterInputStream = new ZstdInputStream(inputStream);
+        } else {
+            throw new IOException("Unknown compression format");
+        }
+        return filterInputStream;
+    }
+
     public static String encodeHexString(ByteBuffer byteBuffer, boolean 
toLowerCase) {
         return new String(encodeHex(byteBuffer, toLowerCase));
     }
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/misc/UtilitiesTest.java
 
b/java/client/src/test/java/org/apache/rocketmq/client/java/misc/UtilitiesTest.java
index b59c377a..ac4e172d 100644
--- 
a/java/client/src/test/java/org/apache/rocketmq/client/java/misc/UtilitiesTest.java
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/misc/UtilitiesTest.java
@@ -32,9 +32,27 @@ public class UtilitiesTest {
     @Test
     public void testCompressAndUncompressByteArray() throws IOException {
         final byte[] bytes = body.getBytes(StandardCharsets.UTF_8);
-        final byte[] compressedBytes = Utilities.compressBytesGzip(bytes, 5);
-        final byte[] originalBytes = 
Utilities.uncompressBytesGzip(compressedBytes);
-        assertEquals(new String(originalBytes, StandardCharsets.UTF_8), body);
+        {
+            final byte[] compressedBytes = Utilities.compressBytesZLIB(bytes, 
5);
+            final byte[] originalBytes = 
Utilities.decompressBytes(compressedBytes);
+            assertEquals(new String(originalBytes, StandardCharsets.UTF_8), 
body);
+        }
+        {
+            final byte[] compressedBytes = Utilities.compressBytesLZ4(bytes);
+            final byte[] originalBytes = 
Utilities.decompressBytes(compressedBytes);
+            assertEquals(new String(originalBytes, StandardCharsets.UTF_8), 
body);
+        }
+        {
+            final byte[] compressedBytes = Utilities.compressBytesZSTD(bytes, 
3);
+            final byte[] originalBytes = 
Utilities.decompressBytes(compressedBytes);
+            assertEquals(new String(originalBytes, StandardCharsets.UTF_8), 
body);
+        }
+        {
+            final byte[] compressedBytes = Utilities.compressBytesGZIP(bytes);
+            final byte[] originalBytes = 
Utilities.decompressBytes(compressedBytes);
+            assertEquals(new String(originalBytes, StandardCharsets.UTF_8), 
body);
+        }
+
     }
 
     @Test
diff --git a/java/pom.xml b/java/pom.xml
index 0694f407..bb7f7c78 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -74,6 +74,9 @@
         <maven-source-plugin.version>3.2.1</maven-source-plugin.version>
         <maven-shade-plugin.version>3.2.4</maven-shade-plugin.version>
         <maven-antrun-plugin.version>1.8</maven-antrun-plugin.version>
+
+        <zstd-jni.version>1.5.2-2</zstd-jni.version>
+        <lz4-java.version>1.8.0</lz4-java.version>
     </properties>
 
     <distributionManagement>
@@ -250,6 +253,17 @@
                 <version>${bcpkix.version}</version>
                 <scope>test</scope>
             </dependency>
+
+            <dependency>
+                <groupId>org.lz4</groupId>
+                <artifactId>lz4-java</artifactId>
+                <version>${lz4-java.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>com.github.luben</groupId>
+                <artifactId>zstd-jni</artifactId>
+                <version>${zstd-jni.version}</version>
+            </dependency>
         </dependencies>
     </dependencyManagement>
 

Reply via email to