This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3ba688e  KAFKA-9203: Check for buggy LZ4 libraries and remove 
corresponding workarounds (#10196)
3ba688e is described below

commit 3ba688e75f29b2089317b3cf7331192ce81ea3f0
Author: Xavier Léauté <[email protected]>
AuthorDate: Fri Jul 23 14:32:41 2021 -0700

    KAFKA-9203: Check for buggy LZ4 libraries and remove corresponding 
workarounds (#10196)
    
    * Remove the workarounds that were added back in 
https://github.com/apache/kafka/pull/7769
    * Add a check to detect buggy LZ4 library versions
    
    This check allows us to safely remove the workarounds for buggy
    LZ4 versions without users encountering cryptic errors if they
    accidentally have an older LZ4 library on the classpath, as
    described in KAFKA-9203.
    
    With this change the use will get a clear error message indicating
    what the problem might be if they encounter this situation.
    
    Note: This now instantiates a compressor in the decompression code.
    This should be safe with respect to JNI libraries, since we always use
    `LZ4Factory.fastestInstance()` which takes care of falling back to a pure
    Java implementation if JNI libraries are not present.
    
    This was tested with lz4 1.3.0 to make sure it triggers the exception when 
running
    `KafkaLZ4Test`.
    
    Reviewers: Manikumar Reddy <[email protected]>, Ismael Juma 
<[email protected]>
    
    Co-authored-by: Ismael Juma <[email protected]>
---
 .../common/compress/KafkaLZ4BlockInputStream.java  | 85 ++++++++++++++--------
 1 file changed, 56 insertions(+), 29 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/compress/KafkaLZ4BlockInputStream.java
 
b/clients/src/main/java/org/apache/kafka/common/compress/KafkaLZ4BlockInputStream.java
index 85e7f7b..e2fbd5a 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/compress/KafkaLZ4BlockInputStream.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/compress/KafkaLZ4BlockInputStream.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.common.compress;
 
+import net.jpountz.lz4.LZ4Compressor;
 import net.jpountz.lz4.LZ4Exception;
 import net.jpountz.lz4.LZ4Factory;
 import net.jpountz.lz4.LZ4SafeDecompressor;
@@ -51,6 +52,19 @@ public final class KafkaLZ4BlockInputStream extends 
InputStream {
     private static final LZ4SafeDecompressor DECOMPRESSOR = 
LZ4Factory.fastestInstance().safeDecompressor();
     private static final XXHash32 CHECKSUM = 
XXHashFactory.fastestInstance().hash32();
 
+    private static final RuntimeException BROKEN_LZ4_EXCEPTION;
+    // https://issues.apache.org/jira/browse/KAFKA-9203
+    // detect buggy lz4 libraries on the classpath
+    static {
+        RuntimeException exception = null;
+        try {
+            detectBrokenLz4Version();
+        } catch (RuntimeException e) {
+            exception = e;
+        }
+        BROKEN_LZ4_EXCEPTION = exception;
+    }
+
     private final ByteBuffer in;
     private final boolean ignoreFlagDescriptorChecksum;
     private final BufferSupplier bufferSupplier;
@@ -73,16 +87,14 @@ public final class KafkaLZ4BlockInputStream extends 
InputStream {
      * @throws IOException
      */
     public KafkaLZ4BlockInputStream(ByteBuffer in, BufferSupplier 
bufferSupplier, boolean ignoreFlagDescriptorChecksum) throws IOException {
+        if (BROKEN_LZ4_EXCEPTION != null) {
+            throw BROKEN_LZ4_EXCEPTION;
+        }
         this.ignoreFlagDescriptorChecksum = ignoreFlagDescriptorChecksum;
         this.in = in.duplicate().order(ByteOrder.LITTLE_ENDIAN);
         this.bufferSupplier = bufferSupplier;
         readHeader();
         decompressionBuffer = bufferSupplier.get(maxBlockSize);
-        if (!decompressionBuffer.hasArray() || 
decompressionBuffer.arrayOffset() != 0) {
-            // require array backed decompression buffer with zero offset
-            // to simplify workaround for 
https://github.com/lz4/lz4-java/pull/65
-            throw new RuntimeException("decompression buffer must have backing 
array with zero array offset");
-        }
         finished = false;
     }
 
@@ -132,10 +144,7 @@ public final class KafkaLZ4BlockInputStream extends 
InputStream {
 
         int len = in.position() - in.reset().position();
 
-        int hash = in.hasArray() ?
-                       // workaround for 
https://github.com/lz4/lz4-java/pull/65
-                       CHECKSUM.hash(in.array(), in.arrayOffset() + 
in.position(), len, 0) :
-                       CHECKSUM.hash(in, in.position(), len, 0);
+        int hash = CHECKSUM.hash(in, in.position(), len, 0);
         in.position(in.position() + len);
         if (in.get() != (byte) ((hash >> 8) & 0xFF)) {
             throw new IOException(DESCRIPTOR_HASH_MISMATCH);
@@ -173,22 +182,8 @@ public final class KafkaLZ4BlockInputStream extends 
InputStream {
 
         if (compressed) {
             try {
-                // workaround for https://github.com/lz4/lz4-java/pull/65
-                final int bufferSize;
-                if (in.hasArray()) {
-                    bufferSize = DECOMPRESSOR.decompress(
-                        in.array(),
-                        in.position() + in.arrayOffset(),
-                        blockSize,
-                        decompressionBuffer.array(),
-                        0,
-                        maxBlockSize
-                    );
-                } else {
-                    // decompressionBuffer has zero arrayOffset, so we don't 
need to worry about
-                    // https://github.com/lz4/lz4-java/pull/65
-                    bufferSize = DECOMPRESSOR.decompress(in, in.position(), 
blockSize, decompressionBuffer, 0, maxBlockSize);
-                }
+                final int bufferSize = DECOMPRESSOR.decompress(in, 
in.position(), blockSize, decompressionBuffer, 0,
+                    maxBlockSize);
                 decompressionBuffer.position(0);
                 decompressionBuffer.limit(bufferSize);
                 decompressedBuffer = decompressionBuffer;
@@ -202,10 +197,7 @@ public final class KafkaLZ4BlockInputStream extends 
InputStream {
 
         // verify checksum
         if (flg.isBlockChecksumSet()) {
-            // workaround for https://github.com/lz4/lz4-java/pull/65
-            int hash = in.hasArray() ?
-                       CHECKSUM.hash(in.array(), in.arrayOffset() + 
in.position(), blockSize, 0) :
-                       CHECKSUM.hash(in, in.position(), blockSize, 0);
+            int hash = CHECKSUM.hash(in, in.position(), blockSize, 0);
             in.position(in.position() + blockSize);
             if (hash != in.getInt()) {
                 throw new IOException(BLOCK_HASH_MISMATCH);
@@ -288,4 +280,39 @@ public final class KafkaLZ4BlockInputStream extends 
InputStream {
     public boolean markSupported() {
         return false;
     }
+
+    /**
+     * Checks whether the version of lz4 on the classpath has the fix for 
reading from ByteBuffers with
+     * non-zero array offsets (see https://github.com/lz4/lz4-java/pull/65)
+     */
+    static void detectBrokenLz4Version() {
+        byte[] source = new byte[]{1, 1, 1, 1, 1, 2, 2, 2, 2, 2, 3, 3, 3, 3, 
3};
+        final LZ4Compressor compressor = 
LZ4Factory.fastestInstance().fastCompressor();
+
+        final byte[] compressed = new 
byte[compressor.maxCompressedLength(source.length)];
+        final int compressedLength = compressor.compress(source, 0, 
source.length, compressed, 0,
+                                                         compressed.length);
+
+        // allocate an array-backed ByteBuffer with non-zero array-offset 
containing the compressed data
+        // a buggy decompressor will read the data from the beginning of the 
underlying array instead of
+        // the beginning of the ByteBuffer, failing to decompress the invalid 
data.
+        final byte[] zeroes = {0, 0, 0, 0, 0};
+        ByteBuffer nonZeroOffsetBuffer = ByteBuffer
+            .allocate(zeroes.length + compressed.length) // allocates the 
backing array with extra space to offset the data
+            .put(zeroes) // prepend invalid bytes (zeros) before the 
compressed data in the array
+            .slice() // create a new ByteBuffer sharing the underlying array, 
offset to start on the compressed data
+            .put(compressed); // write the compressed data at the beginning of 
this new buffer
+
+        ByteBuffer dest = ByteBuffer.allocate(source.length);
+        try {
+            DECOMPRESSOR.decompress(nonZeroOffsetBuffer, 0, compressedLength, 
dest, 0, source.length);
+        } catch (Exception e) {
+            throw new RuntimeException("Kafka has detected detected a buggy 
lz4-java library (< 1.4.x) on the classpath."
+                                       + " If you are using Kafka client 
libraries, make sure your application does not"
+                                       + " accidentally override the version 
provided by Kafka or include multiple versions"
+                                       + " of the library on the classpath. 
The lz4-java version on the classpath should"
+                                       + " match the version the Kafka client 
libraries depend on. Adding -verbose:class"
+                                       + " to your JVM arguments may help 
understand which lz4-java version is getting loaded.", e);
+        }
+    }
 }

Reply via email to