This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch 4.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.2 by this push:
new a1861ad7391 KAFKA-19875 Duplicated topic config prevents broker start
(#20960)
a1861ad7391 is described below
commit a1861ad73912f4703d1136aa76d0c1f6a15e7bbd
Author: Ken Huang <[email protected]>
AuthorDate: Wed Nov 26 09:22:07 2025 +0800
KAFKA-19875 Duplicated topic config prevents broker start (#20960)
Ignore duplicated values in a config of List type instead of failing for
backward compatibility.
Reviewers: Jun Rao <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../org/apache/kafka/common/config/ConfigDef.java | 16 ++++-
.../config/internals/BrokerSecurityConfigs.java | 8 +--
.../apache/kafka/common/config/ConfigDefTest.java | 79 ++++++++++++++++------
.../mirror/DefaultConfigPropertyFilter.java | 3 +-
.../kafka/connect/mirror/DefaultTopicFilter.java | 4 +-
...lowlistConnectorClientConfigOverridePolicy.java | 9 ++-
.../test/scala/unit/kafka/log/LogConfigTest.scala | 2 +-
.../scala/unit/kafka/server/KafkaConfigTest.scala | 12 +---
docs/upgrade.html | 13 ++--
9 files changed, 101 insertions(+), 45 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
index ee2f8c2cfd9..22b00fa0c3d 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
@@ -19,6 +19,9 @@ package org.apache.kafka.common.config;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -81,7 +84,7 @@ import java.util.stream.Collectors;
public class ConfigDef {
private static final Pattern COMMA_WITH_WHITESPACE =
Pattern.compile("\\s*,\\s*");
-
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ConfigDef.class);
/**
* A unique Java object which represents the lack of a default value.
*/
@@ -536,6 +539,14 @@ public class ConfigDef {
// otherwise assign setting its default value
parsedValue = key.defaultValue;
}
+ if (key.validator instanceof ValidList && parsedValue instanceof List)
{
+ List<?> originalListValue = (List<?>) parsedValue;
+ parsedValue =
originalListValue.stream().distinct().collect(Collectors.toList());
+ if (originalListValue.size() != ((List<?>) parsedValue).size()) {
+ LOGGER.warn("Configuration key \"{}\" contains duplicate
values. Duplicates will be removed. The original value " +
+ "is: {}, the updated value is: {}", key.name,
originalListValue, parsedValue);
+ }
+ }
if (key.validator != null) {
key.validator.ensureValid(key.name, parsedValue);
}
@@ -1070,8 +1081,7 @@ public class ConfigDef {
}
public String toString() {
- return validString + (isEmptyAllowed ? " (empty config allowed)" :
" (empty not allowed)") +
- (isNullAllowed ? " (null config allowed)" : " (null not
allowed)");
+ return !validString.validStrings.isEmpty() ?
validString.toString() : "";
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java
b/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java
index 48f3948ef9d..c5289676439 100644
---
a/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java
+++
b/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java
@@ -169,14 +169,14 @@ public class BrokerSecurityConfigs {
.define(SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, STRING,
SslConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM, MEDIUM,
SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC)
.define(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG,
STRING, SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM, LOW,
SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC)
.define(SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG,
STRING, null, LOW, SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_DOC)
- .define(SslConfigs.SSL_CIPHER_SUITES_CONFIG, LIST,
Collections.emptyList(), MEDIUM, SslConfigs.SSL_CIPHER_SUITES_DOC)
+ .define(SslConfigs.SSL_CIPHER_SUITES_CONFIG, LIST, List.of(),
ConfigDef.ValidList.anyNonDuplicateValues(true, false), MEDIUM,
SslConfigs.SSL_CIPHER_SUITES_DOC)
.define(SslConfigs.SSL_ENGINE_FACTORY_CLASS_CONFIG, CLASS, null,
LOW, SslConfigs.SSL_ENGINE_FACTORY_CLASS_DOC)
// Sasl Configuration
.define(BrokerSecurityConfigs.SASL_MECHANISM_INTER_BROKER_PROTOCOL_CONFIG,
STRING, SaslConfigs.DEFAULT_SASL_MECHANISM, MEDIUM,
BrokerSecurityConfigs.SASL_MECHANISM_INTER_BROKER_PROTOCOL_DOC)
- .define(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG,
LIST, BrokerSecurityConfigs.DEFAULT_SASL_ENABLED_MECHANISMS, MEDIUM,
BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_DOC)
+ .define(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG,
LIST, BrokerSecurityConfigs.DEFAULT_SASL_ENABLED_MECHANISMS,
ConfigDef.ValidList.anyNonDuplicateValues(true, false), MEDIUM,
BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_DOC)
.define(BrokerSecurityConfigs.SASL_SERVER_CALLBACK_HANDLER_CLASS_CONFIG, CLASS,
null, MEDIUM, BrokerSecurityConfigs.SASL_SERVER_CALLBACK_HANDLER_CLASS_DOC)
-
.define(BrokerSecurityConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_CONFIG,
LIST, BrokerSecurityConfigs.DEFAULT_SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES,
MEDIUM, BrokerSecurityConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_DOC)
+
.define(BrokerSecurityConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_CONFIG,
LIST, BrokerSecurityConfigs.DEFAULT_SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES,
ConfigDef.ValidList.anyNonDuplicateValues(true, false), MEDIUM,
BrokerSecurityConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_DOC)
.define(SaslConfigs.SASL_JAAS_CONFIG, PASSWORD, null, MEDIUM,
SaslConfigs.SASL_JAAS_CONFIG_DOC)
.define(SaslConfigs.SASL_CLIENT_CALLBACK_HANDLER_CLASS, CLASS,
null, MEDIUM, SaslConfigs.SASL_CLIENT_CALLBACK_HANDLER_CLASS_DOC)
.define(SaslConfigs.SASL_LOGIN_CLASS, CLASS, null, MEDIUM,
SaslConfigs.SASL_LOGIN_CLASS_DOC)
@@ -218,6 +218,6 @@ public class BrokerSecurityConfigs {
.define(SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MS, LONG,
SaslConfigs.DEFAULT_SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MS, LOW,
SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MS_DOC)
.define(SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MAX_MS, LONG,
SaslConfigs.DEFAULT_SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MAX_MS, LOW,
SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MAX_MS_DOC)
.define(SaslConfigs.SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS, INT,
SaslConfigs.DEFAULT_SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS, LOW,
SaslConfigs.SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS_DOC)
- .define(SaslConfigs.SASL_OAUTHBEARER_EXPECTED_AUDIENCE, LIST,
List.of(), LOW, SaslConfigs.SASL_OAUTHBEARER_EXPECTED_AUDIENCE_DOC)
+ .define(SaslConfigs.SASL_OAUTHBEARER_EXPECTED_AUDIENCE, LIST,
List.of(), ConfigDef.ValidList.anyNonDuplicateValues(true, false), LOW,
SaslConfigs.SASL_OAUTHBEARER_EXPECTED_AUDIENCE_DOC)
.define(SaslConfigs.SASL_OAUTHBEARER_EXPECTED_ISSUER, STRING,
null, LOW, SaslConfigs.SASL_OAUTHBEARER_EXPECTED_ISSUER_DOC);
}
diff --git
a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
index c6c2390b07c..c4200e82b0f 100644
--- a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
@@ -760,34 +760,55 @@ public class ConfigDefTest {
@Test
public void testListValidatorAnyNonDuplicateValues() {
- ConfigDef.ValidList allowAnyNonDuplicateValues =
ConfigDef.ValidList.anyNonDuplicateValues(true, true);
- assertDoesNotThrow(() ->
allowAnyNonDuplicateValues.ensureValid("test.config", List.of("a", "b", "c")));
- assertDoesNotThrow(() ->
allowAnyNonDuplicateValues.ensureValid("test.config", List.of()));
- assertDoesNotThrow(() ->
allowAnyNonDuplicateValues.ensureValid("test.config", null));
- ConfigException exception1 = assertThrows(ConfigException.class, () ->
allowAnyNonDuplicateValues.ensureValid("test.config", List.of("a", "a")));
+ ConfigDef.ValidList allowAnyNonDuplicateValuesAndEmptyListAndNull =
ConfigDef.ValidList.anyNonDuplicateValues(true, true);
+ assertDoesNotThrow(() ->
allowAnyNonDuplicateValuesAndEmptyListAndNull.ensureValid("test.config",
List.of("a", "b", "c")));
+ assertDoesNotThrow(() ->
allowAnyNonDuplicateValuesAndEmptyListAndNull.ensureValid("test.config",
List.of()));
+ assertDoesNotThrow(() ->
allowAnyNonDuplicateValuesAndEmptyListAndNull.ensureValid("test.config", null));
+ ConfigException exception1 = assertThrows(ConfigException.class, () ->
allowAnyNonDuplicateValuesAndEmptyListAndNull.ensureValid("test.config",
List.of("a", "a")));
assertEquals("Configuration 'test.config' values must not be
duplicated.", exception1.getMessage());
- ConfigException exception2 = assertThrows(ConfigException.class, () ->
allowAnyNonDuplicateValues.ensureValid("test.config", List.of("")));
+ ConfigException exception2 = assertThrows(ConfigException.class, () ->
allowAnyNonDuplicateValuesAndEmptyListAndNull.ensureValid("test.config",
List.of("")));
assertEquals("Configuration 'test.config' values must not be empty.",
exception2.getMessage());
-
+ ConfigException exception3 = assertThrows(ConfigException.class, () ->
allowAnyNonDuplicateValuesAndEmptyListAndNull.ensureValid("test.config",
List.of("a", "", "b")));
+ assertEquals("Configuration 'test.config' values must not be empty.",
exception3.getMessage());
+
ConfigDef.ValidList allowAnyNonDuplicateValuesAndNull =
ConfigDef.ValidList.anyNonDuplicateValues(false, true);
assertDoesNotThrow(() ->
allowAnyNonDuplicateValuesAndNull.ensureValid("test.config", List.of("a", "b",
"c")));
assertDoesNotThrow(() ->
allowAnyNonDuplicateValuesAndNull.ensureValid("test.config", null));
- ConfigException exception3 = assertThrows(ConfigException.class, () ->
allowAnyNonDuplicateValuesAndNull.ensureValid("test.config", List.of()));
- assertEquals("Configuration 'test.config' must not be empty. Valid
values include: any non-empty value", exception3.getMessage());
- ConfigException exception4 = assertThrows(ConfigException.class, () ->
allowAnyNonDuplicateValuesAndNull.ensureValid("test.config", List.of("a",
"a")));
- assertEquals("Configuration 'test.config' values must not be
duplicated.", exception4.getMessage());
- ConfigException exception5 = assertThrows(ConfigException.class, () ->
allowAnyNonDuplicateValuesAndNull.ensureValid("test.config", List.of("")));
- assertEquals("Configuration 'test.config' values must not be empty.",
exception5.getMessage());
+ ConfigException exception4 = assertThrows(ConfigException.class, () ->
allowAnyNonDuplicateValuesAndNull.ensureValid("test.config", List.of()));
+ assertEquals("Configuration 'test.config' must not be empty. Valid
values include: any non-empty value", exception4.getMessage());
+ ConfigException exception5 = assertThrows(ConfigException.class, () ->
allowAnyNonDuplicateValuesAndNull.ensureValid("test.config", List.of("a",
"a")));
+ assertEquals("Configuration 'test.config' values must not be
duplicated.", exception5.getMessage());
+ ConfigException exception6 = assertThrows(ConfigException.class, () ->
allowAnyNonDuplicateValuesAndNull.ensureValid("test.config", List.of("")));
+ assertEquals("Configuration 'test.config' values must not be empty.",
exception6.getMessage());
+ ConfigException exception7 = assertThrows(ConfigException.class, () ->
allowAnyNonDuplicateValuesAndNull.ensureValid("test.config", List.of("a", "",
"b")));
+ assertEquals("Configuration 'test.config' values must not be empty.",
exception7.getMessage());
+
ConfigDef.ValidList allowAnyNonDuplicateValuesAndEmptyList =
ConfigDef.ValidList.anyNonDuplicateValues(true, false);
assertDoesNotThrow(() ->
allowAnyNonDuplicateValuesAndEmptyList.ensureValid("test.config", List.of("a",
"b", "c")));
assertDoesNotThrow(() ->
allowAnyNonDuplicateValuesAndEmptyList.ensureValid("test.config", List.of()));
- ConfigException exception6 = assertThrows(ConfigException.class, () ->
allowAnyNonDuplicateValuesAndEmptyList.ensureValid("test.config", null));
- assertEquals("Configuration 'test.config' values must not be null.",
exception6.getMessage());
- ConfigException exception7 = assertThrows(ConfigException.class, () ->
allowAnyNonDuplicateValuesAndEmptyList.ensureValid("test.config", List.of("a",
"a")));
- assertEquals("Configuration 'test.config' values must not be
duplicated.", exception7.getMessage());
- ConfigException exception8 = assertThrows(ConfigException.class, () ->
allowAnyNonDuplicateValuesAndEmptyList.ensureValid("test.config", List.of("")));
- assertEquals("Configuration 'test.config' values must not be empty.",
exception8.getMessage());
+ ConfigException exception8 = assertThrows(ConfigException.class, () ->
allowAnyNonDuplicateValuesAndEmptyList.ensureValid("test.config", null));
+ assertEquals("Configuration 'test.config' values must not be null.",
exception8.getMessage());
+ ConfigException exception9 = assertThrows(ConfigException.class, () ->
allowAnyNonDuplicateValuesAndEmptyList.ensureValid("test.config", List.of("a",
"a")));
+ assertEquals("Configuration 'test.config' values must not be
duplicated.", exception9.getMessage());
+ ConfigException exception10 = assertThrows(ConfigException.class, ()
-> allowAnyNonDuplicateValuesAndEmptyList.ensureValid("test.config",
List.of("")));
+ assertEquals("Configuration 'test.config' values must not be empty.",
exception10.getMessage());
+ ConfigException exception11 = assertThrows(ConfigException.class, ()
-> allowAnyNonDuplicateValuesAndEmptyList.ensureValid("test.config",
List.of("a", "", "b")));
+ assertEquals("Configuration 'test.config' values must not be empty.",
exception11.getMessage());
+
+
+ ConfigDef.ValidList allowAnyNonDuplicateValues =
ConfigDef.ValidList.anyNonDuplicateValues(false, false);
+ assertDoesNotThrow(() ->
allowAnyNonDuplicateValues.ensureValid("test.config", List.of("a", "b", "c")));
+ ConfigException exception12 = assertThrows(ConfigException.class, ()
-> allowAnyNonDuplicateValues.ensureValid("test.config", null));
+ assertEquals("Configuration 'test.config' values must not be null.",
exception12.getMessage());
+ ConfigException exception13 = assertThrows(ConfigException.class, ()
-> allowAnyNonDuplicateValues.ensureValid("test.config", List.of()));
+ assertEquals("Configuration 'test.config' must not be empty. Valid
values include: any non-empty value", exception13.getMessage());
+ ConfigException exception14 = assertThrows(ConfigException.class, ()
-> allowAnyNonDuplicateValues.ensureValid("test.config", List.of("a", "a")));
+ assertEquals("Configuration 'test.config' values must not be
duplicated.", exception14.getMessage());
+ ConfigException exception15 = assertThrows(ConfigException.class, ()
-> allowAnyNonDuplicateValues.ensureValid("test.config", List.of("")));
+ assertEquals("Configuration 'test.config' values must not be empty.",
exception15.getMessage());
+ ConfigException exception16 = assertThrows(ConfigException.class, ()
-> allowAnyNonDuplicateValues.ensureValid("test.config", List.of("a", "",
"b")));
+ assertEquals("Configuration 'test.config' values must not be empty.",
exception16.getMessage());
}
@Test
@@ -813,4 +834,24 @@ public class ConfigDefTest {
ConfigException exception7 = assertThrows(ConfigException.class, () ->
notAllowEmptyValidator.ensureValid("test.config", List.of("a", "a")));
assertEquals("Configuration 'test.config' values must not be
duplicated.", exception7.getMessage());
}
+
+ @Test
+ public void testParsedValueWillRemoveDuplicatesInValidList() {
+ ConfigDef def = new ConfigDef()
+ .define(
+ "list",
+ Type.LIST,
+ List.of(),
+ ConfigDef.ValidList.anyNonDuplicateValues(true, true),
+ Importance.HIGH,
+ "list doc"
+ );
+
+ Map<String, String> props = new HashMap<>();
+ props.put("list", "a,b,c,a,b");
+
+ Map<String, Object> parsed = def.parse(props);
+ List<String> expectedList = List.of("a", "b", "c");
+ assertEquals(expectedList, parsed.get("list"));
+ }
}
diff --git
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultConfigPropertyFilter.java
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultConfigPropertyFilter.java
index c5695204962..2dd99ad0cb7 100644
---
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultConfigPropertyFilter.java
+++
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultConfigPropertyFilter.java
@@ -26,7 +26,7 @@ import java.util.regex.Pattern;
/** Filters excluded property names or regexes. */
public class DefaultConfigPropertyFilter implements ConfigPropertyFilter {
-
+
public static final String CONFIG_PROPERTIES_EXCLUDE_CONFIG =
"config.properties.exclude";
public static final String USE_DEFAULTS_FROM = "use.defaults.from";
private static final String USE_DEFAULTS_FROM_DOC = "Which cluster's
defaults (source or target) to use "
@@ -71,6 +71,7 @@ public class DefaultConfigPropertyFilter implements
ConfigPropertyFilter {
.define(CONFIG_PROPERTIES_EXCLUDE_CONFIG,
Type.LIST,
CONFIG_PROPERTIES_EXCLUDE_DEFAULT,
+ ConfigDef.ValidList.anyNonDuplicateValues(true, false),
Importance.HIGH,
CONFIG_PROPERTIES_EXCLUDE_DOC)
.define(USE_DEFAULTS_FROM,
diff --git
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultTopicFilter.java
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultTopicFilter.java
index 95414685ba7..b5c3f8ad584 100644
---
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultTopicFilter.java
+++
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultTopicFilter.java
@@ -26,7 +26,7 @@ import java.util.regex.Pattern;
/** Uses an include and exclude pattern. */
public class DefaultTopicFilter implements TopicFilter {
-
+
public static final String TOPICS_INCLUDE_CONFIG = "topics";
private static final String TOPICS_INCLUDE_DOC = "List of topics and/or
regexes to replicate.";
public static final String TOPICS_INCLUDE_DEFAULT = ".*";
@@ -64,11 +64,13 @@ public class DefaultTopicFilter implements TopicFilter {
.define(TOPICS_INCLUDE_CONFIG,
Type.LIST,
TOPICS_INCLUDE_DEFAULT,
+ ConfigDef.ValidList.anyNonDuplicateValues(true, false),
Importance.HIGH,
TOPICS_INCLUDE_DOC)
.define(TOPICS_EXCLUDE_CONFIG,
Type.LIST,
TOPICS_EXCLUDE_DEFAULT,
+ ConfigDef.ValidList.anyNonDuplicateValues(true, false),
Importance.HIGH,
TOPICS_EXCLUDE_DOC);
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/AllowlistConnectorClientConfigOverridePolicy.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/AllowlistConnectorClientConfigOverridePolicy.java
index 80d89db73e8..0eed36ecee5 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/AllowlistConnectorClientConfigOverridePolicy.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/AllowlistConnectorClientConfigOverridePolicy.java
@@ -40,7 +40,14 @@ public class AllowlistConnectorClientConfigOverridePolicy
extends AbstractConnec
private static final String ALLOWLIST_CONFIG_DOC = "List of client
configurations that can be overridden by " +
"connectors. If empty, connectors can't override any client
configurations.";
private static final ConfigDef CONFIG_DEF = new ConfigDef()
- .define(ALLOWLIST_CONFIG, ConfigDef.Type.LIST,
ALLOWLIST_CONFIG_DEFAULT, ConfigDef.Importance.MEDIUM, ALLOWLIST_CONFIG_DOC);
+ .define(
+ ALLOWLIST_CONFIG,
+ ConfigDef.Type.LIST,
+ ALLOWLIST_CONFIG_DEFAULT,
+ ConfigDef.ValidList.anyNonDuplicateValues(true, false),
+ ConfigDef.Importance.MEDIUM,
+ ALLOWLIST_CONFIG_DOC
+ );
private List<String> allowlist = ALLOWLIST_CONFIG_DEFAULT;
diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
index e942e7e3380..98a49070913 100644
--- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
@@ -294,7 +294,7 @@ class LogConfigTest {
assertThrows(classOf[ConfigException], () => validateCleanupPolicy())
logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, "compact,delete")
assertThrows(classOf[ConfigException], () => validateCleanupPolicy())
- logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, "delete")
+ logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, "delete,delete,delete")
validateCleanupPolicy()
logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, "")
validateCleanupPolicy()
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index c30cc514d44..dd80b50f9c2 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -221,10 +221,6 @@ class KafkaConfigTest {
props.setProperty(SocketServerConfigs.LISTENERS_CONFIG,
"HOST://localhost:9091,LB://localhost:9092")
props.setProperty(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG,
"HOST://localhost:9091,LB://localhost:9091")
KafkaConfig.fromProps(props)
-
- // but not duplicate names
- props.setProperty(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG,
"HOST://localhost:9091,HOST://localhost:9091")
- assertBadConfigContainingMessage(props, "Configuration
'advertised.listeners' values must not be duplicated.")
}
@Test
@@ -248,10 +244,6 @@ class KafkaConfigTest {
caught = assertThrows(classOf[IllegalArgumentException], () =>
KafkaConfig.fromProps(props))
assertTrue(caught.getMessage.contains("If you have two listeners on the
same port then one needs to be IPv4 and the other IPv6"))
- props.put(SocketServerConfigs.LISTENERS_CONFIG,
"PLAINTEXT://127.0.0.1:9092,PLAINTEXT://127.0.0.1:9092")
- val exception = assertThrows(classOf[ConfigException], () =>
KafkaConfig.fromProps(props))
- assertTrue(exception.getMessage.contains("values must not be duplicated."))
-
props.put(SocketServerConfigs.LISTENERS_CONFIG,
"PLAINTEXT://127.0.0.1:9092,SSL://127.0.0.1:9092,SASL_SSL://127.0.0.1:9092")
caught = assertThrows(classOf[IllegalArgumentException], () =>
KafkaConfig.fromProps(props))
assertTrue(caught.getMessage.contains("Each listener must have a different
port"))
@@ -302,7 +294,7 @@ class KafkaConfigTest {
props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "2")
props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "2@localhost:9093")
- assertBadConfigContainingMessage(props,
+ assertBadConfigContainingMessage(props,
"Missing required configuration \"controller.listener.names\" which has
no default value.")
props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "SSL")
@@ -323,7 +315,7 @@ class KafkaConfigTest {
props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "2@localhost:9093")
assertFalse(isValidKafkaConfig(props))
- assertBadConfigContainingMessage(props,
+ assertBadConfigContainingMessage(props,
"Missing required configuration \"controller.listener.names\" which has
no default value.")
props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "SSL")
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 55544356f7f..fe1b8b80d49 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -141,11 +141,14 @@
<li>
LIST-type configurations now enforce stricter validation:
<ul>
- <li>Null values are no longer accepted for most LIST-type
configurations, except those that explicitly
- allow a null default value or where a null value has a
well-defined semantic meaning.</li>
- <li>Duplicate entries within the same list are no longer
permitted.</li>
- <li>Empty lists are no longer allowed, except in
configurations where an empty list has a well-defined
- semantic meaning.</li>
+ <li>Null values are no longer accepted for most LIST-type
configurations. Exceptions apply only to
+ configurations that have null as their default value,
as users cannot explicitly assign null values
+ in configuration files or through the API.</li>
+ <li>Most LIST-type configurations no longer accept
duplicate entries, except in cases where duplicates
+ are explicitly supported. For backward compatibility,
if users configure duplicate entries when they
+ are not accepted, duplicate entries will be ignored
and a warning will be logged.</li>
+ <li>For certain configurations, an empty list causes the
system to malfunction. Therefore, empty
+ lists are no longer allowed for those
configurations.</li>
</ul>
</li>
<li>