This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new aa65220  [Compression] Fix ByteBuffer allocate error in the 
AirliftUtils (#9667)
aa65220 is described below

commit aa65220fa496add95b04971c29400446157c43b4
Author: ran <[email protected]>
AuthorDate: Mon Feb 22 20:40:10 2021 +0800

    [Compression] Fix ByteBuffer allocate error in the AirliftUtils (#9667)
    
    Fixes #9666
    
    ### Motivation
    
    The compressed data length may be bigger than the original data length 
(e.g. the source text is not repeated, such as "abcde"), so we can't use the 
uncompressed length as the allocated length to initial the ByteBuffer.
    
    ### Modifications
    
    Use the capacity of the ByteBuffer instead of the uncompressed length as 
the allocated length.
    
    ### Verifying this change
    
    Update the existing tests.
---
 .../pulsar/common/compression/AirliftUtils.java    |  4 +--
 .../common/compression/CompressionCodecLZ4.java    |  2 +-
 .../common/compression/CompressionCodecSnappy.java |  2 +-
 .../common/compression/CompressionCodecZstd.java   |  2 +-
 .../common/compression/CompressorCodecTest.java    | 38 ++++++++++++++++------
 5 files changed, 33 insertions(+), 15 deletions(-)

diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/AirliftUtils.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/AirliftUtils.java
index 3bfc609..e1480d2 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/AirliftUtils.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/AirliftUtils.java
@@ -25,10 +25,10 @@ import java.nio.ByteBuffer;
  */
 public abstract class AirliftUtils {
 
-    static ByteBuffer ensureAirliftSupported(ByteBuffer encodedNio, int 
uncompressedLength) {
+    static ByteBuffer ensureAirliftSupported(ByteBuffer encodedNio) {
         if (!encodedNio.isDirect() && !encodedNio.hasArray()) {
             // airlift needs a raw ByteArray
-            ByteBuffer copy = ByteBuffer.allocate(uncompressedLength);
+            ByteBuffer copy = ByteBuffer.allocate(encodedNio.capacity());
             copy.put(encodedNio);
             copy.flip();
             encodedNio = copy;
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecLZ4.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecLZ4.java
index a2cf16d..efc497f 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecLZ4.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecLZ4.java
@@ -94,7 +94,7 @@ public class CompressionCodecLZ4 implements CompressionCodec {
         } else {
             ByteBuffer uncompressedNio = uncompressed.nioBuffer(0, 
uncompressedLength);
             ByteBuffer encodedNio = encoded.nioBuffer(encoded.readerIndex(), 
encoded.readableBytes());
-            encodedNio = AirliftUtils.ensureAirliftSupported(encodedNio, 
uncompressedLength);
+            encodedNio = AirliftUtils.ensureAirliftSupported(encodedNio);
             LZ4_DECOMPRESSOR.get().decompress(encodedNio, uncompressedNio);
         }
 
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecSnappy.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecSnappy.java
index 9464992..4bd95e3 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecSnappy.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecSnappy.java
@@ -101,7 +101,7 @@ public class CompressionCodecSnappy implements 
CompressionCodec {
             ByteBuffer uncompressedNio = uncompressed.nioBuffer(0, 
uncompressedLength);
             ByteBuffer encodedNio = encoded.nioBuffer(encoded.readerIndex(), 
encoded.readableBytes());
 
-            encodedNio = AirliftUtils.ensureAirliftSupported(encodedNio, 
uncompressedLength);
+            encodedNio = AirliftUtils.ensureAirliftSupported(encodedNio);
             SNAPPY_DECOMPRESSOR.get().decompress(encodedNio, uncompressedNio);
         }
 
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecZstd.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecZstd.java
index f41f083..78a110a 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecZstd.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecZstd.java
@@ -95,7 +95,7 @@ public class CompressionCodecZstd implements CompressionCodec 
{
         } else {
             ByteBuffer uncompressedNio = uncompressed.nioBuffer(0, 
uncompressedLength);
             ByteBuffer encodedNio = encoded.nioBuffer(encoded.readerIndex(), 
encoded.readableBytes());
-            encodedNio = AirliftUtils.ensureAirliftSupported(encodedNio, 
uncompressedLength);
+            encodedNio = AirliftUtils.ensureAirliftSupported(encodedNio);
             ZSTD_DECOMPRESSOR.get().decompress(encodedNio, uncompressedNio);
         }
 
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CompressorCodecTest.java
 
b/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CompressorCodecTest.java
index 4e2f623..5518df5 100644
--- 
a/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CompressorCodecTest.java
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CompressorCodecTest.java
@@ -36,13 +36,31 @@ import org.testng.annotations.Test;
 
 public class CompressorCodecTest {
 
-    private static String text = "Lorem ipsum dolor sit amet, consectetur 
adipiscing elit. Cras id massa odio. Duis commodo ligula sed efficitur cursus. 
Aliquam sollicitudin, tellus quis suscipit tincidunt, erat sem efficitur nulla, 
in feugiat diam ex a dolor. Vestibulum ante ipsum primis in faucibus orci 
luctus et ultrices posuere cubilia Curae; Vestibulum ac volutpat nisl, vel 
aliquam elit. Maecenas auctor aliquet turpis, id ullamcorper metus. Ut 
tincidunt et magna non ultrices. Quisqu [...]
+    private static final String text = "Lorem ipsum dolor sit amet, 
consectetur adipiscing elit. Cras id massa odio. Duis commodo ligula sed 
efficitur cursus. Aliquam sollicitudin, tellus quis suscipit tincidunt, erat 
sem efficitur nulla, in feugiat diam ex a dolor. Vestibulum ante ipsum primis 
in faucibus orci luctus et ultrices posuere cubilia Curae; Vestibulum ac 
volutpat nisl, vel aliquam elit. Maecenas auctor aliquet turpis, id ullamcorper 
metus. Ut tincidunt et magna non ultrices.  [...]
+
+    private static final String noRepeatedText = "abcde";
 
     private static final String zipCompressedText = 
"789c54914d4ec4300c85aff20e50f502acd0b0840542b03789a7b2e4249dd8461c1f7706a16157d5f1fbf9fc3c2637c86ed150878e09130735f6056574e3e2ec31415576b1227d03abf88ad32483543432238c2a63c55388e5566ba30ea86ca104e30a3e9fa5c82153625ad88a47954b50830dd5eba84a5fe0ac1a86cb2193cf4a5a3a5c7a911a3d03f1244fc17627d843951648c79963939c57495dfe06ddfaacf86073f90ccd86d49d7fcbee535ada1c8b1425e786318b40a3787eb323d4a71436ecc3822767f84f51219c62123ffcd32df81a1abea77f17d3055
 [...]
     private static final String lz4CompressedText = 
"f1aa4c6f72656d20697073756d20646f6c6f722073697420616d65742c20636f6e73656374657475722061646970697363696e6720656c69742e2043726173206964206d61737361206f64696f2e204475697320636f6d6d6f646f206c6967756c612073656420656666696369747572206375727375732e20416c697175616d20736f6c6c696369747564696e2c2074656c6c757320717569732073757363697069742074696e636964756e742c20657261742073656d206566663600f20e72206e756c6c612c20696e2066657567696174206469616d206578206
 [...]
     private static final String zstdCompressedText= 
"28b52ffd600c017d0900c6573c1b9027314099d2a79d3ee4d712d9a201c61c60008415bcfbc24c1802330033003300b7c8cf78602958ceaf800dfc81a64c2a3eeea453b0a13f33625ef9e407f9ac9a96a5c172a156c8faf4c1c3c36533b98b1d1c08857e58e9c8dac1e62ee86e4db1b4d231176af67899abf5c92b3711cebd3375617edc61bad0227e013672f36cbd7e38980f4b5337564a740f132e00a3b305d3ef862ddbb14d4c9c563f7a6b35e76aedf975d0b986d752befc24a6865cf6d6704d43b0414e3007466e7902218719d848d234846e5e74b645e4b7c
 [...]
     private static final String snappyCompressedText = 
"8c04f0b44c6f72656d20697073756d20646f6c6f722073697420616d65742c20636f6e73656374657475722061646970697363696e6720656c69742e2043726173206964206d61737361206f64696f2e204475697320636f6d6d6f646f206c6967756c612073656420656666696369747572206375727375732e20416c697175616d20736f6c6c696369747564696e2c2074656c6c757320717569732073757363697069742074696e636964756e742c20657261742073656d1d516c6e756c6c612c20696e2066657567696174206469616d20657820612005d0
 [...]
 
+    @DataProvider(name = "codecAndText")
+    public Object[][] codecAndTextProvider() {
+        return new Object[][] {
+                { CompressionType.NONE, noRepeatedText},
+                { CompressionType.NONE, text },
+                { CompressionType.LZ4, noRepeatedText},
+                { CompressionType.LZ4, text },
+                { CompressionType.ZLIB, noRepeatedText},
+                { CompressionType.ZLIB, text },
+                { CompressionType.ZSTD, noRepeatedText},
+                { CompressionType.ZSTD, text },
+                { CompressionType.SNAPPY, noRepeatedText},
+                { CompressionType.SNAPPY, text }
+        };
+    }
+
     @DataProvider(name = "codec")
     public Object[][] codecProvider() {
         return new Object[][] {
@@ -54,10 +72,10 @@ public class CompressorCodecTest {
         };
     }
 
-    @Test(dataProvider = "codec")
-    void testCompressDecompress(CompressionType type, String compressedText) 
throws IOException {
+    @Test(dataProvider = "codecAndText")
+    void testCompressDecompress(CompressionType type, String sourceText) 
throws IOException {
         CompressionCodec codec = 
CompressionCodecProvider.getCompressionCodec(type);
-        byte[] data = text.getBytes();
+        byte[] data = sourceText.getBytes();
         ByteBuf raw = PulsarByteBufAllocator.DEFAULT.directBuffer();
         raw.writeBytes(data);
 
@@ -83,10 +101,10 @@ public class CompressorCodecTest {
         assertEquals(compressed.refCnt(), 0);
     }
 
-    @Test(dataProvider = "codec")
-    void testDecompressReadonlyByteBuf(CompressionType type, String 
compressedText) throws IOException {
+    @Test(dataProvider = "codecAndText")
+    void testDecompressReadonlyByteBuf(CompressionType type, String 
sourceText) throws IOException {
         CompressionCodec codec = 
CompressionCodecProvider.getCompressionCodec(type);
-        byte[] data = text.getBytes();
+        byte[] data = sourceText.getBytes();
         ByteBuf raw = PulsarByteBufAllocator.DEFAULT.directBuffer();
         raw.writeBytes(data);
 
@@ -124,10 +142,10 @@ public class CompressorCodecTest {
         assertEquals(uncompressed, Unpooled.EMPTY_BUFFER);
     }
 
-    @Test(dataProvider = "codec")
-    void testMultpileUsages(CompressionType type, String compressedText) 
throws IOException {
+    @Test(dataProvider = "codecAndText")
+    void testMultpileUsages(CompressionType type, String sourceText) throws 
IOException {
         CompressionCodec codec = 
CompressionCodecProvider.getCompressionCodec(type);
-        byte[] data = text.getBytes();
+        byte[] data = sourceText.getBytes();
 
         for (int i = 0; i < 5; i++) {
             ByteBuf raw = PulsarByteBufAllocator.DEFAULT.directBuffer();

Reply via email to