This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch 2.8 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 55c751ecd2899dd8bfd9da9de01df5e9a4c7690d Author: Ismael Juma <[email protected]> AuthorDate: Sun Feb 14 08:12:25 2021 -0800 KAFKA-12327: Remove MethodHandle usage in CompressionType (#10123) We don't really need it and it causes problems in older Android versions and GraalVM native image usage (there are workarounds for the latter). Move the logic to separate classes that are only invoked when the relevant compression library is actually used. Place such classes in their own package and enforce via checkstyle that only these classes refer to compression library packages. To avoid cyclic dependencies, moved `BufferSupplier` to the `utils` package. Reviewers: Chia-Ping Tsai <[email protected]> --- checkstyle/import-control.xml | 12 +++- .../kafka/clients/consumer/internals/Fetcher.java | 2 +- .../KafkaLZ4BlockInputStream.java | 11 +-- .../KafkaLZ4BlockOutputStream.java | 2 +- .../kafka/common/compress/SnappyFactory.java | 50 +++++++++++++ .../apache/kafka/common/compress/ZstdFactory.java | 58 +++++++++++++++ .../common/record/AbstractLegacyRecordBatch.java | 1 + .../kafka/common/record/CompressionType.java | 82 +++++----------------- .../kafka/common/record/DefaultRecordBatch.java | 1 + .../kafka/common/record/FileLogInputStream.java | 1 + .../apache/kafka/common/record/MemoryRecords.java | 1 + .../kafka/common/record/MutableRecordBatch.java | 1 + .../apache/kafka/common/record/RecordBatch.java | 1 + .../common/{record => utils}/BufferSupplier.java | 2 +- .../clients/consumer/internals/FetcherTest.java | 2 +- .../common/{record => compress}/KafkaLZ4Test.java | 5 +- .../kafka/common/record/BufferSupplierTest.java | 1 + .../kafka/common/record/CompressionTypeTest.java | 3 + .../common/record/DefaultRecordBatchTest.java | 1 + .../kafka/common/record/MemoryRecordsTest.java | 1 + core/src/main/scala/kafka/log/LogCleaner.scala | 3 +- core/src/main/scala/kafka/log/LogSegment.scala | 3 +- core/src/main/scala/kafka/log/LogValidator.scala | 5 +- core/src/test/scala/unit/kafka/log/LogTest.scala | 3 +- gradle/spotbugs-exclude.xml | 2 +- .../kafka/jmh/record/BaseRecordBatchBenchmark.java | 2 +- .../org/apache/kafka/raft/KafkaRaftClient.java | 2 +- .../kafka/raft/internals/RecordsBatchReader.java | 2 +- .../raft/internals/RecordsBatchReaderTest.java | 2 +- .../apache/kafka/snapshot/FileRawSnapshotTest.java | 2 +- .../apache/kafka/snapshot/SnapshotWriterTest.java | 2 +- 31 files changed, 171 insertions(+), 95 deletions(-) diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index b658370..bc0491e 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -69,6 +69,15 @@ <allow pkg="org.apache.kafka.common.metrics" /> </subpackage> + <!-- Third-party compression libraries should only be references from this package --> + <subpackage name="compress"> + <allow pkg="com.github.luben.zstd" /> + <allow pkg="net.jpountz.lz4" /> + <allow pkg="net.jpountz.xxhash" /> + <allow pkg="org.apache.kafka.common.compress" /> + <allow pkg="org.xerial.snappy" /> + </subpackage> + <subpackage name="message"> <allow pkg="com.fasterxml.jackson" /> <allow pkg="org.apache.kafka.common.protocol" /> @@ -144,7 +153,7 @@ </subpackage> <subpackage name="record"> - <allow pkg="net.jpountz" /> + <allow pkg="org.apache.kafka.common.compress" /> <allow pkg="org.apache.kafka.common.header" /> <allow pkg="org.apache.kafka.common.record" /> <allow pkg="org.apache.kafka.common.message" /> @@ -152,7 +161,6 @@ <allow pkg="org.apache.kafka.common.protocol" /> <allow pkg="org.apache.kafka.common.protocol.types" /> <allow pkg="org.apache.kafka.common.errors" /> - <allow pkg="com.github.luben.zstd" /> </subpackage> <subpackage name="header"> diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index f71d2c4..78b26e6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -62,7 +62,7 @@ import org.apache.kafka.common.metrics.stats.Value; import org.apache.kafka.common.metrics.stats.WindowedCount; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.record.BufferSupplier; +import org.apache.kafka.common.utils.BufferSupplier; import org.apache.kafka.common.record.ControlRecordType; import org.apache.kafka.common.record.Record; import org.apache.kafka.common.record.RecordBatch; diff --git a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java b/clients/src/main/java/org/apache/kafka/common/compress/KafkaLZ4BlockInputStream.java similarity index 95% rename from clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java rename to clients/src/main/java/org/apache/kafka/common/compress/KafkaLZ4BlockInputStream.java index 850b1e9..85e7f7b 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java +++ b/clients/src/main/java/org/apache/kafka/common/compress/KafkaLZ4BlockInputStream.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kafka.common.record; +package org.apache.kafka.common.compress; import net.jpountz.lz4.LZ4Exception; import net.jpountz.lz4.LZ4Factory; @@ -22,16 +22,17 @@ import net.jpountz.lz4.LZ4SafeDecompressor; import net.jpountz.xxhash.XXHash32; import net.jpountz.xxhash.XXHashFactory; -import org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.BD; -import org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.FLG; +import org.apache.kafka.common.compress.KafkaLZ4BlockOutputStream.BD; +import org.apache.kafka.common.compress.KafkaLZ4BlockOutputStream.FLG; +import org.apache.kafka.common.utils.BufferSupplier; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; import java.nio.ByteOrder; -import static org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.LZ4_FRAME_INCOMPRESSIBLE_MASK; -import static org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.MAGIC; +import static org.apache.kafka.common.compress.KafkaLZ4BlockOutputStream.LZ4_FRAME_INCOMPRESSIBLE_MASK; +import static org.apache.kafka.common.compress.KafkaLZ4BlockOutputStream.MAGIC; /** * A partial implementation of the v1.5.1 LZ4 Frame format. diff --git a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java b/clients/src/main/java/org/apache/kafka/common/compress/KafkaLZ4BlockOutputStream.java similarity index 99% rename from clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java rename to clients/src/main/java/org/apache/kafka/common/compress/KafkaLZ4BlockOutputStream.java index 591ab16..5c5aee4 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java +++ b/clients/src/main/java/org/apache/kafka/common/compress/KafkaLZ4BlockOutputStream.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kafka.common.record; +package org.apache.kafka.common.compress; import java.io.IOException; import java.io.OutputStream; diff --git a/clients/src/main/java/org/apache/kafka/common/compress/SnappyFactory.java b/clients/src/main/java/org/apache/kafka/common/compress/SnappyFactory.java new file mode 100644 index 0000000..b56273d --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/compress/SnappyFactory.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.compress; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.utils.ByteBufferInputStream; +import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.xerial.snappy.SnappyInputStream; +import org.xerial.snappy.SnappyOutputStream; + +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; + +public class SnappyFactory { + + private SnappyFactory() { } + + public static OutputStream wrapForOutput(ByteBufferOutputStream buffer) { + try { + return new SnappyOutputStream(buffer); + } catch (Throwable e) { + throw new KafkaException(e); + } + } + + public static InputStream wrapForInput(ByteBuffer buffer) { + try { + return new SnappyInputStream(new ByteBufferInputStream(buffer)); + } catch (Throwable e) { + throw new KafkaException(e); + } + } + +} diff --git a/clients/src/main/java/org/apache/kafka/common/compress/ZstdFactory.java b/clients/src/main/java/org/apache/kafka/common/compress/ZstdFactory.java new file mode 100644 index 0000000..8f4735e --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/compress/ZstdFactory.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.compress; + +import com.github.luben.zstd.RecyclingBufferPool; +import com.github.luben.zstd.ZstdInputStreamNoFinalizer; +import com.github.luben.zstd.ZstdOutputStreamNoFinalizer; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.ByteBufferInputStream; +import org.apache.kafka.common.utils.ByteBufferOutputStream; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; + +public class ZstdFactory { + + private ZstdFactory() { } + + public static OutputStream wrapForOutput(ByteBufferOutputStream buffer) { + try { + // Set input buffer (uncompressed) to 16 KB (none by default) to ensure reasonable performance + // in cases where the caller passes a small number of bytes to write (potentially a single byte). + return new BufferedOutputStream(new ZstdOutputStreamNoFinalizer(buffer, RecyclingBufferPool.INSTANCE), 16 * 1024); + } catch (Throwable e) { + throw new KafkaException(e); + } + } + + public static InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) { + try { + // Set output buffer (uncompressed) to 16 KB (none by default) to ensure reasonable performance + // in cases where the caller reads a small number of bytes (potentially a single byte). + return new BufferedInputStream(new ZstdInputStreamNoFinalizer(new ByteBufferInputStream(buffer), + RecyclingBufferPool.INSTANCE), 16 * 1024); + } catch (Throwable e) { + throw new KafkaException(e); + } + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java index 8363764..59b2c683 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.errors.CorruptRecordException; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.utils.AbstractIterator; +import org.apache.kafka.common.utils.BufferSupplier; import org.apache.kafka.common.utils.ByteBufferOutputStream; import org.apache.kafka.common.utils.ByteUtils; import org.apache.kafka.common.utils.CloseableIterator; diff --git a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java index c2694ca..1b9754f 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java +++ b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java @@ -16,9 +16,12 @@ */ package org.apache.kafka.common.record; -import com.github.luben.zstd.BufferPool; -import com.github.luben.zstd.RecyclingBufferPool; import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.compress.KafkaLZ4BlockInputStream; +import org.apache.kafka.common.compress.KafkaLZ4BlockOutputStream; +import org.apache.kafka.common.compress.SnappyFactory; +import org.apache.kafka.common.compress.ZstdFactory; +import org.apache.kafka.common.utils.BufferSupplier; import org.apache.kafka.common.utils.ByteBufferInputStream; import org.apache.kafka.common.utils.ByteBufferOutputStream; @@ -26,9 +29,6 @@ import java.io.BufferedInputStream; import java.io.BufferedOutputStream; import java.io.InputStream; import java.io.OutputStream; -import java.lang.invoke.MethodHandle; -import java.lang.invoke.MethodHandles; -import java.lang.invoke.MethodType; import java.nio.ByteBuffer; import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; @@ -49,6 +49,7 @@ public enum CompressionType { } }, + // Shipped with the JDK GZIP(1, "gzip", 1.0f) { @Override public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) { @@ -76,23 +77,21 @@ public enum CompressionType { } }, + // We should only load classes from a given compression library when we actually use said compression library. This + // is because compression libraries include native code for a set of platforms and we want to avoid errors + // in case the platform is not supported and the compression library is not actually used. + // To ensure this, we only reference compression library code from classes that are only invoked when actual usage + // happens. + SNAPPY(2, "snappy", 1.0f) { @Override public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) { - try { - return (OutputStream) SnappyConstructors.OUTPUT.invoke(buffer); - } catch (Throwable e) { - throw new KafkaException(e); - } + return SnappyFactory.wrapForOutput(buffer); } @Override public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) { - try { - return (InputStream) SnappyConstructors.INPUT.invoke(new ByteBufferInputStream(buffer)); - } catch (Throwable e) { - throw new KafkaException(e); - } + return SnappyFactory.wrapForInput(buffer); } }, @@ -120,28 +119,12 @@ public enum CompressionType { ZSTD(4, "zstd", 1.0f) { @Override public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) { - try { - // Set input buffer (uncompressed) to 16 KB (none by default) to ensure reasonable performance - // in cases where the caller passes a small number of bytes to write (potentially a single byte). - // It's ok to reference `RecyclingBufferPool` since it doesn't load any native libraries - return new BufferedOutputStream((OutputStream) ZstdConstructors.OUTPUT.invoke(buffer, RecyclingBufferPool.INSTANCE), - 16 * 1024); - } catch (Throwable e) { - throw new KafkaException(e); - } + return ZstdFactory.wrapForOutput(buffer); } @Override public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) { - try { - // Set output buffer (uncompressed) to 16 KB (none by default) to ensure reasonable performance - // in cases where the caller reads a small number of bytes (potentially a single byte). - // It's ok to reference `RecyclingBufferPool` since it doesn't load any native libraries. - return new BufferedInputStream((InputStream) ZstdConstructors.INPUT.invoke(new ByteBufferInputStream(buffer), - RecyclingBufferPool.INSTANCE), 16 * 1024); - } catch (Throwable e) { - throw new KafkaException(e); - } + return ZstdFactory.wrapForInput(buffer, messageVersion, decompressionBufferSupplier); } }; @@ -207,37 +190,4 @@ public enum CompressionType { else throw new IllegalArgumentException("Unknown compression name: " + name); } - - // We should only have a runtime dependency on compression algorithms in case the native libraries don't support - // some platforms. - // - // For Snappy and Zstd, we dynamically load the classes and rely on the initialization-on-demand holder idiom to ensure - // they're only loaded if used. - // - // For LZ4 we are using org.apache.kafka classes, which should always be in the classpath, and would not trigger - // an error until KafkaLZ4BlockInputStream is initialized, which only happens if LZ4 is actually used. - - private static class SnappyConstructors { - static final MethodHandle INPUT = findConstructor("org.xerial.snappy.SnappyInputStream", - MethodType.methodType(void.class, InputStream.class)); - static final MethodHandle OUTPUT = findConstructor("org.xerial.snappy.SnappyOutputStream", - MethodType.methodType(void.class, OutputStream.class)); - } - - private static class ZstdConstructors { - // It's ok to reference `BufferPool` since it doesn't load any native libraries - static final MethodHandle INPUT = findConstructor("com.github.luben.zstd.ZstdInputStreamNoFinalizer", - MethodType.methodType(void.class, InputStream.class, BufferPool.class)); - static final MethodHandle OUTPUT = findConstructor("com.github.luben.zstd.ZstdOutputStreamNoFinalizer", - MethodType.methodType(void.class, OutputStream.class, BufferPool.class)); - } - - private static MethodHandle findConstructor(String className, MethodType methodType) { - try { - return MethodHandles.publicLookup().findConstructor(Class.forName(className), methodType); - } catch (ReflectiveOperationException e) { - throw new RuntimeException(e); - } - } - } diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java index 33709c0..62cab8f 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.InvalidRecordException; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.errors.CorruptRecordException; import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.utils.BufferSupplier; import org.apache.kafka.common.utils.ByteBufferOutputStream; import org.apache.kafka.common.utils.ByteUtils; import org.apache.kafka.common.utils.CloseableIterator; diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java index 15c09de..10837d6 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java +++ b/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.errors.CorruptRecordException; import org.apache.kafka.common.record.AbstractLegacyRecordBatch.LegacyFileChannelRecordBatch; import org.apache.kafka.common.record.DefaultRecordBatch.DefaultFileChannelRecordBatch; +import org.apache.kafka.common.utils.BufferSupplier; import org.apache.kafka.common.utils.CloseableIterator; import org.apache.kafka.common.utils.Utils; diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java index 82a54af..7d14f67 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.message.LeaderChangeMessage; import org.apache.kafka.common.network.TransferableChannel; import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention; import org.apache.kafka.common.utils.AbstractIterator; +import org.apache.kafka.common.utils.BufferSupplier; import org.apache.kafka.common.utils.ByteBufferOutputStream; import org.apache.kafka.common.utils.CloseableIterator; import org.apache.kafka.common.utils.Time; diff --git a/clients/src/main/java/org/apache/kafka/common/record/MutableRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/MutableRecordBatch.java index 8c0dc23..fc924b0 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MutableRecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MutableRecordBatch.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.common.record; +import org.apache.kafka.common.utils.BufferSupplier; import org.apache.kafka.common.utils.ByteBufferOutputStream; import org.apache.kafka.common.utils.CloseableIterator; diff --git a/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java index 65a6a95..1cff7a2 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.common.record; +import org.apache.kafka.common.utils.BufferSupplier; import org.apache.kafka.common.utils.CloseableIterator; import java.nio.ByteBuffer; diff --git a/clients/src/main/java/org/apache/kafka/common/record/BufferSupplier.java b/clients/src/main/java/org/apache/kafka/common/utils/BufferSupplier.java similarity index 99% rename from clients/src/main/java/org/apache/kafka/common/record/BufferSupplier.java rename to clients/src/main/java/org/apache/kafka/common/utils/BufferSupplier.java index 1a6c92c..1688d10 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/BufferSupplier.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/BufferSupplier.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.kafka.common.record; +package org.apache.kafka.common.utils; import java.nio.ByteBuffer; import java.util.ArrayDeque; diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index 7af9ede..2b9df62 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -65,7 +65,7 @@ import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.network.NetworkReceive; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.record.BufferSupplier; +import org.apache.kafka.common.utils.BufferSupplier; import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.ControlRecordType; import org.apache.kafka.common.record.DefaultRecordBatch; diff --git a/clients/src/test/java/org/apache/kafka/common/record/KafkaLZ4Test.java b/clients/src/test/java/org/apache/kafka/common/compress/KafkaLZ4Test.java similarity index 98% rename from clients/src/test/java/org/apache/kafka/common/record/KafkaLZ4Test.java rename to clients/src/test/java/org/apache/kafka/common/compress/KafkaLZ4Test.java index 5f35f7d..a03c830 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/KafkaLZ4Test.java +++ b/clients/src/test/java/org/apache/kafka/common/compress/KafkaLZ4Test.java @@ -14,10 +14,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kafka.common.record; +package org.apache.kafka.common.compress; import net.jpountz.xxhash.XXHashFactory; +import org.apache.kafka.common.utils.BufferSupplier; import org.junit.jupiter.api.extension.ExtensionContext; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; @@ -34,7 +35,7 @@ import java.util.List; import java.util.Random; import java.util.stream.Stream; -import static org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.LZ4_FRAME_INCOMPRESSIBLE_MASK; +import static org.apache.kafka.common.compress.KafkaLZ4BlockOutputStream.LZ4_FRAME_INCOMPRESSIBLE_MASK; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; diff --git a/clients/src/test/java/org/apache/kafka/common/record/BufferSupplierTest.java b/clients/src/test/java/org/apache/kafka/common/record/BufferSupplierTest.java index 9ead288..e580be5 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/BufferSupplierTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/BufferSupplierTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.common.record; +import org.apache.kafka.common.utils.BufferSupplier; import org.junit.jupiter.api.Test; import java.nio.ByteBuffer; diff --git a/clients/src/test/java/org/apache/kafka/common/record/CompressionTypeTest.java b/clients/src/test/java/org/apache/kafka/common/record/CompressionTypeTest.java index af696c3..16b560d 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/CompressionTypeTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/CompressionTypeTest.java @@ -16,6 +16,9 @@ */ package org.apache.kafka.common.record; +import org.apache.kafka.common.compress.KafkaLZ4BlockInputStream; +import org.apache.kafka.common.compress.KafkaLZ4BlockOutputStream; +import org.apache.kafka.common.utils.BufferSupplier; import org.apache.kafka.common.utils.ByteBufferOutputStream; import org.junit.jupiter.api.Test; diff --git a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java index 4e86906..9cd744a 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.InvalidRecordException; import org.apache.kafka.common.errors.CorruptRecordException; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.utils.BufferSupplier; import org.apache.kafka.common.utils.CloseableIterator; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.test.TestUtils; diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java index cab667a..ebac0bd 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.message.LeaderChangeMessage; import org.apache.kafka.common.message.LeaderChangeMessage.Voter; import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention; +import org.apache.kafka.common.utils.BufferSupplier; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.extension.ExtensionContext; diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index 3225d9d..df9722c 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -21,7 +21,6 @@ import java.io.{File, IOException} import java.nio._ import java.util.Date import java.util.concurrent.TimeUnit - import kafka.common._ import kafka.metrics.KafkaMetricsGroup import kafka.server.{BrokerReconfigurable, KafkaConfig, LogDirFailureChannel} @@ -32,7 +31,7 @@ import org.apache.kafka.common.errors.{CorruptRecordException, KafkaStorageExcep import org.apache.kafka.common.record.MemoryRecords.RecordFilter import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention import org.apache.kafka.common.record._ -import org.apache.kafka.common.utils.Time +import org.apache.kafka.common.utils.{BufferSupplier, Time} import scala.jdk.CollectionConverters._ import scala.collection.mutable.ListBuffer diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index b43833d..37882ff 100755 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -20,7 +20,6 @@ import java.io.{File, IOException} import java.nio.file.{Files, NoSuchFileException} import java.nio.file.attribute.FileTime import java.util.concurrent.TimeUnit - import kafka.common.LogSegmentOffsetOverflowException import kafka.metrics.{KafkaMetricsGroup, KafkaTimer} import kafka.server.epoch.LeaderEpochFileCache @@ -30,7 +29,7 @@ import org.apache.kafka.common.InvalidRecordException import org.apache.kafka.common.errors.CorruptRecordException import org.apache.kafka.common.record.FileRecords.{LogOffsetPosition, TimestampAndOffset} import org.apache.kafka.common.record._ -import org.apache.kafka.common.utils.Time +import org.apache.kafka.common.utils.{BufferSupplier, Time} import scala.jdk.CollectionConverters._ import scala.math._ diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala index b2e6222..056be10 100644 --- a/core/src/main/scala/kafka/log/LogValidator.scala +++ b/core/src/main/scala/kafka/log/LogValidator.scala @@ -17,19 +17,18 @@ package kafka.log import java.nio.ByteBuffer - import kafka.api.{ApiVersion, KAFKA_2_1_IV0} import kafka.common.{LongRef, RecordValidationException} import kafka.message.{CompressionCodec, NoCompressionCodec, ZStdCompressionCodec} import kafka.server.BrokerTopicStats import kafka.utils.Logging import org.apache.kafka.common.errors.{CorruptRecordException, InvalidTimestampException, UnsupportedCompressionTypeException, UnsupportedForMessageFormatException} -import org.apache.kafka.common.record.{AbstractRecords, BufferSupplier, CompressionType, MemoryRecords, Record, RecordBatch, RecordConversionStats, TimestampType} +import org.apache.kafka.common.record.{AbstractRecords, CompressionType, MemoryRecords, Record, RecordBatch, RecordConversionStats, TimestampType} import org.apache.kafka.common.InvalidRecordException import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.ProduceResponse.RecordError -import org.apache.kafka.common.utils.Time +import org.apache.kafka.common.utils.{BufferSupplier, Time} import scala.collection.{Seq, mutable} import scala.jdk.CollectionConverters._ diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index dab9eb1..1a953c5 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -23,7 +23,6 @@ import java.nio.file.{Files, Paths} import java.util.concurrent.{Callable, Executors} import java.util.regex.Pattern import java.util.{Collections, Optional, Properties} - import kafka.api.{ApiVersion, KAFKA_0_11_0_IV0} import kafka.common.{OffsetsOutOfOrderException, RecordValidationException, UnexpectedAppendOffsetException} import kafka.log.Log.DeleteDirSuffix @@ -41,7 +40,7 @@ import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention import org.apache.kafka.common.record._ import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction import org.apache.kafka.common.requests.{ListOffsetsRequest, ListOffsetsResponse} -import org.apache.kafka.common.utils.{Time, Utils} +import org.apache.kafka.common.utils.{BufferSupplier, Time, Utils} import org.easymock.EasyMock import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml index 3d0e0d2..ab60dfd 100644 --- a/gradle/spotbugs-exclude.xml +++ b/gradle/spotbugs-exclude.xml @@ -95,7 +95,7 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read <!-- false positive in Java 11, related to https://github.com/spotbugs/spotbugs/issues/756 but more complex --> <Match> - <Class name="org.apache.kafka.common.record.KafkaLZ4BlockOutputStream"/> + <Class name="org.apache.kafka.common.compress.KafkaLZ4BlockOutputStream"/> <Bug pattern="RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE"/> </Match> diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/BaseRecordBatchBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/BaseRecordBatchBenchmark.java index 834652e..30f908e 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/BaseRecordBatchBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/BaseRecordBatchBenchmark.java @@ -19,7 +19,7 @@ package org.apache.kafka.jmh.record; import kafka.server.BrokerTopicStats; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.record.AbstractRecords; -import org.apache.kafka.common.record.BufferSupplier; +import org.apache.kafka.common.utils.BufferSupplier; import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.MemoryRecordsBuilder; diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java index 09f8672..2823186 100644 --- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java +++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java @@ -40,7 +40,7 @@ import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.ApiMessage; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.record.BufferSupplier; +import org.apache.kafka.common.utils.BufferSupplier; import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.Records; diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/RecordsBatchReader.java b/raft/src/main/java/org/apache/kafka/raft/internals/RecordsBatchReader.java index 9a4c6b5..0817138 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/RecordsBatchReader.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/RecordsBatchReader.java @@ -18,7 +18,7 @@ package org.apache.kafka.raft.internals; import org.apache.kafka.common.protocol.DataInputStreamReadable; import org.apache.kafka.common.protocol.Readable; -import org.apache.kafka.common.record.BufferSupplier; +import org.apache.kafka.common.utils.BufferSupplier; import org.apache.kafka.common.record.DefaultRecordBatch; import org.apache.kafka.common.record.FileRecords; import org.apache.kafka.common.record.MemoryRecords; diff --git a/raft/src/test/java/org/apache/kafka/raft/internals/RecordsBatchReaderTest.java b/raft/src/test/java/org/apache/kafka/raft/internals/RecordsBatchReaderTest.java index 7dc1769..78ffd51 100644 --- a/raft/src/test/java/org/apache/kafka/raft/internals/RecordsBatchReaderTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/internals/RecordsBatchReaderTest.java @@ -16,7 +16,7 @@ */ package org.apache.kafka.raft.internals; -import org.apache.kafka.common.record.BufferSupplier; +import org.apache.kafka.common.utils.BufferSupplier; import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.FileRecords; import org.apache.kafka.common.record.MemoryRecords; diff --git a/raft/src/test/java/org/apache/kafka/snapshot/FileRawSnapshotTest.java b/raft/src/test/java/org/apache/kafka/snapshot/FileRawSnapshotTest.java index 37dac9f..dc4f635 100644 --- a/raft/src/test/java/org/apache/kafka/snapshot/FileRawSnapshotTest.java +++ b/raft/src/test/java/org/apache/kafka/snapshot/FileRawSnapshotTest.java @@ -16,7 +16,7 @@ */ package org.apache.kafka.snapshot; -import org.apache.kafka.common.record.BufferSupplier.GrowableBufferSupplier; +import org.apache.kafka.common.utils.BufferSupplier.GrowableBufferSupplier; import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.Record; diff --git a/raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterTest.java b/raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterTest.java index 35652c7..27bdff2 100644 --- a/raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterTest.java +++ b/raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterTest.java @@ -22,7 +22,7 @@ import java.util.Collections; import java.util.List; import java.util.Random; import java.util.Set; -import org.apache.kafka.common.record.BufferSupplier.GrowableBufferSupplier; +import org.apache.kafka.common.utils.BufferSupplier.GrowableBufferSupplier; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.raft.OffsetAndEpoch; import org.apache.kafka.raft.RaftClientTestContext;
