This is an automated email from the ASF dual-hosted git repository. lizhimin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push: new 784cd7c1 [ISSUE #959] Automatically select the appropriate decompression algorithm according to magic code (#960) 784cd7c1 is described below commit 784cd7c1f2147fc0f31af3de2dcce28082caa73f Author: qianye <wuxingcan....@alibaba-inc.com> AuthorDate: Fri Mar 14 15:55:23 2025 +0800 [ISSUE #959] Automatically select the appropriate decompression algorithm according to magic code (#960) --- java/client/pom.xml | 9 +++ .../client/java/message/MessageViewImpl.java | 2 +- .../rocketmq/client/java/misc/Utilities.java | 86 ++++++++++++++++++---- .../rocketmq/client/java/misc/UtilitiesTest.java | 24 +++++- java/pom.xml | 14 ++++ 5 files changed, 115 insertions(+), 20 deletions(-) diff --git a/java/client/pom.xml b/java/client/pom.xml index 3c8ceb49..83a5e85a 100644 --- a/java/client/pom.xml +++ b/java/client/pom.xml @@ -127,6 +127,15 @@ <artifactId>awaitility</artifactId> <scope>test</scope> </dependency> + + <dependency> + <groupId>com.github.luben</groupId> + <artifactId>zstd-jni</artifactId> + </dependency> + <dependency> + <groupId>org.lz4</groupId> + <artifactId>lz4-java</artifactId> + </dependency> </dependencies> <build> diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageViewImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageViewImpl.java index b657f934..aa9db400 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageViewImpl.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageViewImpl.java @@ -274,7 +274,7 @@ public class MessageViewImpl implements MessageView { switch (bodyEncoding) { case GZIP: try { - body = Utilities.uncompressBytesGzip(body); + body = Utilities.decompressBytes(body); } catch (IOException e) { log.error("Failed to uncompress message body, topic={}, messageId={}", topic, messageId); corrupted = true; diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/misc/Utilities.java b/java/client/src/main/java/org/apache/rocketmq/client/java/misc/Utilities.java index 35916a5b..f7c71f1e 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/misc/Utilities.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/misc/Utilities.java @@ -18,9 +18,14 @@ package org.apache.rocketmq.client.java.misc; import apache.rocketmq.v2.ReceiveMessageRequest; +import com.github.luben.zstd.ZstdInputStream; +import com.github.luben.zstd.ZstdOutputStream; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; +import java.io.FilterInputStream; +import java.io.FilterOutputStream; import java.io.IOException; +import java.io.InputStream; import java.lang.management.ManagementFactory; import java.lang.management.RuntimeMXBean; import java.net.InetAddress; @@ -35,7 +40,11 @@ import java.util.Map; import java.util.Random; import java.util.zip.CRC32; import java.util.zip.DeflaterOutputStream; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; import java.util.zip.InflaterInputStream; +import net.jpountz.lz4.LZ4FrameInputStream; +import net.jpountz.lz4.LZ4FrameOutputStream; import org.apache.commons.lang3.StringUtils; public class Utilities { @@ -140,37 +149,59 @@ public class Utilities { } } - public static byte[] compressBytesGzip(final byte[] src, final int level) throws IOException { - ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(src.length); + public static byte[] compressBytesGZIP(final byte[] src) throws IOException { + try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(src.length)) { + try (FilterOutputStream outputStream = new GZIPOutputStream(byteArrayOutputStream)) { + outputStream.write(src); + outputStream.flush(); + } + return byteArrayOutputStream.toByteArray(); + } + } + + public static byte[] compressBytesZSTD(final byte[] src, final int level) throws IOException { + try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(src.length)) { + try (FilterOutputStream outputStream = new ZstdOutputStream(byteArrayOutputStream, level)) { + outputStream.write(src); + outputStream.flush(); + } + return byteArrayOutputStream.toByteArray(); + } + } + + public static byte[] compressBytesZLIB(final byte[] src, final int level) throws IOException { java.util.zip.Deflater defeater = new java.util.zip.Deflater(level); - DeflaterOutputStream deflaterOutputStream = - new DeflaterOutputStream(byteArrayOutputStream, defeater); - try { + try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(src.length); + DeflaterOutputStream deflaterOutputStream = + new DeflaterOutputStream(byteArrayOutputStream, defeater)) { deflaterOutputStream.write(src); deflaterOutputStream.finish(); - deflaterOutputStream.close(); - return byteArrayOutputStream.toByteArray(); } finally { - try { - byteArrayOutputStream.close(); - } catch (IOException ignore) { - // Exception not expected here. - } defeater.end(); } } - public static byte[] uncompressBytesGzip(final byte[] src) throws IOException { + public static byte[] compressBytesLZ4(byte[] src) throws IOException { + try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(src.length)) { + try (FilterOutputStream outputStream = new LZ4FrameOutputStream(byteArrayOutputStream)) { + outputStream.write(src); + outputStream.flush(); + } + return byteArrayOutputStream.toByteArray(); + } + } + + public static byte[] decompressBytes(final byte[] src) throws IOException { byte[] uncompressData = new byte[src.length]; ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(src); - InflaterInputStream inflaterInputStream = new InflaterInputStream(byteArrayInputStream); + FilterInputStream filterInputStream = getStreamByMagicCode(src, byteArrayInputStream); ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(src.length); try { int length; - while ((length = inflaterInputStream.read(uncompressData, 0, uncompressData.length)) > 0) { + while ((length = filterInputStream.read(uncompressData, 0, uncompressData.length)) > 0) { byteArrayOutputStream.write(uncompressData, 0, length); } byteArrayOutputStream.flush(); @@ -183,7 +214,7 @@ public class Utilities { // Exception not expected here. } try { - inflaterInputStream.close(); + filterInputStream.close(); } catch (IOException ignore) { // Exception not expected here. } @@ -195,6 +226,29 @@ public class Utilities { } } + private static FilterInputStream getStreamByMagicCode(byte[] src, InputStream inputStream) throws IOException { + // Automatically select the appropriate decompression algorithm according to magic code + // GZIP magic code: 0x1F 0x8B + // ZLIB magic code: 0x78 + // LZ4 magic code: 0x04 0x22 0x4D 0x18 + // ZSTD magic code: 0x28 0xB5 0x2F 0xFD + FilterInputStream filterInputStream; + if ((src[0] & 0xFF) == 0x1F && (src[1] & 0xFF) == 0x8B) { + filterInputStream = new GZIPInputStream(inputStream); + } else if ((src[0] & 0xFF) == 0x78) { + filterInputStream = new InflaterInputStream(inputStream); + } else if ((src[0] & 0xFF) == 0x04 && (src[1] & 0xFF) == 0x22 && (src[2] & 0xFF) == 0x4D + && (src[3] & 0xFF) == 0x18) { + filterInputStream = new LZ4FrameInputStream(inputStream); + } else if (((src[0] & 0xFF) == 0x28 && (src[1] & 0xFF) == 0xB5 && (src[2] & 0xFF) == 0x2F + && (src[3] & 0xFF) == 0xFD)) { + filterInputStream = new ZstdInputStream(inputStream); + } else { + throw new IOException("Unknown compression format"); + } + return filterInputStream; + } + public static String encodeHexString(ByteBuffer byteBuffer, boolean toLowerCase) { return new String(encodeHex(byteBuffer, toLowerCase)); } diff --git a/java/client/src/test/java/org/apache/rocketmq/client/java/misc/UtilitiesTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/misc/UtilitiesTest.java index b59c377a..ac4e172d 100644 --- a/java/client/src/test/java/org/apache/rocketmq/client/java/misc/UtilitiesTest.java +++ b/java/client/src/test/java/org/apache/rocketmq/client/java/misc/UtilitiesTest.java @@ -32,9 +32,27 @@ public class UtilitiesTest { @Test public void testCompressAndUncompressByteArray() throws IOException { final byte[] bytes = body.getBytes(StandardCharsets.UTF_8); - final byte[] compressedBytes = Utilities.compressBytesGzip(bytes, 5); - final byte[] originalBytes = Utilities.uncompressBytesGzip(compressedBytes); - assertEquals(new String(originalBytes, StandardCharsets.UTF_8), body); + { + final byte[] compressedBytes = Utilities.compressBytesZLIB(bytes, 5); + final byte[] originalBytes = Utilities.decompressBytes(compressedBytes); + assertEquals(new String(originalBytes, StandardCharsets.UTF_8), body); + } + { + final byte[] compressedBytes = Utilities.compressBytesLZ4(bytes); + final byte[] originalBytes = Utilities.decompressBytes(compressedBytes); + assertEquals(new String(originalBytes, StandardCharsets.UTF_8), body); + } + { + final byte[] compressedBytes = Utilities.compressBytesZSTD(bytes, 3); + final byte[] originalBytes = Utilities.decompressBytes(compressedBytes); + assertEquals(new String(originalBytes, StandardCharsets.UTF_8), body); + } + { + final byte[] compressedBytes = Utilities.compressBytesGZIP(bytes); + final byte[] originalBytes = Utilities.decompressBytes(compressedBytes); + assertEquals(new String(originalBytes, StandardCharsets.UTF_8), body); + } + } @Test diff --git a/java/pom.xml b/java/pom.xml index 0694f407..bb7f7c78 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -74,6 +74,9 @@ <maven-source-plugin.version>3.2.1</maven-source-plugin.version> <maven-shade-plugin.version>3.2.4</maven-shade-plugin.version> <maven-antrun-plugin.version>1.8</maven-antrun-plugin.version> + + <zstd-jni.version>1.5.2-2</zstd-jni.version> + <lz4-java.version>1.8.0</lz4-java.version> </properties> <distributionManagement> @@ -250,6 +253,17 @@ <version>${bcpkix.version}</version> <scope>test</scope> </dependency> + + <dependency> + <groupId>org.lz4</groupId> + <artifactId>lz4-java</artifactId> + <version>${lz4-java.version}</version> + </dependency> + <dependency> + <groupId>com.github.luben</groupId> + <artifactId>zstd-jni</artifactId> + <version>${zstd-jni.version}</version> + </dependency> </dependencies> </dependencyManagement>