This is an automated email from the ASF dual-hosted git repository. yong pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 099a49e6cf91f243f31689020ba58f85391307b8 Author: ran <[email protected]> AuthorDate: Mon Feb 22 20:40:10 2021 +0800 [Compression] Fix ByteBuffer allocate error in the AirliftUtils (#9667) Fixes #9666 ### Motivation The compressed data length may be bigger than the original data length (e.g. the source text is not repeated, such as "abcde"), so we can't use the uncompressed length as the allocated length to initial the ByteBuffer. ### Modifications Use the capacity of the ByteBuffer instead of the uncompressed length as the allocated length. ### Verifying this change Update the existing tests. (cherry picked from commit aa65220fa496add95b04971c29400446157c43b4) --- .../pulsar/common/compression/AirliftUtils.java | 4 +-- .../common/compression/CompressionCodecLZ4.java | 2 +- .../common/compression/CompressionCodecSnappy.java | 2 +- .../common/compression/CompressionCodecZstd.java | 2 +- .../common/compression/CompressorCodecTest.java | 38 ++++++++++++++++------ 5 files changed, 33 insertions(+), 15 deletions(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/AirliftUtils.java b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/AirliftUtils.java index 3bfc609..e1480d2 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/AirliftUtils.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/AirliftUtils.java @@ -25,10 +25,10 @@ import java.nio.ByteBuffer; */ public abstract class AirliftUtils { - static ByteBuffer ensureAirliftSupported(ByteBuffer encodedNio, int uncompressedLength) { + static ByteBuffer ensureAirliftSupported(ByteBuffer encodedNio) { if (!encodedNio.isDirect() && !encodedNio.hasArray()) { // airlift needs a raw ByteArray - ByteBuffer copy = ByteBuffer.allocate(uncompressedLength); + ByteBuffer copy = ByteBuffer.allocate(encodedNio.capacity()); copy.put(encodedNio); copy.flip(); encodedNio = copy; diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecLZ4.java b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecLZ4.java index 12a03d1..b65b488 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecLZ4.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecLZ4.java @@ -96,7 +96,7 @@ public class CompressionCodecLZ4 implements CompressionCodec { } else { ByteBuffer uncompressedNio = uncompressed.nioBuffer(0, uncompressedLength); ByteBuffer encodedNio = encoded.nioBuffer(encoded.readerIndex(), encoded.readableBytes()); - encodedNio = AirliftUtils.ensureAirliftSupported(encodedNio, uncompressedLength); + encodedNio = AirliftUtils.ensureAirliftSupported(encodedNio); LZ4_DECOMPRESSOR.get().decompress(encodedNio, uncompressedNio); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecSnappy.java b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecSnappy.java index 1e31edc..3bbfd0ce 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecSnappy.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecSnappy.java @@ -103,7 +103,7 @@ public class CompressionCodecSnappy implements CompressionCodec { ByteBuffer uncompressedNio = uncompressed.nioBuffer(0, uncompressedLength); ByteBuffer encodedNio = encoded.nioBuffer(encoded.readerIndex(), encoded.readableBytes()); - encodedNio = AirliftUtils.ensureAirliftSupported(encodedNio, uncompressedLength); + encodedNio = AirliftUtils.ensureAirliftSupported(encodedNio); SNAPPY_DECOMPRESSOR.get().decompress(encodedNio, uncompressedNio); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecZstd.java b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecZstd.java index 18caee6..84f8d0c 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecZstd.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecZstd.java @@ -97,7 +97,7 @@ public class CompressionCodecZstd implements CompressionCodec { } else { ByteBuffer uncompressedNio = uncompressed.nioBuffer(0, uncompressedLength); ByteBuffer encodedNio = encoded.nioBuffer(encoded.readerIndex(), encoded.readableBytes()); - encodedNio = AirliftUtils.ensureAirliftSupported(encodedNio, uncompressedLength); + encodedNio = AirliftUtils.ensureAirliftSupported(encodedNio); ZSTD_DECOMPRESSOR.get().decompress(encodedNio, uncompressedNio); } diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CompressorCodecTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CompressorCodecTest.java index ec84741..50e358f 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CompressorCodecTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CompressorCodecTest.java @@ -36,13 +36,31 @@ import org.testng.annotations.Test; public class CompressorCodecTest { - private static String text = "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Cras id massa odio. Duis commodo ligula sed efficitur cursus. Aliquam sollicitudin, tellus quis suscipit tincidunt, erat sem efficitur nulla, in feugiat diam ex a dolor. Vestibulum ante ipsum primis in faucibus orci luctus et ultrices posuere cubilia Curae; Vestibulum ac volutpat nisl, vel aliquam elit. Maecenas auctor aliquet turpis, id ullamcorper metus. Ut tincidunt et magna non ultrices. Quisqu [...] + private static final String text = "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Cras id massa odio. Duis commodo ligula sed efficitur cursus. Aliquam sollicitudin, tellus quis suscipit tincidunt, erat sem efficitur nulla, in feugiat diam ex a dolor. Vestibulum ante ipsum primis in faucibus orci luctus et ultrices posuere cubilia Curae; Vestibulum ac volutpat nisl, vel aliquam elit. Maecenas auctor aliquet turpis, id ullamcorper metus. Ut tincidunt et magna non ultrices. [...] + + private static final String noRepeatedText = "abcde"; private static final String zipCompressedText = "789c54914d4ec4300c85aff20e50f502acd0b0840542b03789a7b2e4249dd8461c1f7706a16157d5f1fbf9fc3c2637c86ed150878e09130735f6056574e3e2ec31415576b1227d03abf88ad32483543432238c2a63c55388e5566ba30ea86ca104e30a3e9fa5c82153625ad88a47954b50830dd5eba84a5fe0ac1a86cb2193cf4a5a3a5c7a911a3d03f1244fc17627d843951648c79963939c57495dfe06ddfaacf86073f90ccd86d49d7fcbee535ada1c8b1425e786318b40a3787eb323d4a71436ecc3822767f84f51219c62123ffcd32df81a1abea77f17d3055 [...] private static final String lz4CompressedText = "f1aa4c6f72656d20697073756d20646f6c6f722073697420616d65742c20636f6e73656374657475722061646970697363696e6720656c69742e2043726173206964206d61737361206f64696f2e204475697320636f6d6d6f646f206c6967756c612073656420656666696369747572206375727375732e20416c697175616d20736f6c6c696369747564696e2c2074656c6c757320717569732073757363697069742074696e636964756e742c20657261742073656d206566663600f20e72206e756c6c612c20696e2066657567696174206469616d206578206 [...] private static final String zstdCompressedText= "28b52ffd600c017d0900c6573c1b9027314099d2a79d3ee4d712d9a201c61c60008415bcfbc24c1802330033003300b7c8cf78602958ceaf800dfc81a64c2a3eeea453b0a13f33625ef9e407f9ac9a96a5c172a156c8faf4c1c3c36533b98b1d1c08857e58e9c8dac1e62ee86e4db1b4d231176af67899abf5c92b3711cebd3375617edc61bad0227e013672f36cbd7e38980f4b5337564a740f132e00a3b305d3ef862ddbb14d4c9c563f7a6b35e76aedf975d0b986d752befc24a6865cf6d6704d43b0414e3007466e7902218719d848d234846e5e74b645e4b7c [...] private static final String snappyCompressedText = "8c04f0b44c6f72656d20697073756d20646f6c6f722073697420616d65742c20636f6e73656374657475722061646970697363696e6720656c69742e2043726173206964206d61737361206f64696f2e204475697320636f6d6d6f646f206c6967756c612073656420656666696369747572206375727375732e20416c697175616d20736f6c6c696369747564696e2c2074656c6c757320717569732073757363697069742074696e636964756e742c20657261742073656d1d516c6e756c6c612c20696e2066657567696174206469616d20657820612005d0 [...] + @DataProvider(name = "codecAndText") + public Object[][] codecAndTextProvider() { + return new Object[][] { + { CompressionType.NONE, noRepeatedText}, + { CompressionType.NONE, text }, + { CompressionType.LZ4, noRepeatedText}, + { CompressionType.LZ4, text }, + { CompressionType.ZLIB, noRepeatedText}, + { CompressionType.ZLIB, text }, + { CompressionType.ZSTD, noRepeatedText}, + { CompressionType.ZSTD, text }, + { CompressionType.SNAPPY, noRepeatedText}, + { CompressionType.SNAPPY, text } + }; + } + @DataProvider(name = "codec") public Object[][] codecProvider() { return new Object[][] { @@ -54,10 +72,10 @@ public class CompressorCodecTest { }; } - @Test(dataProvider = "codec") - void testCompressDecompress(CompressionType type, String compressedText) throws IOException { + @Test(dataProvider = "codecAndText") + void testCompressDecompress(CompressionType type, String sourceText) throws IOException { CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(type); - byte[] data = text.getBytes(); + byte[] data = sourceText.getBytes(); ByteBuf raw = PulsarByteBufAllocator.DEFAULT.directBuffer(); raw.writeBytes(data); @@ -83,10 +101,10 @@ public class CompressorCodecTest { assertEquals(compressed.refCnt(), 0); } - @Test(dataProvider = "codec") - void testDecompressReadonlyByteBuf(CompressionType type, String compressedText) throws IOException { + @Test(dataProvider = "codecAndText") + void testDecompressReadonlyByteBuf(CompressionType type, String sourceText) throws IOException { CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(type); - byte[] data = text.getBytes(); + byte[] data = sourceText.getBytes(); ByteBuf raw = PulsarByteBufAllocator.DEFAULT.directBuffer(); raw.writeBytes(data); @@ -124,10 +142,10 @@ public class CompressorCodecTest { assertEquals(uncompressed, Unpooled.EMPTY_BUFFER); } - @Test(dataProvider = "codec") - void testMultpileUsages(CompressionType type, String compressedText) throws IOException { + @Test(dataProvider = "codecAndText") + void testMultpileUsages(CompressionType type, String sourceText) throws IOException { CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(type); - byte[] data = text.getBytes(); + byte[] data = sourceText.getBytes(); for (int i = 0; i < 5; i++) { ByteBuf raw = PulsarByteBufAllocator.DEFAULT.directBuffer();
