This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.1 by this push:
new f068a79 KAFKA-13490: Fix createTopics and incrementalAlterConfigs for
KRaft mode #11416
f068a79 is described below
commit f068a7919e7943c9c3d920d8b20956b9f21608fb
Author: Colin P. Mccabe <[email protected]>
AuthorDate: Mon Oct 18 13:11:53 2021 -0700
KAFKA-13490: Fix createTopics and incrementalAlterConfigs for KRaft mode
#11416
For CreateTopics, fix a bug where if one createTopics in a batch failed,
they would all fail with
the same error code. Make the error message for TOPIC_ALREADY_EXISTS
consistent with the ZK-based
code by including the topic name.
For IncrementalAlterConfigs, before we allow topic configurations to be
set, we should check that
they are valid. (This also applies to newly created topics.)
IncrementalAlterConfigs should ignore
non-null payloads for DELETE operations. Previously we would return an
error in these cases.
However, this is not compatible with the old ZK-based code, which ignores
the payload in these
cases.
Reviewers: José Armando García Sancio <[email protected]>, Jason Gustafson
<[email protected]>
---
.../server/ControllerConfigurationValidator.scala | 57 +++++++++++++++++
.../main/scala/kafka/server/ControllerServer.scala | 1 +
.../ControllerConfigurationValidatorTest.scala | 71 ++++++++++++++++++++++
.../controller/ConfigurationControlManager.java | 36 +++++------
.../kafka/controller/ConfigurationValidator.java | 35 +++++++++++
.../apache/kafka/controller/QuorumController.java | 13 +++-
.../controller/ReplicationControlManager.java | 16 ++---
.../ConfigurationControlManagerTest.java | 40 ++++++++----
.../controller/ReplicationControlManagerTest.java | 5 +-
9 files changed, 232 insertions(+), 42 deletions(-)
diff --git
a/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala
b/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala
new file mode 100644
index 0000000..dfb78b2
--- /dev/null
+++ b/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util
+import java.util.Properties
+
+import kafka.log.LogConfig
+import org.apache.kafka.common.config.ConfigResource
+import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, TOPIC}
+import org.apache.kafka.controller.ConfigurationValidator
+import org.apache.kafka.common.errors.InvalidRequestException
+
+import scala.collection.mutable
+
+class ControllerConfigurationValidator extends ConfigurationValidator {
+ override def validate(resource: ConfigResource, config: util.Map[String,
String]): Unit = {
+ resource.`type`() match {
+ case TOPIC =>
+ val properties = new Properties()
+ val nullTopicConfigs = new mutable.ArrayBuffer[String]()
+ config.entrySet().forEach(e => {
+ if (e.getValue() == null) {
+ nullTopicConfigs += e.getKey()
+ } else {
+ properties.setProperty(e.getKey(), e.getValue())
+ }
+ })
+ if (nullTopicConfigs.nonEmpty) {
+ throw new InvalidRequestException("Null value not supported for
topic configs : " +
+ nullTopicConfigs.mkString(","))
+ }
+ LogConfig.validate(properties)
+ case BROKER =>
+ // TODO: add broker configuration validation
+ case _ =>
+ // Note: we should never handle BROKER_LOGGER resources here, since
changes to
+ // those resources are not persisted in the metadata.
+ throw new InvalidRequestException(s"Unknown resource type
${resource.`type`}")
+ }
+ }
+}
\ No newline at end of file
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala
b/core/src/main/scala/kafka/server/ControllerServer.scala
index 137d727..ede71d4 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -173,6 +173,7 @@ class ControllerServer(
setMetrics(new
QuorumControllerMetrics(KafkaYammerMetrics.defaultRegistry())).
setCreateTopicPolicy(createTopicPolicy.asJava).
setAlterConfigPolicy(alterConfigPolicy.asJava).
+ setConfigurationValidator(new ControllerConfigurationValidator()).
build()
quotaManagers = QuotaFactory.instantiate(config, metrics, time,
threadNamePrefix.getOrElse(""))
diff --git
a/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala
b/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala
new file mode 100644
index 0000000..3c85299
--- /dev/null
+++
b/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.server
+
+import java.util.TreeMap
+import java.util.Collections.emptyMap
+
+import org.junit.jupiter.api.Test
+import org.apache.kafka.common.config.ConfigResource
+import org.apache.kafka.common.config.ConfigResource.Type.{BROKER_LOGGER,
TOPIC}
+import org.apache.kafka.common.config.TopicConfig.{SEGMENT_BYTES_CONFIG,
SEGMENT_JITTER_MS_CONFIG, SEGMENT_MS_CONFIG}
+import org.apache.kafka.common.errors.{InvalidConfigurationException,
InvalidRequestException}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows}
+
+class ControllerConfigurationValidatorTest {
+ @Test
+ def testUnknownResourceType(): Unit = {
+ val validator = new ControllerConfigurationValidator()
+ assertEquals("Unknown resource type BROKER_LOGGER",
+ assertThrows(classOf[InvalidRequestException], () => validator.validate(
+ new ConfigResource(BROKER_LOGGER, "foo"), emptyMap())). getMessage())
+ }
+
+ @Test
+ def testNullTopicConfigValue(): Unit = {
+ val validator = new ControllerConfigurationValidator()
+ val config = new TreeMap[String, String]()
+ config.put(SEGMENT_JITTER_MS_CONFIG, "10")
+ config.put(SEGMENT_BYTES_CONFIG, null)
+ config.put(SEGMENT_MS_CONFIG, null)
+ assertEquals("Null value not supported for topic configs :
segment.bytes,segment.ms",
+ assertThrows(classOf[InvalidRequestException], () => validator.validate(
+ new ConfigResource(TOPIC, "foo"), config)). getMessage())
+ }
+
+ @Test
+ def testValidTopicConfig(): Unit = {
+ val validator = new ControllerConfigurationValidator()
+ val config = new TreeMap[String, String]()
+ config.put(SEGMENT_JITTER_MS_CONFIG, "1000")
+ config.put(SEGMENT_BYTES_CONFIG, "67108864")
+ validator.validate(new ConfigResource(TOPIC, "foo"), config)
+ }
+
+ @Test
+ def testInvalidTopicConfig(): Unit = {
+ val validator = new ControllerConfigurationValidator()
+ val config = new TreeMap[String, String]()
+ config.put(SEGMENT_JITTER_MS_CONFIG, "1000")
+ config.put(SEGMENT_BYTES_CONFIG, "67108864")
+ config.put("foobar", "abc")
+ assertEquals("Unknown topic config name: foobar",
+ assertThrows(classOf[InvalidConfigurationException], () =>
validator.validate(
+ new ConfigResource(TOPIC, "foo"), config)). getMessage())
+ }
+}
\ No newline at end of file
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
index a305719..83f1cbf 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
@@ -20,9 +20,9 @@ package org.apache.kafka.controller;
import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
import org.apache.kafka.common.config.ConfigDef.ConfigKey;
import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.ConfigResource.Type;
import org.apache.kafka.common.config.ConfigResource;
-import org.apache.kafka.common.errors.PolicyViolationException;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.protocol.Errors;
@@ -49,6 +49,7 @@ import java.util.Optional;
import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.APPEND;
import static
org.apache.kafka.common.metadata.MetadataRecordType.CONFIG_RECORD;
+import static org.apache.kafka.common.protocol.Errors.INVALID_CONFIG;
public class ConfigurationControlManager {
@@ -56,17 +57,20 @@ public class ConfigurationControlManager {
private final SnapshotRegistry snapshotRegistry;
private final Map<ConfigResource.Type, ConfigDef> configDefs;
private final Optional<AlterConfigPolicy> alterConfigPolicy;
+ private final ConfigurationValidator validator;
private final TimelineHashMap<ConfigResource, TimelineHashMap<String,
String>> configData;
ConfigurationControlManager(LogContext logContext,
SnapshotRegistry snapshotRegistry,
Map<ConfigResource.Type, ConfigDef> configDefs,
- Optional<AlterConfigPolicy> alterConfigPolicy)
{
+ Optional<AlterConfigPolicy> alterConfigPolicy,
+ ConfigurationValidator validator) {
this.log = logContext.logger(ConfigurationControlManager.class);
this.snapshotRegistry = snapshotRegistry;
this.configDefs = configDefs;
this.configData = new TimelineHashMap<>(snapshotRegistry, 0);
this.alterConfigPolicy = alterConfigPolicy;
+ this.validator = validator;
}
/**
@@ -122,19 +126,13 @@ public class ConfigurationControlManager {
newValue = opValue;
break;
case DELETE:
- if (opValue != null) {
- outputResults.put(configResource, new ApiError(
- Errors.INVALID_REQUEST, "A DELETE op was given
with a " +
- "non-null value."));
- return;
- }
newValue = null;
break;
case APPEND:
case SUBTRACT:
if (!isSplittable(configResource.type(), key)) {
outputResults.put(configResource, new ApiError(
- Errors.INVALID_CONFIG, "Can't " + opType + " to " +
+ INVALID_CONFIG, "Can't " + opType + " to " +
"key " + key + " because its type is not LIST."));
return;
}
@@ -157,7 +155,7 @@ public class ConfigurationControlManager {
setValue(newValue),
CONFIG_RECORD.highestSupportedVersion()));
}
}
- error = checkAlterConfigPolicy(configResource, newRecords);
+ error = validateAlterConfig(configResource, newRecords);
if (error.isFailure()) {
outputResults.put(configResource, error);
return;
@@ -166,9 +164,8 @@ public class ConfigurationControlManager {
outputResults.put(configResource, ApiError.NONE);
}
- private ApiError checkAlterConfigPolicy(ConfigResource configResource,
- List<ApiMessageAndVersion>
newRecords) {
- if (!alterConfigPolicy.isPresent()) return ApiError.NONE;
+ private ApiError validateAlterConfig(ConfigResource configResource,
+ List<ApiMessageAndVersion>
newRecords) {
Map<String, String> newConfigs = new HashMap<>();
TimelineHashMap<String, String> existingConfigs =
configData.get(configResource);
if (existingConfigs != null) newConfigs.putAll(existingConfigs);
@@ -181,9 +178,14 @@ public class ConfigurationControlManager {
}
}
try {
- alterConfigPolicy.get().validate(new
RequestMetadata(configResource, newConfigs));
- } catch (PolicyViolationException e) {
- return new ApiError(Errors.POLICY_VIOLATION, e.getMessage());
+ validator.validate(configResource, newConfigs);
+ if (alterConfigPolicy.isPresent()) {
+ alterConfigPolicy.get().validate(new
RequestMetadata(configResource, newConfigs));
+ }
+ } catch (ConfigException e) {
+ return new ApiError(INVALID_CONFIG, e.getMessage());
+ } catch (Throwable e) {
+ return ApiError.fromThrowable(e);
}
return ApiError.NONE;
}
@@ -246,7 +248,7 @@ public class ConfigurationControlManager {
setValue(null), CONFIG_RECORD.highestSupportedVersion()));
}
}
- error = checkAlterConfigPolicy(configResource, newRecords);
+ error = validateAlterConfig(configResource, newRecords);
if (error.isFailure()) {
outputResults.put(configResource, error);
return;
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationValidator.java
b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationValidator.java
new file mode 100644
index 0000000..b14580a
--- /dev/null
+++
b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationValidator.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.config.ConfigResource;
+
+import java.util.Map;
+
+
+public interface ConfigurationValidator {
+ ConfigurationValidator NO_OP = (__, ___) -> { };
+
+ /**
+ * Throws an ApiException if a configuration is invalid for the given
resource.
+ *
+ * @param resource The configuration resource.
+ * @param config The new configuration.
+ */
+ void validate(ConfigResource resource, Map<String, String> config);
+}
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 1a1c5d0..16b3ab3 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -140,6 +140,7 @@ public final class QuorumController implements Controller {
private ControllerMetrics controllerMetrics = null;
private Optional<CreateTopicPolicy> createTopicPolicy =
Optional.empty();
private Optional<AlterConfigPolicy> alterConfigPolicy =
Optional.empty();
+ private ConfigurationValidator configurationValidator =
ConfigurationValidator.NO_OP;
public Builder(int nodeId) {
this.nodeId = nodeId;
@@ -215,6 +216,11 @@ public final class QuorumController implements Controller {
return this;
}
+ public Builder setConfigurationValidator(ConfigurationValidator
configurationValidator) {
+ this.configurationValidator = configurationValidator;
+ return this;
+ }
+
@SuppressWarnings("unchecked")
public QuorumController build() throws Exception {
if (raftClient == null) {
@@ -237,7 +243,7 @@ public final class QuorumController implements Controller {
raftClient, supportedFeatures, defaultReplicationFactor,
defaultNumPartitions, replicaPlacer,
snapshotMaxNewRecordBytes,
sessionTimeoutNs, controllerMetrics, createTopicPolicy,
- alterConfigPolicy);
+ alterConfigPolicy, configurationValidator);
} catch (Exception e) {
Utils.closeQuietly(queue, "event queue");
throw e;
@@ -1114,7 +1120,8 @@ public final class QuorumController implements Controller
{
long sessionTimeoutNs,
ControllerMetrics controllerMetrics,
Optional<CreateTopicPolicy> createTopicPolicy,
- Optional<AlterConfigPolicy> alterConfigPolicy) {
+ Optional<AlterConfigPolicy> alterConfigPolicy,
+ ConfigurationValidator configurationValidator) {
this.logContext = logContext;
this.log = logContext.logger(QuorumController.class);
this.nodeId = nodeId;
@@ -1124,7 +1131,7 @@ public final class QuorumController implements Controller
{
this.snapshotRegistry = new SnapshotRegistry(logContext);
this.purgatory = new ControllerPurgatory();
this.configurationControl = new ConfigurationControlManager(logContext,
- snapshotRegistry, configDefs, alterConfigPolicy);
+ snapshotRegistry, configDefs, alterConfigPolicy,
configurationValidator);
this.clientQuotaControlManager = new
ClientQuotaControlManager(snapshotRegistry);
this.clusterControl = new ClusterControlManager(logContext, time,
snapshotRegistry, sessionTimeoutNs, replicaPlacer,
controllerMetrics);
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index 349a6b6..5462dea 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -369,7 +369,8 @@ public class ReplicationControlManager {
// Identify topics that already exist and mark them with the
appropriate error
request.topics().stream().filter(creatableTopic ->
topicsByName.containsKey(creatableTopic.name()))
- .forEach(t -> topicErrors.put(t.name(), new
ApiError(Errors.TOPIC_ALREADY_EXISTS)));
+ .forEach(t -> topicErrors.put(t.name(), new
ApiError(Errors.TOPIC_ALREADY_EXISTS,
+ "Topic '" + t.name() + "' already exists.")));
// Verify that the configurations for the new topics are OK, and
figure out what
// ConfigRecords should be created.
@@ -388,7 +389,12 @@ public class ReplicationControlManager {
Map<String, CreatableTopicResult> successes = new HashMap<>();
for (CreatableTopic topic : request.topics()) {
if (topicErrors.containsKey(topic.name())) continue;
- ApiError error = createTopic(topic, records, successes);
+ ApiError error;
+ try {
+ error = createTopic(topic, records, successes);
+ } catch (ApiException e) {
+ error = ApiError.fromThrowable(e);
+ }
if (error.isFailure()) {
topicErrors.put(topic.name(), error);
}
@@ -472,11 +478,7 @@ public class ReplicationControlManager {
if (error.isFailure()) return error;
} else if (topic.replicationFactor() < -1 || topic.replicationFactor()
== 0) {
return new ApiError(Errors.INVALID_REPLICATION_FACTOR,
- "Replication factor was set to an invalid non-positive
value.");
- } else if (!topic.assignments().isEmpty()) {
- return new ApiError(INVALID_REQUEST,
- "Replication factor was not set to -1 but a manual partition "
+
- "assignment was specified.");
+ "Replication factor must be larger than 0, or -1 to use the
default value.");
} else if (topic.numPartitions() < -1 || topic.numPartitions() == 0) {
return new ApiError(Errors.INVALID_PARTITIONS,
"Number of partitions was set to an invalid non-positive
value.");
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
index b2be6eb..f84b12e 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
@@ -47,6 +47,7 @@ import static java.util.Arrays.asList;
import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.APPEND;
import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.DELETE;
import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET;
+import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SUBTRACT;
import static org.apache.kafka.common.config.ConfigResource.Type.BROKER;
import static org.apache.kafka.common.config.ConfigResource.Type.BROKER_LOGGER;
import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
@@ -93,7 +94,7 @@ public class ConfigurationControlManagerTest {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new
LogContext());
ConfigurationControlManager manager =
new ConfigurationControlManager(new LogContext(),
snapshotRegistry, CONFIGS,
- Optional.empty());
+ Optional.empty(), ConfigurationValidator.NO_OP);
assertEquals(Collections.emptyMap(), manager.getConfigs(BROKER0));
manager.replay(new ConfigRecord().
setResourceType(BROKER.id()).setResourceName("0").
@@ -152,17 +153,29 @@ public class ConfigurationControlManagerTest {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new
LogContext());
ConfigurationControlManager manager =
new ConfigurationControlManager(new LogContext(),
snapshotRegistry, CONFIGS,
- Optional.empty());
+ Optional.empty(), ConfigurationValidator.NO_OP);
+
+ ControllerResult<Map<ConfigResource, ApiError>> result = manager.
+ incrementalAlterConfigs(toMap(entry(BROKER0, toMap(
+ entry("baz", entry(SUBTRACT, "abc")),
+ entry("quux", entry(SET, "abc")))),
+ entry(MYTOPIC, toMap(entry("abc", entry(APPEND, "123"))))));
+
assertEquals(ControllerResult.atomicOf(Collections.singletonList(new
ApiMessageAndVersion(
new
ConfigRecord().setResourceType(TOPIC.id()).setResourceName("mytopic").
setName("abc").setValue("123"), (short) 0)),
- toMap(entry(BROKER0, new ApiError(Errors.INVALID_REQUEST,
- "A DELETE op was given with a non-null value.")),
- entry(MYTOPIC, ApiError.NONE))),
- manager.incrementalAlterConfigs(toMap(entry(BROKER0, toMap(
- entry("foo.bar", entry(DELETE, "abc")),
- entry("quux", entry(SET, "abc")))),
- entry(MYTOPIC, toMap(entry("abc", entry(APPEND, "123")))))));
+ toMap(entry(BROKER0, new ApiError(Errors.INVALID_CONFIG,
+ "Can't SUBTRACT to key baz because its type is not
LIST.")),
+ entry(MYTOPIC, ApiError.NONE))), result);
+
+ RecordTestUtils.replayAll(manager, result.records());
+
+ assertEquals(ControllerResult.atomicOf(Collections.singletonList(new
ApiMessageAndVersion(
+ new
ConfigRecord().setResourceType(TOPIC.id()).setResourceName("mytopic").
+ setName("abc").setValue(null), (short) 0)),
+ toMap(entry(MYTOPIC, ApiError.NONE))),
+ manager.incrementalAlterConfigs(toMap(entry(MYTOPIC, toMap(
+ entry("abc", entry(DELETE, "xyz")))))));
}
private static class MockAlterConfigsPolicy implements AlterConfigPolicy {
@@ -206,7 +219,8 @@ public class ConfigurationControlManagerTest {
new RequestMetadata(BROKER0, toMap(entry("foo.bar", "123"),
entry("quux", "456")))));
ConfigurationControlManager manager = new ConfigurationControlManager(
- new LogContext(), snapshotRegistry, CONFIGS, Optional.of(policy));
+ new LogContext(), snapshotRegistry, CONFIGS, Optional.of(policy),
+ ConfigurationValidator.NO_OP);
assertEquals(ControllerResult.atomicOf(asList(new ApiMessageAndVersion(
new
ConfigRecord().setResourceType(BROKER.id()).setResourceName("0").
@@ -231,7 +245,7 @@ public class ConfigurationControlManagerTest {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new
LogContext());
ConfigurationControlManager manager =
new ConfigurationControlManager(new LogContext(),
snapshotRegistry, CONFIGS,
- Optional.empty());
+ Optional.empty(), ConfigurationValidator.NO_OP);
assertTrue(manager.isSplittable(BROKER, "foo.bar"));
assertFalse(manager.isSplittable(BROKER, "baz"));
assertFalse(manager.isSplittable(BROKER, "foo.baz.quux"));
@@ -244,7 +258,7 @@ public class ConfigurationControlManagerTest {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new
LogContext());
ConfigurationControlManager manager =
new ConfigurationControlManager(new LogContext(),
snapshotRegistry, CONFIGS,
- Optional.empty());
+ Optional.empty(), ConfigurationValidator.NO_OP);
assertEquals("1", manager.getConfigValueDefault(BROKER, "foo.bar"));
assertEquals(null, manager.getConfigValueDefault(BROKER,
"foo.baz.quux"));
assertEquals(null, manager.getConfigValueDefault(TOPIC, "abc"));
@@ -256,7 +270,7 @@ public class ConfigurationControlManagerTest {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new
LogContext());
ConfigurationControlManager manager =
new ConfigurationControlManager(new LogContext(),
snapshotRegistry, CONFIGS,
- Optional.empty());
+ Optional.empty(), ConfigurationValidator.NO_OP);
List<ApiMessageAndVersion> expectedRecords1 = asList(
new ApiMessageAndVersion(new ConfigRecord().
setResourceType(TOPIC.id()).setResourceName("mytopic").
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index 6543073..94fbe7c 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -137,7 +137,8 @@ public class ReplicationControlManagerTest {
logContext, time, snapshotRegistry,
TimeUnit.MILLISECONDS.convert(BROKER_SESSION_TIMEOUT_MS, TimeUnit.NANOSECONDS),
new StripedReplicaPlacer(random), metrics);
final ConfigurationControlManager configurationControl = new
ConfigurationControlManager(
- new LogContext(), snapshotRegistry, Collections.emptyMap(),
Optional.empty());
+ new LogContext(), snapshotRegistry, Collections.emptyMap(),
Optional.empty(),
+ (__, ___) -> { });
final ReplicationControlManager replicationControl;
void replay(List<ApiMessageAndVersion> records) throws Exception {
@@ -415,7 +416,7 @@ public class ReplicationControlManagerTest {
CreateTopicsResponseData expectedResponse3 = new
CreateTopicsResponseData();
expectedResponse3.topics().add(new
CreatableTopicResult().setName("foo").
setErrorCode(Errors.TOPIC_ALREADY_EXISTS.code()).
-
setErrorMessage(Errors.TOPIC_ALREADY_EXISTS.exception().getMessage()));
+ setErrorMessage("Topic 'foo' already exists."));
assertEquals(expectedResponse3, result3.response());
Uuid fooId = result2.response().topics().find("foo").topicId();
RecordTestUtils.assertBatchIteratorContains(asList(