This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a395d060300 KAFKA-19824: New
AllowlistConnectorClientConfigOverridePolicy (KIP-1188) (#20750)
a395d060300 is described below
commit a395d06030022ccf305f6b7803eab43a1856d5b2
Author: Mickael Maison <[email protected]>
AuthorDate: Tue Oct 28 09:19:02 2025 +0100
KAFKA-19824: New AllowlistConnectorClientConfigOverridePolicy (KIP-1188)
(#20750)
Reviewers: Chris Egerton <[email protected]>, Andrew Schofield
<[email protected]>
---
...lowlistConnectorClientConfigOverridePolicy.java | 64 ++++++++++++++++++
...incipalConnectorClientConfigOverridePolicy.java | 5 ++
.../apache/kafka/connect/runtime/WorkerConfig.java | 8 ++-
...ctor.policy.ConnectorClientConfigOverridePolicy | 3 +-
...istConnectorClientConfigOverridePolicyTest.java | 76 ++++++++++++++++++++++
.../ConnectorClientPolicyIntegrationTest.java | 40 +++++++++++-
.../kafka/connect/runtime/AbstractHerderTest.java | 46 +++++++++++++
docs/upgrade.html | 7 ++
8 files changed, 244 insertions(+), 5 deletions(-)
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/AllowlistConnectorClientConfigOverridePolicy.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/AllowlistConnectorClientConfigOverridePolicy.java
new file mode 100644
index 00000000000..80d89db73e8
--- /dev/null
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/AllowlistConnectorClientConfigOverridePolicy.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.connector.policy;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigValue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Allows only client configurations specified via
<code>connector.client.config.override.allowlist</code> to be
+ * overridden by connectors. By default,
<code>connector.client.config.override.allowlist</code> is empty so connectors
+ * can't override any client configurations.
+ */
+public class AllowlistConnectorClientConfigOverridePolicy extends
AbstractConnectorClientConfigOverridePolicy {
+
+ public static final String ALLOWLIST_CONFIG =
"connector.client.config.override.allowlist";
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(AllowlistConnectorClientConfigOverridePolicy.class);
+ private static final List<String> ALLOWLIST_CONFIG_DEFAULT = List.of();
+ private static final String ALLOWLIST_CONFIG_DOC = "List of client
configurations that can be overridden by " +
+ "connectors. If empty, connectors can't override any client
configurations.";
+ private static final ConfigDef CONFIG_DEF = new ConfigDef()
+ .define(ALLOWLIST_CONFIG, ConfigDef.Type.LIST,
ALLOWLIST_CONFIG_DEFAULT, ConfigDef.Importance.MEDIUM, ALLOWLIST_CONFIG_DOC);
+
+ private List<String> allowlist = ALLOWLIST_CONFIG_DEFAULT;
+
+ @Override
+ protected String policyName() {
+ return "Allowlist";
+ }
+
+ @Override
+ protected boolean isAllowed(ConfigValue configValue) {
+ return allowlist.contains(configValue.name());
+ }
+
+ @Override
+ public void configure(Map<String, ?> configs) {
+ AbstractConfig config = new AbstractConfig(CONFIG_DEF, configs);
+ allowlist = config.getList(ALLOWLIST_CONFIG);
+ LOGGER.info("Setting up Allowlist policy for
ConnectorClientConfigOverride. This will allow the following client
configurations"
+ + " to be overridden. {}", allowlist);
+ }
+}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/PrincipalConnectorClientConfigOverridePolicy.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/PrincipalConnectorClientConfigOverridePolicy.java
index 8ec04f97261..0c7bdad2659 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/PrincipalConnectorClientConfigOverridePolicy.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/PrincipalConnectorClientConfigOverridePolicy.java
@@ -32,7 +32,9 @@ import java.util.stream.Stream;
/**
* Allows all {@code sasl} configurations to be overridden via the connector
configs by setting {@code connector.client.config.override.policy} to
* {@code Principal}. This allows to set a principal per connector.
+ * @deprecated Use {@link AllowlistConnectorClientConfigOverridePolicy}
instead.
*/
+@Deprecated(since = " 4.2", forRemoval = true)
public class PrincipalConnectorClientConfigOverridePolicy extends
AbstractConnectorClientConfigOverridePolicy {
private static final Logger log =
LoggerFactory.getLogger(PrincipalConnectorClientConfigOverridePolicy.class);
@@ -52,6 +54,9 @@ public class PrincipalConnectorClientConfigOverridePolicy
extends AbstractConnec
@Override
public void configure(Map<String, ?> configs) {
+ log.warn("The Principal ConnectorClientConfigOverridePolicy is
deprecated, use the Allowlist policy instead. "
+ + "To replicate the Principal policy behavior, set the
connector.client.config.override.allowlist configuration to \"{}\"",
+ String.join(",", ALLOWED_CONFIG));
log.info("Setting up Principal policy for
ConnectorClientConfigOverride. This will allow `sasl` client configuration to
be "
+ "overridden.");
}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
index 8d953d7ded3..35326c03ae3 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
@@ -27,6 +27,7 @@ import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Utils;
+import
org.apache.kafka.connect.connector.policy.AllowlistConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.isolation.PluginDiscoveryMode;
import org.apache.kafka.connect.runtime.rest.RestServerConfig;
@@ -158,9 +159,10 @@ public class WorkerConfig extends AbstractConfig {
public static final String CONNECTOR_CLIENT_POLICY_CLASS_CONFIG =
"connector.client.config.override.policy";
public static final String CONNECTOR_CLIENT_POLICY_CLASS_DOC =
"Class name or alias of implementation of
<code>ConnectorClientConfigOverridePolicy</code>. Defines what client
configurations can be "
- + "overridden by the connector. The default implementation is
<code>All</code>, meaning connector configurations can override all client
properties. "
- + "The other possible policies in the framework include
<code>None</code> to disallow connectors from overriding client properties, "
- + "and <code>Principal</code> to allow connectors to override only
client principals.";
+ + "overridden by the connector. The default policy is
<code>All</code>, meaning connector configurations can override all client
properties. "
+ + "The other possible policies in the framework include
<code>Allowlist</code> to specify allowed configurations via "
+ + "<code>" +
AllowlistConnectorClientConfigOverridePolicy.ALLOWLIST_CONFIG + "</code>,
<code>None</code> to disallow connectors from overriding "
+ + "client properties, and <code>Principal</code> (now deprecated) to
allow connectors to override only client principals.";
public static final String CONNECTOR_CLIENT_POLICY_CLASS_DEFAULT = "All";
diff --git
a/connect/runtime/src/main/resources/META-INF/services/org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy
b/connect/runtime/src/main/resources/META-INF/services/org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy
index 8b76ce452b6..beb6f23be60 100644
---
a/connect/runtime/src/main/resources/META-INF/services/org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy
+++
b/connect/runtime/src/main/resources/META-INF/services/org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy
@@ -15,4 +15,5 @@
org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy
org.apache.kafka.connect.connector.policy.PrincipalConnectorClientConfigOverridePolicy
-org.apache.kafka.connect.connector.policy.NoneConnectorClientConfigOverridePolicy
\ No newline at end of file
+org.apache.kafka.connect.connector.policy.NoneConnectorClientConfigOverridePolicy
+org.apache.kafka.connect.connector.policy.AllowlistConnectorClientConfigOverridePolicy
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/connector/policy/AllowlistConnectorClientConfigOverridePolicyTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/connector/policy/AllowlistConnectorClientConfigOverridePolicyTest.java
new file mode 100644
index 00000000000..3326a5744c0
--- /dev/null
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/connector/policy/AllowlistConnectorClientConfigOverridePolicyTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.connector.policy;
+
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Stream;
+
+public class AllowlistConnectorClientConfigOverridePolicyTest extends
BaseConnectorClientConfigOverridePolicyTest {
+
+ private static final List<String> ALL_CONFIGS = Stream.of(
+ ProducerConfig.configNames(),
+ ConsumerConfig.configNames(),
+ AdminClientConfig.configNames())
+ .flatMap(Collection::stream)
+ .toList();
+
+ private AllowlistConnectorClientConfigOverridePolicy policy;
+
+ @BeforeEach
+ public void setUp() {
+ policy = new AllowlistConnectorClientConfigOverridePolicy();
+ }
+
+ @Override
+ protected ConnectorClientConfigOverridePolicy policyToTest() {
+ return policy;
+ }
+
+ @Test
+ public void testDenyAllByDefault() {
+ for (String config : ALL_CONFIGS) {
+ testInvalidOverride(Map.of(config, new Object()));
+ }
+ }
+
+ @Test
+ public void testAllowConfigs() {
+ Set<String> allowedConfigs = Set.of(
+ ProducerConfig.ACKS_CONFIG,
+ ConsumerConfig.CLIENT_ID_CONFIG,
+ AdminClientConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG
+ );
+
policy.configure(Map.of(AllowlistConnectorClientConfigOverridePolicy.ALLOWLIST_CONFIG,
String.join(",", allowedConfigs)));
+ for (String config : ALL_CONFIGS) {
+ if (!allowedConfigs.contains(config)) {
+ testInvalidOverride(Map.of(config, new Object()));
+ } else {
+ testValidOverride(Map.of(config, new Object()));
+ }
+ }
+ }
+}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorClientPolicyIntegrationTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorClientPolicyIntegrationTest.java
index c42402eea2e..36a14f75440 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorClientPolicyIntegrationTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorClientPolicyIntegrationTest.java
@@ -19,12 +19,14 @@ package org.apache.kafka.connect.integration;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.config.SaslConfigs;
+import
org.apache.kafka.connect.connector.policy.AllowlistConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
@@ -48,6 +50,13 @@ public class ConnectorClientPolicyIntegrationTest {
private static final int NUM_WORKERS = 1;
private static final String CONNECTOR_NAME = "simple-conn";
+ private Map<String, String> workerConfigs;
+
+ @BeforeEach
+ public void setup() {
+ workerConfigs = Map.of();
+ }
+
@Test
public void testCreateWithOverridesForNonePolicy() {
Map<String, String> props = basicConnectorConfig();
@@ -93,9 +102,38 @@ public class ConnectorClientPolicyIntegrationTest {
assertPassCreateConnector(null, props);
}
+ @Test
+ public void testCreateWithoutOverridesForAllowlistPolicy() throws
Exception {
+ // setup up props for the sink connector
+ Map<String, String> props = basicConnectorConfig();
+ assertPassCreateConnector("Allowlist", props);
+ }
+
+ @Test
+ public void testCreateWithNotAllowedOverridesForAllowlistPolicy() {
+ workerConfigs = Map.of(
+ AllowlistConnectorClientConfigOverridePolicy.ALLOWLIST_CONFIG,
CommonClientConfigs.CLIENT_RACK_CONFIG
+ );
+ // setup up props for the sink connector
+ Map<String, String> props = basicConnectorConfig();
+ props.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX +
CommonClientConfigs.CLIENT_ID_CONFIG, "test");
+ assertFailCreateConnector("Allowlist", props);
+ }
+
+ @Test
+ public void testCreateWithAllowedOverridesForAllowlistPolicy() throws
Exception {
+ workerConfigs = Map.of(
+ AllowlistConnectorClientConfigOverridePolicy.ALLOWLIST_CONFIG,
CommonClientConfigs.CLIENT_ID_CONFIG
+ );
+ // setup up props for the sink connector
+ Map<String, String> props = basicConnectorConfig();
+ props.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX +
CommonClientConfigs.CLIENT_ID_CONFIG, "test");
+ assertPassCreateConnector("Allowlist", props);
+ }
+
private EmbeddedConnectCluster connectClusterWithPolicy(String policy) {
// setup Connect worker properties
- Map<String, String> workerProps = new HashMap<>();
+ Map<String, String> workerProps = new HashMap<>(workerConfigs);
workerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG,
String.valueOf(5_000));
if (policy != null) {
workerProps.put(WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG,
policy);
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
index 956153db5c6..02caa21812f 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
@@ -32,6 +32,7 @@ import
org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBea
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.connector.Connector;
import
org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy;
+import
org.apache.kafka.connect.connector.policy.AllowlistConnectorClientConfigOverridePolicy;
import
org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
import
org.apache.kafka.connect.connector.policy.NoneConnectorClientConfigOverridePolicy;
import
org.apache.kafka.connect.connector.policy.PrincipalConnectorClientConfigOverridePolicy;
@@ -696,6 +697,7 @@ public class AbstractHerderTest {
return new PluginDesc(SampleTransformation.class, "1.0",
PluginType.TRANSFORMATION, classLoader);
}
+ @SuppressWarnings("removal")
@Test
public void testConfigValidationPrincipalOnlyOverride() {
final Class<? extends Connector> connectorClass =
SampleSourceConnector.class;
@@ -788,6 +790,50 @@ public class AbstractHerderTest {
verifyValidationIsolation();
}
+ @Test
+ public void testConfigValidationAllowlistOverride() {
+ final Class<? extends Connector> connectorClass =
SampleSourceConnector.class;
+ AllowlistConnectorClientConfigOverridePolicy policy = new
AllowlistConnectorClientConfigOverridePolicy();
+
policy.configure(Map.of(AllowlistConnectorClientConfigOverridePolicy.ALLOWLIST_CONFIG,
"acks"));
+ AbstractHerder herder = createConfigValidationHerder(connectorClass,
policy);
+
+ Map<String, String> config = new HashMap<>();
+ config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG,
connectorClass.getName());
+ config.put(ConnectorConfig.NAME_CONFIG, "connector-name");
+ config.put("required", "value"); // connector required config
+ String ackConfigKey = producerOverrideKey(ProducerConfig.ACKS_CONFIG);
+ String saslConfigKey =
producerOverrideKey(SaslConfigs.SASL_JAAS_CONFIG);
+ config.put(ackConfigKey, "none");
+ config.put(saslConfigKey, "jaas_config");
+
+ ConfigInfos result = herder.validateConnectorConfig(config, s -> null,
false);
+ assertEquals(ConnectorType.SOURCE, herder.connectorType(config));
+
+ // We expect there to be errors due to sasl.jaas.config not being
allowed. Note that these assertions depend heavily on
+ // the config fields for SourceConnectorConfig, but we expect these to
change rarely.
+ assertEquals(SampleSourceConnector.class.getName(), result.name());
+ // Each transform also gets its own group
+ List<String> expectedGroups = List.of(
+ ConnectorConfig.COMMON_GROUP,
+ ConnectorConfig.TRANSFORMS_GROUP,
+ ConnectorConfig.PREDICATES_GROUP,
+ ConnectorConfig.ERROR_GROUP,
+ SourceConnectorConfig.TOPIC_CREATION_GROUP,
+ SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_GROUP,
+ SourceConnectorConfig.OFFSETS_TOPIC_GROUP
+ );
+ assertEquals(expectedGroups, result.groups());
+ assertEquals(1, result.errorCount());
+ // Base connector config has 19 fields, connector's configs add 7, and
2 producer overrides
+ assertEquals(28, result.configs().size());
+ assertTrue(result.configs().stream().anyMatch(
+ configInfo ->
ackConfigKey.equals(configInfo.configValue().name()) &&
configInfo.configValue().errors().isEmpty()));
+ assertTrue(result.configs().stream().anyMatch(
+ configInfo ->
saslConfigKey.equals(configInfo.configValue().name()) &&
!configInfo.configValue().errors().isEmpty()));
+
+ verifyValidationIsolation();
+ }
+
static final class TestClientConfigOverridePolicy extends
AllConnectorClientConfigOverridePolicy implements Monitorable {
private static MetricName metricName = null;
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 80343474d70..c0d22cd0fbf 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -200,6 +200,13 @@
<code>KafkaStreams#close(org.apache.kafka.streams.CloseOptions)</code>.
For further details, please refer to <a
href="https://cwiki.apache.org/confluence/x/QAq9F">KIP-1153</a>.
</li>
+ <li>
+ A new implementation of
<code>ConnectorClientConfigOverridePolicy</code>,
<code>AllowlistConnectorClientConfigOverridePolicy</code>,
+ has been added. This enables specifying the configurations that
connectors can override via
<code>connector.client.config.override.allowlist</code>.
+ From Kafka 5.0.0, this will be the default <a
href="documentation/#connectconfigs_connector.client.config.override.policy">connector.client.config.override.policy</a>
+ policy. The <code>PrincipalConnectorClientConfigOverridePolicy</code>
policy is now deprecated and will be removed in Kafka 5.0.0.
+ For further details, please refer to <a
href="https://cwiki.apache.org/confluence/x/2IkvFg">KIP-1188</a>.
+ </li>
</ul>
<h4><a id="upgrade_4_1_0" href="#upgrade_4_1_0">Upgrading to 4.1.0</a></h4>