This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 1df232c839 MINOR: Supplement the description of `Valid Values` in the documentation of `compression.type` (#11985) 1df232c839 is described below commit 1df232c839f4568718a52c04aad72b69beb52026 Author: RivenSun <91005273+rivens...@users.noreply.github.com> AuthorDate: Wed Apr 13 12:24:57 2022 +0800 MINOR: Supplement the description of `Valid Values` in the documentation of `compression.type` (#11985) Because a validator is added to ProducerConfig.COMPRESSION_TYPE_CONFIG and KafkaConfig.CompressionTypeProp, the corresponding testCase is improved to verify whether the wrong value of compression.type will throw a ConfigException. Reviewers: Mickael Maison <mickael.mai...@gmail.com>, Guozhang Wang <wangg...@gmail.com> --- .../org/apache/kafka/clients/producer/ProducerConfig.java | 4 +++- .../java/org/apache/kafka/common/record/CompressionType.java | 6 ++++++ .../org/apache/kafka/clients/producer/ProducerConfigTest.java | 11 +++++++++++ core/src/main/scala/kafka/server/KafkaConfig.scala | 6 +++--- core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala | 2 +- 5 files changed, 24 insertions(+), 5 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index afc1e55cdf..8fec07a297 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -26,7 +26,9 @@ import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.SecurityConfig; import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -329,7 +331,7 @@ public class ProducerConfig extends AbstractConfig { in("all", "-1", "0", "1"), Importance.LOW, ACKS_DOC) - .define(COMPRESSION_TYPE_CONFIG, Type.STRING, "none", Importance.HIGH, COMPRESSION_TYPE_DOC) + .define(COMPRESSION_TYPE_CONFIG, Type.STRING, CompressionType.NONE.name, in(Utils.enumOptions(CompressionType.class)), Importance.HIGH, COMPRESSION_TYPE_DOC) .define(BATCH_SIZE_CONFIG, Type.INT, 16384, atLeast(0), Importance.MEDIUM, BATCH_SIZE_DOC) .define(LINGER_MS_CONFIG, Type.LONG, 0, atLeast(0), Importance.MEDIUM, LINGER_MS_DOC) .define(DELIVERY_TIMEOUT_MS_CONFIG, Type.INT, 120 * 1000, atLeast(0), Importance.MEDIUM, DELIVERY_TIMEOUT_MS_DOC) diff --git a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java index 1b9754ffab..c526929b72 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java +++ b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java @@ -190,4 +190,10 @@ public enum CompressionType { else throw new IllegalArgumentException("Unknown compression name: " + name); } + + @Override + public String toString() { + return name; + } + } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java index a2f318bebc..ae9de7b70a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java @@ -19,12 +19,14 @@ package org.apache.kafka.clients.producer; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.config.ConfigException; import org.junit.jupiter.api.Test; import java.util.HashMap; import java.util.Map; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; public class ProducerConfigTest { @@ -59,4 +61,13 @@ public class ProducerConfigTest { assertEquals(newConfigs.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG), keySerializerClass); assertEquals(newConfigs.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG), valueSerializerClass); } + + @Test + public void testInvalidCompressionType() { + Map<String, Object> configs = new HashMap<>(); + configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializerClass); + configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializerClass); + configs.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "abc"); + assertThrows(ConfigException.class, () -> new ProducerConfig(configs)); + } } diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 7dd9276d6d..6fe0acaa9d 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -25,7 +25,7 @@ import kafka.coordinator.group.OffsetConfig import kafka.coordinator.transaction.{TransactionLog, TransactionStateManager} import kafka.log.LogConfig import kafka.log.LogConfig.MessageFormatVersion -import kafka.message.{BrokerCompressionCodec, CompressionCodec, ZStdCompressionCodec} +import kafka.message.{BrokerCompressionCodec, CompressionCodec, ProducerCompressionCodec, ZStdCompressionCodec} import kafka.security.authorizer.AuthorizerUtils import kafka.server.KafkaConfig.{ControllerListenerNamesProp, ListenerSecurityProtocolMapProp} import kafka.server.KafkaRaftServer.{BrokerRole, ControllerRole, ProcessRole} @@ -227,7 +227,7 @@ object Defaults { val DeleteTopicEnable = true - val CompressionType = "producer" + val CompressionType = ProducerCompressionCodec.name val MaxIdMapSnapshots = 2 /** ********* Kafka Metrics Configuration ***********/ @@ -1257,7 +1257,7 @@ object KafkaConfig { .define(OffsetCommitTimeoutMsProp, INT, Defaults.OffsetCommitTimeoutMs, atLeast(1), HIGH, OffsetCommitTimeoutMsDoc) .define(OffsetCommitRequiredAcksProp, SHORT, Defaults.OffsetCommitRequiredAcks, HIGH, OffsetCommitRequiredAcksDoc) .define(DeleteTopicEnableProp, BOOLEAN, Defaults.DeleteTopicEnable, HIGH, DeleteTopicEnableDoc) - .define(CompressionTypeProp, STRING, Defaults.CompressionType, HIGH, CompressionTypeDoc) + .define(CompressionTypeProp, STRING, Defaults.CompressionType, in(BrokerCompressionCodec.brokerCompressionOptions:_*), HIGH, CompressionTypeDoc) /** ********* Transaction management configuration ***********/ .define(TransactionalIdExpirationMsProp, INT, Defaults.TransactionalIdExpirationMs, atLeast(1), HIGH, TransactionalIdExpirationMsDoc) diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index a6597d8815..ed31dba41a 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -657,7 +657,7 @@ class KafkaConfigTest { def testInvalidCompressionType(): Unit = { val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181) props.put(KafkaConfig.CompressionTypeProp, "abc") - assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props)) + assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props)) } @Test