This is an automated email from the ASF dual-hosted git repository.

brandonwilliams pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6a42c21  replace LZ4FastDecompressor with LZ4SafeDecompressor
6a42c21 is described below

commit 6a42c21cb3e357caf4d7b2e8328f0f8f46f5269b
Author: Bereng <[email protected]>
AuthorDate: Mon Apr 20 13:40:43 2020 +0200

    replace LZ4FastDecompressor with LZ4SafeDecompressor
    
    Patch by Berenguer Blasi, reviewed by brandonwilliams for CASSANDRA-15560
---
 CHANGES.txt                                         |  1 +
 .../apache/cassandra/io/compress/LZ4Compressor.java | 21 +++++++++++++--------
 .../org/apache/cassandra/net/FrameDecoderLZ4.java   | 11 ++++++-----
 .../apache/cassandra/net/FrameDecoderLegacyLZ4.java |  9 +++++----
 .../async/StreamCompressionSerializer.java          |  4 ++--
 .../compress/StreamCompressionInputStream.java      |  6 +++---
 .../apache/cassandra/test/microbench/Sample.java    |  4 ++--
 .../async/StreamCompressionSerializerTest.java      |  4 ++--
 8 files changed, 34 insertions(+), 26 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index a1bf2fc..89c8d7d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0-alpha5
+ * replace LZ4FastDecompressor with LZ4SafeDecompressor (CASSANDRA-15560)
  * Fix buffer pool NPE with concurrent release due to in-progress tiny pool 
eviction (CASSANDRA-15726)
  * Avoid race condition when completing stream sessions (CASSANDRA-15666)
  * Flush with fast compressors by default (CASSANDRA-15379)
diff --git a/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java 
b/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java
index 30ec8ba..6c333b7 100644
--- a/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java
+++ b/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java
@@ -78,7 +78,7 @@ public class LZ4Compressor implements ICompressor
     }
 
     private final net.jpountz.lz4.LZ4Compressor compressor;
-    private final net.jpountz.lz4.LZ4FastDecompressor decompressor;
+    private final net.jpountz.lz4.LZ4SafeDecompressor decompressor;
     @VisibleForTesting
     final String compressorType;
     @VisibleForTesting
@@ -107,7 +107,7 @@ public class LZ4Compressor implements ICompressor
             }
         }
 
-        decompressor = lz4Factory.fastDecompressor();
+        decompressor = lz4Factory.safeDecompressor();
     }
 
     public int initialCompressedBufferLength(int chunkLength)
@@ -141,20 +141,24 @@ public class LZ4Compressor implements ICompressor
                 | ((input[inputOffset + 2] & 0xFF) << 16)
                 | ((input[inputOffset + 3] & 0xFF) << 24);
 
-        final int compressedLength;
+        final int writtenLength;
         try
         {
-            compressedLength = decompressor.decompress(input, inputOffset + 
INTEGER_BYTES,
-                                                       output, outputOffset, 
decompressedLength);
+            writtenLength = decompressor.decompress(input,
+                                                    inputOffset + 
INTEGER_BYTES,
+                                                    inputLength - 
INTEGER_BYTES,
+                                                    output,
+                                                    outputOffset,
+                                                    decompressedLength);
         }
         catch (LZ4Exception e)
         {
             throw new IOException(e);
         }
 
-        if (compressedLength != inputLength - INTEGER_BYTES)
+        if (writtenLength != decompressedLength)
         {
-            throw new IOException("Compressed lengths mismatch");
+            throw new IOException("Decompressed lengths mismatch");
         }
 
         return decompressedLength;
@@ -169,7 +173,8 @@ public class LZ4Compressor implements ICompressor
 
         try
         {
-            int compressedLength = decompressor.decompress(input, 
input.position(), output, output.position(), decompressedLength);
+            int compressedLength = input.remaining();
+            decompressor.decompress(input, input.position(), 
input.remaining(), output, output.position(), decompressedLength);
             input.position(input.position() + compressedLength);
             output.position(output.position() + decompressedLength);
         }
diff --git a/src/java/org/apache/cassandra/net/FrameDecoderLZ4.java 
b/src/java/org/apache/cassandra/net/FrameDecoderLZ4.java
index 941139a..2b32d18 100644
--- a/src/java/org/apache/cassandra/net/FrameDecoderLZ4.java
+++ b/src/java/org/apache/cassandra/net/FrameDecoderLZ4.java
@@ -24,7 +24,7 @@ import java.util.zip.CRC32;
 
 import io.netty.channel.ChannelPipeline;
 import net.jpountz.lz4.LZ4Factory;
-import net.jpountz.lz4.LZ4FastDecompressor;
+import net.jpountz.lz4.LZ4SafeDecompressor;
 
 import static org.apache.cassandra.net.Crc.*;
 
