This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch 3.8
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.8 by this push:
new a75d4d4e2b4 KAFKA-17227: Refactor compression code to only load codecs
when used (#16782) (#16811)
a75d4d4e2b4 is described below
commit a75d4d4e2b4cab5d32c8bbd57b048445a9cb560e
Author: Mickael Maison <[email protected]>
AuthorDate: Tue Aug 6 20:56:46 2024 +0200
KAFKA-17227: Refactor compression code to only load codecs when used
(#16782) (#16811)
Reviewers: Chia-Ping Tsai <[email protected]>
---
checkstyle/import-control.xml | 6 +-
.../kafka/clients/producer/ProducerConfig.java | 9 +-
.../kafka/common/compress/GzipCompression.java | 33 +-----
.../common/compress/Lz4BlockOutputStream.java | 3 +-
.../kafka/common/compress/Lz4Compression.java | 14 +--
.../kafka/common/compress/ZstdCompression.java | 13 +--
.../kafka/common/record/CompressionType.java | 125 ++++++++++++++++++++-
.../kafka/common/compress/GzipCompressionTest.java | 22 ++--
.../kafka/common/compress/Lz4CompressionTest.java | 13 ++-
.../kafka/common/compress/ZstdCompressionTest.java | 11 +-
core/src/main/scala/kafka/server/KafkaConfig.scala | 7 +-
.../scala/unit/kafka/log/LogValidatorTest.scala | 10 +-
.../scala/unit/kafka/server/KafkaConfigTest.scala | 9 +-
gradle/dependencies.gradle | 2 +
.../server/record/BrokerCompressionTypeTest.java | 6 +-
.../kafka/storage/internals/log/LogConfig.java | 16 ++-
16 files changed, 196 insertions(+), 103 deletions(-)
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 2f90548ffa9..59f93ad3a62 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -78,7 +78,10 @@
<allow pkg="net.jpountz.xxhash" />
<allow pkg="org.xerial.snappy" />
<allow pkg="org.apache.kafka.common.compress" />
- <allow class="org.apache.kafka.common.record.CompressionType"
exact-match="true" />
+ <allow class="org.apache.kafka.common.record.CompressionType" />
+ <allow class="org.apache.kafka.common.record.CompressionType.GZIP" />
+ <allow class="org.apache.kafka.common.record.CompressionType.LZ4" />
+ <allow class="org.apache.kafka.common.record.CompressionType.ZSTD" />
<allow class="org.apache.kafka.common.record.RecordBatch"
exact-match="true" />
</subpackage>
@@ -152,6 +155,7 @@
</subpackage>
<subpackage name="record">
+ <allow class="org.apache.kafka.common.config.ConfigDef.Range.between"
exact-match="true" />
<allow pkg="org.apache.kafka.common.compress" />
<allow pkg="org.apache.kafka.common.header" />
<allow pkg="org.apache.kafka.common.record" />
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index a59ee81b4a9..0b2d477aff6 100644
---
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -19,9 +19,6 @@ package org.apache.kafka.clients.producer;
import org.apache.kafka.clients.ClientDnsLookup;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.MetadataRecoveryStrategy;
-import org.apache.kafka.common.compress.GzipCompression;
-import org.apache.kafka.common.compress.Lz4Compression;
-import org.apache.kafka.common.compress.ZstdCompression;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
@@ -380,9 +377,9 @@ public class ProducerConfig extends AbstractConfig {
Importance.LOW,
ACKS_DOC)
.define(COMPRESSION_TYPE_CONFIG, Type.STRING,
CompressionType.NONE.name, in(Utils.enumOptions(CompressionType.class)),
Importance.HIGH, COMPRESSION_TYPE_DOC)
- .define(COMPRESSION_GZIP_LEVEL_CONFIG,
Type.INT, GzipCompression.DEFAULT_LEVEL, new GzipCompression.LevelValidator(),
Importance.MEDIUM, COMPRESSION_GZIP_LEVEL_DOC)
- .define(COMPRESSION_LZ4_LEVEL_CONFIG,
Type.INT, Lz4Compression.DEFAULT_LEVEL, between(Lz4Compression.MIN_LEVEL,
Lz4Compression.MAX_LEVEL), Importance.MEDIUM, COMPRESSION_LZ4_LEVEL_DOC)
- .define(COMPRESSION_ZSTD_LEVEL_CONFIG,
Type.INT, ZstdCompression.DEFAULT_LEVEL, between(ZstdCompression.MIN_LEVEL,
ZstdCompression.MAX_LEVEL), Importance.MEDIUM, COMPRESSION_ZSTD_LEVEL_DOC)
+ .define(COMPRESSION_GZIP_LEVEL_CONFIG,
Type.INT, CompressionType.GZIP.defaultLevel(),
CompressionType.GZIP.levelValidator(), Importance.MEDIUM,
COMPRESSION_GZIP_LEVEL_DOC)
+ .define(COMPRESSION_LZ4_LEVEL_CONFIG,
Type.INT, CompressionType.LZ4.defaultLevel(),
CompressionType.LZ4.levelValidator(), Importance.MEDIUM,
COMPRESSION_LZ4_LEVEL_DOC)
+ .define(COMPRESSION_ZSTD_LEVEL_CONFIG,
Type.INT, CompressionType.ZSTD.defaultLevel(),
CompressionType.ZSTD.levelValidator(), Importance.MEDIUM,
COMPRESSION_ZSTD_LEVEL_DOC)
.define(BATCH_SIZE_CONFIG, Type.INT, 16384,
atLeast(0), Importance.MEDIUM, BATCH_SIZE_DOC)
.define(PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_CONFIG, Type.BOOLEAN, true,
Importance.LOW, PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_DOC)
.define(PARTITIONER_AVAILABILITY_TIMEOUT_MS_CONFIG, Type.LONG, 0, atLeast(0),
Importance.LOW, PARTITIONER_AVAILABILITY_TIMEOUT_MS_DOC)
diff --git
a/clients/src/main/java/org/apache/kafka/common/compress/GzipCompression.java
b/clients/src/main/java/org/apache/kafka/common/compress/GzipCompression.java
index 52e38700c46..e900359530a 100644
---
a/clients/src/main/java/org/apache/kafka/common/compress/GzipCompression.java
+++
b/clients/src/main/java/org/apache/kafka/common/compress/GzipCompression.java
@@ -17,8 +17,6 @@
package org.apache.kafka.common.compress;
import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.config.ConfigDef;
-import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.ByteBufferInputStream;
@@ -30,14 +28,11 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Objects;
-import java.util.zip.Deflater;
import java.util.zip.GZIPInputStream;
-public class GzipCompression implements Compression {
+import static org.apache.kafka.common.record.CompressionType.GZIP;
- public static final int MIN_LEVEL = Deflater.BEST_SPEED;
- public static final int MAX_LEVEL = Deflater.BEST_COMPRESSION;
- public static final int DEFAULT_LEVEL = Deflater.DEFAULT_COMPRESSION;
+public class GzipCompression implements Compression {
private final int level;
@@ -47,7 +42,7 @@ public class GzipCompression implements Compression {
@Override
public CompressionType type() {
- return CompressionType.GZIP;
+ return GZIP;
}
@Override
@@ -101,10 +96,10 @@ public class GzipCompression implements Compression {
}
public static class Builder implements
Compression.Builder<GzipCompression> {
- private int level = DEFAULT_LEVEL;
+ private int level = GZIP.defaultLevel();
public Builder level(int level) {
- if ((level < MIN_LEVEL || MAX_LEVEL < level) && level !=
DEFAULT_LEVEL) {
+ if ((level < GZIP.minLevel() || GZIP.maxLevel() < level) && level
!= GZIP.defaultLevel()) {
throw new IllegalArgumentException("gzip doesn't support given
compression level: " + level);
}
@@ -117,22 +112,4 @@ public class GzipCompression implements Compression {
return new GzipCompression(level);
}
}
-
- public static class LevelValidator implements ConfigDef.Validator {
-
- @Override
- public void ensureValid(String name, Object o) {
- if (o == null)
- throw new ConfigException(name, null, "Value must be
non-null");
- int level = ((Number) o).intValue();
- if (level > MAX_LEVEL || (level < MIN_LEVEL && level !=
DEFAULT_LEVEL)) {
- throw new ConfigException(name, o, "Value must be between " +
MIN_LEVEL + " and " + MAX_LEVEL + " or equal to " + DEFAULT_LEVEL);
- }
- }
-
- @Override
- public String toString() {
- return "[" + MIN_LEVEL + ",...," + MAX_LEVEL + "] or " +
DEFAULT_LEVEL;
- }
- }
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/compress/Lz4BlockOutputStream.java
b/clients/src/main/java/org/apache/kafka/common/compress/Lz4BlockOutputStream.java
index 97e370a383c..ad2d86aeba6 100644
---
a/clients/src/main/java/org/apache/kafka/common/compress/Lz4BlockOutputStream.java
+++
b/clients/src/main/java/org/apache/kafka/common/compress/Lz4BlockOutputStream.java
@@ -19,6 +19,7 @@ package org.apache.kafka.common.compress;
import java.io.IOException;
import java.io.OutputStream;
+import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.utils.ByteUtils;
import net.jpountz.lz4.LZ4Compressor;
@@ -75,7 +76,7 @@ public final class Lz4BlockOutputStream extends OutputStream {
*
* For backward compatibility, Lz4BlockOutputStream uses
fastCompressor with default compression level but, with the other level, it
uses highCompressor.
*/
- compressor = level == Lz4Compression.DEFAULT_LEVEL ?
LZ4Factory.fastestInstance().fastCompressor() :
LZ4Factory.fastestInstance().highCompressor(level);
+ compressor = level == CompressionType.LZ4.defaultLevel() ?
LZ4Factory.fastestInstance().fastCompressor() :
LZ4Factory.fastestInstance().highCompressor(level);
checksum = XXHashFactory.fastestInstance().hash32();
this.useBrokenFlagDescriptorChecksum = useBrokenFlagDescriptorChecksum;
bd = new BD(blockSize);
diff --git
a/clients/src/main/java/org/apache/kafka/common/compress/Lz4Compression.java
b/clients/src/main/java/org/apache/kafka/common/compress/Lz4Compression.java
index 42c1a1a1417..f3e80447a14 100644
--- a/clients/src/main/java/org/apache/kafka/common/compress/Lz4Compression.java
+++ b/clients/src/main/java/org/apache/kafka/common/compress/Lz4Compression.java
@@ -28,13 +28,9 @@ import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Objects;
-public class Lz4Compression implements Compression {
+import static org.apache.kafka.common.record.CompressionType.LZ4;
- // These values come from net.jpountz.lz4.LZ4Constants
- // We may need to update them if the lz4 library changes these values.
- public static final int MIN_LEVEL = 1;
- public static final int MAX_LEVEL = 17;
- public static final int DEFAULT_LEVEL = 9;
+public class Lz4Compression implements Compression {
private final int level;
@@ -44,7 +40,7 @@ public class Lz4Compression implements Compression {
@Override
public CompressionType type() {
- return CompressionType.LZ4;
+ return LZ4;
}
@Override
@@ -89,10 +85,10 @@ public class Lz4Compression implements Compression {
}
public static class Builder implements Compression.Builder<Lz4Compression>
{
- private int level = DEFAULT_LEVEL;
+ private int level = LZ4.defaultLevel();
public Builder level(int level) {
- if (level < MIN_LEVEL || MAX_LEVEL < level) {
+ if (level < LZ4.minLevel() || LZ4.maxLevel() < level) {
throw new IllegalArgumentException("lz4 doesn't support given
compression level: " + level);
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/compress/ZstdCompression.java
b/clients/src/main/java/org/apache/kafka/common/compress/ZstdCompression.java
index 0664fd2dbb0..d3514563a5d 100644
---
a/clients/src/main/java/org/apache/kafka/common/compress/ZstdCompression.java
+++
b/clients/src/main/java/org/apache/kafka/common/compress/ZstdCompression.java
@@ -19,7 +19,6 @@ package org.apache.kafka.common.compress;
import com.github.luben.zstd.BufferPool;
import com.github.luben.zstd.RecyclingBufferPool;
-import com.github.luben.zstd.Zstd;
import com.github.luben.zstd.ZstdInputStreamNoFinalizer;
import com.github.luben.zstd.ZstdOutputStreamNoFinalizer;
import org.apache.kafka.common.KafkaException;
@@ -36,11 +35,9 @@ import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Objects;
-public class ZstdCompression implements Compression {
+import static org.apache.kafka.common.record.CompressionType.ZSTD;
- public static final int MIN_LEVEL = Zstd.minCompressionLevel();
- public static final int MAX_LEVEL = Zstd.maxCompressionLevel();
- public static final int DEFAULT_LEVEL = Zstd.defaultCompressionLevel();
+public class ZstdCompression implements Compression {
private final int level;
@@ -50,7 +47,7 @@ public class ZstdCompression implements Compression {
@Override
public CompressionType type() {
- return CompressionType.ZSTD;
+ return ZSTD;
}
@Override
@@ -124,10 +121,10 @@ public class ZstdCompression implements Compression {
}
public static class Builder implements
Compression.Builder<ZstdCompression> {
- private int level = DEFAULT_LEVEL;
+ private int level = ZSTD.defaultLevel();
public Builder level(int level) {
- if (MAX_LEVEL < level || level < MIN_LEVEL) {
+ if (level < ZSTD.minLevel() || ZSTD.maxLevel() < level) {
throw new IllegalArgumentException("zstd doesn't support given
compression level: " + level);
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
index 12efafc8b55..05b8c45358b 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
@@ -16,6 +16,13 @@
*/
package org.apache.kafka.common.record;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+
+import java.util.zip.Deflater;
+
+import static org.apache.kafka.common.config.ConfigDef.Range.between;
+
/**
* The compression type to use
*/
@@ -23,7 +30,46 @@ public enum CompressionType {
NONE((byte) 0, "none", 1.0f),
// Shipped with the JDK
- GZIP((byte) 1, "gzip", 1.0f),
+ GZIP((byte) 1, "gzip", 1.0f) {
+ public static final int MIN_LEVEL = Deflater.BEST_SPEED;
+ public static final int MAX_LEVEL = Deflater.BEST_COMPRESSION;
+ public static final int DEFAULT_LEVEL = Deflater.DEFAULT_COMPRESSION;
+
+ @Override
+ public int defaultLevel() {
+ return DEFAULT_LEVEL;
+ }
+
+ @Override
+ public int maxLevel() {
+ return MAX_LEVEL;
+ }
+
+ @Override
+ public int minLevel() {
+ return MIN_LEVEL;
+ }
+
+ @Override
+ public ConfigDef.Validator levelValidator() {
+ return new ConfigDef.Validator() {
+ @Override
+ public void ensureValid(String name, Object o) {
+ if (o == null)
+ throw new ConfigException(name, null, "Value must be
non-null");
+ int level = ((Number) o).intValue();
+ if (level > MAX_LEVEL || (level < MIN_LEVEL && level !=
DEFAULT_LEVEL)) {
+ throw new ConfigException(name, o, "Value must be
between " + MIN_LEVEL + " and " + MAX_LEVEL + " or equal to " + DEFAULT_LEVEL);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "[" + MIN_LEVEL + ",...," + MAX_LEVEL + "] or " +
DEFAULT_LEVEL;
+ }
+ };
+ }
+ },
// We should only load classes from a given compression library when we
actually use said compression library. This
// is because compression libraries include native code for a set of
platforms and we want to avoid errors
@@ -31,8 +77,65 @@ public enum CompressionType {
// To ensure this, we only reference compression library code from classes
that are only invoked when actual usage
// happens.
SNAPPY((byte) 2, "snappy", 1.0f),
- LZ4((byte) 3, "lz4", 1.0f),
- ZSTD((byte) 4, "zstd", 1.0f);
+ LZ4((byte) 3, "lz4", 1.0f) {
+ // These values come from net.jpountz.lz4.LZ4Constants
+ // We may need to update them if the lz4 library changes these values.
+ private static final int MIN_LEVEL = 1;
+ private static final int MAX_LEVEL = 17;
+ private static final int DEFAULT_LEVEL = 9;
+
+ @Override
+ public int defaultLevel() {
+ return DEFAULT_LEVEL;
+ }
+
+ @Override
+ public int maxLevel() {
+ return MAX_LEVEL;
+ }
+
+ @Override
+ public int minLevel() {
+ return MIN_LEVEL;
+ }
+
+ @Override
+ public ConfigDef.Validator levelValidator() {
+ return between(MIN_LEVEL, MAX_LEVEL);
+ }
+ },
+ ZSTD((byte) 4, "zstd", 1.0f) {
+ // These values come from the zstd library. We don't use the
Zstd.minCompressionLevel(),
+ // Zstd.maxCompressionLevel() and Zstd.defaultCompressionLevel()
methods to not load the Zstd library
+ // while parsing configuration.
+ // See ZSTD_minCLevel in
https://github.com/facebook/zstd/blob/dev/lib/compress/zstd_compress.c#L6987
+ // and ZSTD_TARGETLENGTH_MAX
https://github.com/facebook/zstd/blob/dev/lib/zstd.h#L1249
+ private static final int MIN_LEVEL = -131072;
+ // See ZSTD_MAX_CLEVEL in
https://github.com/facebook/zstd/blob/dev/lib/compress/clevels.h#L19
+ private static final int MAX_LEVEL = 22;
+ // See ZSTD_CLEVEL_DEFAULT in
https://github.com/facebook/zstd/blob/dev/lib/zstd.h#L129
+ private static final int DEFAULT_LEVEL = 3;
+
+ @Override
+ public int defaultLevel() {
+ return DEFAULT_LEVEL;
+ }
+
+ @Override
+ public int maxLevel() {
+ return MAX_LEVEL;
+ }
+
+ @Override
+ public int minLevel() {
+ return MIN_LEVEL;
+ }
+
+ @Override
+ public ConfigDef.Validator levelValidator() {
+ return between(MIN_LEVEL, MAX_LEVEL);
+ }
+ };
// compression type is represented by two bits in the attributes field of
the record batch header, so `byte` is
// large enough
@@ -78,6 +181,22 @@ public enum CompressionType {
throw new IllegalArgumentException("Unknown compression name: " +
name);
}
+ public int defaultLevel() {
+ throw new UnsupportedOperationException("Compression levels are not
defined for this compression type: " + name);
+ }
+
+ public int maxLevel() {
+ throw new UnsupportedOperationException("Compression levels are not
defined for this compression type: " + name);
+ }
+
+ public int minLevel() {
+ throw new UnsupportedOperationException("Compression levels are not
defined for this compression type: " + name);
+ }
+
+ public ConfigDef.Validator levelValidator() {
+ throw new UnsupportedOperationException("Compression levels are not
defined for this compression type: " + name);
+ }
+
@Override
public String toString() {
return name;
diff --git
a/clients/src/test/java/org/apache/kafka/common/compress/GzipCompressionTest.java
b/clients/src/test/java/org/apache/kafka/common/compress/GzipCompressionTest.java
index 394eae4c924..caf2631d8f6 100644
---
a/clients/src/test/java/org/apache/kafka/common/compress/GzipCompressionTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/compress/GzipCompressionTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.common.compress;
+import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.utils.BufferSupplier;
@@ -29,6 +30,7 @@ import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
+import static org.apache.kafka.common.record.CompressionType.GZIP;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -41,7 +43,7 @@ public class GzipCompressionTest {
byte[] data = String.join("", Collections.nCopies(256,
"data")).getBytes(StandardCharsets.UTF_8);
for (byte magic : Arrays.asList(RecordBatch.MAGIC_VALUE_V0,
RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2)) {
- for (int level : Arrays.asList(GzipCompression.MIN_LEVEL,
GzipCompression.DEFAULT_LEVEL, GzipCompression.MAX_LEVEL)) {
+ for (int level : Arrays.asList(GZIP.minLevel(),
GZIP.defaultLevel(), GZIP.maxLevel())) {
GzipCompression compression = builder.level(level).build();
ByteBufferOutputStream bufferStream = new
ByteBufferOutputStream(4);
try (OutputStream out =
compression.wrapForOutput(bufferStream, magic)) {
@@ -64,21 +66,21 @@ public class GzipCompressionTest {
public void testCompressionLevels() {
GzipCompression.Builder builder = Compression.gzip();
- assertThrows(IllegalArgumentException.class, () ->
builder.level(GzipCompression.MIN_LEVEL - 1));
- assertThrows(IllegalArgumentException.class, () ->
builder.level(GzipCompression.MAX_LEVEL + 1));
+ assertThrows(IllegalArgumentException.class, () ->
builder.level(GZIP.minLevel() - 1));
+ assertThrows(IllegalArgumentException.class, () ->
builder.level(GZIP.maxLevel() + 1));
- builder.level(GzipCompression.MIN_LEVEL);
- builder.level(GzipCompression.MAX_LEVEL);
+ builder.level(GZIP.minLevel());
+ builder.level(GZIP.maxLevel());
}
@Test
public void testLevelValidator() {
- GzipCompression.LevelValidator validator = new
GzipCompression.LevelValidator();
- for (int level = GzipCompression.MIN_LEVEL; level <=
GzipCompression.MAX_LEVEL; level++) {
+ ConfigDef.Validator validator = GZIP.levelValidator();
+ for (int level = GZIP.minLevel(); level <= GZIP.maxLevel(); level++) {
validator.ensureValid("", level);
}
- validator.ensureValid("", GzipCompression.DEFAULT_LEVEL);
- assertThrows(ConfigException.class, () -> validator.ensureValid("",
GzipCompression.MIN_LEVEL - 1));
- assertThrows(ConfigException.class, () -> validator.ensureValid("",
GzipCompression.MAX_LEVEL + 1));
+ validator.ensureValid("", GZIP.defaultLevel());
+ assertThrows(ConfigException.class, () -> validator.ensureValid("",
GZIP.minLevel() - 1));
+ assertThrows(ConfigException.class, () -> validator.ensureValid("",
GZIP.maxLevel() + 1));
}
}
diff --git
a/clients/src/test/java/org/apache/kafka/common/compress/Lz4CompressionTest.java
b/clients/src/test/java/org/apache/kafka/common/compress/Lz4CompressionTest.java
index e0f85489856..a043f2eb7dc 100644
---
a/clients/src/test/java/org/apache/kafka/common/compress/Lz4CompressionTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/compress/Lz4CompressionTest.java
@@ -44,6 +44,7 @@ import java.util.Random;
import java.util.stream.Stream;
import static
org.apache.kafka.common.compress.Lz4BlockOutputStream.LZ4_FRAME_INCOMPRESSIBLE_MASK;
+import static org.apache.kafka.common.record.CompressionType.LZ4;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -89,7 +90,7 @@ public class Lz4CompressionTest {
byte[] data = String.join("", Collections.nCopies(256,
"data")).getBytes(StandardCharsets.UTF_8);
for (byte magic : Arrays.asList(RecordBatch.MAGIC_VALUE_V0,
RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2)) {
- for (int level : Arrays.asList(Lz4Compression.MIN_LEVEL,
Lz4Compression.DEFAULT_LEVEL, Lz4Compression.MAX_LEVEL)) {
+ for (int level : Arrays.asList(LZ4.minLevel(), LZ4.defaultLevel(),
LZ4.maxLevel())) {
Lz4Compression compression = builder.level(level).build();
ByteBufferOutputStream bufferStream = new
ByteBufferOutputStream(4);
try (OutputStream out =
compression.wrapForOutput(bufferStream, magic)) {
@@ -112,11 +113,11 @@ public class Lz4CompressionTest {
public void testCompressionLevels() {
Lz4Compression.Builder builder = Compression.lz4();
- assertThrows(IllegalArgumentException.class, () ->
builder.level(Lz4Compression.MIN_LEVEL - 1));
- assertThrows(IllegalArgumentException.class, () ->
builder.level(Lz4Compression.MAX_LEVEL + 1));
+ assertThrows(IllegalArgumentException.class, () ->
builder.level(LZ4.minLevel() - 1));
+ assertThrows(IllegalArgumentException.class, () ->
builder.level(LZ4.maxLevel() + 1));
- builder.level(Lz4Compression.MIN_LEVEL);
- builder.level(Lz4Compression.MAX_LEVEL);
+ builder.level(LZ4.minLevel());
+ builder.level(LZ4.maxLevel());
}
private static class Payload {
@@ -191,7 +192,7 @@ public class Lz4CompressionTest {
for (boolean ignore : Arrays.asList(false, true))
for (boolean blockChecksum : Arrays.asList(false,
true))
for (boolean close : Arrays.asList(false, true))
- for (int level :
Arrays.asList(Lz4Compression.MIN_LEVEL, Lz4Compression.DEFAULT_LEVEL,
Lz4Compression.MAX_LEVEL))
+ for (int level : Arrays.asList(LZ4.minLevel(),
LZ4.defaultLevel(), LZ4.maxLevel()))
arguments.add(Arguments.of(new
Args(broken, ignore, level, blockChecksum, close, payload)));
return arguments.stream();
diff --git
a/clients/src/test/java/org/apache/kafka/common/compress/ZstdCompressionTest.java
b/clients/src/test/java/org/apache/kafka/common/compress/ZstdCompressionTest.java
index 2e16d4b36b8..6f13054db31 100644
---
a/clients/src/test/java/org/apache/kafka/common/compress/ZstdCompressionTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/compress/ZstdCompressionTest.java
@@ -28,6 +28,7 @@ import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
+import static org.apache.kafka.common.record.CompressionType.ZSTD;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -40,7 +41,7 @@ public class ZstdCompressionTest {
byte[] data = String.join("", Collections.nCopies(256,
"data")).getBytes(StandardCharsets.UTF_8);
for (byte magic : Arrays.asList(RecordBatch.MAGIC_VALUE_V0,
RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2)) {
- for (int level : Arrays.asList(ZstdCompression.MIN_LEVEL,
ZstdCompression.DEFAULT_LEVEL, ZstdCompression.MAX_LEVEL)) {
+ for (int level : Arrays.asList(ZSTD.minLevel(),
ZSTD.defaultLevel(), ZSTD.maxLevel())) {
ZstdCompression compression = builder.level(level).build();
ByteBufferOutputStream bufferStream = new
ByteBufferOutputStream(4);
try (OutputStream out =
compression.wrapForOutput(bufferStream, magic)) {
@@ -63,10 +64,10 @@ public class ZstdCompressionTest {
public void testCompressionLevels() {
ZstdCompression.Builder builder = Compression.zstd();
- assertThrows(IllegalArgumentException.class, () ->
builder.level(ZstdCompression.MIN_LEVEL - 1));
- assertThrows(IllegalArgumentException.class, () ->
builder.level(ZstdCompression.MAX_LEVEL + 1));
+ assertThrows(IllegalArgumentException.class, () ->
builder.level(ZSTD.minLevel() - 1));
+ assertThrows(IllegalArgumentException.class, () ->
builder.level(ZSTD.maxLevel() + 1));
- builder.level(ZstdCompression.MIN_LEVEL);
- builder.level(ZstdCompression.MAX_LEVEL);
+ builder.level(ZSTD.minLevel());
+ builder.level(ZSTD.maxLevel());
}
}
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 364a647364b..f3ccaeaae10 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -24,7 +24,6 @@ import kafka.cluster.EndPoint
import kafka.utils.{CoreUtils, Logging}
import kafka.utils.Implicits._
import org.apache.kafka.common.Reconfigurable
-import org.apache.kafka.common.compress.{GzipCompression, Lz4Compression,
ZstdCompression}
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef,
ConfigException, ConfigResource, SaslConfigs, SecurityConfig, SslClientAuth,
SslConfigs, TopicConfig}
import org.apache.kafka.common.config.ConfigDef.{CaseInsensitiveValidString,
ConfigKey, ValidList, ValidString}
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
@@ -306,9 +305,9 @@ object KafkaConfig {
.define(GroupCoordinatorConfig.OFFSET_COMMIT_REQUIRED_ACKS_CONFIG,
SHORT, GroupCoordinatorConfig.OFFSET_COMMIT_REQUIRED_ACKS_DEFAULT, HIGH,
GroupCoordinatorConfig.OFFSET_COMMIT_REQUIRED_ACKS_DOC)
.define(ServerConfigs.DELETE_TOPIC_ENABLE_CONFIG, BOOLEAN,
ServerConfigs.DELETE_TOPIC_ENABLE_DEFAULT, HIGH,
ServerConfigs.DELETE_TOPIC_ENABLE_DOC)
.define(ServerConfigs.COMPRESSION_TYPE_CONFIG, STRING,
LogConfig.DEFAULT_COMPRESSION_TYPE,
ValidString.in(BrokerCompressionType.names.asScala.toSeq:_*), HIGH,
ServerConfigs.COMPRESSION_TYPE_DOC)
- .define(ServerConfigs.COMPRESSION_GZIP_LEVEL_CONFIG, INT,
GzipCompression.DEFAULT_LEVEL, new GzipCompression.LevelValidator(), MEDIUM,
ServerConfigs.COMPRESSION_GZIP_LEVEL_DOC)
- .define(ServerConfigs.COMPRESSION_LZ4_LEVEL_CONFIG, INT,
Lz4Compression.DEFAULT_LEVEL, between(Lz4Compression.MIN_LEVEL,
Lz4Compression.MAX_LEVEL), MEDIUM, ServerConfigs.COMPRESSION_LZ4_LEVEL_DOC)
- .define(ServerConfigs.COMPRESSION_ZSTD_LEVEL_CONFIG, INT,
ZstdCompression.DEFAULT_LEVEL, between(ZstdCompression.MIN_LEVEL,
ZstdCompression.MAX_LEVEL), MEDIUM, ServerConfigs.COMPRESSION_ZSTD_LEVEL_DOC)
+ .define(ServerConfigs.COMPRESSION_GZIP_LEVEL_CONFIG, INT,
CompressionType.GZIP.defaultLevel, CompressionType.GZIP.levelValidator, MEDIUM,
ServerConfigs.COMPRESSION_GZIP_LEVEL_DOC)
+ .define(ServerConfigs.COMPRESSION_LZ4_LEVEL_CONFIG, INT,
CompressionType.LZ4.defaultLevel, CompressionType.LZ4.levelValidator, MEDIUM,
ServerConfigs.COMPRESSION_LZ4_LEVEL_DOC)
+ .define(ServerConfigs.COMPRESSION_ZSTD_LEVEL_CONFIG, INT,
CompressionType.ZSTD.defaultLevel, CompressionType.ZSTD.levelValidator, MEDIUM,
ServerConfigs.COMPRESSION_ZSTD_LEVEL_DOC)
/** ********* Transaction management configuration ***********/
.define(TransactionStateManagerConfigs.TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG,
INT, TransactionStateManagerConfigs.TRANSACTIONAL_ID_EXPIRATION_MS_DEFAULT,
atLeast(1), HIGH,
TransactionStateManagerConfigs.TRANSACTIONAL_ID_EXPIRATION_MS_DOC)
diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
index c2b29bcc6d6..82b5999a6aa 100644
--- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
@@ -20,7 +20,7 @@ import java.nio.ByteBuffer
import java.util.concurrent.TimeUnit
import kafka.server.{BrokerTopicStats, RequestLocal}
import kafka.utils.TestUtils.meterCount
-import org.apache.kafka.common.compress.{Compression, GzipCompression,
Lz4Compression}
+import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.errors.{InvalidTimestampException,
UnsupportedCompressionTypeException, UnsupportedForMessageFormatException}
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.{PrimitiveRef, Time}
@@ -1570,11 +1570,11 @@ class LogValidatorTest {
List.fill(256)("data").mkString("").getBytes
)
// Records from the producer were created with gzip max level
- val gzipMax: Compression =
Compression.gzip().level(GzipCompression.MAX_LEVEL).build()
+ val gzipMax: Compression =
Compression.gzip().level(CompressionType.GZIP.maxLevel()).build()
val recordsGzipMax = createRecords(records, RecordBatch.MAGIC_VALUE_V2,
RecordBatch.NO_TIMESTAMP, gzipMax)
// The topic is configured with gzip min level
- val gzipMin: Compression =
Compression.gzip().level(GzipCompression.MIN_LEVEL).build()
+ val gzipMin: Compression =
Compression.gzip().level(CompressionType.GZIP.minLevel()).build()
val recordsGzipMin = createRecords(records, RecordBatch.MAGIC_VALUE_V2,
RecordBatch.NO_TIMESTAMP, gzipMin)
// ensure data compressed with gzip max and min is different
@@ -1608,11 +1608,11 @@ class LogValidatorTest {
List.fill(256)("data").mkString("").getBytes
)
// Records from the producer were created with gzip max level
- val gzipMax: Compression =
Compression.gzip().level(GzipCompression.MAX_LEVEL).build()
+ val gzipMax: Compression =
Compression.gzip().level(CompressionType.GZIP.maxLevel()).build()
val recordsGzipMax = createRecords(records, RecordBatch.MAGIC_VALUE_V2,
RecordBatch.NO_TIMESTAMP, gzipMax)
// The topic is configured with lz4 min level
- val lz4Min: Compression =
Compression.lz4().level(Lz4Compression.MIN_LEVEL).build()
+ val lz4Min: Compression =
Compression.lz4().level(CompressionType.LZ4.minLevel()).build()
val recordsLz4Min = createRecords(records, RecordBatch.MAGIC_VALUE_V2,
RecordBatch.NO_TIMESTAMP, lz4Min)
val validator = new LogValidator(recordsGzipMax,
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 266b64560fb..bebddbc4684 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -30,7 +30,6 @@ import org.apache.kafka.common.metrics.Sensor
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.record.{CompressionType, Records}
import org.apache.kafka.common.security.auth.SecurityProtocol
-import org.apache.kafka.common.compress.{GzipCompression, Lz4Compression,
ZstdCompression}
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
import org.apache.kafka.coordinator.group.ConsumerGroupMigrationPolicy
import org.apache.kafka.coordinator.group.Group.GroupType
@@ -716,7 +715,7 @@ class KafkaConfigTest {
def testInvalidGzipCompressionLevel(): Unit = {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port
= 8181)
props.setProperty(ServerConfigs.COMPRESSION_TYPE_CONFIG, "gzip")
- props.setProperty(ServerConfigs.COMPRESSION_GZIP_LEVEL_CONFIG,
(GzipCompression.MAX_LEVEL + 1).toString)
+ props.setProperty(ServerConfigs.COMPRESSION_GZIP_LEVEL_CONFIG,
(CompressionType.GZIP.maxLevel() + 1).toString)
assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props))
}
@@ -724,7 +723,7 @@ class KafkaConfigTest {
def testInvalidLz4CompressionLevel(): Unit = {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port
= 8181)
props.setProperty(ServerConfigs.COMPRESSION_TYPE_CONFIG, "lz4")
- props.setProperty(ServerConfigs.COMPRESSION_LZ4_LEVEL_CONFIG,
(Lz4Compression.MAX_LEVEL + 1).toString)
+ props.setProperty(ServerConfigs.COMPRESSION_LZ4_LEVEL_CONFIG,
(CompressionType.LZ4.maxLevel() + 1).toString)
assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props))
}
@@ -732,7 +731,7 @@ class KafkaConfigTest {
def testInvalidZstdCompressionLevel(): Unit = {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port
= 8181)
props.setProperty(ServerConfigs.COMPRESSION_TYPE_CONFIG, "zstd")
- props.setProperty(ServerConfigs.COMPRESSION_ZSTD_LEVEL_CONFIG,
(ZstdCompression.MAX_LEVEL + 1).toString)
+ props.setProperty(ServerConfigs.COMPRESSION_ZSTD_LEVEL_CONFIG,
(CompressionType.ZSTD.maxLevel() + 1).toString)
assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props))
}
@@ -946,7 +945,7 @@ class KafkaConfigTest {
case ServerConfigs.COMPRESSION_GZIP_LEVEL_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
case ServerConfigs.COMPRESSION_LZ4_LEVEL_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
- case ServerConfigs.COMPRESSION_ZSTD_LEVEL_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number",
ZstdCompression.MAX_LEVEL + 1)
+ case ServerConfigs.COMPRESSION_ZSTD_LEVEL_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number",
CompressionType.ZSTD.maxLevel() + 1)
//SSL Configs
case BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG =>
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index 2ee8bf4d749..2d3e013fee1 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -143,6 +143,7 @@ versions += [
kafka_35: "3.5.2",
kafka_36: "3.6.2",
kafka_37: "3.7.0",
+ // When updating lz4 make sure the compression levels in
org.apache.kafka.common.record.CompressionType are still valid
lz4: "1.8.0",
mavenArtifact: "3.9.6",
metrics: "2.2.0",
@@ -166,6 +167,7 @@ versions += [
zinc: "1.9.2",
zookeeper: "3.8.4",
// When updating the zstd version, please do as well in
docker/native/native-image-configs/resource-config.json
+ // Also make sure the compression levels in
org.apache.kafka.common.record.CompressionType are still valid
zstd: "1.5.6-4"
]
diff --git
a/server-common/src/test/java/org/apache/kafka/server/record/BrokerCompressionTypeTest.java
b/server-common/src/test/java/org/apache/kafka/server/record/BrokerCompressionTypeTest.java
index 62ac8eab059..41463aff4d2 100644
---
a/server-common/src/test/java/org/apache/kafka/server/record/BrokerCompressionTypeTest.java
+++
b/server-common/src/test/java/org/apache/kafka/server/record/BrokerCompressionTypeTest.java
@@ -32,13 +32,13 @@ public class BrokerCompressionTypeTest {
@Test
public void testTargetCompressionType() {
- GzipCompression gzipWithLevel =
Compression.gzip().level(GzipCompression.MAX_LEVEL).build();
+ GzipCompression gzipWithLevel =
Compression.gzip().level(CompressionType.GZIP.maxLevel()).build();
assertEquals(gzipWithLevel,
BrokerCompressionType.targetCompression(Optional.of(gzipWithLevel),
CompressionType.ZSTD));
SnappyCompression snappy = Compression.snappy().build();
assertEquals(snappy,
BrokerCompressionType.targetCompression(Optional.of(snappy),
CompressionType.LZ4));
- Lz4Compression lz4WithLevel =
Compression.lz4().level(Lz4Compression.MAX_LEVEL).build();
+ Lz4Compression lz4WithLevel =
Compression.lz4().level(CompressionType.LZ4.maxLevel()).build();
assertEquals(lz4WithLevel,
BrokerCompressionType.targetCompression(Optional.of(lz4WithLevel),
CompressionType.ZSTD));
- ZstdCompression zstdWithLevel =
Compression.zstd().level(ZstdCompression.MAX_LEVEL).build();
+ ZstdCompression zstdWithLevel =
Compression.zstd().level(CompressionType.ZSTD.maxLevel()).build();
assertEquals(zstdWithLevel,
BrokerCompressionType.targetCompression(Optional.of(zstdWithLevel),
CompressionType.GZIP));
GzipCompression gzip = Compression.gzip().build();
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
index ec5dec77c5f..fc7bcec9027 100644
---
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
+++
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
@@ -42,9 +42,6 @@ import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import org.apache.kafka.common.compress.Compression;
-import org.apache.kafka.common.compress.GzipCompression;
-import org.apache.kafka.common.compress.Lz4Compression;
-import org.apache.kafka.common.compress.ZstdCompression;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.ConfigKey;
@@ -53,6 +50,7 @@ import org.apache.kafka.common.config.ConfigDef.ValidList;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.errors.InvalidConfigurationException;
+import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.LegacyRecord;
import org.apache.kafka.common.record.RecordVersion;
import org.apache.kafka.common.record.Records;
@@ -246,12 +244,12 @@ public class LogConfig extends AbstractConfig {
TopicConfig.MIN_IN_SYNC_REPLICAS_DOC)
.define(TopicConfig.COMPRESSION_TYPE_CONFIG, STRING,
DEFAULT_COMPRESSION_TYPE, in(BrokerCompressionType.names().toArray(new
String[0])),
MEDIUM, TopicConfig.COMPRESSION_TYPE_DOC)
- .define(TopicConfig.COMPRESSION_GZIP_LEVEL_CONFIG, INT,
GzipCompression.DEFAULT_LEVEL,
- new GzipCompression.LevelValidator(), MEDIUM,
TopicConfig.COMPRESSION_GZIP_LEVEL_DOC)
- .define(TopicConfig.COMPRESSION_LZ4_LEVEL_CONFIG, INT,
Lz4Compression.DEFAULT_LEVEL,
- between(Lz4Compression.MIN_LEVEL, Lz4Compression.MAX_LEVEL),
MEDIUM, TopicConfig.COMPRESSION_LZ4_LEVEL_DOC)
- .define(TopicConfig.COMPRESSION_ZSTD_LEVEL_CONFIG, INT,
ZstdCompression.DEFAULT_LEVEL,
- between(ZstdCompression.MIN_LEVEL, ZstdCompression.MAX_LEVEL),
MEDIUM, TopicConfig.COMPRESSION_ZSTD_LEVEL_DOC)
+ .define(TopicConfig.COMPRESSION_GZIP_LEVEL_CONFIG, INT,
CompressionType.GZIP.defaultLevel(),
+ CompressionType.GZIP.levelValidator(), MEDIUM,
TopicConfig.COMPRESSION_GZIP_LEVEL_DOC)
+ .define(TopicConfig.COMPRESSION_LZ4_LEVEL_CONFIG, INT,
CompressionType.LZ4.defaultLevel(),
+ CompressionType.LZ4.levelValidator(), MEDIUM,
TopicConfig.COMPRESSION_LZ4_LEVEL_DOC)
+ .define(TopicConfig.COMPRESSION_ZSTD_LEVEL_CONFIG, INT,
CompressionType.ZSTD.defaultLevel(),
+ CompressionType.ZSTD.levelValidator(), MEDIUM,
TopicConfig.COMPRESSION_ZSTD_LEVEL_DOC)
.define(TopicConfig.PREALLOCATE_CONFIG, BOOLEAN,
DEFAULT_PREALLOCATE, MEDIUM, TopicConfig.PREALLOCATE_DOC)
.define(MESSAGE_FORMAT_VERSION_CONFIG, STRING,
DEFAULT_MESSAGE_FORMAT_VERSION, new MetadataVersionValidator(), MEDIUM,
MESSAGE_FORMAT_VERSION_DOC)