This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a395d060300 KAFKA-19824: New 
AllowlistConnectorClientConfigOverridePolicy (KIP-1188) (#20750)
a395d060300 is described below

commit a395d06030022ccf305f6b7803eab43a1856d5b2
Author: Mickael Maison <[email protected]>
AuthorDate: Tue Oct 28 09:19:02 2025 +0100

    KAFKA-19824: New AllowlistConnectorClientConfigOverridePolicy (KIP-1188) 
(#20750)
    
    
    Reviewers: Chris Egerton <[email protected]>, Andrew Schofield 
<[email protected]>
---
 ...lowlistConnectorClientConfigOverridePolicy.java | 64 ++++++++++++++++++
 ...incipalConnectorClientConfigOverridePolicy.java |  5 ++
 .../apache/kafka/connect/runtime/WorkerConfig.java |  8 ++-
 ...ctor.policy.ConnectorClientConfigOverridePolicy |  3 +-
 ...istConnectorClientConfigOverridePolicyTest.java | 76 ++++++++++++++++++++++
 .../ConnectorClientPolicyIntegrationTest.java      | 40 +++++++++++-
 .../kafka/connect/runtime/AbstractHerderTest.java  | 46 +++++++++++++
 docs/upgrade.html                                  |  7 ++
 8 files changed, 244 insertions(+), 5 deletions(-)

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/AllowlistConnectorClientConfigOverridePolicy.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/AllowlistConnectorClientConfigOverridePolicy.java
new file mode 100644
index 00000000000..80d89db73e8
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/AllowlistConnectorClientConfigOverridePolicy.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.connector.policy;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigValue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Allows only client configurations specified via 
<code>connector.client.config.override.allowlist</code> to be
+ * overridden by connectors. By default, 
<code>connector.client.config.override.allowlist</code> is empty so connectors
+ * can't override any client configurations.
+ */
+public class AllowlistConnectorClientConfigOverridePolicy extends 
AbstractConnectorClientConfigOverridePolicy {
+
+    public static final String ALLOWLIST_CONFIG = 
"connector.client.config.override.allowlist";
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(AllowlistConnectorClientConfigOverridePolicy.class);
+    private static final List<String> ALLOWLIST_CONFIG_DEFAULT = List.of();
+    private static final String ALLOWLIST_CONFIG_DOC = "List of client 
configurations that can be overridden by " +
+            "connectors. If empty, connectors can't override any client 
configurations.";
+    private static final ConfigDef CONFIG_DEF = new ConfigDef()
+            .define(ALLOWLIST_CONFIG, ConfigDef.Type.LIST, 
ALLOWLIST_CONFIG_DEFAULT, ConfigDef.Importance.MEDIUM, ALLOWLIST_CONFIG_DOC);
+
+    private List<String> allowlist = ALLOWLIST_CONFIG_DEFAULT;
+
+    @Override
+    protected String policyName() {
+        return "Allowlist";
+    }
+
+    @Override
+    protected boolean isAllowed(ConfigValue configValue) {
+        return allowlist.contains(configValue.name());
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs) {
+        AbstractConfig config = new AbstractConfig(CONFIG_DEF, configs);
+        allowlist = config.getList(ALLOWLIST_CONFIG);
+        LOGGER.info("Setting up Allowlist policy for 
ConnectorClientConfigOverride. This will allow the following client 
configurations"
+                + " to be overridden. {}", allowlist);
+    }
+}
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/PrincipalConnectorClientConfigOverridePolicy.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/PrincipalConnectorClientConfigOverridePolicy.java
index 8ec04f97261..0c7bdad2659 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/PrincipalConnectorClientConfigOverridePolicy.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/PrincipalConnectorClientConfigOverridePolicy.java
@@ -32,7 +32,9 @@ import java.util.stream.Stream;
 /**
  * Allows all {@code sasl} configurations to be overridden via the connector 
configs by setting {@code connector.client.config.override.policy} to
  * {@code Principal}. This allows to set a principal per connector.
+ * @deprecated Use {@link AllowlistConnectorClientConfigOverridePolicy} 
instead.
  */
+@Deprecated(since = " 4.2", forRemoval = true)
 public class PrincipalConnectorClientConfigOverridePolicy extends 
AbstractConnectorClientConfigOverridePolicy {
     private static final Logger log = 
LoggerFactory.getLogger(PrincipalConnectorClientConfigOverridePolicy.class);
 
@@ -52,6 +54,9 @@ public class PrincipalConnectorClientConfigOverridePolicy 
extends AbstractConnec
 
     @Override
     public void configure(Map<String, ?> configs) {
+        log.warn("The Principal ConnectorClientConfigOverridePolicy is 
deprecated, use the Allowlist policy instead. "
+                 + "To replicate the Principal policy behavior, set the 
connector.client.config.override.allowlist configuration to \"{}\"",
+                 String.join(",", ALLOWED_CONFIG));
         log.info("Setting up Principal policy for 
ConnectorClientConfigOverride. This will allow `sasl` client configuration to 
be "
                  + "overridden.");
     }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
index 8d953d7ded3..35326c03ae3 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
@@ -27,6 +27,7 @@ import org.apache.kafka.common.config.ConfigDef.Type;
 import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.utils.Utils;
+import 
org.apache.kafka.connect.connector.policy.AllowlistConnectorClientConfigOverridePolicy;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.runtime.isolation.PluginDiscoveryMode;
 import org.apache.kafka.connect.runtime.rest.RestServerConfig;
@@ -158,9 +159,10 @@ public class WorkerConfig extends AbstractConfig {
     public static final String CONNECTOR_CLIENT_POLICY_CLASS_CONFIG = 
"connector.client.config.override.policy";
     public static final String CONNECTOR_CLIENT_POLICY_CLASS_DOC =
         "Class name or alias of implementation of 
<code>ConnectorClientConfigOverridePolicy</code>. Defines what client 
configurations can be "
-        + "overridden by the connector. The default implementation is 
<code>All</code>, meaning connector configurations can override all client 
properties. "
-        + "The other possible policies in the framework include 
<code>None</code> to disallow connectors from overriding client properties, "
-        + "and <code>Principal</code> to allow connectors to override only 
client principals.";
+        + "overridden by the connector. The default policy is 
<code>All</code>, meaning connector configurations can override all client 
properties. "
+        + "The other possible policies in the framework include 
<code>Allowlist</code> to specify allowed configurations via "
+        + "<code>" + 
AllowlistConnectorClientConfigOverridePolicy.ALLOWLIST_CONFIG + "</code>, 
<code>None</code> to disallow connectors from overriding "
+        + "client properties, and <code>Principal</code> (now deprecated) to 
allow connectors to override only client principals.";
     public static final String CONNECTOR_CLIENT_POLICY_CLASS_DEFAULT = "All";
 
 
diff --git 
a/connect/runtime/src/main/resources/META-INF/services/org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy
 
b/connect/runtime/src/main/resources/META-INF/services/org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy
index 8b76ce452b6..beb6f23be60 100644
--- 
a/connect/runtime/src/main/resources/META-INF/services/org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy
+++ 
b/connect/runtime/src/main/resources/META-INF/services/org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy
@@ -15,4 +15,5 @@
 
 
org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy
 
org.apache.kafka.connect.connector.policy.PrincipalConnectorClientConfigOverridePolicy
-org.apache.kafka.connect.connector.policy.NoneConnectorClientConfigOverridePolicy
\ No newline at end of file
+org.apache.kafka.connect.connector.policy.NoneConnectorClientConfigOverridePolicy
+org.apache.kafka.connect.connector.policy.AllowlistConnectorClientConfigOverridePolicy
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/connector/policy/AllowlistConnectorClientConfigOverridePolicyTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/connector/policy/AllowlistConnectorClientConfigOverridePolicyTest.java
new file mode 100644
index 00000000000..3326a5744c0
--- /dev/null
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/connector/policy/AllowlistConnectorClientConfigOverridePolicyTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.connector.policy;
+
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Stream;
+
+public class AllowlistConnectorClientConfigOverridePolicyTest extends 
BaseConnectorClientConfigOverridePolicyTest {
+
+    private static final List<String> ALL_CONFIGS = Stream.of(
+                    ProducerConfig.configNames(),
+                    ConsumerConfig.configNames(),
+                    AdminClientConfig.configNames())
+            .flatMap(Collection::stream)
+            .toList();
+
+    private AllowlistConnectorClientConfigOverridePolicy policy;
+
+    @BeforeEach
+    public void setUp() {
+        policy = new AllowlistConnectorClientConfigOverridePolicy();
+    }
+
+    @Override
+    protected ConnectorClientConfigOverridePolicy policyToTest() {
+        return policy;
+    }
+
+    @Test
+    public void testDenyAllByDefault() {
+        for (String config : ALL_CONFIGS) {
+            testInvalidOverride(Map.of(config, new Object()));
+        }
+    }
+
+    @Test
+    public void testAllowConfigs() {
+        Set<String> allowedConfigs = Set.of(
+                ProducerConfig.ACKS_CONFIG,
+                ConsumerConfig.CLIENT_ID_CONFIG,
+                AdminClientConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG
+        );
+        
policy.configure(Map.of(AllowlistConnectorClientConfigOverridePolicy.ALLOWLIST_CONFIG,
 String.join(",", allowedConfigs)));
+        for (String config : ALL_CONFIGS) {
+            if (!allowedConfigs.contains(config)) {
+                testInvalidOverride(Map.of(config, new Object()));
+            } else {
+                testValidOverride(Map.of(config, new Object()));
+            }
+        }
+    }
+}
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorClientPolicyIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorClientPolicyIntegrationTest.java
index c42402eea2e..36a14f75440 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorClientPolicyIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorClientPolicyIntegrationTest.java
@@ -19,12 +19,14 @@ package org.apache.kafka.connect.integration;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.config.SaslConfigs;
+import 
org.apache.kafka.connect.connector.policy.AllowlistConnectorClientConfigOverridePolicy;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
 import org.apache.kafka.connect.storage.StringConverter;
 import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
 
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Test;
 
@@ -48,6 +50,13 @@ public class ConnectorClientPolicyIntegrationTest {
     private static final int NUM_WORKERS = 1;
     private static final String CONNECTOR_NAME = "simple-conn";
 
+    private Map<String, String> workerConfigs;
+
+    @BeforeEach
+    public void setup() {
+        workerConfigs = Map.of();
+    }
+
     @Test
     public void testCreateWithOverridesForNonePolicy() {
         Map<String, String> props = basicConnectorConfig();
@@ -93,9 +102,38 @@ public class ConnectorClientPolicyIntegrationTest {
         assertPassCreateConnector(null, props);
     }
 
+    @Test
+    public void testCreateWithoutOverridesForAllowlistPolicy() throws 
Exception {
+        // setup up props for the sink connector
+        Map<String, String> props = basicConnectorConfig();
+        assertPassCreateConnector("Allowlist", props);
+    }
+
+    @Test
+    public void testCreateWithNotAllowedOverridesForAllowlistPolicy() {
+        workerConfigs = Map.of(
+                AllowlistConnectorClientConfigOverridePolicy.ALLOWLIST_CONFIG, 
CommonClientConfigs.CLIENT_RACK_CONFIG
+        );
+        // setup up props for the sink connector
+        Map<String, String> props = basicConnectorConfig();
+        props.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + 
CommonClientConfigs.CLIENT_ID_CONFIG, "test");
+        assertFailCreateConnector("Allowlist", props);
+    }
+
+    @Test
+    public void testCreateWithAllowedOverridesForAllowlistPolicy() throws 
Exception {
+        workerConfigs = Map.of(
+                AllowlistConnectorClientConfigOverridePolicy.ALLOWLIST_CONFIG, 
CommonClientConfigs.CLIENT_ID_CONFIG
+        );
+        // setup up props for the sink connector
+        Map<String, String> props = basicConnectorConfig();
+        props.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + 
CommonClientConfigs.CLIENT_ID_CONFIG, "test");
+        assertPassCreateConnector("Allowlist", props);
+    }
+
     private EmbeddedConnectCluster connectClusterWithPolicy(String policy) {
         // setup Connect worker properties
-        Map<String, String> workerProps = new HashMap<>();
+        Map<String, String> workerProps = new HashMap<>(workerConfigs);
         workerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, 
String.valueOf(5_000));
         if (policy != null) {
             workerProps.put(WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG, 
policy);
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
index 956153db5c6..02caa21812f 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
@@ -32,6 +32,7 @@ import 
org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBea
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.connect.connector.Connector;
 import 
org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy;
+import 
org.apache.kafka.connect.connector.policy.AllowlistConnectorClientConfigOverridePolicy;
 import 
org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
 import 
org.apache.kafka.connect.connector.policy.NoneConnectorClientConfigOverridePolicy;
 import 
org.apache.kafka.connect.connector.policy.PrincipalConnectorClientConfigOverridePolicy;
@@ -696,6 +697,7 @@ public class AbstractHerderTest {
         return new PluginDesc(SampleTransformation.class, "1.0", 
PluginType.TRANSFORMATION, classLoader);
     }
 
+    @SuppressWarnings("removal")
     @Test
     public void testConfigValidationPrincipalOnlyOverride() {
         final Class<? extends Connector> connectorClass = 
SampleSourceConnector.class;
@@ -788,6 +790,50 @@ public class AbstractHerderTest {
         verifyValidationIsolation();
     }
 
+    @Test
+    public void testConfigValidationAllowlistOverride() {
+        final Class<? extends Connector> connectorClass = 
SampleSourceConnector.class;
+        AllowlistConnectorClientConfigOverridePolicy policy = new 
AllowlistConnectorClientConfigOverridePolicy();
+        
policy.configure(Map.of(AllowlistConnectorClientConfigOverridePolicy.ALLOWLIST_CONFIG,
 "acks"));
+        AbstractHerder herder = createConfigValidationHerder(connectorClass, 
policy);
+
+        Map<String, String> config = new HashMap<>();
+        config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, 
connectorClass.getName());
+        config.put(ConnectorConfig.NAME_CONFIG, "connector-name");
+        config.put("required", "value"); // connector required config
+        String ackConfigKey = producerOverrideKey(ProducerConfig.ACKS_CONFIG);
+        String saslConfigKey = 
producerOverrideKey(SaslConfigs.SASL_JAAS_CONFIG);
+        config.put(ackConfigKey, "none");
+        config.put(saslConfigKey, "jaas_config");
+
+        ConfigInfos result = herder.validateConnectorConfig(config, s -> null, 
false);
+        assertEquals(ConnectorType.SOURCE, herder.connectorType(config));
+
+        // We expect there to be errors due to sasl.jaas.config not being 
allowed. Note that these assertions depend heavily on
+        // the config fields for SourceConnectorConfig, but we expect these to 
change rarely.
+        assertEquals(SampleSourceConnector.class.getName(), result.name());
+        // Each transform also gets its own group
+        List<String> expectedGroups = List.of(
+                ConnectorConfig.COMMON_GROUP,
+                ConnectorConfig.TRANSFORMS_GROUP,
+                ConnectorConfig.PREDICATES_GROUP,
+                ConnectorConfig.ERROR_GROUP,
+                SourceConnectorConfig.TOPIC_CREATION_GROUP,
+                SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_GROUP,
+                SourceConnectorConfig.OFFSETS_TOPIC_GROUP
+        );
+        assertEquals(expectedGroups, result.groups());
+        assertEquals(1, result.errorCount());
+        // Base connector config has 19 fields, connector's configs add 7, and 
2 producer overrides
+        assertEquals(28, result.configs().size());
+        assertTrue(result.configs().stream().anyMatch(
+                configInfo -> 
ackConfigKey.equals(configInfo.configValue().name()) && 
configInfo.configValue().errors().isEmpty()));
+        assertTrue(result.configs().stream().anyMatch(
+                configInfo -> 
saslConfigKey.equals(configInfo.configValue().name()) && 
!configInfo.configValue().errors().isEmpty()));
+
+        verifyValidationIsolation();
+    }
+
     static final class TestClientConfigOverridePolicy extends 
AllConnectorClientConfigOverridePolicy implements Monitorable {
 
         private static MetricName metricName = null;
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 80343474d70..c0d22cd0fbf 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -200,6 +200,13 @@
         <code>KafkaStreams#close(org.apache.kafka.streams.CloseOptions)</code>.
         For further details, please refer to <a 
href="https://cwiki.apache.org/confluence/x/QAq9F";>KIP-1153</a>.
     </li>
+    <li>
+        A new implementation of 
<code>ConnectorClientConfigOverridePolicy</code>, 
<code>AllowlistConnectorClientConfigOverridePolicy</code>,
+        has been added. This enables specifying the configurations that 
connectors can override via 
<code>connector.client.config.override.allowlist</code>.
+        From Kafka 5.0.0, this will be the default <a 
href="documentation/#connectconfigs_connector.client.config.override.policy">connector.client.config.override.policy</a>
+        policy. The <code>PrincipalConnectorClientConfigOverridePolicy</code> 
policy is now deprecated and will be removed in Kafka 5.0.0.
+        For further details, please refer to <a 
href="https://cwiki.apache.org/confluence/x/2IkvFg";>KIP-1188</a>.
+    </li>
 </ul>
 
 <h4><a id="upgrade_4_1_0" href="#upgrade_4_1_0">Upgrading to 4.1.0</a></h4>

Reply via email to