@@ -61,7 +61,7 @@ final class FrameDecoderLZ4 extends FrameDecoderWith8bHeader
 {
     public static FrameDecoderLZ4 fast(BufferPoolAllocator allocator)
     {
-        return new FrameDecoderLZ4(allocator, 
LZ4Factory.fastestInstance().fastDecompressor());
+        return new FrameDecoderLZ4(allocator, 
LZ4Factory.fastestInstance().safeDecompressor());
     }
 
     private static final int HEADER_LENGTH = 8;
@@ -85,9 +85,9 @@ final class FrameDecoderLZ4 extends FrameDecoderWith8bHeader
         return ((int) (header8b >>> 40)) & 0xFFFFFF;
     }
 
-    private final LZ4FastDecompressor decompressor;
+    private final LZ4SafeDecompressor decompressor;
 
-    private FrameDecoderLZ4(BufferPoolAllocator allocator, LZ4FastDecompressor 
decompressor)
+    private FrameDecoderLZ4(BufferPoolAllocator allocator, LZ4SafeDecompressor 
decompressor)
     {
         super(allocator);
         this.decompressor = decompressor;
@@ -141,7 +141,8 @@ final class FrameDecoderLZ4 extends FrameDecoderWith8bHeader
             ByteBuffer out = allocator.get(uncompressedLength);
             try
             {
-                decompressor.decompress(input, begin + HEADER_LENGTH, out, 0, 
uncompressedLength);
+                int sourceLength = end - (begin + HEADER_LENGTH + 
TRAILER_LENGTH);
+                decompressor.decompress(input, begin + HEADER_LENGTH, 
sourceLength, out, 0, uncompressedLength);
                 return new IntactFrame(isSelfContained, 
ShareableBytes.wrap(out));
             }
             catch (Throwable t)
diff --git a/src/java/org/apache/cassandra/net/FrameDecoderLegacyLZ4.java 
b/src/java/org/apache/cassandra/net/FrameDecoderLegacyLZ4.java
index f2556a5..bf6bc17 100644
--- a/src/java/org/apache/cassandra/net/FrameDecoderLegacyLZ4.java
+++ b/src/java/org/apache/cassandra/net/FrameDecoderLegacyLZ4.java
@@ -28,7 +28,7 @@ import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.channel.ChannelPipeline;
 import io.netty.handler.codec.compression.Lz4FrameDecoder;
 import net.jpountz.lz4.LZ4Factory;
-import net.jpountz.lz4.LZ4FastDecompressor;
+import net.jpountz.lz4.LZ4SafeDecompressor;
 import net.jpountz.xxhash.XXHash32;
 import net.jpountz.xxhash.XXHashFactory;
 import org.apache.cassandra.utils.memory.BufferPool;
@@ -101,8 +101,8 @@ class FrameDecoderLegacyLZ4 extends FrameDecoderLegacy
         private static final XXHash32 xxhash =
             XXHashFactory.fastestInstance().hash32();
 
-        private static final LZ4FastDecompressor decompressor =
-            LZ4Factory.fastestInstance().fastDecompressor();
+        private static final LZ4SafeDecompressor decompressor =
+            LZ4Factory.fastestInstance().safeDecompressor();
 
         private final BufferPoolAllocator allocator;
 
@@ -245,7 +245,8 @@ class FrameDecoderLegacyLZ4 extends FrameDecoderLegacy
             ByteBuffer out = allocator.get(header.uncompressedLength);
             try
             {
-                decompressor.decompress(buf, begin + HEADER_LENGTH, out, 0, 
header.uncompressedLength);
+                int sourceLength = end - (begin + HEADER_LENGTH);
+                decompressor.decompress(buf, begin + HEADER_LENGTH, 
sourceLength, out, 0, header.uncompressedLength);
                 validateChecksum(out, 0, header);
                 return ShareableBytes.wrap(out);
             }
diff --git 
a/src/java/org/apache/cassandra/streaming/async/StreamCompressionSerializer.java
 
b/src/java/org/apache/cassandra/streaming/async/StreamCompressionSerializer.java
index 1d83409..fc1bde2 100644
--- 
a/src/java/org/apache/cassandra/streaming/async/StreamCompressionSerializer.java
+++ 
b/src/java/org/apache/cassandra/streaming/async/StreamCompressionSerializer.java
@@ -25,7 +25,7 @@ import java.nio.channels.ReadableByteChannel;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
 import net.jpountz.lz4.LZ4Compressor;
-import net.jpountz.lz4.LZ4FastDecompressor;
+import net.jpountz.lz4.LZ4SafeDecompressor;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.net.AsyncStreamingOutputPlus;
 
@@ -73,7 +73,7 @@ public class StreamCompressionSerializer
     /**
      * @return A buffer with decompressed data.
      */
-    public ByteBuf deserialize(LZ4FastDecompressor decompressor, DataInputPlus 
in, int version) throws IOException
+    public ByteBuf deserialize(LZ4SafeDecompressor decompressor, DataInputPlus 
in, int version) throws IOException
     {
         final int compressedLength = in.readInt();
         final int uncompressedLength = in.readInt();
diff --git 
a/src/java/org/apache/cassandra/streaming/compress/StreamCompressionInputStream.java
 
b/src/java/org/apache/cassandra/streaming/compress/StreamCompressionInputStream.java
index 50d746a..ceed532 100644
--- 
a/src/java/org/apache/cassandra/streaming/compress/StreamCompressionInputStream.java
+++ 
b/src/java/org/apache/cassandra/streaming/compress/StreamCompressionInputStream.java
@@ -25,7 +25,7 @@ import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.buffer.Unpooled;
 import net.jpountz.lz4.LZ4Factory;
-import net.jpountz.lz4.LZ4FastDecompressor;
+import net.jpountz.lz4.LZ4SafeDecompressor;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.RebufferingInputStream;
 import org.apache.cassandra.net.AsyncStreamingInputPlus;
@@ -38,7 +38,7 @@ public class StreamCompressionInputStream extends 
RebufferingInputStream impleme
      */
     private final DataInputPlus dataInputPlus;
 
-    private final LZ4FastDecompressor decompressor;
+    private final LZ4SafeDecompressor decompressor;
     private final int protocolVersion;
     private final StreamCompressionSerializer deserializer;
 
@@ -54,7 +54,7 @@ public class StreamCompressionInputStream extends 
RebufferingInputStream impleme
 
         this.dataInputPlus = dataInputPlus;
         this.protocolVersion = protocolVersion;
-        this.decompressor = LZ4Factory.fastestInstance().fastDecompressor();
+        this.decompressor = LZ4Factory.fastestInstance().safeDecompressor();
 
         ByteBufAllocator allocator = dataInputPlus instanceof 
AsyncStreamingInputPlus
                                      ? 
((AsyncStreamingInputPlus)dataInputPlus).getAllocator()
diff --git a/test/microbench/org/apache/cassandra/test/microbench/Sample.java 
b/test/microbench/org/apache/cassandra/test/microbench/Sample.java
index 1f149c0..52f7c3e 100644
--- a/test/microbench/org/apache/cassandra/test/microbench/Sample.java
+++ b/test/microbench/org/apache/cassandra/test/microbench/Sample.java
@@ -20,7 +20,7 @@ package org.apache.cassandra.test.microbench;
 
 import net.jpountz.lz4.LZ4Compressor;
 import net.jpountz.lz4.LZ4Factory;
-import net.jpountz.lz4.LZ4FastDecompressor;
+import net.jpountz.lz4.LZ4SafeDecompressor;
 import org.openjdk.jmh.annotations.*;
 import org.xerial.snappy.Snappy;
 
@@ -56,7 +56,7 @@ public class Sample
     private byte[][] snappyBytes;
     private byte[][] rawBytes;
 
-    private LZ4FastDecompressor lz4Decompressor = 
LZ4Factory.fastestInstance().fastDecompressor();
+    private LZ4SafeDecompressor lz4Decompressor = 
LZ4Factory.fastestInstance().safeDecompressor();
 
     private LZ4Compressor lz4Compressor = 
LZ4Factory.fastestInstance().fastCompressor();
 
diff --git 
a/test/unit/org/apache/cassandra/streaming/async/StreamCompressionSerializerTest.java
 
b/test/unit/org/apache/cassandra/streaming/async/StreamCompressionSerializerTest.java
index a88092e..dab6001 100644
--- 
a/test/unit/org/apache/cassandra/streaming/async/StreamCompressionSerializerTest.java
+++ 
b/test/unit/org/apache/cassandra/streaming/async/StreamCompressionSerializerTest.java
@@ -34,7 +34,7 @@ import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.buffer.Unpooled;
 import net.jpountz.lz4.LZ4Compressor;
 import net.jpountz.lz4.LZ4Factory;
-import net.jpountz.lz4.LZ4FastDecompressor;
+import net.jpountz.lz4.LZ4SafeDecompressor;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.io.util.FileUtils;
@@ -48,7 +48,7 @@ public class StreamCompressionSerializerTest
     private final ByteBufAllocator allocator = PooledByteBufAllocator.DEFAULT;
     private final StreamCompressionSerializer serializer = new 
StreamCompressionSerializer(allocator);
     private final LZ4Compressor compressor = 
LZ4Factory.fastestInstance().fastCompressor();
-    private final LZ4FastDecompressor decompressor = 
LZ4Factory.fastestInstance().fastDecompressor();
+    private final LZ4SafeDecompressor decompressor = 
LZ4Factory.fastestInstance().safeDecompressor();
 
     private ByteBuffer input;
     private ByteBuffer compressed;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to