This is an automated email from the ASF dual-hosted git repository.
brandonwilliams pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6a42c21 replace LZ4FastDecompressor with LZ4SafeDecompressor
6a42c21 is described below
commit 6a42c21cb3e357caf4d7b2e8328f0f8f46f5269b
Author: Bereng <[email protected]>
AuthorDate: Mon Apr 20 13:40:43 2020 +0200
replace LZ4FastDecompressor with LZ4SafeDecompressor
Patch by Berenguer Blasi, reviewed by brandonwilliams for CASSANDRA-15560
---
CHANGES.txt | 1 +
.../apache/cassandra/io/compress/LZ4Compressor.java | 21 +++++++++++++--------
.../org/apache/cassandra/net/FrameDecoderLZ4.java | 11 ++++++-----
.../apache/cassandra/net/FrameDecoderLegacyLZ4.java | 9 +++++----
.../async/StreamCompressionSerializer.java | 4 ++--
.../compress/StreamCompressionInputStream.java | 6 +++---
.../apache/cassandra/test/microbench/Sample.java | 4 ++--
.../async/StreamCompressionSerializerTest.java | 4 ++--
8 files changed, 34 insertions(+), 26 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index a1bf2fc..89c8d7d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0-alpha5
+ * replace LZ4FastDecompressor with LZ4SafeDecompressor (CASSANDRA-15560)
* Fix buffer pool NPE with concurrent release due to in-progress tiny pool
eviction (CASSANDRA-15726)
* Avoid race condition when completing stream sessions (CASSANDRA-15666)
* Flush with fast compressors by default (CASSANDRA-15379)
diff --git a/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java
b/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java
index 30ec8ba..6c333b7 100644
--- a/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java
+++ b/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java
@@ -78,7 +78,7 @@ public class LZ4Compressor implements ICompressor
}
private final net.jpountz.lz4.LZ4Compressor compressor;
- private final net.jpountz.lz4.LZ4FastDecompressor decompressor;
+ private final net.jpountz.lz4.LZ4SafeDecompressor decompressor;
@VisibleForTesting
final String compressorType;
@VisibleForTesting
@@ -107,7 +107,7 @@ public class LZ4Compressor implements ICompressor
}
}
- decompressor = lz4Factory.fastDecompressor();
+ decompressor = lz4Factory.safeDecompressor();
}
public int initialCompressedBufferLength(int chunkLength)
@@ -141,20 +141,24 @@ public class LZ4Compressor implements ICompressor
| ((input[inputOffset + 2] & 0xFF) << 16)
| ((input[inputOffset + 3] & 0xFF) << 24);
- final int compressedLength;
+ final int writtenLength;
try
{
- compressedLength = decompressor.decompress(input, inputOffset +
INTEGER_BYTES,
- output, outputOffset,
decompressedLength);
+ writtenLength = decompressor.decompress(input,
+ inputOffset +
INTEGER_BYTES,
+ inputLength -
INTEGER_BYTES,
+ output,
+ outputOffset,
+ decompressedLength);
}
catch (LZ4Exception e)
{
throw new IOException(e);
}
- if (compressedLength != inputLength - INTEGER_BYTES)
+ if (writtenLength != decompressedLength)
{
- throw new IOException("Compressed lengths mismatch");
+ throw new IOException("Decompressed lengths mismatch");
}
return decompressedLength;
@@ -169,7 +173,8 @@ public class LZ4Compressor implements ICompressor
try
{
- int compressedLength = decompressor.decompress(input,
input.position(), output, output.position(), decompressedLength);
+ int compressedLength = input.remaining();
+ decompressor.decompress(input, input.position(),
input.remaining(), output, output.position(), decompressedLength);
input.position(input.position() + compressedLength);
output.position(output.position() + decompressedLength);
}
diff --git a/src/java/org/apache/cassandra/net/FrameDecoderLZ4.java
b/src/java/org/apache/cassandra/net/FrameDecoderLZ4.java
index 941139a..2b32d18 100644
--- a/src/java/org/apache/cassandra/net/FrameDecoderLZ4.java
+++ b/src/java/org/apache/cassandra/net/FrameDecoderLZ4.java
@@ -24,7 +24,7 @@ import java.util.zip.CRC32;
import io.netty.channel.ChannelPipeline;
import net.jpountz.lz4.LZ4Factory;
-import net.jpountz.lz4.LZ4FastDecompressor;
+import net.jpountz.lz4.LZ4SafeDecompressor;
import static org.apache.cassandra.net.Crc.*;
@@ -61,7 +61,7 @@ final class FrameDecoderLZ4 extends FrameDecoderWith8bHeader
{
public static FrameDecoderLZ4 fast(BufferPoolAllocator allocator)
{
- return new FrameDecoderLZ4(allocator,
LZ4Factory.fastestInstance().fastDecompressor());
+ return new FrameDecoderLZ4(allocator,
LZ4Factory.fastestInstance().safeDecompressor());
}
private static final int HEADER_LENGTH = 8;
@@ -85,9 +85,9 @@ final class FrameDecoderLZ4 extends FrameDecoderWith8bHeader
return ((int) (header8b >>> 40)) & 0xFFFFFF;
}
- private final LZ4FastDecompressor decompressor;
+ private final LZ4SafeDecompressor decompressor;
- private FrameDecoderLZ4(BufferPoolAllocator allocator, LZ4FastDecompressor
decompressor)
+ private FrameDecoderLZ4(BufferPoolAllocator allocator, LZ4SafeDecompressor
decompressor)
{
super(allocator);
this.decompressor = decompressor;
@@ -141,7 +141,8 @@ final class FrameDecoderLZ4 extends FrameDecoderWith8bHeader
ByteBuffer out = allocator.get(uncompressedLength);
try
{
- decompressor.decompress(input, begin + HEADER_LENGTH, out, 0,
uncompressedLength);
+ int sourceLength = end - (begin + HEADER_LENGTH +
TRAILER_LENGTH);
+ decompressor.decompress(input, begin + HEADER_LENGTH,
sourceLength, out, 0, uncompressedLength);
return new IntactFrame(isSelfContained,
ShareableBytes.wrap(out));
}
catch (Throwable t)
diff --git a/src/java/org/apache/cassandra/net/FrameDecoderLegacyLZ4.java
b/src/java/org/apache/cassandra/net/FrameDecoderLegacyLZ4.java
index f2556a5..bf6bc17 100644
--- a/src/java/org/apache/cassandra/net/FrameDecoderLegacyLZ4.java
+++ b/src/java/org/apache/cassandra/net/FrameDecoderLegacyLZ4.java
@@ -28,7 +28,7 @@ import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.compression.Lz4FrameDecoder;
import net.jpountz.lz4.LZ4Factory;
-import net.jpountz.lz4.LZ4FastDecompressor;
+import net.jpountz.lz4.LZ4SafeDecompressor;
import net.jpountz.xxhash.XXHash32;
import net.jpountz.xxhash.XXHashFactory;
import org.apache.cassandra.utils.memory.BufferPool;
@@ -101,8 +101,8 @@ class FrameDecoderLegacyLZ4 extends FrameDecoderLegacy
private static final XXHash32 xxhash =
XXHashFactory.fastestInstance().hash32();
- private static final LZ4FastDecompressor decompressor =
- LZ4Factory.fastestInstance().fastDecompressor();
+ private static final LZ4SafeDecompressor decompressor =
+ LZ4Factory.fastestInstance().safeDecompressor();
private final BufferPoolAllocator allocator;
@@ -245,7 +245,8 @@ class FrameDecoderLegacyLZ4 extends FrameDecoderLegacy
ByteBuffer out = allocator.get(header.uncompressedLength);
try
{
- decompressor.decompress(buf, begin + HEADER_LENGTH, out, 0,
header.uncompressedLength);
+ int sourceLength = end - (begin + HEADER_LENGTH);
+ decompressor.decompress(buf, begin + HEADER_LENGTH,
sourceLength, out, 0, header.uncompressedLength);
validateChecksum(out, 0, header);
return ShareableBytes.wrap(out);
}
diff --git
a/src/java/org/apache/cassandra/streaming/async/StreamCompressionSerializer.java
b/src/java/org/apache/cassandra/streaming/async/StreamCompressionSerializer.java
index 1d83409..fc1bde2 100644
---
a/src/java/org/apache/cassandra/streaming/async/StreamCompressionSerializer.java
+++
b/src/java/org/apache/cassandra/streaming/async/StreamCompressionSerializer.java
@@ -25,7 +25,7 @@ import java.nio.channels.ReadableByteChannel;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import net.jpountz.lz4.LZ4Compressor;
-import net.jpountz.lz4.LZ4FastDecompressor;
+import net.jpountz.lz4.LZ4SafeDecompressor;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.net.AsyncStreamingOutputPlus;
@@ -73,7 +73,7 @@ public class StreamCompressionSerializer
/**
* @return A buffer with decompressed data.
*/
- public ByteBuf deserialize(LZ4FastDecompressor decompressor, DataInputPlus
in, int version) throws IOException
+ public ByteBuf deserialize(LZ4SafeDecompressor decompressor, DataInputPlus
in, int version) throws IOException
{
final int compressedLength = in.readInt();
final int uncompressedLength = in.readInt();
diff --git
a/src/java/org/apache/cassandra/streaming/compress/StreamCompressionInputStream.java
b/src/java/org/apache/cassandra/streaming/compress/StreamCompressionInputStream.java
index 50d746a..ceed532 100644
---
a/src/java/org/apache/cassandra/streaming/compress/StreamCompressionInputStream.java
+++
b/src/java/org/apache/cassandra/streaming/compress/StreamCompressionInputStream.java
@@ -25,7 +25,7 @@ import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
import net.jpountz.lz4.LZ4Factory;
-import net.jpountz.lz4.LZ4FastDecompressor;
+import net.jpountz.lz4.LZ4SafeDecompressor;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.RebufferingInputStream;
import org.apache.cassandra.net.AsyncStreamingInputPlus;
@@ -38,7 +38,7 @@ public class StreamCompressionInputStream extends
RebufferingInputStream impleme
*/
private final DataInputPlus dataInputPlus;
- private final LZ4FastDecompressor decompressor;
+ private final LZ4SafeDecompressor decompressor;
private final int protocolVersion;
private final StreamCompressionSerializer deserializer;
@@ -54,7 +54,7 @@ public class StreamCompressionInputStream extends
RebufferingInputStream impleme
this.dataInputPlus = dataInputPlus;
this.protocolVersion = protocolVersion;
- this.decompressor = LZ4Factory.fastestInstance().fastDecompressor();
+ this.decompressor = LZ4Factory.fastestInstance().safeDecompressor();
ByteBufAllocator allocator = dataInputPlus instanceof
AsyncStreamingInputPlus
?
((AsyncStreamingInputPlus)dataInputPlus).getAllocator()
diff --git a/test/microbench/org/apache/cassandra/test/microbench/Sample.java
b/test/microbench/org/apache/cassandra/test/microbench/Sample.java
index 1f149c0..52f7c3e 100644
--- a/test/microbench/org/apache/cassandra/test/microbench/Sample.java
+++ b/test/microbench/org/apache/cassandra/test/microbench/Sample.java
@@ -20,7 +20,7 @@ package org.apache.cassandra.test.microbench;
import net.jpountz.lz4.LZ4Compressor;
import net.jpountz.lz4.LZ4Factory;
-import net.jpountz.lz4.LZ4FastDecompressor;
+import net.jpountz.lz4.LZ4SafeDecompressor;
import org.openjdk.jmh.annotations.*;
import org.xerial.snappy.Snappy;
@@ -56,7 +56,7 @@ public class Sample
private byte[][] snappyBytes;
private byte[][] rawBytes;
- private LZ4FastDecompressor lz4Decompressor =
LZ4Factory.fastestInstance().fastDecompressor();
+ private LZ4SafeDecompressor lz4Decompressor =
LZ4Factory.fastestInstance().safeDecompressor();
private LZ4Compressor lz4Compressor =
LZ4Factory.fastestInstance().fastCompressor();
diff --git
a/test/unit/org/apache/cassandra/streaming/async/StreamCompressionSerializerTest.java
b/test/unit/org/apache/cassandra/streaming/async/StreamCompressionSerializerTest.java
index a88092e..dab6001 100644
---
a/test/unit/org/apache/cassandra/streaming/async/StreamCompressionSerializerTest.java
+++
b/test/unit/org/apache/cassandra/streaming/async/StreamCompressionSerializerTest.java
@@ -34,7 +34,7 @@ import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
import net.jpountz.lz4.LZ4Compressor;
import net.jpountz.lz4.LZ4Factory;
-import net.jpountz.lz4.LZ4FastDecompressor;
+import net.jpountz.lz4.LZ4SafeDecompressor;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.util.DataInputBuffer;
import org.apache.cassandra.io.util.FileUtils;
@@ -48,7 +48,7 @@ public class StreamCompressionSerializerTest
private final ByteBufAllocator allocator = PooledByteBufAllocator.DEFAULT;
private final StreamCompressionSerializer serializer = new
StreamCompressionSerializer(allocator);
private final LZ4Compressor compressor =
LZ4Factory.fastestInstance().fastCompressor();
- private final LZ4FastDecompressor decompressor =
LZ4Factory.fastestInstance().fastDecompressor();
+ private final LZ4SafeDecompressor decompressor =
LZ4Factory.fastestInstance().safeDecompressor();
private ByteBuffer input;
private ByteBuffer compressed;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]