This is an automated email from the ASF dual-hosted git repository.
clolov pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1b11fef5bb9 KAFKA-17205: Allow topic config validation in controller
level in KRaft mode (#16693)
1b11fef5bb9 is described below
commit 1b11fef5bb9b753a2ef8916efdf3f66167e66976
Author: Luke Chen <[email protected]>
AuthorDate: Wed Jul 31 01:07:09 2024 +0900
KAFKA-17205: Allow topic config validation in controller level in KRaft
mode (#16693)
Reviewers: Kamal Chandraprakash <[email protected]>, Christo
Lolov <[email protected]>
---
.../server/ControllerConfigurationValidator.scala | 9 ++--
core/src/main/scala/kafka/zk/AdminZkClient.scala | 4 +-
.../kafka/admin/RemoteTopicCrudTest.scala | 20 +++++++++
.../kafka/api/BaseAdminIntegrationTest.scala | 2 +-
.../test/scala/unit/kafka/log/LogConfigTest.scala | 43 ++++++++++++++-----
.../ControllerConfigurationValidatorTest.scala | 49 +++++++++++++++-------
.../controller/ConfigurationControlManager.java | 10 +++--
.../kafka/controller/ConfigurationValidator.java | 9 ++--
.../kafka/storage/internals/log/LogConfig.java | 38 +++++++++++------
9 files changed, 132 insertions(+), 52 deletions(-)
diff --git
a/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala
b/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala
index b99065b573e..06a60e30076 100644
--- a/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala
+++ b/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala
@@ -89,14 +89,15 @@ class ControllerConfigurationValidator(kafkaConfig:
KafkaConfig) extends Configu
override def validate(
resource: ConfigResource,
- config: util.Map[String, String]
+ newConfigs: util.Map[String, String],
+ oldConfigs: util.Map[String, String]
): Unit = {
resource.`type`() match {
case TOPIC =>
validateTopicName(resource.name())
val properties = new Properties()
val nullTopicConfigs = new mutable.ArrayBuffer[String]()
- config.forEach((key, value) => {
+ newConfigs.forEach((key, value) => {
if (value == null) {
nullTopicConfigs += key
} else {
@@ -107,12 +108,12 @@ class ControllerConfigurationValidator(kafkaConfig:
KafkaConfig) extends Configu
throw new InvalidConfigurationException("Null value not supported
for topic configs: " +
nullTopicConfigs.mkString(","))
}
- LogConfig.validate(properties, kafkaConfig.extractLogConfigMap,
+ LogConfig.validate(oldConfigs, properties,
kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
case BROKER => validateBrokerName(resource.name())
case CLIENT_METRICS =>
val properties = new Properties()
- config.forEach((key, value) => properties.setProperty(key, value))
+ newConfigs.forEach((key, value) => properties.setProperty(key, value))
ClientMetricsConfigs.validate(resource.name(), properties)
case _ => throwExceptionForUnknownResourceType(resource)
}
diff --git a/core/src/main/scala/kafka/zk/AdminZkClient.scala
b/core/src/main/scala/kafka/zk/AdminZkClient.scala
index cd9153c07dc..8db20583e24 100644
--- a/core/src/main/scala/kafka/zk/AdminZkClient.scala
+++ b/core/src/main/scala/kafka/zk/AdminZkClient.scala
@@ -161,7 +161,7 @@ class AdminZkClient(zkClient: KafkaZkClient,
partitionReplicaAssignment.keys.filter(_ >= 0).sum != sequenceSum)
throw new InvalidReplicaAssignmentException("partitions should be a
consecutive 0-based integer sequence")
- LogConfig.validate(config,
+ LogConfig.validate(Collections.emptyMap(), config,
kafkaConfig.map(_.extractLogConfigMap).getOrElse(Collections.emptyMap()),
kafkaConfig.exists(_.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
}
@@ -479,7 +479,7 @@ class AdminZkClient(zkClient: KafkaZkClient,
if (!zkClient.topicExists(topic))
throw new UnknownTopicOrPartitionException(s"Topic '$topic' does not
exist.")
// remove the topic overrides
- LogConfig.validate(configs,
+ LogConfig.validate(Collections.emptyMap(), configs,
kafkaConfig.map(_.extractLogConfigMap).getOrElse(Collections.emptyMap()),
kafkaConfig.exists(_.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
}
diff --git
a/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
b/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
index e362832d638..462698734a0 100644
--- a/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
+++ b/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
@@ -291,6 +291,26 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
() => admin.incrementalAlterConfigs(configs).all().get(), "Invalid local
retention size")
}
+ // The remote storage config validation on controller level only works in
KRaft
+ @ParameterizedTest
+ @ValueSource(strings = Array("kraft"))
+ def testUpdateTopicConfigWithDisablingRemoteStorage(quorum: String): Unit = {
+ val admin = createAdminClient()
+ val topicConfig = new Properties
+ topicConfig.setProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG,
"true")
+ TestUtils.createTopicWithAdmin(admin, testTopicName, brokers,
controllerServers, numPartitions, numReplicationFactor,
+ topicConfig = topicConfig)
+
+ val configs = new util.HashMap[ConfigResource,
util.Collection[AlterConfigOp]]()
+ configs.put(new ConfigResource(ConfigResource.Type.TOPIC, testTopicName),
+ util.Arrays.asList(
+ new AlterConfigOp(new
ConfigEntry(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false"),
+ AlterConfigOp.OpType.SET),
+ ))
+ assertThrowsException(classOf[InvalidConfigurationException],
+ () => admin.incrementalAlterConfigs(configs).all().get(), "Disabling
remote storage feature on the topic level is not supported.")
+ }
+
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testTopicDeletion(quorum: String): Unit = {
diff --git
a/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala
index 1a516336745..436673806a5 100644
--- a/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala
@@ -32,7 +32,7 @@ import
org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.security.authorizer.AclEntry
import org.apache.kafka.server.config.{ServerConfigs, ReplicationConfigs,
ServerLogConfigs}
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach , TestInfo, Timeout}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo, Timeout}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
index 2793da51515..4e7e4e23b38 100644
--- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
@@ -22,6 +22,7 @@ import kafka.utils.TestUtils
import org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM
import org.apache.kafka.common.config.ConfigDef.Type.INT
import org.apache.kafka.common.config.{ConfigException, SslConfigs,
TopicConfig}
+import org.apache.kafka.common.errors.InvalidConfigurationException
import org.apache.kafka.server.common.MetadataVersion
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
@@ -299,7 +300,7 @@ class LogConfigTest {
props.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG,
localRetentionMs.toString)
props.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG,
localRetentionBytes.toString)
assertThrows(classOf[ConfigException],
- () => LogConfig.validate(props, kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
+ () => LogConfig.validate(Collections.emptyMap(), props,
kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
}
@Test
@@ -311,17 +312,17 @@ class LogConfigTest {
val logProps = new Properties()
logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG,
TopicConfig.CLEANUP_POLICY_DELETE)
logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
- LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
+ LogConfig.validate(Collections.emptyMap(), logProps,
kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG,
TopicConfig.CLEANUP_POLICY_COMPACT)
assertThrows(classOf[ConfigException],
- () => LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
+ () => LogConfig.validate(Collections.emptyMap(), logProps,
kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, "delete,compact")
assertThrows(classOf[ConfigException],
- () => LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
+ () => LogConfig.validate(Collections.emptyMap(), logProps,
kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, "compact,delete")
assertThrows(classOf[ConfigException],
- () => LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
+ () => LogConfig.validate(Collections.emptyMap(), logProps,
kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
}
@ParameterizedTest(name = "testEnableRemoteLogStorage with
sysRemoteStorageEnabled: {0}")
@@ -334,14 +335,34 @@ class LogConfigTest {
val logProps = new Properties()
logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
if (sysRemoteStorageEnabled) {
- LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
+ LogConfig.validate(Collections.emptyMap(), logProps,
kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
} else {
val message = assertThrows(classOf[ConfigException],
- () => LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
+ () => LogConfig.validate(Collections.emptyMap(), logProps,
kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
assertTrue(message.getMessage.contains("Tiered Storage functionality is
disabled in the broker"))
}
}
+ @ParameterizedTest(name = "testDisableRemoteLogStorage with
wasRemoteStorageEnabled: {0}")
+ @ValueSource(booleans = Array(true, false))
+ def testDisableRemoteLogStorage(wasRemoteStorageEnabled: Boolean): Unit = {
+ val kafkaProps = TestUtils.createDummyBrokerConfig()
+
kafkaProps.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP,
"true")
+ val kafkaConfig = KafkaConfig.fromProps(kafkaProps)
+
+ val logProps = new Properties()
+ logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false")
+ if (wasRemoteStorageEnabled) {
+ val message = assertThrows(classOf[InvalidConfigurationException],
+ () =>
LogConfig.validate(Collections.singletonMap(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG,
"true"),
+ logProps, kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
+ assertTrue(message.getMessage.contains("Disabling remote storage feature
on the topic level is not supported."))
+ } else {
+ LogConfig.validate(Collections.emptyMap(), logProps,
kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
+
LogConfig.validate(Collections.singletonMap(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG,
"false"), logProps, kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
+ }
+ }
+
@ParameterizedTest(name = "testTopicCreationWithInvalidRetentionTime with
sysRemoteStorageEnabled: {0}")
@ValueSource(booleans = Array(true, false))
def testTopicCreationWithInvalidRetentionTime(sysRemoteStorageEnabled:
Boolean): Unit = {
@@ -357,10 +378,10 @@ class LogConfigTest {
logProps.put(TopicConfig.RETENTION_MS_CONFIG, "500")
if (sysRemoteStorageEnabled) {
val message = assertThrows(classOf[ConfigException],
- () => LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
+ () => LogConfig.validate(Collections.emptyMap(), logProps,
kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
assertTrue(message.getMessage.contains(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG))
} else {
- LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
+ LogConfig.validate(Collections.emptyMap(), logProps,
kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
}
}
@@ -379,10 +400,10 @@ class LogConfigTest {
logProps.put(TopicConfig.RETENTION_BYTES_CONFIG, "128")
if (sysRemoteStorageEnabled) {
val message = assertThrows(classOf[ConfigException],
- () => LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
+ () => LogConfig.validate(Collections.emptyMap(), logProps,
kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
assertTrue(message.getMessage.contains(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG))
} else {
- LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
+ LogConfig.validate(Collections.emptyMap(), logProps,
kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
}
}
diff --git
a/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala
b/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala
index 00bb93811b2..250f07ca23e 100644
---
a/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala
+++
b/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala
@@ -20,11 +20,13 @@ package kafka.server
import kafka.utils.TestUtils
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.config.ConfigResource.Type.{BROKER,
BROKER_LOGGER, CLIENT_METRICS, TOPIC}
-import org.apache.kafka.common.config.TopicConfig.{SEGMENT_BYTES_CONFIG,
SEGMENT_JITTER_MS_CONFIG, SEGMENT_MS_CONFIG}
+import
org.apache.kafka.common.config.TopicConfig.{REMOTE_LOG_STORAGE_ENABLE_CONFIG,
SEGMENT_BYTES_CONFIG, SEGMENT_JITTER_MS_CONFIG, SEGMENT_MS_CONFIG}
import org.apache.kafka.common.errors.{InvalidConfigurationException,
InvalidRequestException, InvalidTopicException}
import org.apache.kafka.server.metrics.ClientMetricsConfigs
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows}
import org.junit.jupiter.api.Test
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
import java.util
import java.util.Collections.emptyMap
@@ -37,7 +39,7 @@ class ControllerConfigurationValidatorTest {
def testDefaultTopicResourceIsRejected(): Unit = {
assertEquals("Default topic resources are not allowed.",
assertThrows(classOf[InvalidRequestException], () =>
validator.validate(
- new ConfigResource(TOPIC, ""), emptyMap())). getMessage)
+ new ConfigResource(TOPIC, ""), emptyMap(), emptyMap())). getMessage)
}
@Test
@@ -45,14 +47,14 @@ class ControllerConfigurationValidatorTest {
assertEquals("Topic name is invalid: '(<-invalid->)' contains " +
"one or more characters other than ASCII alphanumerics, '.', '_' and
'-'",
assertThrows(classOf[InvalidTopicException], () => validator.validate(
- new ConfigResource(TOPIC, "(<-invalid->)"), emptyMap())). getMessage)
+ new ConfigResource(TOPIC, "(<-invalid->)"), emptyMap(),
emptyMap())). getMessage)
}
@Test
def testUnknownResourceType(): Unit = {
assertEquals("Unknown resource type BROKER_LOGGER",
assertThrows(classOf[InvalidRequestException], () => validator.validate(
- new ConfigResource(BROKER_LOGGER, "foo"), emptyMap())). getMessage)
+ new ConfigResource(BROKER_LOGGER, "foo"), emptyMap(), emptyMap())).
getMessage)
}
@Test
@@ -63,7 +65,7 @@ class ControllerConfigurationValidatorTest {
config.put(SEGMENT_MS_CONFIG, null)
assertEquals("Null value not supported for topic configs:
segment.bytes,segment.ms",
assertThrows(classOf[InvalidConfigurationException], () =>
validator.validate(
- new ConfigResource(TOPIC, "foo"), config)). getMessage)
+ new ConfigResource(TOPIC, "foo"), config, emptyMap())). getMessage)
}
@Test
@@ -71,7 +73,7 @@ class ControllerConfigurationValidatorTest {
val config = new util.TreeMap[String, String]()
config.put(SEGMENT_JITTER_MS_CONFIG, "1000")
config.put(SEGMENT_BYTES_CONFIG, "67108864")
- validator.validate(new ConfigResource(TOPIC, "foo"), config)
+ validator.validate(new ConfigResource(TOPIC, "foo"), config, emptyMap())
}
@Test
@@ -82,7 +84,24 @@ class ControllerConfigurationValidatorTest {
config.put("foobar", "abc")
assertEquals("Unknown topic config name: foobar",
assertThrows(classOf[InvalidConfigurationException], () =>
validator.validate(
- new ConfigResource(TOPIC, "foo"), config)). getMessage)
+ new ConfigResource(TOPIC, "foo"), config, emptyMap())). getMessage)
+ }
+
+ @ParameterizedTest(name = "testDisablingRemoteStorageTopicConfig with
wasRemoteStorageEnabled: {0}")
+ @ValueSource(booleans = Array(true, false))
+ def testDisablingRemoteStorageTopicConfig(wasRemoteStorageEnabled: Boolean):
Unit = {
+ val config = new util.TreeMap[String, String]()
+ config.put(REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false")
+ if (wasRemoteStorageEnabled) {
+ assertEquals("Disabling remote storage feature on the topic level is not
supported.",
+ assertThrows(classOf[InvalidConfigurationException], () =>
validator.validate(
+ new ConfigResource(TOPIC, "foo"), config,
util.Collections.singletonMap(REMOTE_LOG_STORAGE_ENABLE_CONFIG,
"true"))).getMessage)
+ } else {
+ validator.validate(
+ new ConfigResource(TOPIC, "foo"), config, util.Collections.emptyMap())
+ validator.validate(
+ new ConfigResource(TOPIC, "foo"), config,
util.Collections.singletonMap(REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false"))
+ }
}
@Test
@@ -91,7 +110,7 @@ class ControllerConfigurationValidatorTest {
config.put(SEGMENT_JITTER_MS_CONFIG, "1000")
assertEquals("Unable to parse broker name as a base 10 number.",
assertThrows(classOf[InvalidRequestException], () => validator.validate(
- new ConfigResource(BROKER, "blah"), config)). getMessage)
+ new ConfigResource(BROKER, "blah"), config, emptyMap())). getMessage)
}
@Test
@@ -100,7 +119,7 @@ class ControllerConfigurationValidatorTest {
config.put(SEGMENT_JITTER_MS_CONFIG, "1000")
assertEquals("Invalid negative broker ID.",
assertThrows(classOf[InvalidRequestException], () => validator.validate(
- new ConfigResource(BROKER, "-1"), config)). getMessage)
+ new ConfigResource(BROKER, "-1"), config, emptyMap())). getMessage)
}
@Test
@@ -111,7 +130,7 @@ class ControllerConfigurationValidatorTest {
config.put(ClientMetricsConfigs.CLIENT_MATCH_PATTERN,
"client_instance_id=b69cc35a-7a54-4790-aa69-cc2bd4ee4538,client_id=1" +
",client_software_name=apache-kafka-java,client_software_version=2.8.0-SNAPSHOT,client_source_address=127.0.0.1,"
+
"client_source_port=1234")
- validator.validate(new ConfigResource(CLIENT_METRICS, "subscription-1"),
config)
+ validator.validate(new ConfigResource(CLIENT_METRICS, "subscription-1"),
config, emptyMap())
}
@Test
@@ -119,7 +138,7 @@ class ControllerConfigurationValidatorTest {
val config = new util.TreeMap[String, String]()
assertEquals("Subscription name can't be empty",
assertThrows(classOf[InvalidRequestException], () => validator.validate(
- new ConfigResource(CLIENT_METRICS, ""), config)). getMessage)
+ new ConfigResource(CLIENT_METRICS, ""), config, emptyMap())).
getMessage)
}
@Test
@@ -128,12 +147,12 @@ class ControllerConfigurationValidatorTest {
config.put(ClientMetricsConfigs.PUSH_INTERVAL_MS, "10")
assertEquals("Invalid value 10 for interval.ms, interval must be between
100 and 3600000 (1 hour)",
assertThrows(classOf[InvalidRequestException], () => validator.validate(
- new ConfigResource(CLIENT_METRICS, "subscription-1"), config)).
getMessage)
+ new ConfigResource(CLIENT_METRICS, "subscription-1"), config,
emptyMap())). getMessage)
config.put(ClientMetricsConfigs.PUSH_INTERVAL_MS, "3600001")
assertEquals("Invalid value 3600001 for interval.ms, interval must be
between 100 and 3600000 (1 hour)",
assertThrows(classOf[InvalidRequestException], () => validator.validate(
- new ConfigResource(CLIENT_METRICS, "subscription-1"), config)).
getMessage)
+ new ConfigResource(CLIENT_METRICS, "subscription-1"), config,
emptyMap())). getMessage)
}
@Test
@@ -142,7 +161,7 @@ class ControllerConfigurationValidatorTest {
config.put("random", "10")
assertEquals("Unknown client metrics configuration: random",
assertThrows(classOf[InvalidRequestException], () => validator.validate(
- new ConfigResource(CLIENT_METRICS, "subscription-1"), config)).
getMessage)
+ new ConfigResource(CLIENT_METRICS, "subscription-1"), config,
emptyMap())). getMessage)
}
@Test
@@ -151,6 +170,6 @@ class ControllerConfigurationValidatorTest {
config.put(ClientMetricsConfigs.CLIENT_MATCH_PATTERN, "10")
assertEquals("Illegal client matching pattern: 10",
assertThrows(classOf[InvalidConfigurationException], () =>
validator.validate(
- new ConfigResource(CLIENT_METRICS, "subscription-1"), config)).
getMessage)
+ new ConfigResource(CLIENT_METRICS, "subscription-1"), config,
emptyMap())). getMessage)
}
}
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
index 4ea7d9757d9..b10614cc2e8 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
@@ -269,9 +269,13 @@ public class ConfigurationControlManager {
List<ApiMessageAndVersion>
recordsImplicitlyDeleted,
boolean newlyCreatedResource) {
Map<String, String> allConfigs = new HashMap<>();
+ Map<String, String> existingConfigsMap = new HashMap<>();
Map<String, String> alteredConfigsForAlterConfigPolicyCheck = new
HashMap<>();
- TimelineHashMap<String, String> existingConfigs =
configData.get(configResource);
- if (existingConfigs != null) allConfigs.putAll(existingConfigs);
+ TimelineHashMap<String, String> existingConfigsSnapshot =
configData.get(configResource);
+ if (existingConfigsSnapshot != null) {
+ allConfigs.putAll(existingConfigsSnapshot);
+ existingConfigsMap.putAll(existingConfigsSnapshot);
+ }
for (ApiMessageAndVersion newRecord : recordsExplicitlyAltered) {
ConfigRecord configRecord = (ConfigRecord) newRecord.message();
if (configRecord.value() == null) {
@@ -288,7 +292,7 @@ public class ConfigurationControlManager {
// in the list passed to the policy in order to maintain backwards
compatibility
}
try {
- validator.validate(configResource, allConfigs);
+ validator.validate(configResource, allConfigs, existingConfigsMap);
if (!newlyCreatedResource) {
existenceChecker.accept(configResource);
}
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationValidator.java
b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationValidator.java
index 7e8f505f40b..c23c64d8c49 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationValidator.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationValidator.java
@@ -28,7 +28,7 @@ public interface ConfigurationValidator {
public void validate(ConfigResource resource) { }
@Override
- public void validate(ConfigResource resource, Map<String, String>
config) { }
+ public void validate(ConfigResource resource, Map<String, String>
newConfigs, Map<String, String> existingConfigs) { }
};
/**
@@ -41,8 +41,9 @@ public interface ConfigurationValidator {
/**
* Throws an ApiException if a configuration is invalid for the given
resource.
*
- * @param resource The configuration resource.
- * @param config The new configuration.
+ * @param resource The configuration resource.
+ * @param newConfigs The new configuration.
+ * @param existingConfigs The existing configuration.
*/
- void validate(ConfigResource resource, Map<String, String> config);
+ void validate(ConfigResource resource, Map<String, String> newConfigs,
Map<String, String> existingConfigs);
}
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
index 430486bdcad..9cf4a71e38e 100644
---
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
+++
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
@@ -608,19 +608,32 @@ public class LogConfig extends AbstractConfig {
/**
* Validates the values of the given properties. Should be called only by
the broker.
- * The `props` supplied contains the topic-level configs,
+ * The `newConfigs` supplied contains the topic-level configs,
* The default values should be extracted from the KafkaConfig.
- * @param props The properties to be validated
+ * @param existingConfigs The existing properties
+ * @param newConfigs The new properties to be
validated
+ * @param isRemoteLogStorageSystemEnabled true if system wise remote log
storage is enabled
*/
- private static void validateTopicLogConfigValues(Map<?, ?> props,
+ private static void validateTopicLogConfigValues(Map<String, String>
existingConfigs,
+ Map<?, ?> newConfigs,
boolean
isRemoteLogStorageSystemEnabled) {
- validateValues(props);
- boolean isRemoteLogStorageEnabled = (Boolean)
props.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG);
+ validateValues(newConfigs);
+ boolean isRemoteLogStorageEnabled = (Boolean)
newConfigs.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG);
if (isRemoteLogStorageEnabled) {
- validateRemoteStorageOnlyIfSystemEnabled(props,
isRemoteLogStorageSystemEnabled, false);
- validateNoRemoteStorageForCompactedTopic(props);
- validateRemoteStorageRetentionSize(props);
- validateRemoteStorageRetentionTime(props);
+ validateRemoteStorageOnlyIfSystemEnabled(newConfigs,
isRemoteLogStorageSystemEnabled, false);
+ validateNoRemoteStorageForCompactedTopic(newConfigs);
+ validateRemoteStorageRetentionSize(newConfigs);
+ validateRemoteStorageRetentionTime(newConfigs);
+ } else {
+ // The new config "remote.storage.enable" is false, validate if
it's turning from true to false
+ validateNotTurningOffRemoteStorage(existingConfigs);
+ }
+ }
+
+ public static void validateNotTurningOffRemoteStorage(Map<String, String>
existingConfigs) {
+ boolean wasRemoteLogEnabledBeforeUpdate =
Boolean.parseBoolean(existingConfigs.getOrDefault(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG,
"false"));
+ if (wasRemoteLogEnabledBeforeUpdate) {
+ throw new InvalidConfigurationException("Disabling remote storage
feature on the topic level is not supported.");
}
}
@@ -681,10 +694,11 @@ public class LogConfig extends AbstractConfig {
* Check that the given properties contain only valid log config names and
that all values can be parsed and are valid
*/
public static void validate(Properties props) {
- validate(props, Collections.emptyMap(), false);
+ validate(Collections.emptyMap(), props, Collections.emptyMap(), false);
}
- public static void validate(Properties props,
+ public static void validate(Map<String, String> existingConfigs,
+ Properties props,
Map<?, ?> configuredProps,
boolean isRemoteLogStorageSystemEnabled) {
validateNames(props);
@@ -695,7 +709,7 @@ public class LogConfig extends AbstractConfig {
Map<Object, Object> combinedConfigs = new
HashMap<>(configuredProps);
combinedConfigs.putAll(props);
Map<?, ?> valueMaps = CONFIG.parse(combinedConfigs);
- validateTopicLogConfigValues(valueMaps,
isRemoteLogStorageSystemEnabled);
+ validateTopicLogConfigValues(existingConfigs, valueMaps,
isRemoteLogStorageSystemEnabled);
}
}