This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3ba688e KAFKA-9203: Check for buggy LZ4 libraries and remove
corresponding workarounds (#10196)
3ba688e is described below
commit 3ba688e75f29b2089317b3cf7331192ce81ea3f0
Author: Xavier Léauté <[email protected]>
AuthorDate: Fri Jul 23 14:32:41 2021 -0700
KAFKA-9203: Check for buggy LZ4 libraries and remove corresponding
workarounds (#10196)
* Remove the workarounds that were added back in
https://github.com/apache/kafka/pull/7769
* Add a check to detect buggy LZ4 library versions
This check allows us to safely remove the workarounds for buggy
LZ4 versions without users encountering cryptic errors if they
accidentally have an older LZ4 library on the classpath, as
described in KAFKA-9203.
With this change the use will get a clear error message indicating
what the problem might be if they encounter this situation.
Note: This now instantiates a compressor in the decompression code.
This should be safe with respect to JNI libraries, since we always use
`LZ4Factory.fastestInstance()` which takes care of falling back to a pure
Java implementation if JNI libraries are not present.
This was tested with lz4 1.3.0 to make sure it triggers the exception when
running
`KafkaLZ4Test`.
Reviewers: Manikumar Reddy <[email protected]>, Ismael Juma
<[email protected]>
Co-authored-by: Ismael Juma <[email protected]>
---
.../common/compress/KafkaLZ4BlockInputStream.java | 85 ++++++++++++++--------
1 file changed, 56 insertions(+), 29 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/compress/KafkaLZ4BlockInputStream.java
b/clients/src/main/java/org/apache/kafka/common/compress/KafkaLZ4BlockInputStream.java
index 85e7f7b..e2fbd5a 100644
---
a/clients/src/main/java/org/apache/kafka/common/compress/KafkaLZ4BlockInputStream.java
+++
b/clients/src/main/java/org/apache/kafka/common/compress/KafkaLZ4BlockInputStream.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.common.compress;
+import net.jpountz.lz4.LZ4Compressor;
import net.jpountz.lz4.LZ4Exception;
import net.jpountz.lz4.LZ4Factory;
import net.jpountz.lz4.LZ4SafeDecompressor;
@@ -51,6 +52,19 @@ public final class KafkaLZ4BlockInputStream extends
InputStream {
private static final LZ4SafeDecompressor DECOMPRESSOR =
LZ4Factory.fastestInstance().safeDecompressor();
private static final XXHash32 CHECKSUM =
XXHashFactory.fastestInstance().hash32();
+ private static final RuntimeException BROKEN_LZ4_EXCEPTION;
+ // https://issues.apache.org/jira/browse/KAFKA-9203
+ // detect buggy lz4 libraries on the classpath
+ static {
+ RuntimeException exception = null;
+ try {
+ detectBrokenLz4Version();
+ } catch (RuntimeException e) {
+ exception = e;
+ }
+ BROKEN_LZ4_EXCEPTION = exception;
+ }
+
private final ByteBuffer in;
private final boolean ignoreFlagDescriptorChecksum;
private final BufferSupplier bufferSupplier;
@@ -73,16 +87,14 @@ public final class KafkaLZ4BlockInputStream extends
InputStream {
* @throws IOException
*/
public KafkaLZ4BlockInputStream(ByteBuffer in, BufferSupplier
bufferSupplier, boolean ignoreFlagDescriptorChecksum) throws IOException {
+ if (BROKEN_LZ4_EXCEPTION != null) {
+ throw BROKEN_LZ4_EXCEPTION;
+ }
this.ignoreFlagDescriptorChecksum = ignoreFlagDescriptorChecksum;
this.in = in.duplicate().order(ByteOrder.LITTLE_ENDIAN);
this.bufferSupplier = bufferSupplier;
readHeader();
decompressionBuffer = bufferSupplier.get(maxBlockSize);
- if (!decompressionBuffer.hasArray() ||
decompressionBuffer.arrayOffset() != 0) {
- // require array backed decompression buffer with zero offset
- // to simplify workaround for
https://github.com/lz4/lz4-java/pull/65
- throw new RuntimeException("decompression buffer must have backing
array with zero array offset");
- }
finished = false;
}
@@ -132,10 +144,7 @@ public final class KafkaLZ4BlockInputStream extends
InputStream {
int len = in.position() - in.reset().position();
- int hash = in.hasArray() ?
- // workaround for
https://github.com/lz4/lz4-java/pull/65
- CHECKSUM.hash(in.array(), in.arrayOffset() +
in.position(), len, 0) :
- CHECKSUM.hash(in, in.position(), len, 0);
+ int hash = CHECKSUM.hash(in, in.position(), len, 0);
in.position(in.position() + len);
if (in.get() != (byte) ((hash >> 8) & 0xFF)) {
throw new IOException(DESCRIPTOR_HASH_MISMATCH);
@@ -173,22 +182,8 @@ public final class KafkaLZ4BlockInputStream extends
InputStream {
if (compressed) {
try {
- // workaround for https://github.com/lz4/lz4-java/pull/65
- final int bufferSize;
- if (in.hasArray()) {
- bufferSize = DECOMPRESSOR.decompress(
- in.array(),
- in.position() + in.arrayOffset(),
- blockSize,
- decompressionBuffer.array(),
- 0,
- maxBlockSize
- );
- } else {
- // decompressionBuffer has zero arrayOffset, so we don't
need to worry about
- // https://github.com/lz4/lz4-java/pull/65
- bufferSize = DECOMPRESSOR.decompress(in, in.position(),
blockSize, decompressionBuffer, 0, maxBlockSize);
- }
+ final int bufferSize = DECOMPRESSOR.decompress(in,
in.position(), blockSize, decompressionBuffer, 0,
+ maxBlockSize);
decompressionBuffer.position(0);
decompressionBuffer.limit(bufferSize);
decompressedBuffer = decompressionBuffer;
@@ -202,10 +197,7 @@ public final class KafkaLZ4BlockInputStream extends
InputStream {
// verify checksum
if (flg.isBlockChecksumSet()) {
- // workaround for https://github.com/lz4/lz4-java/pull/65
- int hash = in.hasArray() ?
- CHECKSUM.hash(in.array(), in.arrayOffset() +
in.position(), blockSize, 0) :
- CHECKSUM.hash(in, in.position(), blockSize, 0);
+ int hash = CHECKSUM.hash(in, in.position(), blockSize, 0);
in.position(in.position() + blockSize);
if (hash != in.getInt()) {
throw new IOException(BLOCK_HASH_MISMATCH);
@@ -288,4 +280,39 @@ public final class KafkaLZ4BlockInputStream extends
InputStream {
public boolean markSupported() {
return false;
}
+
+ /**
+ * Checks whether the version of lz4 on the classpath has the fix for
reading from ByteBuffers with
+ * non-zero array offsets (see https://github.com/lz4/lz4-java/pull/65)
+ */
+ static void detectBrokenLz4Version() {
+ byte[] source = new byte[]{1, 1, 1, 1, 1, 2, 2, 2, 2, 2, 3, 3, 3, 3,
3};
+ final LZ4Compressor compressor =
LZ4Factory.fastestInstance().fastCompressor();
+
+ final byte[] compressed = new
byte[compressor.maxCompressedLength(source.length)];
+ final int compressedLength = compressor.compress(source, 0,
source.length, compressed, 0,
+ compressed.length);
+
+ // allocate an array-backed ByteBuffer with non-zero array-offset
containing the compressed data
+ // a buggy decompressor will read the data from the beginning of the
underlying array instead of
+ // the beginning of the ByteBuffer, failing to decompress the invalid
data.
+ final byte[] zeroes = {0, 0, 0, 0, 0};
+ ByteBuffer nonZeroOffsetBuffer = ByteBuffer
+ .allocate(zeroes.length + compressed.length) // allocates the
backing array with extra space to offset the data
+ .put(zeroes) // prepend invalid bytes (zeros) before the
compressed data in the array
+ .slice() // create a new ByteBuffer sharing the underlying array,
offset to start on the compressed data
+ .put(compressed); // write the compressed data at the beginning of
this new buffer
+
+ ByteBuffer dest = ByteBuffer.allocate(source.length);
+ try {
+ DECOMPRESSOR.decompress(nonZeroOffsetBuffer, 0, compressedLength,
dest, 0, source.length);
+ } catch (Exception e) {
+ throw new RuntimeException("Kafka has detected detected a buggy
lz4-java library (< 1.4.x) on the classpath."
+ + " If you are using Kafka client
libraries, make sure your application does not"
+ + " accidentally override the version
provided by Kafka or include multiple versions"
+ + " of the library on the classpath.
The lz4-java version on the classpath should"
+ + " match the version the Kafka client
libraries depend on. Adding -verbose:class"
+ + " to your JVM arguments may help
understand which lz4-java version is getting loaded.", e);
+ }
+ }
}