This is an automated email from the ASF dual-hosted git repository.
divijv pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.3 by this push:
new 041afb73ecb KAFKA-15053: Use case insensitive validator for
security.protocol config (#13831)
041afb73ecb is described below
commit 041afb73ecb081592cd2f9dcf7964545df7c606b
Author: Bo Gao <[email protected]>
AuthorDate: Thu Jun 29 01:13:21 2023 -0700
KAFKA-15053: Use case insensitive validator for security.protocol config
(#13831)
Fixed a regression described in KAFKA-15053 that security.protocol only
allows uppercase values like PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL. With
this fix, both lower case and upper case values will be supported (e.g.
PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL, plaintext, ssl, sasl_plaintext,
sasl_ssl)
Reviewers: Chris Egerton <[email protected]>, Divij Vaidya <[email protected]>
---
.../org/apache/kafka/clients/admin/AdminClientConfig.java | 3 ++-
.../org/apache/kafka/clients/consumer/ConsumerConfig.java | 3 ++-
.../org/apache/kafka/clients/producer/ProducerConfig.java | 3 ++-
.../apache/kafka/clients/consumer/ConsumerConfigTest.java | 13 +++++++++++++
.../apache/kafka/clients/producer/ProducerConfigTest.java | 13 +++++++++++++
.../org/apache/kafka/connect/mirror/MirrorClientConfig.java | 2 +-
.../apache/kafka/connect/mirror/MirrorConnectorConfig.java | 2 +-
.../org/apache/kafka/connect/mirror/MirrorMakerConfig.java | 2 +-
.../kafka/connect/mirror/MirrorConnectorConfigTest.java | 9 +++++++++
.../apache/kafka/connect/mirror/MirrorMakerConfigTest.java | 10 ++++++++++
.../connect/runtime/distributed/DistributedConfig.java | 4 ++--
.../connect/runtime/distributed/DistributedConfigTest.java | 12 ++++++++++++
.../main/scala/kafka/admin/BrokerApiVersionsCommand.scala | 2 +-
docs/security.html | 2 +-
.../main/java/org/apache/kafka/streams/StreamsConfig.java | 2 +-
.../java/org/apache/kafka/streams/StreamsConfigTest.java | 9 +++++++++
16 files changed, 80 insertions(+), 11 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
index 37af3864103..9dfc32398ab 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
@@ -214,7 +214,8 @@ public class AdminClientConfig extends AbstractConfig {
.define(SECURITY_PROTOCOL_CONFIG,
Type.STRING,
DEFAULT_SECURITY_PROTOCOL,
-
in(Utils.enumOptions(SecurityProtocol.class)),
+ ConfigDef.CaseInsensitiveValidString
+
.in(Utils.enumOptions(SecurityProtocol.class)),
Importance.MEDIUM,
SECURITY_PROTOCOL_DOC)
.withClientSslSupport()
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 5a217705290..b1dd2d16c32 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -575,7 +575,8 @@ public class ConsumerConfig extends AbstractConfig {
.define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
Type.STRING,
CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
-
in(Utils.enumOptions(SecurityProtocol.class)),
+ ConfigDef.CaseInsensitiveValidString
+
.in(Utils.enumOptions(SecurityProtocol.class)),
Importance.MEDIUM,
CommonClientConfigs.SECURITY_PROTOCOL_DOC)
.withClientSslSupport()
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index a3ae15a3baf..83f4fec99df 100644
---
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -449,7 +449,8 @@ public class ProducerConfig extends AbstractConfig {
.define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
Type.STRING,
CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
-
in(Utils.enumOptions(SecurityProtocol.class)),
+ ConfigDef.CaseInsensitiveValidString
+
.in(Utils.enumOptions(SecurityProtocol.class)),
Importance.MEDIUM,
CommonClientConfigs.SECURITY_PROTOCOL_DOC)
.define(SECURITY_PROVIDERS_CONFIG,
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java
index 163b9cf1180..661a74ea740 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.clients.consumer;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.InvalidConfigurationException;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
@@ -27,6 +28,7 @@ import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.Locale;
import java.util.Map;
import java.util.Properties;
@@ -145,4 +147,15 @@ public class ConsumerConfigTest {
ConfigException ce = assertThrows(ConfigException.class, () -> new
ConsumerConfig(configs));
assertTrue(ce.getMessage().contains(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
}
+
+ @Test
+ public void testCaseInsensitiveSecurityProtocol() {
+ final String saslSslLowerCase =
SecurityProtocol.SASL_SSL.name.toLowerCase(Locale.ROOT);
+ final Map<String, Object> configs = new HashMap<>();
+ configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
keyDeserializerClass);
+ configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
valueDeserializerClass);
+ configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
saslSslLowerCase);
+ final ConsumerConfig consumerConfig = new ConsumerConfig(configs);
+ assertEquals(saslSslLowerCase,
consumerConfig.originals().get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
+ }
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java
index 7a9be7b32ff..d7952320e9f 100644
---
a/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java
@@ -18,12 +18,14 @@ package org.apache.kafka.clients.producer;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
+import java.util.Locale;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -95,4 +97,15 @@ public class ProducerConfigTest {
ConfigException ce = assertThrows(ConfigException.class, () -> new
ProducerConfig(configs));
assertTrue(ce.getMessage().contains(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
}
+
+ @Test
+ public void testCaseInsensitiveSecurityProtocol() {
+ final String saslSslLowerCase =
SecurityProtocol.SASL_SSL.name.toLowerCase(Locale.ROOT);
+ final Map<String, Object> configs = new HashMap<>();
+ configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
keySerializerClass);
+ configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
valueSerializerClass);
+ configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
saslSslLowerCase);
+ final ProducerConfig producerConfig = new ProducerConfig(configs);
+ assertEquals(saslSslLowerCase,
producerConfig.originals().get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
+ }
}
diff --git
a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java
b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java
index 9f79ec5f7a2..e3a1fec2e7d 100644
---
a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java
+++
b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java
@@ -27,7 +27,7 @@ import org.apache.kafka.common.utils.Utils;
import java.util.Map;
import java.util.HashMap;
-import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
+import static
org.apache.kafka.common.config.ConfigDef.CaseInsensitiveValidString.in;
/** Configuration required for MirrorClient to talk to a given target cluster.
* <p>
diff --git
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
index d59f4bc7664..6ce6b3dbea0 100644
---
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
+++
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
@@ -31,7 +31,7 @@ import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import static
org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG;
import static
org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
-import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
+import static
org.apache.kafka.common.config.ConfigDef.CaseInsensitiveValidString.in;
import java.util.Map;
import java.util.HashMap;
diff --git
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
index 072b5c802d9..1300022892d 100644
---
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
+++
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
@@ -40,7 +40,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.stream.Collectors;
-import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
+import static
org.apache.kafka.common.config.ConfigDef.CaseInsensitiveValidString.in;
/** Top-level config describing replication flows between multiple Kafka
clusters.
*
diff --git
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java
index ab8e33768c8..0a976fbc6d3 100644
---
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java
+++
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java
@@ -20,11 +20,13 @@ import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
import java.util.HashMap;
import java.util.HashSet;
@@ -335,4 +337,11 @@ public class MirrorConnectorConfigTest {
assertTrue(ce.getMessage().contains(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
}
+ @Test
+ public void testCaseInsensitiveSecurityProtocol() {
+ final String saslSslLowerCase =
SecurityProtocol.SASL_SSL.name.toLowerCase(Locale.ROOT);
+ final MirrorConnectorConfig config = new
MirrorConnectorConfig(makeProps(
+ CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
saslSslLowerCase));
+ assertEquals(saslSslLowerCase,
config.originalsStrings().get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
+ }
}
diff --git
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java
index e656e87ac55..84f9a1c21a4 100644
---
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java
+++
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java
@@ -23,8 +23,10 @@ import
org.apache.kafka.common.config.provider.ConfigProvider;
import org.apache.kafka.common.config.ConfigData;
import org.apache.kafka.common.metrics.FakeMetricsReporter;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.junit.jupiter.api.Test;
+import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.Collections;
@@ -352,6 +354,14 @@ public class MirrorMakerConfigTest {
assertTrue(ce.getMessage().contains(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
}
+ @Test
+ public void testCaseInsensitiveSecurityProtocol() {
+ final String saslSslLowerCase =
SecurityProtocol.SASL_SSL.name.toLowerCase(Locale.ROOT);
+ final MirrorClientConfig config = new MirrorClientConfig(makeProps(
+ CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
saslSslLowerCase));
+ assertEquals(saslSslLowerCase,
config.originalsStrings().get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
+ }
+
public static class FakeConfigProvider implements ConfigProvider {
Map<String, String> secrets = Collections.singletonMap("password",
"secret2");
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
index 3ce511cdebb..94cbf8d0aed 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
@@ -45,7 +45,7 @@ import java.util.concurrent.TimeUnit;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.Range.between;
-import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
+import static
org.apache.kafka.common.config.ConfigDef.CaseInsensitiveValidString.in;
import static org.apache.kafka.common.utils.Utils.enumOptions;
import static
org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_VALIDATOR;
import static
org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_VALIDATOR;
@@ -316,7 +316,7 @@ public class DistributedConfig extends WorkerConfig {
.define(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG,
ConfigDef.Type.STRING,
EXACTLY_ONCE_SOURCE_SUPPORT_DEFAULT,
-
ConfigDef.CaseInsensitiveValidString.in(enumOptions(ExactlyOnceSourceSupport.class)),
+ in(enumOptions(ExactlyOnceSourceSupport.class)),
ConfigDef.Importance.HIGH,
EXACTLY_ONCE_SOURCE_SUPPORT_DOC)
.define(CommonClientConfigs.METADATA_MAX_AGE_CONFIG,
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedConfigTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedConfigTest.java
index 3996c9714eb..5d1920c1d4d 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedConfigTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedConfigTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.connect.runtime.distributed;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.junit.Test;
import org.mockito.MockedStatic;
@@ -31,6 +32,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
import java.util.Set;
@@ -413,6 +415,16 @@ public class DistributedConfigTest {
assertTrue(ce.getMessage().contains(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
}
+ @Test
+ public void testCaseInsensitiveSecurityProtocol() {
+ final String saslSslLowerCase =
SecurityProtocol.SASL_SSL.name.toLowerCase(Locale.ROOT);
+ final Map<String, String> configs = configs();
+ configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
saslSslLowerCase);
+ final DistributedConfig distributedConfig = new
DistributedConfig(configs);
+ assertEquals(saslSslLowerCase, distributedConfig.originalsStrings()
+ .get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
+ }
+
@Test
public void shouldIdentifyNeedForTransactionalLeader() {
Map<String, String> workerProps = configs();
diff --git a/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
b/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
index 957cb2ce8bb..3b16eb3d2b5 100644
--- a/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
@@ -231,7 +231,7 @@ object BrokerApiVersionsCommand {
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
ConfigDef.Type.STRING,
CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
- in(Utils.enumOptions(classOf[SecurityProtocol]):_*),
+
ConfigDef.CaseInsensitiveValidString.in(Utils.enumOptions(classOf[SecurityProtocol]):_*),
ConfigDef.Importance.MEDIUM,
CommonClientConfigs.SECURITY_PROTOCOL_DOC)
.define(
diff --git a/docs/security.html b/docs/security.html
index 6bf15f72462..84728c3009a 100644
--- a/docs/security.html
+++ b/docs/security.html
@@ -66,7 +66,7 @@
<pre class="line-numbers"><code
class="language-text">listener.security.protocol.map=CLIENT:SSL,BROKER:PLAINTEXT</code></pre>
- <p>Possible options for the security protocol are given below:</p>
+ <p>Possible options (case-insensitive) for the security protocol are given
below:</p>
<ol>
<li>PLAINTEXT</li>
<li>SSL</li>
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 9975bd4680d..3f89de11e96 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -829,7 +829,7 @@ public class StreamsConfig extends AbstractConfig {
.define(SECURITY_PROTOCOL_CONFIG,
Type.STRING,
CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
- in(Utils.enumOptions(SecurityProtocol.class)),
+
ConfigDef.CaseInsensitiveValidString.in(Utils.enumOptions(SecurityProtocol.class)),
Importance.MEDIUM,
CommonClientConfigs.SECURITY_PROTOCOL_DOC)
.define(TASK_TIMEOUT_MS_CONFIG,
diff --git
a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 435dd249f2f..c47a035415d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
@@ -1255,6 +1256,14 @@ public class StreamsConfigTest {
);
}
+ @Test
+ public void testCaseInsensitiveSecurityProtocol() {
+ final String saslSslLowerCase =
SecurityProtocol.SASL_SSL.name.toLowerCase(Locale.ROOT);
+ props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
saslSslLowerCase);
+ final StreamsConfig config = new StreamsConfig(props);
+ assertEquals(saslSslLowerCase,
config.originalsStrings().get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
+ }
+
@Test
public void testInvalidSecurityProtocol() {
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "abc");