This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1df232c839 MINOR: Supplement the description of `Valid Values` in the 
documentation of `compression.type` (#11985)
1df232c839 is described below

commit 1df232c839f4568718a52c04aad72b69beb52026
Author: RivenSun <91005273+rivens...@users.noreply.github.com>
AuthorDate: Wed Apr 13 12:24:57 2022 +0800

    MINOR: Supplement the description of `Valid Values` in the documentation of 
`compression.type` (#11985)
    
    Because a validator is added to ProducerConfig.COMPRESSION_TYPE_CONFIG and 
KafkaConfig.CompressionTypeProp, the corresponding testCase is improved to 
verify whether the wrong value of compression.type will throw a ConfigException.
    
    Reviewers: Mickael Maison <mickael.mai...@gmail.com>, Guozhang Wang 
<wangg...@gmail.com>
---
 .../org/apache/kafka/clients/producer/ProducerConfig.java     |  4 +++-
 .../java/org/apache/kafka/common/record/CompressionType.java  |  6 ++++++
 .../org/apache/kafka/clients/producer/ProducerConfigTest.java | 11 +++++++++++
 core/src/main/scala/kafka/server/KafkaConfig.scala            |  6 +++---
 core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala   |  2 +-
 5 files changed, 24 insertions(+), 5 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index afc1e55cdf..8fec07a297 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -26,7 +26,9 @@ import org.apache.kafka.common.config.ConfigDef.Type;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.config.SecurityConfig;
 import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -329,7 +331,7 @@ public class ProducerConfig extends AbstractConfig {
                                         in("all", "-1", "0", "1"),
                                         Importance.LOW,
                                         ACKS_DOC)
-                                .define(COMPRESSION_TYPE_CONFIG, Type.STRING, 
"none", Importance.HIGH, COMPRESSION_TYPE_DOC)
+                                .define(COMPRESSION_TYPE_CONFIG, Type.STRING, 
CompressionType.NONE.name, in(Utils.enumOptions(CompressionType.class)), 
Importance.HIGH, COMPRESSION_TYPE_DOC)
                                 .define(BATCH_SIZE_CONFIG, Type.INT, 16384, 
atLeast(0), Importance.MEDIUM, BATCH_SIZE_DOC)
                                 .define(LINGER_MS_CONFIG, Type.LONG, 0, 
atLeast(0), Importance.MEDIUM, LINGER_MS_DOC)
                                 .define(DELIVERY_TIMEOUT_MS_CONFIG, Type.INT, 
120 * 1000, atLeast(0), Importance.MEDIUM, DELIVERY_TIMEOUT_MS_DOC)
diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java 
b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
index 1b9754ffab..c526929b72 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
@@ -190,4 +190,10 @@ public enum CompressionType {
         else
             throw new IllegalArgumentException("Unknown compression name: " + 
name);
     }
+
+    @Override
+    public String toString() {
+        return name;
+    }
+
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java
index a2f318bebc..ae9de7b70a 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java
@@ -19,12 +19,14 @@ package org.apache.kafka.clients.producer;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.config.ConfigException;
 import org.junit.jupiter.api.Test;
 
 import java.util.HashMap;
 import java.util.Map;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 
 public class ProducerConfigTest {
 
@@ -59,4 +61,13 @@ public class ProducerConfigTest {
         
assertEquals(newConfigs.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG), 
keySerializerClass);
         
assertEquals(newConfigs.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG), 
valueSerializerClass);
     }
+
+    @Test
+    public void testInvalidCompressionType() {
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
keySerializerClass);
+        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
valueSerializerClass);
+        configs.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "abc");
+        assertThrows(ConfigException.class, () -> new ProducerConfig(configs));
+    }
 }
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 7dd9276d6d..6fe0acaa9d 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -25,7 +25,7 @@ import kafka.coordinator.group.OffsetConfig
 import kafka.coordinator.transaction.{TransactionLog, TransactionStateManager}
 import kafka.log.LogConfig
 import kafka.log.LogConfig.MessageFormatVersion
-import kafka.message.{BrokerCompressionCodec, CompressionCodec, 
ZStdCompressionCodec}
+import kafka.message.{BrokerCompressionCodec, CompressionCodec, 
ProducerCompressionCodec, ZStdCompressionCodec}
 import kafka.security.authorizer.AuthorizerUtils
 import kafka.server.KafkaConfig.{ControllerListenerNamesProp, 
ListenerSecurityProtocolMapProp}
 import kafka.server.KafkaRaftServer.{BrokerRole, ControllerRole, ProcessRole}
@@ -227,7 +227,7 @@ object Defaults {
 
   val DeleteTopicEnable = true
 
-  val CompressionType = "producer"
+  val CompressionType = ProducerCompressionCodec.name
 
   val MaxIdMapSnapshots = 2
   /** ********* Kafka Metrics Configuration ***********/
@@ -1257,7 +1257,7 @@ object KafkaConfig {
       .define(OffsetCommitTimeoutMsProp, INT, Defaults.OffsetCommitTimeoutMs, 
atLeast(1), HIGH, OffsetCommitTimeoutMsDoc)
       .define(OffsetCommitRequiredAcksProp, SHORT, 
Defaults.OffsetCommitRequiredAcks, HIGH, OffsetCommitRequiredAcksDoc)
       .define(DeleteTopicEnableProp, BOOLEAN, Defaults.DeleteTopicEnable, 
HIGH, DeleteTopicEnableDoc)
-      .define(CompressionTypeProp, STRING, Defaults.CompressionType, HIGH, 
CompressionTypeDoc)
+      .define(CompressionTypeProp, STRING, Defaults.CompressionType, 
in(BrokerCompressionCodec.brokerCompressionOptions:_*), HIGH, 
CompressionTypeDoc)
 
       /** ********* Transaction management configuration ***********/
       .define(TransactionalIdExpirationMsProp, INT, 
Defaults.TransactionalIdExpirationMs, atLeast(1), HIGH, 
TransactionalIdExpirationMsDoc)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index a6597d8815..ed31dba41a 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -657,7 +657,7 @@ class KafkaConfigTest {
   def testInvalidCompressionType(): Unit = {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 8181)
     props.put(KafkaConfig.CompressionTypeProp, "abc")
-    assertThrows(classOf[IllegalArgumentException], () => 
KafkaConfig.fromProps(props))
+    assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props))
   }
 
   @Test

Reply via email to