This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.9 by this push:
     new 4e6508b5e35 KAFKA-17227: Refactor compression code to only load codecs 
when used (#16782)
4e6508b5e35 is described below

commit 4e6508b5e3522c931d027a33f5d8a02716bf0e56
Author: Mickael Maison <[email protected]>
AuthorDate: Tue Aug 6 11:01:21 2024 +0200

    KAFKA-17227: Refactor compression code to only load codecs when used 
(#16782)
    
    
    Reviewers: Chia-Ping Tsai <[email protected]>, Josep Prat 
<[email protected]>

---
 checkstyle/import-control.xml                      |   6 +-
 .../kafka/clients/producer/ProducerConfig.java     |   9 +-
 .../kafka/common/compress/GzipCompression.java     |  33 +-----
 .../common/compress/Lz4BlockOutputStream.java      |   3 +-
 .../kafka/common/compress/Lz4Compression.java      |  14 +--
 .../kafka/common/compress/ZstdCompression.java     |  13 +--
 .../kafka/common/record/CompressionType.java       | 125 ++++++++++++++++++++-
 .../kafka/common/compress/GzipCompressionTest.java |  22 ++--
 .../kafka/common/compress/Lz4CompressionTest.java  |  13 ++-
 .../kafka/common/compress/ZstdCompressionTest.java |  11 +-
 .../scala/unit/kafka/log/LogValidatorTest.scala    |  10 +-
 .../scala/unit/kafka/server/KafkaConfigTest.scala  |   9 +-
 gradle/dependencies.gradle                         |   2 +
 .../server/record/BrokerCompressionTypeTest.java   |   6 +-
 .../apache/kafka/server/config/ServerConfigs.java  |  11 +-
 .../kafka/storage/internals/log/LogConfig.java     |  16 ++-
 16 files changed, 197 insertions(+), 106 deletions(-)

diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 3f8212f9976..abf98458005 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -76,7 +76,10 @@
       <allow pkg="net.jpountz.xxhash" />
       <allow pkg="org.xerial.snappy" />
       <allow pkg="org.apache.kafka.common.compress" />
-      <allow class="org.apache.kafka.common.record.CompressionType" 
exact-match="true" />
+      <allow class="org.apache.kafka.common.record.CompressionType" />
+      <allow class="org.apache.kafka.common.record.CompressionType.GZIP" />
+      <allow class="org.apache.kafka.common.record.CompressionType.LZ4" />
+      <allow class="org.apache.kafka.common.record.CompressionType.ZSTD" />
       <allow class="org.apache.kafka.common.record.RecordBatch" 
exact-match="true" />
     </subpackage>
 
@@ -150,6 +153,7 @@
     </subpackage>
 
     <subpackage name="record">
+      <allow class="org.apache.kafka.common.config.ConfigDef.Range.between" 
exact-match="true" />
       <allow pkg="org.apache.kafka.common.compress" />
       <allow pkg="org.apache.kafka.common.header" />
       <allow pkg="org.apache.kafka.common.record" />
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index acfb760fbab..8b293395313 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -19,9 +19,6 @@ package org.apache.kafka.clients.producer;
 import org.apache.kafka.clients.ClientDnsLookup;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.MetadataRecoveryStrategy;
-import org.apache.kafka.common.compress.GzipCompression;
-import org.apache.kafka.common.compress.Lz4Compression;
-import org.apache.kafka.common.compress.ZstdCompression;
 import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
@@ -381,9 +378,9 @@ public class ProducerConfig extends AbstractConfig {
                                         Importance.LOW,
                                         ACKS_DOC)
                                 .define(COMPRESSION_TYPE_CONFIG, Type.STRING, 
CompressionType.NONE.name, in(Utils.enumOptions(CompressionType.class)), 
Importance.HIGH, COMPRESSION_TYPE_DOC)
-                                .define(COMPRESSION_GZIP_LEVEL_CONFIG, 
Type.INT, GzipCompression.DEFAULT_LEVEL, new GzipCompression.LevelValidator(), 
Importance.MEDIUM, COMPRESSION_GZIP_LEVEL_DOC)
-                                .define(COMPRESSION_LZ4_LEVEL_CONFIG, 
Type.INT, Lz4Compression.DEFAULT_LEVEL, between(Lz4Compression.MIN_LEVEL, 
Lz4Compression.MAX_LEVEL), Importance.MEDIUM, COMPRESSION_LZ4_LEVEL_DOC)
-                                .define(COMPRESSION_ZSTD_LEVEL_CONFIG, 
Type.INT, ZstdCompression.DEFAULT_LEVEL, between(ZstdCompression.MIN_LEVEL, 
ZstdCompression.MAX_LEVEL), Importance.MEDIUM, COMPRESSION_ZSTD_LEVEL_DOC)
+                                .define(COMPRESSION_GZIP_LEVEL_CONFIG, 
Type.INT, CompressionType.GZIP.defaultLevel(), 
CompressionType.GZIP.levelValidator(), Importance.MEDIUM, 
COMPRESSION_GZIP_LEVEL_DOC)
+                                .define(COMPRESSION_LZ4_LEVEL_CONFIG, 
Type.INT, CompressionType.LZ4.defaultLevel(), 
CompressionType.LZ4.levelValidator(), Importance.MEDIUM, 
COMPRESSION_LZ4_LEVEL_DOC)
+                                .define(COMPRESSION_ZSTD_LEVEL_CONFIG, 
Type.INT, CompressionType.ZSTD.defaultLevel(), 
CompressionType.ZSTD.levelValidator(), Importance.MEDIUM, 
COMPRESSION_ZSTD_LEVEL_DOC)
                                 .define(BATCH_SIZE_CONFIG, Type.INT, 16384, 
atLeast(0), Importance.MEDIUM, BATCH_SIZE_DOC)
                                 
.define(PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_CONFIG, Type.BOOLEAN, true, 
Importance.LOW, PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_DOC)
                                 
.define(PARTITIONER_AVAILABILITY_TIMEOUT_MS_CONFIG, Type.LONG, 0, atLeast(0), 
Importance.LOW, PARTITIONER_AVAILABILITY_TIMEOUT_MS_DOC)
diff --git 
a/clients/src/main/java/org/apache/kafka/common/compress/GzipCompression.java 
b/clients/src/main/java/org/apache/kafka/common/compress/GzipCompression.java
index 52e38700c46..e900359530a 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/compress/GzipCompression.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/compress/GzipCompression.java
@@ -17,8 +17,6 @@
 package org.apache.kafka.common.compress;
 
 import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.config.ConfigDef;
-import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.utils.BufferSupplier;
 import org.apache.kafka.common.utils.ByteBufferInputStream;
@@ -30,14 +28,11 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.util.Objects;
-import java.util.zip.Deflater;
 import java.util.zip.GZIPInputStream;
 
-public class GzipCompression implements Compression {
+import static org.apache.kafka.common.record.CompressionType.GZIP;
 
-    public static final int MIN_LEVEL = Deflater.BEST_SPEED;
-    public static final int MAX_LEVEL = Deflater.BEST_COMPRESSION;
-    public static final int DEFAULT_LEVEL = Deflater.DEFAULT_COMPRESSION;
+public class GzipCompression implements Compression {
 
     private final int level;
 
@@ -47,7 +42,7 @@ public class GzipCompression implements Compression {
 
     @Override
     public CompressionType type() {
-        return CompressionType.GZIP;
+        return GZIP;
     }
 
     @Override
@@ -101,10 +96,10 @@ public class GzipCompression implements Compression {
     }
 
     public static class Builder implements 
Compression.Builder<GzipCompression> {
-        private int level = DEFAULT_LEVEL;
+        private int level = GZIP.defaultLevel();
 
         public Builder level(int level) {
-            if ((level < MIN_LEVEL || MAX_LEVEL < level) && level != 
DEFAULT_LEVEL) {
+            if ((level < GZIP.minLevel() || GZIP.maxLevel() < level) && level 
!= GZIP.defaultLevel()) {
                 throw new IllegalArgumentException("gzip doesn't support given 
compression level: " + level);
             }
 
@@ -117,22 +112,4 @@ public class GzipCompression implements Compression {
             return new GzipCompression(level);
         }
     }
-
-    public static class LevelValidator implements ConfigDef.Validator {
-
-        @Override
-        public void ensureValid(String name, Object o) {
-            if (o == null)
-                throw new ConfigException(name, null, "Value must be 
non-null");
-            int level = ((Number) o).intValue();
-            if (level > MAX_LEVEL || (level < MIN_LEVEL && level != 
DEFAULT_LEVEL)) {
-                throw new ConfigException(name, o, "Value must be between " + 
MIN_LEVEL + " and " + MAX_LEVEL + " or equal to " + DEFAULT_LEVEL);
-            }
-        }
-
-        @Override
-        public String toString() {
-            return "[" + MIN_LEVEL + ",...," + MAX_LEVEL + "] or " + 
DEFAULT_LEVEL;
-        }
-    }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/compress/Lz4BlockOutputStream.java
 
b/clients/src/main/java/org/apache/kafka/common/compress/Lz4BlockOutputStream.java
index e55b84e286c..10a0a81b1d7 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/compress/Lz4BlockOutputStream.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/compress/Lz4BlockOutputStream.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.common.compress;
 
+import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.utils.ByteUtils;
 
 import net.jpountz.lz4.LZ4Compressor;
@@ -75,7 +76,7 @@ public final class Lz4BlockOutputStream extends OutputStream {
          *
          * For backward compatibility, Lz4BlockOutputStream uses 
fastCompressor with default compression level but, with the other level, it 
uses highCompressor.
          */
-        compressor = level == Lz4Compression.DEFAULT_LEVEL ? 
LZ4Factory.fastestInstance().fastCompressor() : 
LZ4Factory.fastestInstance().highCompressor(level);
+        compressor = level == CompressionType.LZ4.defaultLevel() ? 
LZ4Factory.fastestInstance().fastCompressor() : 
LZ4Factory.fastestInstance().highCompressor(level);
         checksum = XXHashFactory.fastestInstance().hash32();
         this.useBrokenFlagDescriptorChecksum = useBrokenFlagDescriptorChecksum;
         bd = new BD(blockSize);
diff --git 
a/clients/src/main/java/org/apache/kafka/common/compress/Lz4Compression.java 
b/clients/src/main/java/org/apache/kafka/common/compress/Lz4Compression.java
index 42c1a1a1417..f3e80447a14 100644
--- a/clients/src/main/java/org/apache/kafka/common/compress/Lz4Compression.java
+++ b/clients/src/main/java/org/apache/kafka/common/compress/Lz4Compression.java
@@ -28,13 +28,9 @@ import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.util.Objects;
 
-public class Lz4Compression implements Compression {
+import static org.apache.kafka.common.record.CompressionType.LZ4;
 
-    // These values come from net.jpountz.lz4.LZ4Constants
-    // We may need to update them if the lz4 library changes these values.
-    public static final int MIN_LEVEL = 1;
-    public static final int MAX_LEVEL = 17;
-    public static final int DEFAULT_LEVEL = 9;
+public class Lz4Compression implements Compression {
 
     private final int level;
 
@@ -44,7 +40,7 @@ public class Lz4Compression implements Compression {
 
     @Override
     public CompressionType type() {
-        return CompressionType.LZ4;
+        return LZ4;
     }
 
     @Override
@@ -89,10 +85,10 @@ public class Lz4Compression implements Compression {
     }
 
     public static class Builder implements Compression.Builder<Lz4Compression> 
{
-        private int level = DEFAULT_LEVEL;
+        private int level = LZ4.defaultLevel();
 
         public Builder level(int level) {
-            if (level < MIN_LEVEL || MAX_LEVEL < level) {
+            if (level < LZ4.minLevel() || LZ4.maxLevel() < level) {
                 throw new IllegalArgumentException("lz4 doesn't support given 
compression level: " + level);
             }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/common/compress/ZstdCompression.java 
b/clients/src/main/java/org/apache/kafka/common/compress/ZstdCompression.java
index 4ddf23e78de..728797e6a82 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/compress/ZstdCompression.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/compress/ZstdCompression.java
@@ -26,7 +26,6 @@ import org.apache.kafka.common.utils.ChunkedBytesStream;
 
 import com.github.luben.zstd.BufferPool;
 import com.github.luben.zstd.RecyclingBufferPool;
-import com.github.luben.zstd.Zstd;
 import com.github.luben.zstd.ZstdInputStreamNoFinalizer;
 import com.github.luben.zstd.ZstdOutputStreamNoFinalizer;
 
@@ -37,11 +36,9 @@ import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.util.Objects;
 
-public class ZstdCompression implements Compression {
+import static org.apache.kafka.common.record.CompressionType.ZSTD;
 
-    public static final int MIN_LEVEL = Zstd.minCompressionLevel();
-    public static final int MAX_LEVEL = Zstd.maxCompressionLevel();
-    public static final int DEFAULT_LEVEL = Zstd.defaultCompressionLevel();
+public class ZstdCompression implements Compression {
 
     private final int level;
 
@@ -51,7 +48,7 @@ public class ZstdCompression implements Compression {
 
     @Override
     public CompressionType type() {
-        return CompressionType.ZSTD;
+        return ZSTD;
     }
 
     @Override
@@ -125,10 +122,10 @@ public class ZstdCompression implements Compression {
     }
 
     public static class Builder implements 
Compression.Builder<ZstdCompression> {
-        private int level = DEFAULT_LEVEL;
+        private int level = ZSTD.defaultLevel();
 
         public Builder level(int level) {
-            if (MAX_LEVEL < level || level < MIN_LEVEL) {
+            if (level < ZSTD.minLevel() || ZSTD.maxLevel() < level) {
                 throw new IllegalArgumentException("zstd doesn't support given 
compression level: " + level);
             }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java 
b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
index 12efafc8b55..05b8c45358b 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
@@ -16,6 +16,13 @@
  */
 package org.apache.kafka.common.record;
 
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+
+import java.util.zip.Deflater;
+
+import static org.apache.kafka.common.config.ConfigDef.Range.between;
+
 /**
  * The compression type to use
  */
@@ -23,7 +30,46 @@ public enum CompressionType {
     NONE((byte) 0, "none", 1.0f),
 
     // Shipped with the JDK
-    GZIP((byte) 1, "gzip", 1.0f),
+    GZIP((byte) 1, "gzip", 1.0f) {
+        public static final int MIN_LEVEL = Deflater.BEST_SPEED;
+        public static final int MAX_LEVEL = Deflater.BEST_COMPRESSION;
+        public static final int DEFAULT_LEVEL = Deflater.DEFAULT_COMPRESSION;
+
+        @Override
+        public int defaultLevel() {
+            return DEFAULT_LEVEL;
+        }
+
+        @Override
+        public int maxLevel() {
+            return MAX_LEVEL;
+        }
+
+        @Override
+        public int minLevel() {
+            return MIN_LEVEL;
+        }
+
+        @Override
+        public ConfigDef.Validator levelValidator() {
+            return new ConfigDef.Validator() {
+                @Override
+                public void ensureValid(String name, Object o) {
+                    if (o == null)
+                        throw new ConfigException(name, null, "Value must be 
non-null");
+                    int level = ((Number) o).intValue();
+                    if (level > MAX_LEVEL || (level < MIN_LEVEL && level != 
DEFAULT_LEVEL)) {
+                        throw new ConfigException(name, o, "Value must be 
between " + MIN_LEVEL + " and " + MAX_LEVEL + " or equal to " + DEFAULT_LEVEL);
+                    }
+                }
+
+                @Override
+                public String toString() {
+                    return "[" + MIN_LEVEL + ",...," + MAX_LEVEL + "] or " + 
DEFAULT_LEVEL;
+                }
+            };
+        }
+    },
 
     // We should only load classes from a given compression library when we 
actually use said compression library. This
     // is because compression libraries include native code for a set of 
platforms and we want to avoid errors
@@ -31,8 +77,65 @@ public enum CompressionType {
     // To ensure this, we only reference compression library code from classes 
that are only invoked when actual usage
     // happens.
     SNAPPY((byte) 2, "snappy", 1.0f),
-    LZ4((byte) 3, "lz4", 1.0f),
-    ZSTD((byte) 4, "zstd", 1.0f);
+    LZ4((byte) 3, "lz4", 1.0f) {
+        // These values come from net.jpountz.lz4.LZ4Constants
+        // We may need to update them if the lz4 library changes these values.
+        private static final int MIN_LEVEL = 1;
+        private static final int MAX_LEVEL = 17;
+        private static final int DEFAULT_LEVEL = 9;
+
+        @Override
+        public int defaultLevel() {
+            return DEFAULT_LEVEL;
+        }
+
+        @Override
+        public int maxLevel() {
+            return MAX_LEVEL;
+        }
+
+        @Override
+        public int minLevel() {
+            return MIN_LEVEL;
+        }
+
+        @Override
+        public ConfigDef.Validator levelValidator() {
+            return between(MIN_LEVEL, MAX_LEVEL);
+        }
+    },
+    ZSTD((byte) 4, "zstd", 1.0f) {
+        // These values come from the zstd library. We don't use the 
Zstd.minCompressionLevel(),
+        // Zstd.maxCompressionLevel() and Zstd.defaultCompressionLevel() 
methods to not load the Zstd library
+        // while parsing configuration.
+        // See ZSTD_minCLevel in 
https://github.com/facebook/zstd/blob/dev/lib/compress/zstd_compress.c#L6987
+        // and ZSTD_TARGETLENGTH_MAX 
https://github.com/facebook/zstd/blob/dev/lib/zstd.h#L1249
+        private static final int MIN_LEVEL = -131072;
+        // See ZSTD_MAX_CLEVEL in 
https://github.com/facebook/zstd/blob/dev/lib/compress/clevels.h#L19
+        private static final int MAX_LEVEL = 22;
+        // See ZSTD_CLEVEL_DEFAULT in 
https://github.com/facebook/zstd/blob/dev/lib/zstd.h#L129
+        private static final int DEFAULT_LEVEL = 3;
+
+        @Override
+        public int defaultLevel() {
+            return DEFAULT_LEVEL;
+        }
+
+        @Override
+        public int maxLevel() {
+            return MAX_LEVEL;
+        }
+
+        @Override
+        public int minLevel() {
+            return MIN_LEVEL;
+        }
+
+        @Override
+        public ConfigDef.Validator levelValidator() {
+            return between(MIN_LEVEL, MAX_LEVEL);
+        }
+    };
 
     // compression type is represented by two bits in the attributes field of 
the record batch header, so `byte` is
     // large enough
@@ -78,6 +181,22 @@ public enum CompressionType {
             throw new IllegalArgumentException("Unknown compression name: " + 
name);
     }
 
+    public int defaultLevel() {
+        throw new UnsupportedOperationException("Compression levels are not 
defined for this compression type: " + name);
+    }
+
+    public int maxLevel() {
+        throw new UnsupportedOperationException("Compression levels are not 
defined for this compression type: " + name);
+    }
+
+    public int minLevel() {
+        throw new UnsupportedOperationException("Compression levels are not 
defined for this compression type: " + name);
+    }
+
+    public ConfigDef.Validator levelValidator() {
+        throw new UnsupportedOperationException("Compression levels are not 
defined for this compression type: " + name);
+    }
+
     @Override
     public String toString() {
         return name;
diff --git 
a/clients/src/test/java/org/apache/kafka/common/compress/GzipCompressionTest.java
 
b/clients/src/test/java/org/apache/kafka/common/compress/GzipCompressionTest.java
index 2837d96a5d0..660ba02a877 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/compress/GzipCompressionTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/compress/GzipCompressionTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.common.compress;
 
+import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.utils.BufferSupplier;
@@ -30,6 +31,7 @@ import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.Collections;
 
+import static org.apache.kafka.common.record.CompressionType.GZIP;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -42,7 +44,7 @@ public class GzipCompressionTest {
         byte[] data = String.join("", Collections.nCopies(256, 
"data")).getBytes(StandardCharsets.UTF_8);
 
         for (byte magic : Arrays.asList(RecordBatch.MAGIC_VALUE_V0, 
RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2)) {
-            for (int level : Arrays.asList(GzipCompression.MIN_LEVEL, 
GzipCompression.DEFAULT_LEVEL, GzipCompression.MAX_LEVEL)) {
+            for (int level : Arrays.asList(GZIP.minLevel(), 
GZIP.defaultLevel(), GZIP.maxLevel())) {
                 GzipCompression compression = builder.level(level).build();
                 ByteBufferOutputStream bufferStream = new 
ByteBufferOutputStream(4);
                 try (OutputStream out = 
compression.wrapForOutput(bufferStream, magic)) {
@@ -65,21 +67,21 @@ public class GzipCompressionTest {
     public void testCompressionLevels() {
         GzipCompression.Builder builder = Compression.gzip();
 
-        assertThrows(IllegalArgumentException.class, () -> 
builder.level(GzipCompression.MIN_LEVEL - 1));
-        assertThrows(IllegalArgumentException.class, () -> 
builder.level(GzipCompression.MAX_LEVEL + 1));
+        assertThrows(IllegalArgumentException.class, () -> 
builder.level(GZIP.minLevel() - 1));
+        assertThrows(IllegalArgumentException.class, () -> 
builder.level(GZIP.maxLevel() + 1));
 
-        builder.level(GzipCompression.MIN_LEVEL);
-        builder.level(GzipCompression.MAX_LEVEL);
+        builder.level(GZIP.minLevel());
+        builder.level(GZIP.maxLevel());
     }
 
     @Test
     public void testLevelValidator() {
-        GzipCompression.LevelValidator validator = new 
GzipCompression.LevelValidator();
-        for (int level = GzipCompression.MIN_LEVEL; level <= 
GzipCompression.MAX_LEVEL; level++) {
+        ConfigDef.Validator validator = GZIP.levelValidator();
+        for (int level = GZIP.minLevel(); level <= GZIP.maxLevel(); level++) {
             validator.ensureValid("", level);
         }
-        validator.ensureValid("", GzipCompression.DEFAULT_LEVEL);
-        assertThrows(ConfigException.class, () -> validator.ensureValid("", 
GzipCompression.MIN_LEVEL - 1));
-        assertThrows(ConfigException.class, () -> validator.ensureValid("", 
GzipCompression.MAX_LEVEL + 1));
+        validator.ensureValid("", GZIP.defaultLevel());
+        assertThrows(ConfigException.class, () -> validator.ensureValid("", 
GZIP.minLevel() - 1));
+        assertThrows(ConfigException.class, () -> validator.ensureValid("", 
GZIP.maxLevel() + 1));
     }
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/common/compress/Lz4CompressionTest.java
 
b/clients/src/test/java/org/apache/kafka/common/compress/Lz4CompressionTest.java
index a4f1152830d..e0a1a7aab94 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/compress/Lz4CompressionTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/compress/Lz4CompressionTest.java
@@ -45,6 +45,7 @@ import java.util.Random;
 import java.util.stream.Stream;
 
 import static 
org.apache.kafka.common.compress.Lz4BlockOutputStream.LZ4_FRAME_INCOMPRESSIBLE_MASK;
+import static org.apache.kafka.common.record.CompressionType.LZ4;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -90,7 +91,7 @@ public class Lz4CompressionTest {
         byte[] data = String.join("", Collections.nCopies(256, 
"data")).getBytes(StandardCharsets.UTF_8);
 
         for (byte magic : Arrays.asList(RecordBatch.MAGIC_VALUE_V0, 
RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2)) {
-            for (int level : Arrays.asList(Lz4Compression.MIN_LEVEL, 
Lz4Compression.DEFAULT_LEVEL, Lz4Compression.MAX_LEVEL)) {
+            for (int level : Arrays.asList(LZ4.minLevel(), LZ4.defaultLevel(), 
LZ4.maxLevel())) {
                 Lz4Compression compression = builder.level(level).build();
                 ByteBufferOutputStream bufferStream = new 
ByteBufferOutputStream(4);
                 try (OutputStream out = 
compression.wrapForOutput(bufferStream, magic)) {
@@ -113,11 +114,11 @@ public class Lz4CompressionTest {
     public void testCompressionLevels() {
         Lz4Compression.Builder builder = Compression.lz4();
 
-        assertThrows(IllegalArgumentException.class, () -> 
builder.level(Lz4Compression.MIN_LEVEL - 1));
-        assertThrows(IllegalArgumentException.class, () -> 
builder.level(Lz4Compression.MAX_LEVEL + 1));
+        assertThrows(IllegalArgumentException.class, () -> 
builder.level(LZ4.minLevel() - 1));
+        assertThrows(IllegalArgumentException.class, () -> 
builder.level(LZ4.maxLevel() + 1));
 
-        builder.level(Lz4Compression.MIN_LEVEL);
-        builder.level(Lz4Compression.MAX_LEVEL);
+        builder.level(LZ4.minLevel());
+        builder.level(LZ4.maxLevel());
     }
 
     private static class Payload {
@@ -192,7 +193,7 @@ public class Lz4CompressionTest {
                     for (boolean ignore : Arrays.asList(false, true))
                         for (boolean blockChecksum : Arrays.asList(false, 
true))
                             for (boolean close : Arrays.asList(false, true))
-                                for (int level : 
Arrays.asList(Lz4Compression.MIN_LEVEL, Lz4Compression.DEFAULT_LEVEL, 
Lz4Compression.MAX_LEVEL))
+                                for (int level : Arrays.asList(LZ4.minLevel(), 
LZ4.defaultLevel(), LZ4.maxLevel()))
                                     arguments.add(Arguments.of(new 
Args(broken, ignore, level, blockChecksum, close, payload)));
 
             return arguments.stream();
diff --git 
a/clients/src/test/java/org/apache/kafka/common/compress/ZstdCompressionTest.java
 
b/clients/src/test/java/org/apache/kafka/common/compress/ZstdCompressionTest.java
index 176291b8c56..30ace681b08 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/compress/ZstdCompressionTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/compress/ZstdCompressionTest.java
@@ -29,6 +29,7 @@ import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.Collections;
 
+import static org.apache.kafka.common.record.CompressionType.ZSTD;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -41,7 +42,7 @@ public class ZstdCompressionTest {
         byte[] data = String.join("", Collections.nCopies(256, 
"data")).getBytes(StandardCharsets.UTF_8);
 
         for (byte magic : Arrays.asList(RecordBatch.MAGIC_VALUE_V0, 
RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2)) {
-            for (int level : Arrays.asList(ZstdCompression.MIN_LEVEL, 
ZstdCompression.DEFAULT_LEVEL, ZstdCompression.MAX_LEVEL)) {
+            for (int level : Arrays.asList(ZSTD.minLevel(), 
ZSTD.defaultLevel(), ZSTD.maxLevel())) {
                 ZstdCompression compression = builder.level(level).build();
                 ByteBufferOutputStream bufferStream = new 
ByteBufferOutputStream(4);
                 try (OutputStream out = 
compression.wrapForOutput(bufferStream, magic)) {
@@ -64,10 +65,10 @@ public class ZstdCompressionTest {
     public void testCompressionLevels() {
         ZstdCompression.Builder builder = Compression.zstd();
 
-        assertThrows(IllegalArgumentException.class, () -> 
builder.level(ZstdCompression.MIN_LEVEL - 1));
-        assertThrows(IllegalArgumentException.class, () -> 
builder.level(ZstdCompression.MAX_LEVEL + 1));
+        assertThrows(IllegalArgumentException.class, () -> 
builder.level(ZSTD.minLevel() - 1));
+        assertThrows(IllegalArgumentException.class, () -> 
builder.level(ZSTD.maxLevel() + 1));
 
-        builder.level(ZstdCompression.MIN_LEVEL);
-        builder.level(ZstdCompression.MAX_LEVEL);
+        builder.level(ZSTD.minLevel());
+        builder.level(ZSTD.maxLevel());
     }
 }
diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala 
b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
index 61f2f2feb1e..dceee266f6d 100644
--- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
@@ -20,7 +20,7 @@ import java.nio.ByteBuffer
 import java.util.concurrent.TimeUnit
 import kafka.server.{BrokerTopicStats, RequestLocal}
 import kafka.utils.TestUtils.meterCount
-import org.apache.kafka.common.compress.{Compression, GzipCompression, 
Lz4Compression}
+import org.apache.kafka.common.compress.Compression
 import org.apache.kafka.common.errors.{CorruptRecordException, 
InvalidTimestampException, UnsupportedCompressionTypeException, 
UnsupportedForMessageFormatException}
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.utils.{PrimitiveRef, Time}
@@ -1619,11 +1619,11 @@ class LogValidatorTest {
       List.fill(256)("data").mkString("").getBytes
     )
     // Records from the producer were created with gzip max level
-    val gzipMax: Compression = 
Compression.gzip().level(GzipCompression.MAX_LEVEL).build()
+    val gzipMax: Compression = 
Compression.gzip().level(CompressionType.GZIP.maxLevel()).build()
     val recordsGzipMax = createRecords(records, RecordBatch.MAGIC_VALUE_V2, 
RecordBatch.NO_TIMESTAMP, gzipMax)
 
     // The topic is configured with gzip min level
-    val gzipMin: Compression = 
Compression.gzip().level(GzipCompression.MIN_LEVEL).build()
+    val gzipMin: Compression = 
Compression.gzip().level(CompressionType.GZIP.minLevel()).build()
     val recordsGzipMin = createRecords(records, RecordBatch.MAGIC_VALUE_V2, 
RecordBatch.NO_TIMESTAMP, gzipMin)
 
     // ensure data compressed with gzip max and min is different
@@ -1657,11 +1657,11 @@ class LogValidatorTest {
       List.fill(256)("data").mkString("").getBytes
     )
     // Records from the producer were created with gzip max level
-    val gzipMax: Compression = 
Compression.gzip().level(GzipCompression.MAX_LEVEL).build()
+    val gzipMax: Compression = 
Compression.gzip().level(CompressionType.GZIP.maxLevel()).build()
     val recordsGzipMax = createRecords(records, RecordBatch.MAGIC_VALUE_V2, 
RecordBatch.NO_TIMESTAMP, gzipMax)
 
     // The topic is configured with lz4 min level
-    val lz4Min: Compression = 
Compression.lz4().level(Lz4Compression.MIN_LEVEL).build()
+    val lz4Min: Compression = 
Compression.lz4().level(CompressionType.LZ4.minLevel()).build()
     val recordsLz4Min = createRecords(records, RecordBatch.MAGIC_VALUE_V2, 
RecordBatch.NO_TIMESTAMP, lz4Min)
 
     val validator = new LogValidator(recordsGzipMax,
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 6a8d4a6564a..20555da96f6 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -30,7 +30,6 @@ import org.apache.kafka.common.metrics.Sensor
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.record.{CompressionType, Records}
 import org.apache.kafka.common.security.auth.SecurityProtocol
-import org.apache.kafka.common.compress.{GzipCompression, Lz4Compression, 
ZstdCompression}
 import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
 import org.apache.kafka.coordinator.group.ConsumerGroupMigrationPolicy
 import org.apache.kafka.coordinator.group.Group.GroupType
@@ -769,7 +768,7 @@ class KafkaConfigTest {
   def testInvalidGzipCompressionLevel(): Unit = {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 8181)
     props.setProperty(ServerConfigs.COMPRESSION_TYPE_CONFIG, "gzip")
-    props.setProperty(ServerConfigs.COMPRESSION_GZIP_LEVEL_CONFIG, 
(GzipCompression.MAX_LEVEL + 1).toString)
+    props.setProperty(ServerConfigs.COMPRESSION_GZIP_LEVEL_CONFIG, 
(CompressionType.GZIP.maxLevel() + 1).toString)
     assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props))
   }
 
@@ -777,7 +776,7 @@ class KafkaConfigTest {
   def testInvalidLz4CompressionLevel(): Unit = {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 8181)
     props.setProperty(ServerConfigs.COMPRESSION_TYPE_CONFIG, "lz4")
-    props.setProperty(ServerConfigs.COMPRESSION_LZ4_LEVEL_CONFIG, 
(Lz4Compression.MAX_LEVEL + 1).toString)
+    props.setProperty(ServerConfigs.COMPRESSION_LZ4_LEVEL_CONFIG, 
(CompressionType.LZ4.maxLevel() + 1).toString)
     assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props))
   }
 
@@ -785,7 +784,7 @@ class KafkaConfigTest {
   def testInvalidZstdCompressionLevel(): Unit = {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 8181)
     props.setProperty(ServerConfigs.COMPRESSION_TYPE_CONFIG, "zstd")
-    props.setProperty(ServerConfigs.COMPRESSION_ZSTD_LEVEL_CONFIG, 
(ZstdCompression.MAX_LEVEL + 1).toString)
+    props.setProperty(ServerConfigs.COMPRESSION_ZSTD_LEVEL_CONFIG, 
(CompressionType.ZSTD.maxLevel() + 1).toString)
     assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props))
   }
 
@@ -999,7 +998,7 @@ class KafkaConfigTest {
 
         case ServerConfigs.COMPRESSION_GZIP_LEVEL_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
         case ServerConfigs.COMPRESSION_LZ4_LEVEL_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
-        case ServerConfigs.COMPRESSION_ZSTD_LEVEL_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number", 
ZstdCompression.MAX_LEVEL + 1)
+        case ServerConfigs.COMPRESSION_ZSTD_LEVEL_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number", 
CompressionType.ZSTD.maxLevel() + 1)
 
         //SSL Configs
         case BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG =>
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index c67167731e7..8ca49dd674b 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -136,6 +136,7 @@ versions += [
   kafka_36: "3.6.2",
   kafka_37: "3.7.1",
   kafka_38: "3.8.0",
+  // When updating lz4 make sure the compression levels in 
org.apache.kafka.common.record.CompressionType are still valid
   lz4: "1.8.0",
   mavenArtifact: "3.9.6",
   metrics: "2.2.0",
@@ -158,6 +159,7 @@ versions += [
   zinc: "1.9.2",
   zookeeper: "3.8.4",
   // When updating the zstd version, please do as well in 
docker/native/native-image-configs/resource-config.json
+  // Also make sure the compression levels in 
org.apache.kafka.common.record.CompressionType are still valid
   zstd: "1.5.6-4",
   junitPlatform: "1.10.2"
 ]
diff --git 
a/server-common/src/test/java/org/apache/kafka/server/record/BrokerCompressionTypeTest.java
 
b/server-common/src/test/java/org/apache/kafka/server/record/BrokerCompressionTypeTest.java
index ae6c3d76e3a..96f8a09d33a 100644
--- 
a/server-common/src/test/java/org/apache/kafka/server/record/BrokerCompressionTypeTest.java
+++ 
b/server-common/src/test/java/org/apache/kafka/server/record/BrokerCompressionTypeTest.java
@@ -33,13 +33,13 @@ public class BrokerCompressionTypeTest {
 
     @Test
     public void testTargetCompressionType() {
-        GzipCompression gzipWithLevel = 
Compression.gzip().level(GzipCompression.MAX_LEVEL).build();
+        GzipCompression gzipWithLevel = 
Compression.gzip().level(CompressionType.GZIP.maxLevel()).build();
         assertEquals(gzipWithLevel, 
BrokerCompressionType.targetCompression(Optional.of(gzipWithLevel), 
CompressionType.ZSTD));
         SnappyCompression snappy = Compression.snappy().build();
         assertEquals(snappy, 
BrokerCompressionType.targetCompression(Optional.of(snappy), 
CompressionType.LZ4));
-        Lz4Compression lz4WithLevel = 
Compression.lz4().level(Lz4Compression.MAX_LEVEL).build();
+        Lz4Compression lz4WithLevel = 
Compression.lz4().level(CompressionType.LZ4.maxLevel()).build();
         assertEquals(lz4WithLevel, 
BrokerCompressionType.targetCompression(Optional.of(lz4WithLevel), 
CompressionType.ZSTD));
-        ZstdCompression zstdWithLevel = 
Compression.zstd().level(ZstdCompression.MAX_LEVEL).build();
+        ZstdCompression zstdWithLevel = 
Compression.zstd().level(CompressionType.ZSTD.maxLevel()).build();
         assertEquals(zstdWithLevel, 
BrokerCompressionType.targetCompression(Optional.of(zstdWithLevel), 
CompressionType.GZIP));
 
         GzipCompression gzip = Compression.gzip().build();
diff --git 
a/server/src/main/java/org/apache/kafka/server/config/ServerConfigs.java 
b/server/src/main/java/org/apache/kafka/server/config/ServerConfigs.java
index fc39e451481..883eaf19feb 100644
--- a/server/src/main/java/org/apache/kafka/server/config/ServerConfigs.java
+++ b/server/src/main/java/org/apache/kafka/server/config/ServerConfigs.java
@@ -17,11 +17,9 @@
 package org.apache.kafka.server.config;
 
 import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.common.compress.GzipCompression;
-import org.apache.kafka.common.compress.Lz4Compression;
-import org.apache.kafka.common.compress.ZstdCompression;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.server.authorizer.Authorizer;
 import org.apache.kafka.server.record.BrokerCompressionType;
 import org.apache.kafka.storage.internals.log.LogConfig;
@@ -30,7 +28,6 @@ import static 
org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
 import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
 import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
 import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
-import static org.apache.kafka.common.config.ConfigDef.Range.between;
 import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN;
 import static org.apache.kafka.common.config.ConfigDef.Type.INT;
 import static org.apache.kafka.common.config.ConfigDef.Type.LONG;
@@ -163,9 +160,9 @@ public class ServerConfigs {
             .define(CONTROLLED_SHUTDOWN_ENABLE_CONFIG, BOOLEAN, 
CONTROLLED_SHUTDOWN_ENABLE_DEFAULT, MEDIUM, CONTROLLED_SHUTDOWN_ENABLE_DOC)
             .define(DELETE_TOPIC_ENABLE_CONFIG, BOOLEAN, 
DELETE_TOPIC_ENABLE_DEFAULT, HIGH, DELETE_TOPIC_ENABLE_DOC)
             .define(COMPRESSION_TYPE_CONFIG, STRING, 
LogConfig.DEFAULT_COMPRESSION_TYPE, 
ConfigDef.ValidString.in(BrokerCompressionType.names().toArray(new String[0])), 
HIGH, COMPRESSION_TYPE_DOC)
-            .define(COMPRESSION_GZIP_LEVEL_CONFIG, INT, 
GzipCompression.DEFAULT_LEVEL, new GzipCompression.LevelValidator(), MEDIUM, 
COMPRESSION_GZIP_LEVEL_DOC)
-            .define(COMPRESSION_LZ4_LEVEL_CONFIG, INT, 
Lz4Compression.DEFAULT_LEVEL, between(Lz4Compression.MIN_LEVEL, 
Lz4Compression.MAX_LEVEL), MEDIUM, COMPRESSION_LZ4_LEVEL_DOC)
-            .define(COMPRESSION_ZSTD_LEVEL_CONFIG, INT, 
ZstdCompression.DEFAULT_LEVEL, between(ZstdCompression.MIN_LEVEL, 
ZstdCompression.MAX_LEVEL), MEDIUM, COMPRESSION_ZSTD_LEVEL_DOC)
+            .define(COMPRESSION_GZIP_LEVEL_CONFIG, INT, 
CompressionType.GZIP.defaultLevel(), CompressionType.GZIP.levelValidator(), 
MEDIUM, COMPRESSION_GZIP_LEVEL_DOC)
+            .define(COMPRESSION_LZ4_LEVEL_CONFIG, INT, 
CompressionType.LZ4.defaultLevel(), CompressionType.LZ4.levelValidator(), 
MEDIUM, COMPRESSION_LZ4_LEVEL_DOC)
+            .define(COMPRESSION_ZSTD_LEVEL_CONFIG, INT, 
CompressionType.ZSTD.defaultLevel(), CompressionType.ZSTD.levelValidator(), 
MEDIUM, COMPRESSION_ZSTD_LEVEL_DOC)
             /** ********* Fetch Configuration **************/
             .define(MAX_INCREMENTAL_FETCH_SESSION_CACHE_SLOTS_CONFIG, INT, 
MAX_INCREMENTAL_FETCH_SESSION_CACHE_SLOTS_DEFAULT, atLeast(0), MEDIUM, 
MAX_INCREMENTAL_FETCH_SESSION_CACHE_SLOTS_DOC)
             .define(FETCH_MAX_BYTES_CONFIG, INT, FETCH_MAX_BYTES_DEFAULT, 
atLeast(1024), MEDIUM, FETCH_MAX_BYTES_DOC)
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
index 7f8fe4a9826..8d6df1ee93e 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
@@ -17,9 +17,6 @@
 package org.apache.kafka.storage.internals.log;
 
 import org.apache.kafka.common.compress.Compression;
-import org.apache.kafka.common.compress.GzipCompression;
-import org.apache.kafka.common.compress.Lz4Compression;
-import org.apache.kafka.common.compress.ZstdCompression;
 import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.ConfigKey;
@@ -28,6 +25,7 @@ import org.apache.kafka.common.config.ConfigDef.ValidList;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.config.TopicConfig;
 import org.apache.kafka.common.errors.InvalidConfigurationException;
+import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.record.LegacyRecord;
 import org.apache.kafka.common.record.RecordVersion;
 import org.apache.kafka.common.record.Records;
@@ -300,12 +298,12 @@ public class LogConfig extends AbstractConfig {
                         TopicConfig.MIN_IN_SYNC_REPLICAS_DOC)
                 .define(TopicConfig.COMPRESSION_TYPE_CONFIG, STRING, 
DEFAULT_COMPRESSION_TYPE, in(BrokerCompressionType.names().toArray(new 
String[0])),
                         MEDIUM, TopicConfig.COMPRESSION_TYPE_DOC)
-                .define(TopicConfig.COMPRESSION_GZIP_LEVEL_CONFIG, INT, 
GzipCompression.DEFAULT_LEVEL,
-                        new GzipCompression.LevelValidator(), MEDIUM, 
TopicConfig.COMPRESSION_GZIP_LEVEL_DOC)
-                .define(TopicConfig.COMPRESSION_LZ4_LEVEL_CONFIG, INT, 
Lz4Compression.DEFAULT_LEVEL,
-                        between(Lz4Compression.MIN_LEVEL, 
Lz4Compression.MAX_LEVEL), MEDIUM, TopicConfig.COMPRESSION_LZ4_LEVEL_DOC)
-                .define(TopicConfig.COMPRESSION_ZSTD_LEVEL_CONFIG, INT, 
ZstdCompression.DEFAULT_LEVEL,
-                        between(ZstdCompression.MIN_LEVEL, 
ZstdCompression.MAX_LEVEL), MEDIUM, TopicConfig.COMPRESSION_ZSTD_LEVEL_DOC)
+                .define(TopicConfig.COMPRESSION_GZIP_LEVEL_CONFIG, INT, 
CompressionType.GZIP.defaultLevel(),
+                        CompressionType.GZIP.levelValidator(), MEDIUM, 
TopicConfig.COMPRESSION_GZIP_LEVEL_DOC)
+                .define(TopicConfig.COMPRESSION_LZ4_LEVEL_CONFIG, INT, 
CompressionType.LZ4.defaultLevel(),
+                        CompressionType.LZ4.levelValidator(), MEDIUM, 
TopicConfig.COMPRESSION_LZ4_LEVEL_DOC)
+                .define(TopicConfig.COMPRESSION_ZSTD_LEVEL_CONFIG, INT, 
CompressionType.ZSTD.defaultLevel(),
+                        CompressionType.ZSTD.levelValidator(), MEDIUM, 
TopicConfig.COMPRESSION_ZSTD_LEVEL_DOC)
                 .define(TopicConfig.PREALLOCATE_CONFIG, BOOLEAN, 
DEFAULT_PREALLOCATE, MEDIUM, TopicConfig.PREALLOCATE_DOC)
                 .define(MESSAGE_FORMAT_VERSION_CONFIG, STRING, 
DEFAULT_MESSAGE_FORMAT_VERSION, new MetadataVersionValidator(), MEDIUM,
                         MESSAGE_FORMAT_VERSION_DOC)


Reply via email to