This is an automated email from the ASF dual-hosted git repository.

divijv pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.4 by this push:
     new a25cc752a42 KAFKA-15053: Use case insensitive validator for 
security.protocol config (#13831)
a25cc752a42 is described below

commit a25cc752a42780a9c042e8a556c393dfc9ab8a21
Author: Bo Gao <[email protected]>
AuthorDate: Thu Jun 29 01:13:21 2023 -0700

    KAFKA-15053: Use case insensitive validator for security.protocol config 
(#13831)
    
    Fixed a regression described in KAFKA-15053 that security.protocol only 
allows uppercase values like PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL. With 
this fix, both lower case and upper case values will be supported (e.g. 
PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL, plaintext, ssl, sasl_plaintext, 
sasl_ssl)
    
    Reviewers: Chris Egerton <[email protected]>, Divij Vaidya <[email protected]>
---
 .../org/apache/kafka/clients/admin/AdminClientConfig.java   |  3 ++-
 .../org/apache/kafka/clients/consumer/ConsumerConfig.java   |  3 ++-
 .../org/apache/kafka/clients/producer/ProducerConfig.java   |  3 ++-
 .../apache/kafka/clients/consumer/ConsumerConfigTest.java   | 13 +++++++++++++
 .../apache/kafka/clients/producer/ProducerConfigTest.java   | 13 +++++++++++++
 .../org/apache/kafka/connect/mirror/MirrorClientConfig.java |  2 +-
 .../apache/kafka/connect/mirror/MirrorConnectorConfig.java  |  2 +-
 .../org/apache/kafka/connect/mirror/MirrorMakerConfig.java  |  2 +-
 .../kafka/connect/mirror/MirrorConnectorConfigTest.java     | 10 ++++++++++
 .../apache/kafka/connect/mirror/MirrorMakerConfigTest.java  | 10 ++++++++++
 .../connect/runtime/distributed/DistributedConfig.java      |  4 ++--
 .../connect/runtime/distributed/DistributedConfigTest.java  | 12 ++++++++++++
 .../main/scala/kafka/admin/BrokerApiVersionsCommand.scala   |  2 +-
 docs/security.html                                          |  2 +-
 .../main/java/org/apache/kafka/streams/StreamsConfig.java   |  2 +-
 .../java/org/apache/kafka/streams/StreamsConfigTest.java    |  9 +++++++++
 16 files changed, 81 insertions(+), 11 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
index 0afcd709f04..ccf1b9f595b 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
@@ -223,7 +223,8 @@ public class AdminClientConfig extends AbstractConfig {
                                 .define(SECURITY_PROTOCOL_CONFIG,
                                         Type.STRING,
                                         DEFAULT_SECURITY_PROTOCOL,
-                                        
in(Utils.enumOptions(SecurityProtocol.class)),
+                                        ConfigDef.CaseInsensitiveValidString
+                                                
.in(Utils.enumOptions(SecurityProtocol.class)),
                                         Importance.MEDIUM,
                                         SECURITY_PROTOCOL_DOC)
                                 .withClientSslSupport()
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index f8bc97ec8a3..be25eba6f2f 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -586,7 +586,8 @@ public class ConsumerConfig extends AbstractConfig {
                                 
.define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
                                         Type.STRING,
                                         
CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
-                                        
in(Utils.enumOptions(SecurityProtocol.class)),
+                                        ConfigDef.CaseInsensitiveValidString
+                                                
.in(Utils.enumOptions(SecurityProtocol.class)),
                                         Importance.MEDIUM,
                                         
CommonClientConfigs.SECURITY_PROTOCOL_DOC)
                                 .withClientSslSupport()
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index e5ad8664126..f18949a5b33 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -458,7 +458,8 @@ public class ProducerConfig extends AbstractConfig {
                                 
.define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
                                         Type.STRING,
                                         
CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
-                                        
in(Utils.enumOptions(SecurityProtocol.class)),
+                                        ConfigDef.CaseInsensitiveValidString
+                                                
.in(Utils.enumOptions(SecurityProtocol.class)),
                                         Importance.MEDIUM,
                                         
CommonClientConfigs.SECURITY_PROTOCOL_DOC)
                                 .define(SECURITY_PROVIDERS_CONFIG,
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java
index 163b9cf1180..661a74ea740 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.clients.consumer;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.errors.InvalidConfigurationException;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
@@ -27,6 +28,7 @@ import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Properties;
 
@@ -145,4 +147,15 @@ public class ConsumerConfigTest {
         ConfigException ce = assertThrows(ConfigException.class, () -> new 
ConsumerConfig(configs));
         
assertTrue(ce.getMessage().contains(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
     }
+
+    @Test
+    public void testCaseInsensitiveSecurityProtocol() {
+        final String saslSslLowerCase = 
SecurityProtocol.SASL_SSL.name.toLowerCase(Locale.ROOT);
+        final Map<String, Object> configs = new HashMap<>();
+        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
keyDeserializerClass);
+        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
valueDeserializerClass);
+        configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
saslSslLowerCase);
+        final ConsumerConfig consumerConfig = new ConsumerConfig(configs);
+        assertEquals(saslSslLowerCase, 
consumerConfig.originals().get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
+    }
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java
index 7a9be7b32ff..d7952320e9f 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java
@@ -18,12 +18,14 @@ package org.apache.kafka.clients.producer;
 
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.junit.jupiter.api.Test;
 
 import java.util.HashMap;
+import java.util.Locale;
 import java.util.Map;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -95,4 +97,15 @@ public class ProducerConfigTest {
         ConfigException ce = assertThrows(ConfigException.class, () -> new 
ProducerConfig(configs));
         
assertTrue(ce.getMessage().contains(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
     }
+
+    @Test
+    public void testCaseInsensitiveSecurityProtocol() {
+        final String saslSslLowerCase = 
SecurityProtocol.SASL_SSL.name.toLowerCase(Locale.ROOT);
+        final Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
keySerializerClass);
+        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
valueSerializerClass);
+        configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
saslSslLowerCase);
+        final ProducerConfig producerConfig = new ProducerConfig(configs);
+        assertEquals(saslSslLowerCase, 
producerConfig.originals().get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
+    }
 }
diff --git 
a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java
 
b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java
index 910ee206a5f..bf1bd4826e1 100644
--- 
a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java
+++ 
b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java
@@ -29,7 +29,7 @@ import org.apache.kafka.common.utils.Utils;
 import java.util.Map;
 import java.util.HashMap;
 
-import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
+import static 
org.apache.kafka.common.config.ConfigDef.CaseInsensitiveValidString.in;
 
 /** Configuration required for MirrorClient to talk to a given target cluster.
  *  <p>
diff --git 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
index 195d309b74b..f8bcda5dce9 100644
--- 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
+++ 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
@@ -29,7 +29,7 @@ import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
 import static 
org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG;
 import static 
org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
-import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
+import static 
org.apache.kafka.common.config.ConfigDef.CaseInsensitiveValidString.in;
 
 import java.util.Map;
 import java.util.HashMap;
diff --git 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
index 303f784324c..5f03693018f 100644
--- 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
+++ 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
@@ -41,7 +41,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.stream.Collectors;
 
-import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
+import static 
org.apache.kafka.common.config.ConfigDef.CaseInsensitiveValidString.in;
 
 /** Top-level config describing replication flows between multiple Kafka 
clusters.
  *
diff --git 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java
 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java
index dbf255d8bb4..426b9e02b82 100644
--- 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java
+++ 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java
@@ -19,9 +19,11 @@ package org.apache.kafka.connect.mirror;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.metrics.JmxReporter;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.test.MockMetricsReporter;
 import org.junit.jupiter.api.Test;
 
+import java.util.Locale;
 import java.util.Map;
 import java.util.HashMap;
 
@@ -163,6 +165,14 @@ public class MirrorConnectorConfigTest {
         
assertTrue(ce.getMessage().contains(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
     }
 
+    @Test
+    public void testCaseInsensitiveSecurityProtocol() {
+        final String saslSslLowerCase = 
SecurityProtocol.SASL_SSL.name.toLowerCase(Locale.ROOT);
+        final TestMirrorConnectorConfig config = new 
TestMirrorConnectorConfig(makeProps(
+                CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
saslSslLowerCase));
+        assertEquals(saslSslLowerCase, 
config.originalsStrings().get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
+    }
+
     @Test
     @SuppressWarnings("deprecation")
     public void testMetricsReporters() {
diff --git 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java
 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java
index 62e94c5975c..bf19fd5e71b 100644
--- 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java
+++ 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java
@@ -25,8 +25,10 @@ import 
org.apache.kafka.common.config.provider.ConfigProvider;
 import org.apache.kafka.common.config.ConfigData;
 import org.apache.kafka.common.metrics.FakeMetricsReporter;
 
+import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.junit.jupiter.api.Test;
 
+import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
 import java.util.Collections;
@@ -361,6 +363,14 @@ public class MirrorMakerConfigTest {
         
assertTrue(ce.getMessage().contains(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
     }
 
+    @Test
+    public void testCaseInsensitiveSecurityProtocol() {
+        final String saslSslLowerCase = 
SecurityProtocol.SASL_SSL.name.toLowerCase(Locale.ROOT);
+        final MirrorClientConfig config = new MirrorClientConfig(makeProps(
+                CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
saslSslLowerCase));
+        assertEquals(saslSslLowerCase, 
config.originalsStrings().get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
+    }
+
     @Test
     public void testAllConfigNames() {
         MirrorMakerConfig mirrorConfig = new MirrorMakerConfig(makeProps(
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
index 8c75f3a4076..4391ef2e190 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
@@ -44,7 +44,7 @@ import java.util.concurrent.TimeUnit;
 
 import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
 import static org.apache.kafka.common.config.ConfigDef.Range.between;
-import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
+import static 
org.apache.kafka.common.config.ConfigDef.CaseInsensitiveValidString.in;
 import static org.apache.kafka.common.utils.Utils.enumOptions;
 import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_VALIDATOR;
 import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_VALIDATOR;
@@ -317,7 +317,7 @@ public class DistributedConfig extends WorkerConfig {
             .define(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG,
                     ConfigDef.Type.STRING,
                     EXACTLY_ONCE_SOURCE_SUPPORT_DEFAULT,
-                    
ConfigDef.CaseInsensitiveValidString.in(enumOptions(ExactlyOnceSourceSupport.class)),
+                    in(enumOptions(ExactlyOnceSourceSupport.class)),
                     ConfigDef.Importance.HIGH,
                     EXACTLY_ONCE_SOURCE_SUPPORT_DOC)
             .define(CommonClientConfigs.METADATA_MAX_AGE_CONFIG,
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedConfigTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedConfigTest.java
index 122575e3a24..ca6dc3334d9 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedConfigTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedConfigTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.connect.runtime.distributed;
 
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.junit.Test;
 
 import javax.crypto.KeyGenerator;
@@ -30,6 +31,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
 
@@ -408,6 +410,16 @@ public class DistributedConfigTest {
         
assertTrue(ce.getMessage().contains(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
     }
 
+    @Test
+    public void testCaseInsensitiveSecurityProtocol() {
+        final String saslSslLowerCase = 
SecurityProtocol.SASL_SSL.name.toLowerCase(Locale.ROOT);
+        final Map<String, String> configs = configs();
+        configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
saslSslLowerCase);
+        final DistributedConfig distributedConfig = new 
DistributedConfig(configs);
+        assertEquals(saslSslLowerCase, distributedConfig.originalsStrings()
+                .get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
+    }
+
     @Test
     public void shouldIdentifyNeedForTransactionalLeader() {
         Map<String, String> workerProps = configs();
diff --git a/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala 
b/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
index ea6ffeea65b..6d9793bd840 100644
--- a/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
@@ -229,7 +229,7 @@ object BrokerApiVersionsCommand {
           CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
           ConfigDef.Type.STRING,
           CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
-          in(Utils.enumOptions(classOf[SecurityProtocol]):_*),
+          
ConfigDef.CaseInsensitiveValidString.in(Utils.enumOptions(classOf[SecurityProtocol]):_*),
           ConfigDef.Importance.MEDIUM,
           CommonClientConfigs.SECURITY_PROTOCOL_DOC)
         .define(
diff --git a/docs/security.html b/docs/security.html
index cbfff2858d3..fd9c2bd8a90 100644
--- a/docs/security.html
+++ b/docs/security.html
@@ -66,7 +66,7 @@
     
     <pre class="line-numbers"><code 
class="language-text">listener.security.protocol.map=CLIENT:SSL,BROKER:PLAINTEXT</code></pre>
            
-    <p>Possible options for the security protocol are given below:</p>
+    <p>Possible options (case-insensitive) for the security protocol are given 
below:</p>
     <ol>
       <li>PLAINTEXT</li>
       <li>SSL</li>
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java 
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index a440b1bdd62..474de54bcbe 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -888,7 +888,7 @@ public class StreamsConfig extends AbstractConfig {
             .define(SECURITY_PROTOCOL_CONFIG,
                     Type.STRING,
                     CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
-                    in(Utils.enumOptions(SecurityProtocol.class)),
+                    
ConfigDef.CaseInsensitiveValidString.in(Utils.enumOptions(SecurityProtocol.class)),
                     Importance.MEDIUM,
                     CommonClientConfigs.SECURITY_PROTOCOL_DOC)
             .define(TASK_TIMEOUT_MS_CONFIG,
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java 
b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index f0c0ce74ccd..d1d63181c07 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
@@ -1287,6 +1288,14 @@ public class StreamsConfigTest {
         assertEquals(getTotalCacheSize(config), 10 * 1024 * 1024);
     }
 
+    @Test
+    public void testCaseInsensitiveSecurityProtocol() {
+        final String saslSslLowerCase = 
SecurityProtocol.SASL_SSL.name.toLowerCase(Locale.ROOT);
+        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
saslSslLowerCase);
+        final StreamsConfig config = new StreamsConfig(props);
+        assertEquals(saslSslLowerCase, 
config.originalsStrings().get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
+    }
+
     @Test
     public void testInvalidSecurityProtocol() {
         props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "abc");

Reply via email to