This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch 4.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.2 by this push:
     new a1861ad7391 KAFKA-19875 Duplicated topic config prevents broker start 
(#20960)
a1861ad7391 is described below

commit a1861ad73912f4703d1136aa76d0c1f6a15e7bbd
Author: Ken Huang <[email protected]>
AuthorDate: Wed Nov 26 09:22:07 2025 +0800

    KAFKA-19875 Duplicated topic config prevents broker start (#20960)
    
    Ignore duplicated values in a config of List type instead of failing for
    backward compatibility.
    
    Reviewers: Jun Rao <[email protected]>, Chia-Ping Tsai
    <[email protected]>
---
 .../org/apache/kafka/common/config/ConfigDef.java  | 16 ++++-
 .../config/internals/BrokerSecurityConfigs.java    |  8 +--
 .../apache/kafka/common/config/ConfigDefTest.java  | 79 ++++++++++++++++------
 .../mirror/DefaultConfigPropertyFilter.java        |  3 +-
 .../kafka/connect/mirror/DefaultTopicFilter.java   |  4 +-
 ...lowlistConnectorClientConfigOverridePolicy.java |  9 ++-
 .../test/scala/unit/kafka/log/LogConfigTest.scala  |  2 +-
 .../scala/unit/kafka/server/KafkaConfigTest.scala  | 12 +---
 docs/upgrade.html                                  | 13 ++--
 9 files changed, 101 insertions(+), 45 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java 
b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
index ee2f8c2cfd9..22b00fa0c3d 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
@@ -19,6 +19,9 @@ package org.apache.kafka.common.config;
 import org.apache.kafka.common.config.types.Password;
 import org.apache.kafka.common.utils.Utils;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -81,7 +84,7 @@ import java.util.stream.Collectors;
 public class ConfigDef {
 
     private static final Pattern COMMA_WITH_WHITESPACE = 
Pattern.compile("\\s*,\\s*");
-
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ConfigDef.class);
     /**
      * A unique Java object which represents the lack of a default value.
      */
@@ -536,6 +539,14 @@ public class ConfigDef {
             // otherwise assign setting its default value
             parsedValue = key.defaultValue;
         }
+        if (key.validator instanceof ValidList && parsedValue instanceof List) 
{
+            List<?> originalListValue = (List<?>) parsedValue;
+            parsedValue = 
originalListValue.stream().distinct().collect(Collectors.toList());
+            if (originalListValue.size() != ((List<?>) parsedValue).size()) {
+                LOGGER.warn("Configuration key \"{}\" contains duplicate 
values. Duplicates will be removed. The original value " +
+                        "is: {}, the updated value is: {}", key.name, 
originalListValue, parsedValue);
+            }
+        }
         if (key.validator != null) {
             key.validator.ensureValid(key.name, parsedValue);
         }
@@ -1070,8 +1081,7 @@ public class ConfigDef {
         }
 
         public String toString() {
-            return validString + (isEmptyAllowed ? " (empty config allowed)" : 
" (empty not allowed)") +
-                    (isNullAllowed ? " (null config allowed)" : " (null not 
allowed)");
+            return !validString.validStrings.isEmpty() ? 
validString.toString() : "";
         }
     }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java
 
b/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java
index 48f3948ef9d..c5289676439 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java
@@ -169,14 +169,14 @@ public class BrokerSecurityConfigs {
             .define(SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, STRING, 
SslConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM, MEDIUM, 
SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC)
             .define(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, 
STRING, SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM, LOW, 
SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC)
             .define(SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG, 
STRING, null, LOW, SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_DOC)
-            .define(SslConfigs.SSL_CIPHER_SUITES_CONFIG, LIST, 
Collections.emptyList(), MEDIUM, SslConfigs.SSL_CIPHER_SUITES_DOC)
+            .define(SslConfigs.SSL_CIPHER_SUITES_CONFIG, LIST, List.of(), 
ConfigDef.ValidList.anyNonDuplicateValues(true, false), MEDIUM, 
SslConfigs.SSL_CIPHER_SUITES_DOC)
             .define(SslConfigs.SSL_ENGINE_FACTORY_CLASS_CONFIG, CLASS, null, 
LOW, SslConfigs.SSL_ENGINE_FACTORY_CLASS_DOC)
 
             // Sasl Configuration
             
.define(BrokerSecurityConfigs.SASL_MECHANISM_INTER_BROKER_PROTOCOL_CONFIG, 
STRING, SaslConfigs.DEFAULT_SASL_MECHANISM, MEDIUM, 
BrokerSecurityConfigs.SASL_MECHANISM_INTER_BROKER_PROTOCOL_DOC)
-            .define(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, 
LIST, BrokerSecurityConfigs.DEFAULT_SASL_ENABLED_MECHANISMS, MEDIUM, 
BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_DOC)
+            .define(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, 
LIST, BrokerSecurityConfigs.DEFAULT_SASL_ENABLED_MECHANISMS, 
ConfigDef.ValidList.anyNonDuplicateValues(true, false), MEDIUM, 
BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_DOC)
             
.define(BrokerSecurityConfigs.SASL_SERVER_CALLBACK_HANDLER_CLASS_CONFIG, CLASS, 
null, MEDIUM, BrokerSecurityConfigs.SASL_SERVER_CALLBACK_HANDLER_CLASS_DOC)
-            
.define(BrokerSecurityConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_CONFIG, 
LIST, BrokerSecurityConfigs.DEFAULT_SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES, 
MEDIUM, BrokerSecurityConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_DOC)
+            
.define(BrokerSecurityConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_CONFIG, 
LIST, BrokerSecurityConfigs.DEFAULT_SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES, 
ConfigDef.ValidList.anyNonDuplicateValues(true, false), MEDIUM, 
BrokerSecurityConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_DOC)
             .define(SaslConfigs.SASL_JAAS_CONFIG, PASSWORD, null, MEDIUM, 
SaslConfigs.SASL_JAAS_CONFIG_DOC)
             .define(SaslConfigs.SASL_CLIENT_CALLBACK_HANDLER_CLASS, CLASS, 
null, MEDIUM, SaslConfigs.SASL_CLIENT_CALLBACK_HANDLER_CLASS_DOC)
             .define(SaslConfigs.SASL_LOGIN_CLASS, CLASS, null, MEDIUM, 
SaslConfigs.SASL_LOGIN_CLASS_DOC)
@@ -218,6 +218,6 @@ public class BrokerSecurityConfigs {
             
.define(SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MS, LONG, 
SaslConfigs.DEFAULT_SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MS, LOW, 
SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MS_DOC)
             
.define(SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MAX_MS, LONG, 
SaslConfigs.DEFAULT_SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MAX_MS, LOW, 
SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MAX_MS_DOC)
             .define(SaslConfigs.SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS, INT, 
SaslConfigs.DEFAULT_SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS, LOW, 
SaslConfigs.SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS_DOC)
-            .define(SaslConfigs.SASL_OAUTHBEARER_EXPECTED_AUDIENCE, LIST, 
List.of(), LOW, SaslConfigs.SASL_OAUTHBEARER_EXPECTED_AUDIENCE_DOC)
+            .define(SaslConfigs.SASL_OAUTHBEARER_EXPECTED_AUDIENCE, LIST, 
List.of(), ConfigDef.ValidList.anyNonDuplicateValues(true, false), LOW, 
SaslConfigs.SASL_OAUTHBEARER_EXPECTED_AUDIENCE_DOC)
             .define(SaslConfigs.SASL_OAUTHBEARER_EXPECTED_ISSUER, STRING, 
null, LOW, SaslConfigs.SASL_OAUTHBEARER_EXPECTED_ISSUER_DOC);
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java 
b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
index c6c2390b07c..c4200e82b0f 100644
--- a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
@@ -760,34 +760,55 @@ public class ConfigDefTest {
 
     @Test
     public void testListValidatorAnyNonDuplicateValues() {
-        ConfigDef.ValidList allowAnyNonDuplicateValues = 
ConfigDef.ValidList.anyNonDuplicateValues(true, true);
-        assertDoesNotThrow(() -> 
allowAnyNonDuplicateValues.ensureValid("test.config", List.of("a", "b", "c")));
-        assertDoesNotThrow(() -> 
allowAnyNonDuplicateValues.ensureValid("test.config", List.of()));
-        assertDoesNotThrow(() -> 
allowAnyNonDuplicateValues.ensureValid("test.config", null));
-        ConfigException exception1 = assertThrows(ConfigException.class, () -> 
allowAnyNonDuplicateValues.ensureValid("test.config", List.of("a", "a")));
+        ConfigDef.ValidList allowAnyNonDuplicateValuesAndEmptyListAndNull = 
ConfigDef.ValidList.anyNonDuplicateValues(true, true);
+        assertDoesNotThrow(() -> 
allowAnyNonDuplicateValuesAndEmptyListAndNull.ensureValid("test.config", 
List.of("a", "b", "c")));
+        assertDoesNotThrow(() -> 
allowAnyNonDuplicateValuesAndEmptyListAndNull.ensureValid("test.config", 
List.of()));
+        assertDoesNotThrow(() -> 
allowAnyNonDuplicateValuesAndEmptyListAndNull.ensureValid("test.config", null));
+        ConfigException exception1 = assertThrows(ConfigException.class, () -> 
allowAnyNonDuplicateValuesAndEmptyListAndNull.ensureValid("test.config", 
List.of("a", "a")));
         assertEquals("Configuration 'test.config' values must not be 
duplicated.", exception1.getMessage());
-        ConfigException exception2 = assertThrows(ConfigException.class, () -> 
allowAnyNonDuplicateValues.ensureValid("test.config", List.of("")));
+        ConfigException exception2 = assertThrows(ConfigException.class, () -> 
allowAnyNonDuplicateValuesAndEmptyListAndNull.ensureValid("test.config", 
List.of("")));
         assertEquals("Configuration 'test.config' values must not be empty.", 
exception2.getMessage());
-        
+        ConfigException exception3 = assertThrows(ConfigException.class, () -> 
allowAnyNonDuplicateValuesAndEmptyListAndNull.ensureValid("test.config", 
List.of("a", "", "b")));
+        assertEquals("Configuration 'test.config' values must not be empty.", 
exception3.getMessage());
+
         ConfigDef.ValidList allowAnyNonDuplicateValuesAndNull = 
ConfigDef.ValidList.anyNonDuplicateValues(false, true);
         assertDoesNotThrow(() -> 
allowAnyNonDuplicateValuesAndNull.ensureValid("test.config", List.of("a", "b", 
"c")));
         assertDoesNotThrow(() -> 
allowAnyNonDuplicateValuesAndNull.ensureValid("test.config", null));
-        ConfigException exception3 = assertThrows(ConfigException.class, () -> 
allowAnyNonDuplicateValuesAndNull.ensureValid("test.config", List.of()));
-        assertEquals("Configuration 'test.config' must not be empty. Valid 
values include: any non-empty value", exception3.getMessage());
-        ConfigException exception4 = assertThrows(ConfigException.class, () -> 
allowAnyNonDuplicateValuesAndNull.ensureValid("test.config", List.of("a", 
"a")));
-        assertEquals("Configuration 'test.config' values must not be 
duplicated.", exception4.getMessage());
-        ConfigException exception5 = assertThrows(ConfigException.class, () -> 
allowAnyNonDuplicateValuesAndNull.ensureValid("test.config", List.of("")));
-        assertEquals("Configuration 'test.config' values must not be empty.", 
exception5.getMessage());
+        ConfigException exception4 = assertThrows(ConfigException.class, () -> 
allowAnyNonDuplicateValuesAndNull.ensureValid("test.config", List.of()));
+        assertEquals("Configuration 'test.config' must not be empty. Valid 
values include: any non-empty value", exception4.getMessage());
+        ConfigException exception5 = assertThrows(ConfigException.class, () -> 
allowAnyNonDuplicateValuesAndNull.ensureValid("test.config", List.of("a", 
"a")));
+        assertEquals("Configuration 'test.config' values must not be 
duplicated.", exception5.getMessage());
+        ConfigException exception6 = assertThrows(ConfigException.class, () -> 
allowAnyNonDuplicateValuesAndNull.ensureValid("test.config", List.of("")));
+        assertEquals("Configuration 'test.config' values must not be empty.", 
exception6.getMessage());
+        ConfigException exception7 = assertThrows(ConfigException.class, () -> 
allowAnyNonDuplicateValuesAndNull.ensureValid("test.config", List.of("a", "", 
"b")));
+        assertEquals("Configuration 'test.config' values must not be empty.", 
exception7.getMessage());
+
 
         ConfigDef.ValidList allowAnyNonDuplicateValuesAndEmptyList = 
ConfigDef.ValidList.anyNonDuplicateValues(true, false);
         assertDoesNotThrow(() -> 
allowAnyNonDuplicateValuesAndEmptyList.ensureValid("test.config", List.of("a", 
"b", "c")));
         assertDoesNotThrow(() -> 
allowAnyNonDuplicateValuesAndEmptyList.ensureValid("test.config", List.of()));
-        ConfigException exception6 = assertThrows(ConfigException.class, () -> 
allowAnyNonDuplicateValuesAndEmptyList.ensureValid("test.config", null));
-        assertEquals("Configuration 'test.config' values must not be null.", 
exception6.getMessage());
-        ConfigException exception7 = assertThrows(ConfigException.class, () -> 
allowAnyNonDuplicateValuesAndEmptyList.ensureValid("test.config", List.of("a", 
"a")));
-        assertEquals("Configuration 'test.config' values must not be 
duplicated.", exception7.getMessage());
-        ConfigException exception8 = assertThrows(ConfigException.class, () -> 
allowAnyNonDuplicateValuesAndEmptyList.ensureValid("test.config", List.of("")));
-        assertEquals("Configuration 'test.config' values must not be empty.", 
exception8.getMessage());
+        ConfigException exception8 = assertThrows(ConfigException.class, () -> 
allowAnyNonDuplicateValuesAndEmptyList.ensureValid("test.config", null));
+        assertEquals("Configuration 'test.config' values must not be null.", 
exception8.getMessage());
+        ConfigException exception9 = assertThrows(ConfigException.class, () -> 
allowAnyNonDuplicateValuesAndEmptyList.ensureValid("test.config", List.of("a", 
"a")));
+        assertEquals("Configuration 'test.config' values must not be 
duplicated.", exception9.getMessage());
+        ConfigException exception10 = assertThrows(ConfigException.class, () 
-> allowAnyNonDuplicateValuesAndEmptyList.ensureValid("test.config", 
List.of("")));
+        assertEquals("Configuration 'test.config' values must not be empty.", 
exception10.getMessage());
+        ConfigException exception11 = assertThrows(ConfigException.class, () 
-> allowAnyNonDuplicateValuesAndEmptyList.ensureValid("test.config", 
List.of("a", "", "b")));
+        assertEquals("Configuration 'test.config' values must not be empty.", 
exception11.getMessage());
+
+
+        ConfigDef.ValidList allowAnyNonDuplicateValues = 
ConfigDef.ValidList.anyNonDuplicateValues(false, false);
+        assertDoesNotThrow(() -> 
allowAnyNonDuplicateValues.ensureValid("test.config", List.of("a", "b", "c")));
+        ConfigException exception12 = assertThrows(ConfigException.class, () 
-> allowAnyNonDuplicateValues.ensureValid("test.config", null));
+        assertEquals("Configuration 'test.config' values must not be null.", 
exception12.getMessage());
+        ConfigException exception13 = assertThrows(ConfigException.class, () 
-> allowAnyNonDuplicateValues.ensureValid("test.config", List.of()));
+        assertEquals("Configuration 'test.config' must not be empty. Valid 
values include: any non-empty value", exception13.getMessage());
+        ConfigException exception14 = assertThrows(ConfigException.class, () 
-> allowAnyNonDuplicateValues.ensureValid("test.config", List.of("a", "a")));
+        assertEquals("Configuration 'test.config' values must not be 
duplicated.", exception14.getMessage());
+        ConfigException exception15 = assertThrows(ConfigException.class, () 
-> allowAnyNonDuplicateValues.ensureValid("test.config", List.of("")));
+        assertEquals("Configuration 'test.config' values must not be empty.", 
exception15.getMessage());
+        ConfigException exception16 = assertThrows(ConfigException.class, () 
-> allowAnyNonDuplicateValues.ensureValid("test.config", List.of("a", "", 
"b")));
+        assertEquals("Configuration 'test.config' values must not be empty.", 
exception16.getMessage());
     }
 
     @Test
@@ -813,4 +834,24 @@ public class ConfigDefTest {
         ConfigException exception7 = assertThrows(ConfigException.class, () -> 
notAllowEmptyValidator.ensureValid("test.config", List.of("a", "a")));
         assertEquals("Configuration 'test.config' values must not be 
duplicated.", exception7.getMessage());
     }
+
+    @Test
+    public void testParsedValueWillRemoveDuplicatesInValidList() {
+        ConfigDef def = new ConfigDef()
+            .define(
+                "list",
+                Type.LIST,
+                List.of(),
+                ConfigDef.ValidList.anyNonDuplicateValues(true, true),
+                Importance.HIGH,
+                "list doc"
+            );
+
+        Map<String, String> props = new HashMap<>();
+        props.put("list", "a,b,c,a,b");
+
+        Map<String, Object> parsed = def.parse(props);
+        List<String> expectedList = List.of("a", "b", "c");
+        assertEquals(expectedList, parsed.get("list"));
+    }
 }
diff --git 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultConfigPropertyFilter.java
 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultConfigPropertyFilter.java
index c5695204962..2dd99ad0cb7 100644
--- 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultConfigPropertyFilter.java
+++ 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultConfigPropertyFilter.java
@@ -26,7 +26,7 @@ import java.util.regex.Pattern;
 
 /** Filters excluded property names or regexes. */
 public class DefaultConfigPropertyFilter implements ConfigPropertyFilter {
-    
+
     public static final String CONFIG_PROPERTIES_EXCLUDE_CONFIG = 
"config.properties.exclude";
     public static final String USE_DEFAULTS_FROM = "use.defaults.from";
     private static final String USE_DEFAULTS_FROM_DOC = "Which cluster's 
defaults (source or target) to use "
@@ -71,6 +71,7 @@ public class DefaultConfigPropertyFilter implements 
ConfigPropertyFilter {
             .define(CONFIG_PROPERTIES_EXCLUDE_CONFIG,
                     Type.LIST,
                     CONFIG_PROPERTIES_EXCLUDE_DEFAULT,
+                    ConfigDef.ValidList.anyNonDuplicateValues(true, false),
                     Importance.HIGH,
                     CONFIG_PROPERTIES_EXCLUDE_DOC)
             .define(USE_DEFAULTS_FROM,
diff --git 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultTopicFilter.java
 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultTopicFilter.java
index 95414685ba7..b5c3f8ad584 100644
--- 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultTopicFilter.java
+++ 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultTopicFilter.java
@@ -26,7 +26,7 @@ import java.util.regex.Pattern;
 
 /** Uses an include and exclude pattern. */
 public class DefaultTopicFilter implements TopicFilter {
-    
+
     public static final String TOPICS_INCLUDE_CONFIG = "topics";
     private static final String TOPICS_INCLUDE_DOC = "List of topics and/or 
regexes to replicate.";
     public static final String TOPICS_INCLUDE_DEFAULT = ".*";
@@ -64,11 +64,13 @@ public class DefaultTopicFilter implements TopicFilter {
             .define(TOPICS_INCLUDE_CONFIG,
                     Type.LIST,
                     TOPICS_INCLUDE_DEFAULT,
+                    ConfigDef.ValidList.anyNonDuplicateValues(true, false),
                     Importance.HIGH,
                     TOPICS_INCLUDE_DOC)
             .define(TOPICS_EXCLUDE_CONFIG,
                     Type.LIST,
                     TOPICS_EXCLUDE_DEFAULT,
+                    ConfigDef.ValidList.anyNonDuplicateValues(true, false),
                     Importance.HIGH,
                     TOPICS_EXCLUDE_DOC);
 
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/AllowlistConnectorClientConfigOverridePolicy.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/AllowlistConnectorClientConfigOverridePolicy.java
index 80d89db73e8..0eed36ecee5 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/AllowlistConnectorClientConfigOverridePolicy.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/AllowlistConnectorClientConfigOverridePolicy.java
@@ -40,7 +40,14 @@ public class AllowlistConnectorClientConfigOverridePolicy 
extends AbstractConnec
     private static final String ALLOWLIST_CONFIG_DOC = "List of client 
configurations that can be overridden by " +
             "connectors. If empty, connectors can't override any client 
configurations.";
     private static final ConfigDef CONFIG_DEF = new ConfigDef()
-            .define(ALLOWLIST_CONFIG, ConfigDef.Type.LIST, 
ALLOWLIST_CONFIG_DEFAULT, ConfigDef.Importance.MEDIUM, ALLOWLIST_CONFIG_DOC);
+            .define(
+                ALLOWLIST_CONFIG,
+                ConfigDef.Type.LIST,
+                ALLOWLIST_CONFIG_DEFAULT,
+                ConfigDef.ValidList.anyNonDuplicateValues(true, false),
+                ConfigDef.Importance.MEDIUM,
+                ALLOWLIST_CONFIG_DOC
+            );
 
     private List<String> allowlist = ALLOWLIST_CONFIG_DEFAULT;
 
diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala 
b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
index e942e7e3380..98a49070913 100644
--- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
@@ -294,7 +294,7 @@ class LogConfigTest {
     assertThrows(classOf[ConfigException], () => validateCleanupPolicy())
     logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, "compact,delete")
     assertThrows(classOf[ConfigException], () => validateCleanupPolicy())
-    logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, "delete")
+    logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, "delete,delete,delete")
     validateCleanupPolicy()
     logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, "")
     validateCleanupPolicy()
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index c30cc514d44..dd80b50f9c2 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -221,10 +221,6 @@ class KafkaConfigTest {
     props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, 
"HOST://localhost:9091,LB://localhost:9092")
     props.setProperty(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, 
"HOST://localhost:9091,LB://localhost:9091")
     KafkaConfig.fromProps(props)
-
-    // but not duplicate names
-    props.setProperty(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, 
"HOST://localhost:9091,HOST://localhost:9091")
-    assertBadConfigContainingMessage(props, "Configuration 
'advertised.listeners' values must not be duplicated.")
   }
 
   @Test
@@ -248,10 +244,6 @@ class KafkaConfigTest {
     caught = assertThrows(classOf[IllegalArgumentException], () => 
KafkaConfig.fromProps(props))
     assertTrue(caught.getMessage.contains("If you have two listeners on the 
same port then one needs to be IPv4 and the other IPv6"))
 
-    props.put(SocketServerConfigs.LISTENERS_CONFIG, 
"PLAINTEXT://127.0.0.1:9092,PLAINTEXT://127.0.0.1:9092")
-    val exception = assertThrows(classOf[ConfigException], () => 
KafkaConfig.fromProps(props))
-    assertTrue(exception.getMessage.contains("values must not be duplicated."))
-
     props.put(SocketServerConfigs.LISTENERS_CONFIG, 
"PLAINTEXT://127.0.0.1:9092,SSL://127.0.0.1:9092,SASL_SSL://127.0.0.1:9092")
     caught = assertThrows(classOf[IllegalArgumentException], () => 
KafkaConfig.fromProps(props))
     assertTrue(caught.getMessage.contains("Each listener must have a different 
port"))
@@ -302,7 +294,7 @@ class KafkaConfigTest {
     props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "2")
     props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "2@localhost:9093")
 
-    assertBadConfigContainingMessage(props, 
+    assertBadConfigContainingMessage(props,
       "Missing required configuration \"controller.listener.names\" which has 
no default value.")
 
     props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "SSL")
@@ -323,7 +315,7 @@ class KafkaConfigTest {
     props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "2@localhost:9093")
 
     assertFalse(isValidKafkaConfig(props))
-    assertBadConfigContainingMessage(props, 
+    assertBadConfigContainingMessage(props,
       "Missing required configuration \"controller.listener.names\" which has 
no default value.")
 
     props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "SSL")
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 55544356f7f..fe1b8b80d49 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -141,11 +141,14 @@
             <li>
                 LIST-type configurations now enforce stricter validation:
                 <ul>
-                    <li>Null values are no longer accepted for most LIST-type 
configurations, except those that explicitly
-                        allow a null default value or where a null value has a 
well-defined semantic meaning.</li>
-                    <li>Duplicate entries within the same list are no longer 
permitted.</li>
-                    <li>Empty lists are no longer allowed, except in 
configurations where an empty list has a well-defined
-                        semantic meaning.</li>
+                    <li>Null values are no longer accepted for most LIST-type 
configurations. Exceptions apply only to
+                        configurations that have null as their default value, 
as users cannot explicitly assign null values
+                        in configuration files or through the API.</li>
+                    <li>Most LIST-type configurations no longer accept 
duplicate entries, except in cases where duplicates
+                        are explicitly supported. For backward compatibility, 
if users configure duplicate entries when they
+                        are not accepted, duplicate entries will be ignored 
and a warning will be logged.</li>
+                    <li>For certain configurations, an empty list causes the 
system to malfunction. Therefore, empty
+                        lists are no longer allowed for those 
configurations.</li>
                 </ul>
             </li>
             <li>

Reply via email to