This is an automated email from the ASF dual-hosted git repository.
divijv pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 005416879e4 KAFKA-15053: Use case insensitive validator for
security.protocol config (#13831)
005416879e4 is described below
commit 005416879e4c180e8e9dea94a08e57521ac8cf1d
Author: Bo Gao <[email protected]>
AuthorDate: Thu Jun 29 01:13:21 2023 -0700
KAFKA-15053: Use case insensitive validator for security.protocol config
(#13831)
Fixed a regression described in KAFKA-15053 that security.protocol only
allows uppercase values like PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL. With
this fix, both lower case and upper case values will be supported (e.g.
PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL, plaintext, ssl, sasl_plaintext,
sasl_ssl)
Reviewers: Chris Egerton <[email protected]>, Divij Vaidya <[email protected]>
---
.../org/apache/kafka/clients/admin/AdminClientConfig.java | 3 ++-
.../org/apache/kafka/clients/consumer/ConsumerConfig.java | 3 ++-
.../org/apache/kafka/clients/producer/ProducerConfig.java | 3 ++-
.../apache/kafka/clients/consumer/ConsumerConfigTest.java | 13 +++++++++++++
.../apache/kafka/clients/producer/ProducerConfigTest.java | 13 +++++++++++++
.../org/apache/kafka/connect/mirror/MirrorClientConfig.java | 2 +-
.../apache/kafka/connect/mirror/MirrorConnectorConfig.java | 2 +-
.../org/apache/kafka/connect/mirror/MirrorMakerConfig.java | 2 +-
.../kafka/connect/mirror/MirrorConnectorConfigTest.java | 10 ++++++++++
.../apache/kafka/connect/mirror/MirrorMakerConfigTest.java | 10 ++++++++++
.../connect/runtime/distributed/DistributedConfig.java | 4 ++--
.../connect/runtime/distributed/DistributedConfigTest.java | 12 ++++++++++++
.../main/scala/kafka/admin/BrokerApiVersionsCommand.scala | 2 +-
docs/security.html | 2 +-
.../main/java/org/apache/kafka/streams/StreamsConfig.java | 2 +-
.../java/org/apache/kafka/streams/StreamsConfigTest.java | 9 +++++++++
16 files changed, 81 insertions(+), 11 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
index dbbdd517ff8..19a759e4bdc 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
@@ -223,7 +223,8 @@ public class AdminClientConfig extends AbstractConfig {
.define(SECURITY_PROTOCOL_CONFIG,
Type.STRING,
DEFAULT_SECURITY_PROTOCOL,
-
in(Utils.enumOptions(SecurityProtocol.class)),
+ ConfigDef.CaseInsensitiveValidString
+
.in(Utils.enumOptions(SecurityProtocol.class)),
Importance.MEDIUM,
SECURITY_PROTOCOL_DOC)
.withClientSslSupport()
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 51c5a35bcf8..ec94748221b 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -590,7 +590,8 @@ public class ConsumerConfig extends AbstractConfig {
.define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
Type.STRING,
CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
-
in(Utils.enumOptions(SecurityProtocol.class)),
+ ConfigDef.CaseInsensitiveValidString
+
.in(Utils.enumOptions(SecurityProtocol.class)),
Importance.MEDIUM,
CommonClientConfigs.SECURITY_PROTOCOL_DOC)
.withClientSslSupport()
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index 045b6b9e051..0b2b9af1d7c 100644
---
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -459,7 +459,8 @@ public class ProducerConfig extends AbstractConfig {
.define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
Type.STRING,
CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
-
in(Utils.enumOptions(SecurityProtocol.class)),
+ ConfigDef.CaseInsensitiveValidString
+
.in(Utils.enumOptions(SecurityProtocol.class)),
Importance.MEDIUM,
CommonClientConfigs.SECURITY_PROTOCOL_DOC)
.define(SECURITY_PROVIDERS_CONFIG,
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java
index 163b9cf1180..661a74ea740 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.clients.consumer;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.InvalidConfigurationException;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
@@ -27,6 +28,7 @@ import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.Locale;
import java.util.Map;
import java.util.Properties;
@@ -145,4 +147,15 @@ public class ConsumerConfigTest {
ConfigException ce = assertThrows(ConfigException.class, () -> new
ConsumerConfig(configs));
assertTrue(ce.getMessage().contains(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
}
+
+ @Test
+ public void testCaseInsensitiveSecurityProtocol() {
+ final String saslSslLowerCase =
SecurityProtocol.SASL_SSL.name.toLowerCase(Locale.ROOT);
+ final Map<String, Object> configs = new HashMap<>();
+ configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
keyDeserializerClass);
+ configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
valueDeserializerClass);
+ configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
saslSslLowerCase);
+ final ConsumerConfig consumerConfig = new ConsumerConfig(configs);
+ assertEquals(saslSslLowerCase,
consumerConfig.originals().get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
+ }
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java
index 7a9be7b32ff..d7952320e9f 100644
---
a/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java
@@ -18,12 +18,14 @@ package org.apache.kafka.clients.producer;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
+import java.util.Locale;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -95,4 +97,15 @@ public class ProducerConfigTest {
ConfigException ce = assertThrows(ConfigException.class, () -> new
ProducerConfig(configs));
assertTrue(ce.getMessage().contains(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
}
+
+ @Test
+ public void testCaseInsensitiveSecurityProtocol() {
+ final String saslSslLowerCase =
SecurityProtocol.SASL_SSL.name.toLowerCase(Locale.ROOT);
+ final Map<String, Object> configs = new HashMap<>();
+ configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
keySerializerClass);
+ configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
valueSerializerClass);
+ configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
saslSslLowerCase);
+ final ProducerConfig producerConfig = new ProducerConfig(configs);
+ assertEquals(saslSslLowerCase,
producerConfig.originals().get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
+ }
}
diff --git
a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java
b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java
index 910ee206a5f..bf1bd4826e1 100644
---
a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java
+++
b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java
@@ -29,7 +29,7 @@ import org.apache.kafka.common.utils.Utils;
import java.util.Map;
import java.util.HashMap;
-import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
+import static
org.apache.kafka.common.config.ConfigDef.CaseInsensitiveValidString.in;
/** Configuration required for MirrorClient to talk to a given target cluster.
* <p>
diff --git
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
index 2df67074878..917e5d5afd5 100644
---
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
+++
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
@@ -29,7 +29,7 @@ import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import static
org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG;
import static
org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
-import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
+import static
org.apache.kafka.common.config.ConfigDef.CaseInsensitiveValidString.in;
import java.util.Map;
import java.util.HashMap;
diff --git
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
index 4f1efeb6bfe..46cfda9ac77 100644
---
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
+++
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
@@ -42,7 +42,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.stream.Collectors;
-import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
+import static
org.apache.kafka.common.config.ConfigDef.CaseInsensitiveValidString.in;
/** Top-level config describing replication flows between multiple Kafka
clusters.
* <p>
diff --git
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java
index c00004d75e9..3840c49114f 100644
---
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java
+++
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java
@@ -19,9 +19,11 @@ package org.apache.kafka.connect.mirror;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.metrics.JmxReporter;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.test.MockMetricsReporter;
import org.junit.jupiter.api.Test;
+import java.util.Locale;
import java.util.Map;
import java.util.HashMap;
@@ -175,6 +177,14 @@ public class MirrorConnectorConfigTest {
assertTrue(ce.getMessage().contains(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
}
+ @Test
+ public void testCaseInsensitiveSecurityProtocol() {
+ final String saslSslLowerCase =
SecurityProtocol.SASL_SSL.name.toLowerCase(Locale.ROOT);
+ final TestMirrorConnectorConfig config = new
TestMirrorConnectorConfig(makeProps(
+ CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
saslSslLowerCase));
+ assertEquals(saslSslLowerCase,
config.originalsStrings().get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
+ }
+
@Test
@SuppressWarnings("deprecation")
public void testMetricsReporters() {
diff --git
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java
index 2611c67ad2f..9e4a8c62321 100644
---
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java
+++
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java
@@ -25,8 +25,10 @@ import
org.apache.kafka.common.config.provider.ConfigProvider;
import org.apache.kafka.common.config.ConfigData;
import org.apache.kafka.common.metrics.FakeMetricsReporter;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.junit.jupiter.api.Test;
+import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.Collections;
@@ -363,6 +365,14 @@ public class MirrorMakerConfigTest {
assertTrue(ce.getMessage().contains(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
}
+ @Test
+ public void testCaseInsensitiveSecurityProtocol() {
+ final String saslSslLowerCase =
SecurityProtocol.SASL_SSL.name.toLowerCase(Locale.ROOT);
+ final MirrorClientConfig config = new MirrorClientConfig(makeProps(
+ CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
saslSslLowerCase));
+ assertEquals(saslSslLowerCase,
config.originalsStrings().get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
+ }
+
@Test
public void testAllConfigNames() {
MirrorMakerConfig mirrorConfig = new MirrorMakerConfig(makeProps(
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
index feae00628a8..7850eb3b10d 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
@@ -44,7 +44,7 @@ import java.util.concurrent.TimeUnit;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.Range.between;
-import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
+import static
org.apache.kafka.common.config.ConfigDef.CaseInsensitiveValidString.in;
import static org.apache.kafka.common.utils.Utils.enumOptions;
import static
org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_VALIDATOR;
import static
org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_VALIDATOR;
@@ -320,7 +320,7 @@ public class DistributedConfig extends WorkerConfig {
.define(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG,
ConfigDef.Type.STRING,
EXACTLY_ONCE_SOURCE_SUPPORT_DEFAULT,
-
ConfigDef.CaseInsensitiveValidString.in(enumOptions(ExactlyOnceSourceSupport.class)),
+ in(enumOptions(ExactlyOnceSourceSupport.class)),
ConfigDef.Importance.HIGH,
EXACTLY_ONCE_SOURCE_SUPPORT_DOC)
.define(CommonClientConfigs.METADATA_MAX_AGE_CONFIG,
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedConfigTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedConfigTest.java
index dd274411c8c..565e5ca8ea1 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedConfigTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedConfigTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.connect.runtime.distributed;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.junit.Test;
import javax.crypto.KeyGenerator;
@@ -30,6 +31,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
import java.util.Set;
@@ -408,6 +410,16 @@ public class DistributedConfigTest {
assertTrue(ce.getMessage().contains(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
}
+ @Test
+ public void testCaseInsensitiveSecurityProtocol() {
+ final String saslSslLowerCase =
SecurityProtocol.SASL_SSL.name.toLowerCase(Locale.ROOT);
+ final Map<String, String> configs = configs();
+ configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
saslSslLowerCase);
+ final DistributedConfig distributedConfig = new
DistributedConfig(configs);
+ assertEquals(saslSslLowerCase, distributedConfig.originalsStrings()
+ .get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
+ }
+
@Test
public void shouldIdentifyNeedForTransactionalLeader() {
Map<String, String> workerProps = configs();
diff --git a/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
b/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
index c882a88cc1a..bcce48f19a9 100644
--- a/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
@@ -228,7 +228,7 @@ object BrokerApiVersionsCommand {
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
ConfigDef.Type.STRING,
CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
- in(Utils.enumOptions(classOf[SecurityProtocol]):_*),
+
ConfigDef.CaseInsensitiveValidString.in(Utils.enumOptions(classOf[SecurityProtocol]):_*),
ConfigDef.Importance.MEDIUM,
CommonClientConfigs.SECURITY_PROTOCOL_DOC)
.define(
diff --git a/docs/security.html b/docs/security.html
index a6cece897c0..3018c768bd7 100644
--- a/docs/security.html
+++ b/docs/security.html
@@ -66,7 +66,7 @@
<pre class="line-numbers"><code
class="language-text">listener.security.protocol.map=CLIENT:SSL,BROKER:PLAINTEXT</code></pre>
- <p>Possible options for the security protocol are given below:</p>
+ <p>Possible options (case-insensitive) for the security protocol are given
below:</p>
<ol>
<li>PLAINTEXT</li>
<li>SSL</li>
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 6eb34dfd9dc..26d90be11df 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -904,7 +904,7 @@ public class StreamsConfig extends AbstractConfig {
.define(SECURITY_PROTOCOL_CONFIG,
Type.STRING,
CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
- in(Utils.enumOptions(SecurityProtocol.class)),
+
ConfigDef.CaseInsensitiveValidString.in(Utils.enumOptions(SecurityProtocol.class)),
Importance.MEDIUM,
CommonClientConfigs.SECURITY_PROTOCOL_DOC)
.define(TASK_TIMEOUT_MS_CONFIG,
diff --git
a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 331358877f4..07e62ce96ed 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
@@ -1288,6 +1289,14 @@ public class StreamsConfigTest {
assertEquals(getTotalCacheSize(config), 10 * 1024 * 1024);
}
+ @Test
+ public void testCaseInsensitiveSecurityProtocol() {
+ final String saslSslLowerCase =
SecurityProtocol.SASL_SSL.name.toLowerCase(Locale.ROOT);
+ props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
saslSslLowerCase);
+ final StreamsConfig config = new StreamsConfig(props);
+ assertEquals(saslSslLowerCase,
config.originalsStrings().get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
+ }
+
@Test
public void testInvalidSecurityProtocol() {
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "abc");