This is an automated email from the ASF dual-hosted git repository.

divijv pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new 041afb73ecb KAFKA-15053: Use case insensitive validator for 
security.protocol config (#13831)
041afb73ecb is described below

commit 041afb73ecb081592cd2f9dcf7964545df7c606b
Author: Bo Gao <[email protected]>
AuthorDate: Thu Jun 29 01:13:21 2023 -0700

    KAFKA-15053: Use case insensitive validator for security.protocol config 
(#13831)
    
    Fixed a regression described in KAFKA-15053 that security.protocol only 
allows uppercase values like PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL. With 
this fix, both lower case and upper case values will be supported (e.g. 
PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL, plaintext, ssl, sasl_plaintext, 
sasl_ssl)
    
    Reviewers: Chris Egerton <[email protected]>, Divij Vaidya <[email protected]>
---
 .../org/apache/kafka/clients/admin/AdminClientConfig.java   |  3 ++-
 .../org/apache/kafka/clients/consumer/ConsumerConfig.java   |  3 ++-
 .../org/apache/kafka/clients/producer/ProducerConfig.java   |  3 ++-
 .../apache/kafka/clients/consumer/ConsumerConfigTest.java   | 13 +++++++++++++
 .../apache/kafka/clients/producer/ProducerConfigTest.java   | 13 +++++++++++++
 .../org/apache/kafka/connect/mirror/MirrorClientConfig.java |  2 +-
 .../apache/kafka/connect/mirror/MirrorConnectorConfig.java  |  2 +-
 .../org/apache/kafka/connect/mirror/MirrorMakerConfig.java  |  2 +-
 .../kafka/connect/mirror/MirrorConnectorConfigTest.java     |  9 +++++++++
 .../apache/kafka/connect/mirror/MirrorMakerConfigTest.java  | 10 ++++++++++
 .../connect/runtime/distributed/DistributedConfig.java      |  4 ++--
 .../connect/runtime/distributed/DistributedConfigTest.java  | 12 ++++++++++++
 .../main/scala/kafka/admin/BrokerApiVersionsCommand.scala   |  2 +-
 docs/security.html                                          |  2 +-
 .../main/java/org/apache/kafka/streams/StreamsConfig.java   |  2 +-
 .../java/org/apache/kafka/streams/StreamsConfigTest.java    |  9 +++++++++
 16 files changed, 80 insertions(+), 11 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
index 37af3864103..9dfc32398ab 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
@@ -214,7 +214,8 @@ public class AdminClientConfig extends AbstractConfig {
                                 .define(SECURITY_PROTOCOL_CONFIG,
                                         Type.STRING,
                                         DEFAULT_SECURITY_PROTOCOL,
-                                        
in(Utils.enumOptions(SecurityProtocol.class)),
+                                        ConfigDef.CaseInsensitiveValidString
+                                                
.in(Utils.enumOptions(SecurityProtocol.class)),
                                         Importance.MEDIUM,
                                         SECURITY_PROTOCOL_DOC)
                                 .withClientSslSupport()
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 5a217705290..b1dd2d16c32 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -575,7 +575,8 @@ public class ConsumerConfig extends AbstractConfig {
                                 
.define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
                                         Type.STRING,
                                         
CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
-                                        
in(Utils.enumOptions(SecurityProtocol.class)),
+                                        ConfigDef.CaseInsensitiveValidString
+                                                
.in(Utils.enumOptions(SecurityProtocol.class)),
                                         Importance.MEDIUM,
                                         
CommonClientConfigs.SECURITY_PROTOCOL_DOC)
                                 .withClientSslSupport()
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index a3ae15a3baf..83f4fec99df 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -449,7 +449,8 @@ public class ProducerConfig extends AbstractConfig {
                                 
.define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
                                         Type.STRING,
                                         
CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
-                                        
in(Utils.enumOptions(SecurityProtocol.class)),
+                                        ConfigDef.CaseInsensitiveValidString
+                                                
.in(Utils.enumOptions(SecurityProtocol.class)),
                                         Importance.MEDIUM,
                                         
CommonClientConfigs.SECURITY_PROTOCOL_DOC)
                                 .define(SECURITY_PROVIDERS_CONFIG,
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java
index 163b9cf1180..661a74ea740 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.clients.consumer;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.errors.InvalidConfigurationException;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
@@ -27,6 +28,7 @@ import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Properties;
 
@@ -145,4 +147,15 @@ public class ConsumerConfigTest {
         ConfigException ce = assertThrows(ConfigException.class, () -> new 
ConsumerConfig(configs));
         
assertTrue(ce.getMessage().contains(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
     }
+
+    @Test
+    public void testCaseInsensitiveSecurityProtocol() {
+        final String saslSslLowerCase = 
SecurityProtocol.SASL_SSL.name.toLowerCase(Locale.ROOT);
+        final Map<String, Object> configs = new HashMap<>();
+        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
keyDeserializerClass);
+        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
valueDeserializerClass);
+        configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
saslSslLowerCase);
+        final ConsumerConfig consumerConfig = new ConsumerConfig(configs);
+        assertEquals(saslSslLowerCase, 
consumerConfig.originals().get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
+    }
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java
index 7a9be7b32ff..d7952320e9f 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java
@@ -18,12 +18,14 @@ package org.apache.kafka.clients.producer;
 
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.junit.jupiter.api.Test;
 
 import java.util.HashMap;
+import java.util.Locale;
 import java.util.Map;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -95,4 +97,15 @@ public class ProducerConfigTest {
         ConfigException ce = assertThrows(ConfigException.class, () -> new 
ProducerConfig(configs));
         
assertTrue(ce.getMessage().contains(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
     }
+
+    @Test
+    public void testCaseInsensitiveSecurityProtocol() {
+        final String saslSslLowerCase = 
SecurityProtocol.SASL_SSL.name.toLowerCase(Locale.ROOT);
+        final Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
keySerializerClass);
+        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
valueSerializerClass);
+        configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
saslSslLowerCase);
+        final ProducerConfig producerConfig = new ProducerConfig(configs);
+        assertEquals(saslSslLowerCase, 
producerConfig.originals().get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
+    }
 }
diff --git 
a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java
 
b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java
index 9f79ec5f7a2..e3a1fec2e7d 100644
--- 
a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java
+++ 
b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java
@@ -27,7 +27,7 @@ import org.apache.kafka.common.utils.Utils;
 import java.util.Map;
 import java.util.HashMap;
 
-import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
+import static 
org.apache.kafka.common.config.ConfigDef.CaseInsensitiveValidString.in;
 
 /** Configuration required for MirrorClient to talk to a given target cluster.
  *  <p>
diff --git 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
index d59f4bc7664..6ce6b3dbea0 100644
--- 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
+++ 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
@@ -31,7 +31,7 @@ import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
 import static 
org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG;
 import static 
org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
-import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
+import static 
org.apache.kafka.common.config.ConfigDef.CaseInsensitiveValidString.in;
 
 import java.util.Map;
 import java.util.HashMap;
diff --git 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
index 072b5c802d9..1300022892d 100644
--- 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
+++ 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
@@ -40,7 +40,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.stream.Collectors;
 
-import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
+import static 
org.apache.kafka.common.config.ConfigDef.CaseInsensitiveValidString.in;
 
 /** Top-level config describing replication flows between multiple Kafka 
clusters.
  *
diff --git 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java
 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java
index ab8e33768c8..0a976fbc6d3 100644
--- 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java
+++ 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java
@@ -20,11 +20,13 @@ import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -335,4 +337,11 @@ public class MirrorConnectorConfigTest {
         
assertTrue(ce.getMessage().contains(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
     }
 
+    @Test
+    public void testCaseInsensitiveSecurityProtocol() {
+        final String saslSslLowerCase = 
SecurityProtocol.SASL_SSL.name.toLowerCase(Locale.ROOT);
+        final MirrorConnectorConfig config = new 
MirrorConnectorConfig(makeProps(
+                CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
saslSslLowerCase));
+        assertEquals(saslSslLowerCase, 
config.originalsStrings().get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
+    }
 }
diff --git 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java
 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java
index e656e87ac55..84f9a1c21a4 100644
--- 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java
+++ 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java
@@ -23,8 +23,10 @@ import 
org.apache.kafka.common.config.provider.ConfigProvider;
 import org.apache.kafka.common.config.ConfigData;
 import org.apache.kafka.common.metrics.FakeMetricsReporter;
 
+import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.junit.jupiter.api.Test;
 
+import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
 import java.util.Collections;
@@ -352,6 +354,14 @@ public class MirrorMakerConfigTest {
         
assertTrue(ce.getMessage().contains(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
     }
 
+    @Test
+    public void testCaseInsensitiveSecurityProtocol() {
+        final String saslSslLowerCase = 
SecurityProtocol.SASL_SSL.name.toLowerCase(Locale.ROOT);
+        final MirrorClientConfig config = new MirrorClientConfig(makeProps(
+                CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
saslSslLowerCase));
+        assertEquals(saslSslLowerCase, 
config.originalsStrings().get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
+    }
+
     public static class FakeConfigProvider implements ConfigProvider {
 
         Map<String, String> secrets = Collections.singletonMap("password", 
"secret2");
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
index 3ce511cdebb..94cbf8d0aed 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
@@ -45,7 +45,7 @@ import java.util.concurrent.TimeUnit;
 
 import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
 import static org.apache.kafka.common.config.ConfigDef.Range.between;
-import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
+import static 
org.apache.kafka.common.config.ConfigDef.CaseInsensitiveValidString.in;
 import static org.apache.kafka.common.utils.Utils.enumOptions;
 import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_VALIDATOR;
 import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_VALIDATOR;
@@ -316,7 +316,7 @@ public class DistributedConfig extends WorkerConfig {
             .define(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG,
                     ConfigDef.Type.STRING,
                     EXACTLY_ONCE_SOURCE_SUPPORT_DEFAULT,
-                    
ConfigDef.CaseInsensitiveValidString.in(enumOptions(ExactlyOnceSourceSupport.class)),
+                    in(enumOptions(ExactlyOnceSourceSupport.class)),
                     ConfigDef.Importance.HIGH,
                     EXACTLY_ONCE_SOURCE_SUPPORT_DOC)
             .define(CommonClientConfigs.METADATA_MAX_AGE_CONFIG,
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedConfigTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedConfigTest.java
index 3996c9714eb..5d1920c1d4d 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedConfigTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedConfigTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.connect.runtime.distributed;
 
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.junit.Test;
 import org.mockito.MockedStatic;
 
@@ -31,6 +32,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
 
@@ -413,6 +415,16 @@ public class DistributedConfigTest {
         
assertTrue(ce.getMessage().contains(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
     }
 
+    @Test
+    public void testCaseInsensitiveSecurityProtocol() {
+        final String saslSslLowerCase = 
SecurityProtocol.SASL_SSL.name.toLowerCase(Locale.ROOT);
+        final Map<String, String> configs = configs();
+        configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
saslSslLowerCase);
+        final DistributedConfig distributedConfig = new 
DistributedConfig(configs);
+        assertEquals(saslSslLowerCase, distributedConfig.originalsStrings()
+                .get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
+    }
+
     @Test
     public void shouldIdentifyNeedForTransactionalLeader() {
         Map<String, String> workerProps = configs();
diff --git a/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala 
b/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
index 957cb2ce8bb..3b16eb3d2b5 100644
--- a/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
@@ -231,7 +231,7 @@ object BrokerApiVersionsCommand {
           CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
           ConfigDef.Type.STRING,
           CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
-          in(Utils.enumOptions(classOf[SecurityProtocol]):_*),
+          
ConfigDef.CaseInsensitiveValidString.in(Utils.enumOptions(classOf[SecurityProtocol]):_*),
           ConfigDef.Importance.MEDIUM,
           CommonClientConfigs.SECURITY_PROTOCOL_DOC)
         .define(
diff --git a/docs/security.html b/docs/security.html
index 6bf15f72462..84728c3009a 100644
--- a/docs/security.html
+++ b/docs/security.html
@@ -66,7 +66,7 @@
     
     <pre class="line-numbers"><code 
class="language-text">listener.security.protocol.map=CLIENT:SSL,BROKER:PLAINTEXT</code></pre>
            
-    <p>Possible options for the security protocol are given below:</p>
+    <p>Possible options (case-insensitive) for the security protocol are given 
below:</p>
     <ol>
       <li>PLAINTEXT</li>
       <li>SSL</li>
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java 
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 9975bd4680d..3f89de11e96 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -829,7 +829,7 @@ public class StreamsConfig extends AbstractConfig {
             .define(SECURITY_PROTOCOL_CONFIG,
                     Type.STRING,
                     CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
-                    in(Utils.enumOptions(SecurityProtocol.class)),
+                    
ConfigDef.CaseInsensitiveValidString.in(Utils.enumOptions(SecurityProtocol.class)),
                     Importance.MEDIUM,
                     CommonClientConfigs.SECURITY_PROTOCOL_DOC)
             .define(TASK_TIMEOUT_MS_CONFIG,
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java 
b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 435dd249f2f..c47a035415d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
@@ -1255,6 +1256,14 @@ public class StreamsConfigTest {
         );
     }
 
+    @Test
+    public void testCaseInsensitiveSecurityProtocol() {
+        final String saslSslLowerCase = 
SecurityProtocol.SASL_SSL.name.toLowerCase(Locale.ROOT);
+        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
saslSslLowerCase);
+        final StreamsConfig config = new StreamsConfig(props);
+        assertEquals(saslSslLowerCase, 
config.originalsStrings().get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
+    }
+
     @Test
     public void testInvalidSecurityProtocol() {
         props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "abc");

Reply via email to