This is an automated email from the ASF dual-hosted git repository. showuon pushed a commit to branch 3.9 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 38db4c46ffd3a600ebfdeea07c0ba24a67d3405e Author: Luke Chen <[email protected]> AuthorDate: Wed Jul 31 01:07:09 2024 +0900 KAFKA-17205: Allow topic config validation in controller level in KRaft mode (#16693) Reviewers: Kamal Chandraprakash <[email protected]>, Christo Lolov <[email protected]> --- .../server/ControllerConfigurationValidator.scala | 9 ++-- core/src/main/scala/kafka/zk/AdminZkClient.scala | 4 +- .../kafka/admin/RemoteTopicCrudTest.scala | 20 +++++++++ .../kafka/api/BaseAdminIntegrationTest.scala | 2 +- .../test/scala/unit/kafka/log/LogConfigTest.scala | 43 ++++++++++++++----- .../ControllerConfigurationValidatorTest.scala | 49 +++++++++++++++------- .../controller/ConfigurationControlManager.java | 10 +++-- .../kafka/controller/ConfigurationValidator.java | 9 ++-- .../kafka/storage/internals/log/LogConfig.java | 38 +++++++++++------ 9 files changed, 132 insertions(+), 52 deletions(-) diff --git a/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala b/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala index b99065b573e..06a60e30076 100644 --- a/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala +++ b/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala @@ -89,14 +89,15 @@ class ControllerConfigurationValidator(kafkaConfig: KafkaConfig) extends Configu override def validate( resource: ConfigResource, - config: util.Map[String, String] + newConfigs: util.Map[String, String], + oldConfigs: util.Map[String, String] ): Unit = { resource.`type`() match { case TOPIC => validateTopicName(resource.name()) val properties = new Properties() val nullTopicConfigs = new mutable.ArrayBuffer[String]() - config.forEach((key, value) => { + newConfigs.forEach((key, value) => { if (value == null) { nullTopicConfigs += key } else { @@ -107,12 +108,12 @@ class ControllerConfigurationValidator(kafkaConfig: KafkaConfig) extends Configu throw new InvalidConfigurationException("Null value not supported for topic configs: " + nullTopicConfigs.mkString(",")) } - LogConfig.validate(properties, kafkaConfig.extractLogConfigMap, + LogConfig.validate(oldConfigs, properties, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) case BROKER => validateBrokerName(resource.name()) case CLIENT_METRICS => val properties = new Properties() - config.forEach((key, value) => properties.setProperty(key, value)) + newConfigs.forEach((key, value) => properties.setProperty(key, value)) ClientMetricsConfigs.validate(resource.name(), properties) case _ => throwExceptionForUnknownResourceType(resource) } diff --git a/core/src/main/scala/kafka/zk/AdminZkClient.scala b/core/src/main/scala/kafka/zk/AdminZkClient.scala index cd9153c07dc..8db20583e24 100644 --- a/core/src/main/scala/kafka/zk/AdminZkClient.scala +++ b/core/src/main/scala/kafka/zk/AdminZkClient.scala @@ -161,7 +161,7 @@ class AdminZkClient(zkClient: KafkaZkClient, partitionReplicaAssignment.keys.filter(_ >= 0).sum != sequenceSum) throw new InvalidReplicaAssignmentException("partitions should be a consecutive 0-based integer sequence") - LogConfig.validate(config, + LogConfig.validate(Collections.emptyMap(), config, kafkaConfig.map(_.extractLogConfigMap).getOrElse(Collections.emptyMap()), kafkaConfig.exists(_.remoteLogManagerConfig.isRemoteStorageSystemEnabled())) } @@ -479,7 +479,7 @@ class AdminZkClient(zkClient: KafkaZkClient, if (!zkClient.topicExists(topic)) throw new UnknownTopicOrPartitionException(s"Topic '$topic' does not exist.") // remove the topic overrides - LogConfig.validate(configs, + LogConfig.validate(Collections.emptyMap(), configs, kafkaConfig.map(_.extractLogConfigMap).getOrElse(Collections.emptyMap()), kafkaConfig.exists(_.remoteLogManagerConfig.isRemoteStorageSystemEnabled())) } diff --git a/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala b/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala index f995b86b704..c0e976bf1d4 100644 --- a/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala +++ b/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala @@ -291,6 +291,26 @@ class RemoteTopicCrudTest extends IntegrationTestHarness { () => admin.incrementalAlterConfigs(configs).all().get(), "Invalid local retention size") } + // The remote storage config validation on controller level only works in KRaft + @ParameterizedTest + @ValueSource(strings = Array("kraft")) + def testUpdateTopicConfigWithDisablingRemoteStorage(quorum: String): Unit = { + val admin = createAdminClient() + val topicConfig = new Properties + topicConfig.setProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") + TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, controllerServers, numPartitions, numReplicationFactor, + topicConfig = topicConfig) + + val configs = new util.HashMap[ConfigResource, util.Collection[AlterConfigOp]]() + configs.put(new ConfigResource(ConfigResource.Type.TOPIC, testTopicName), + util.Arrays.asList( + new AlterConfigOp(new ConfigEntry(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false"), + AlterConfigOp.OpType.SET), + )) + assertThrowsException(classOf[InvalidConfigurationException], + () => admin.incrementalAlterConfigs(configs).all().get(), "Disabling remote storage feature on the topic level is not supported.") + } + @ParameterizedTest @ValueSource(strings = Array("zk", "kraft")) def testTopicDeletion(quorum: String): Unit = { diff --git a/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala index 1a516336745..436673806a5 100644 --- a/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala @@ -32,7 +32,7 @@ import org.apache.kafka.coordinator.group.GroupCoordinatorConfig import org.apache.kafka.security.authorizer.AclEntry import org.apache.kafka.server.config.{ServerConfigs, ReplicationConfigs, ServerLogConfigs} import org.junit.jupiter.api.Assertions._ -import org.junit.jupiter.api.{AfterEach, BeforeEach , TestInfo, Timeout} +import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo, Timeout} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala index 2793da51515..4e7e4e23b38 100644 --- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala @@ -22,6 +22,7 @@ import kafka.utils.TestUtils import org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM import org.apache.kafka.common.config.ConfigDef.Type.INT import org.apache.kafka.common.config.{ConfigException, SslConfigs, TopicConfig} +import org.apache.kafka.common.errors.InvalidConfigurationException import org.apache.kafka.server.common.MetadataVersion import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Test @@ -299,7 +300,7 @@ class LogConfigTest { props.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, localRetentionMs.toString) props.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, localRetentionBytes.toString) assertThrows(classOf[ConfigException], - () => LogConfig.validate(props, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())) + () => LogConfig.validate(Collections.emptyMap(), props, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())) } @Test @@ -311,17 +312,17 @@ class LogConfigTest { val logProps = new Properties() logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE) logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") - LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) + LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT) assertThrows(classOf[ConfigException], - () => LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())) + () => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())) logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, "delete,compact") assertThrows(classOf[ConfigException], - () => LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())) + () => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())) logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, "compact,delete") assertThrows(classOf[ConfigException], - () => LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())) + () => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())) } @ParameterizedTest(name = "testEnableRemoteLogStorage with sysRemoteStorageEnabled: {0}") @@ -334,14 +335,34 @@ class LogConfigTest { val logProps = new Properties() logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") if (sysRemoteStorageEnabled) { - LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) + LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) } else { val message = assertThrows(classOf[ConfigException], - () => LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())) + () => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())) assertTrue(message.getMessage.contains("Tiered Storage functionality is disabled in the broker")) } } + @ParameterizedTest(name = "testDisableRemoteLogStorage with wasRemoteStorageEnabled: {0}") + @ValueSource(booleans = Array(true, false)) + def testDisableRemoteLogStorage(wasRemoteStorageEnabled: Boolean): Unit = { + val kafkaProps = TestUtils.createDummyBrokerConfig() + kafkaProps.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, "true") + val kafkaConfig = KafkaConfig.fromProps(kafkaProps) + + val logProps = new Properties() + logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false") + if (wasRemoteStorageEnabled) { + val message = assertThrows(classOf[InvalidConfigurationException], + () => LogConfig.validate(Collections.singletonMap(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"), + logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())) + assertTrue(message.getMessage.contains("Disabling remote storage feature on the topic level is not supported.")) + } else { + LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) + LogConfig.validate(Collections.singletonMap(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false"), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) + } + } + @ParameterizedTest(name = "testTopicCreationWithInvalidRetentionTime with sysRemoteStorageEnabled: {0}") @ValueSource(booleans = Array(true, false)) def testTopicCreationWithInvalidRetentionTime(sysRemoteStorageEnabled: Boolean): Unit = { @@ -357,10 +378,10 @@ class LogConfigTest { logProps.put(TopicConfig.RETENTION_MS_CONFIG, "500") if (sysRemoteStorageEnabled) { val message = assertThrows(classOf[ConfigException], - () => LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())) + () => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())) assertTrue(message.getMessage.contains(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG)) } else { - LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) + LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) } } @@ -379,10 +400,10 @@ class LogConfigTest { logProps.put(TopicConfig.RETENTION_BYTES_CONFIG, "128") if (sysRemoteStorageEnabled) { val message = assertThrows(classOf[ConfigException], - () => LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())) + () => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())) assertTrue(message.getMessage.contains(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG)) } else { - LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) + LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) } } diff --git a/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala b/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala index 00bb93811b2..250f07ca23e 100644 --- a/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala +++ b/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala @@ -20,11 +20,13 @@ package kafka.server import kafka.utils.TestUtils import org.apache.kafka.common.config.ConfigResource import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, BROKER_LOGGER, CLIENT_METRICS, TOPIC} -import org.apache.kafka.common.config.TopicConfig.{SEGMENT_BYTES_CONFIG, SEGMENT_JITTER_MS_CONFIG, SEGMENT_MS_CONFIG} +import org.apache.kafka.common.config.TopicConfig.{REMOTE_LOG_STORAGE_ENABLE_CONFIG, SEGMENT_BYTES_CONFIG, SEGMENT_JITTER_MS_CONFIG, SEGMENT_MS_CONFIG} import org.apache.kafka.common.errors.{InvalidConfigurationException, InvalidRequestException, InvalidTopicException} import org.apache.kafka.server.metrics.ClientMetricsConfigs import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows} import org.junit.jupiter.api.Test +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource import java.util import java.util.Collections.emptyMap @@ -37,7 +39,7 @@ class ControllerConfigurationValidatorTest { def testDefaultTopicResourceIsRejected(): Unit = { assertEquals("Default topic resources are not allowed.", assertThrows(classOf[InvalidRequestException], () => validator.validate( - new ConfigResource(TOPIC, ""), emptyMap())). getMessage) + new ConfigResource(TOPIC, ""), emptyMap(), emptyMap())). getMessage) } @Test @@ -45,14 +47,14 @@ class ControllerConfigurationValidatorTest { assertEquals("Topic name is invalid: '(<-invalid->)' contains " + "one or more characters other than ASCII alphanumerics, '.', '_' and '-'", assertThrows(classOf[InvalidTopicException], () => validator.validate( - new ConfigResource(TOPIC, "(<-invalid->)"), emptyMap())). getMessage) + new ConfigResource(TOPIC, "(<-invalid->)"), emptyMap(), emptyMap())). getMessage) } @Test def testUnknownResourceType(): Unit = { assertEquals("Unknown resource type BROKER_LOGGER", assertThrows(classOf[InvalidRequestException], () => validator.validate( - new ConfigResource(BROKER_LOGGER, "foo"), emptyMap())). getMessage) + new ConfigResource(BROKER_LOGGER, "foo"), emptyMap(), emptyMap())). getMessage) } @Test @@ -63,7 +65,7 @@ class ControllerConfigurationValidatorTest { config.put(SEGMENT_MS_CONFIG, null) assertEquals("Null value not supported for topic configs: segment.bytes,segment.ms", assertThrows(classOf[InvalidConfigurationException], () => validator.validate( - new ConfigResource(TOPIC, "foo"), config)). getMessage) + new ConfigResource(TOPIC, "foo"), config, emptyMap())). getMessage) } @Test @@ -71,7 +73,7 @@ class ControllerConfigurationValidatorTest { val config = new util.TreeMap[String, String]() config.put(SEGMENT_JITTER_MS_CONFIG, "1000") config.put(SEGMENT_BYTES_CONFIG, "67108864") - validator.validate(new ConfigResource(TOPIC, "foo"), config) + validator.validate(new ConfigResource(TOPIC, "foo"), config, emptyMap()) } @Test @@ -82,7 +84,24 @@ class ControllerConfigurationValidatorTest { config.put("foobar", "abc") assertEquals("Unknown topic config name: foobar", assertThrows(classOf[InvalidConfigurationException], () => validator.validate( - new ConfigResource(TOPIC, "foo"), config)). getMessage) + new ConfigResource(TOPIC, "foo"), config, emptyMap())). getMessage) + } + + @ParameterizedTest(name = "testDisablingRemoteStorageTopicConfig with wasRemoteStorageEnabled: {0}") + @ValueSource(booleans = Array(true, false)) + def testDisablingRemoteStorageTopicConfig(wasRemoteStorageEnabled: Boolean): Unit = { + val config = new util.TreeMap[String, String]() + config.put(REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false") + if (wasRemoteStorageEnabled) { + assertEquals("Disabling remote storage feature on the topic level is not supported.", + assertThrows(classOf[InvalidConfigurationException], () => validator.validate( + new ConfigResource(TOPIC, "foo"), config, util.Collections.singletonMap(REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"))).getMessage) + } else { + validator.validate( + new ConfigResource(TOPIC, "foo"), config, util.Collections.emptyMap()) + validator.validate( + new ConfigResource(TOPIC, "foo"), config, util.Collections.singletonMap(REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false")) + } } @Test @@ -91,7 +110,7 @@ class ControllerConfigurationValidatorTest { config.put(SEGMENT_JITTER_MS_CONFIG, "1000") assertEquals("Unable to parse broker name as a base 10 number.", assertThrows(classOf[InvalidRequestException], () => validator.validate( - new ConfigResource(BROKER, "blah"), config)). getMessage) + new ConfigResource(BROKER, "blah"), config, emptyMap())). getMessage) } @Test @@ -100,7 +119,7 @@ class ControllerConfigurationValidatorTest { config.put(SEGMENT_JITTER_MS_CONFIG, "1000") assertEquals("Invalid negative broker ID.", assertThrows(classOf[InvalidRequestException], () => validator.validate( - new ConfigResource(BROKER, "-1"), config)). getMessage) + new ConfigResource(BROKER, "-1"), config, emptyMap())). getMessage) } @Test @@ -111,7 +130,7 @@ class ControllerConfigurationValidatorTest { config.put(ClientMetricsConfigs.CLIENT_MATCH_PATTERN, "client_instance_id=b69cc35a-7a54-4790-aa69-cc2bd4ee4538,client_id=1" + ",client_software_name=apache-kafka-java,client_software_version=2.8.0-SNAPSHOT,client_source_address=127.0.0.1," + "client_source_port=1234") - validator.validate(new ConfigResource(CLIENT_METRICS, "subscription-1"), config) + validator.validate(new ConfigResource(CLIENT_METRICS, "subscription-1"), config, emptyMap()) } @Test @@ -119,7 +138,7 @@ class ControllerConfigurationValidatorTest { val config = new util.TreeMap[String, String]() assertEquals("Subscription name can't be empty", assertThrows(classOf[InvalidRequestException], () => validator.validate( - new ConfigResource(CLIENT_METRICS, ""), config)). getMessage) + new ConfigResource(CLIENT_METRICS, ""), config, emptyMap())). getMessage) } @Test @@ -128,12 +147,12 @@ class ControllerConfigurationValidatorTest { config.put(ClientMetricsConfigs.PUSH_INTERVAL_MS, "10") assertEquals("Invalid value 10 for interval.ms, interval must be between 100 and 3600000 (1 hour)", assertThrows(classOf[InvalidRequestException], () => validator.validate( - new ConfigResource(CLIENT_METRICS, "subscription-1"), config)). getMessage) + new ConfigResource(CLIENT_METRICS, "subscription-1"), config, emptyMap())). getMessage) config.put(ClientMetricsConfigs.PUSH_INTERVAL_MS, "3600001") assertEquals("Invalid value 3600001 for interval.ms, interval must be between 100 and 3600000 (1 hour)", assertThrows(classOf[InvalidRequestException], () => validator.validate( - new ConfigResource(CLIENT_METRICS, "subscription-1"), config)). getMessage) + new ConfigResource(CLIENT_METRICS, "subscription-1"), config, emptyMap())). getMessage) } @Test @@ -142,7 +161,7 @@ class ControllerConfigurationValidatorTest { config.put("random", "10") assertEquals("Unknown client metrics configuration: random", assertThrows(classOf[InvalidRequestException], () => validator.validate( - new ConfigResource(CLIENT_METRICS, "subscription-1"), config)). getMessage) + new ConfigResource(CLIENT_METRICS, "subscription-1"), config, emptyMap())). getMessage) } @Test @@ -151,6 +170,6 @@ class ControllerConfigurationValidatorTest { config.put(ClientMetricsConfigs.CLIENT_MATCH_PATTERN, "10") assertEquals("Illegal client matching pattern: 10", assertThrows(classOf[InvalidConfigurationException], () => validator.validate( - new ConfigResource(CLIENT_METRICS, "subscription-1"), config)). getMessage) + new ConfigResource(CLIENT_METRICS, "subscription-1"), config, emptyMap())). getMessage) } } diff --git a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java index 4ea7d9757d9..b10614cc2e8 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java @@ -269,9 +269,13 @@ public class ConfigurationControlManager { List<ApiMessageAndVersion> recordsImplicitlyDeleted, boolean newlyCreatedResource) { Map<String, String> allConfigs = new HashMap<>(); + Map<String, String> existingConfigsMap = new HashMap<>(); Map<String, String> alteredConfigsForAlterConfigPolicyCheck = new HashMap<>(); - TimelineHashMap<String, String> existingConfigs = configData.get(configResource); - if (existingConfigs != null) allConfigs.putAll(existingConfigs); + TimelineHashMap<String, String> existingConfigsSnapshot = configData.get(configResource); + if (existingConfigsSnapshot != null) { + allConfigs.putAll(existingConfigsSnapshot); + existingConfigsMap.putAll(existingConfigsSnapshot); + } for (ApiMessageAndVersion newRecord : recordsExplicitlyAltered) { ConfigRecord configRecord = (ConfigRecord) newRecord.message(); if (configRecord.value() == null) { @@ -288,7 +292,7 @@ public class ConfigurationControlManager { // in the list passed to the policy in order to maintain backwards compatibility } try { - validator.validate(configResource, allConfigs); + validator.validate(configResource, allConfigs, existingConfigsMap); if (!newlyCreatedResource) { existenceChecker.accept(configResource); } diff --git a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationValidator.java b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationValidator.java index 7e8f505f40b..c23c64d8c49 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationValidator.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationValidator.java @@ -28,7 +28,7 @@ public interface ConfigurationValidator { public void validate(ConfigResource resource) { } @Override - public void validate(ConfigResource resource, Map<String, String> config) { } + public void validate(ConfigResource resource, Map<String, String> newConfigs, Map<String, String> existingConfigs) { } }; /** @@ -41,8 +41,9 @@ public interface ConfigurationValidator { /** * Throws an ApiException if a configuration is invalid for the given resource. * - * @param resource The configuration resource. - * @param config The new configuration. + * @param resource The configuration resource. + * @param newConfigs The new configuration. + * @param existingConfigs The existing configuration. */ - void validate(ConfigResource resource, Map<String, String> config); + void validate(ConfigResource resource, Map<String, String> newConfigs, Map<String, String> existingConfigs); } diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java index 430486bdcad..9cf4a71e38e 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java @@ -608,19 +608,32 @@ public class LogConfig extends AbstractConfig { /** * Validates the values of the given properties. Should be called only by the broker. - * The `props` supplied contains the topic-level configs, + * The `newConfigs` supplied contains the topic-level configs, * The default values should be extracted from the KafkaConfig. - * @param props The properties to be validated + * @param existingConfigs The existing properties + * @param newConfigs The new properties to be validated + * @param isRemoteLogStorageSystemEnabled true if system wise remote log storage is enabled */ - private static void validateTopicLogConfigValues(Map<?, ?> props, + private static void validateTopicLogConfigValues(Map<String, String> existingConfigs, + Map<?, ?> newConfigs, boolean isRemoteLogStorageSystemEnabled) { - validateValues(props); - boolean isRemoteLogStorageEnabled = (Boolean) props.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG); + validateValues(newConfigs); + boolean isRemoteLogStorageEnabled = (Boolean) newConfigs.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG); if (isRemoteLogStorageEnabled) { - validateRemoteStorageOnlyIfSystemEnabled(props, isRemoteLogStorageSystemEnabled, false); - validateNoRemoteStorageForCompactedTopic(props); - validateRemoteStorageRetentionSize(props); - validateRemoteStorageRetentionTime(props); + validateRemoteStorageOnlyIfSystemEnabled(newConfigs, isRemoteLogStorageSystemEnabled, false); + validateNoRemoteStorageForCompactedTopic(newConfigs); + validateRemoteStorageRetentionSize(newConfigs); + validateRemoteStorageRetentionTime(newConfigs); + } else { + // The new config "remote.storage.enable" is false, validate if it's turning from true to false + validateNotTurningOffRemoteStorage(existingConfigs); + } + } + + public static void validateNotTurningOffRemoteStorage(Map<String, String> existingConfigs) { + boolean wasRemoteLogEnabledBeforeUpdate = Boolean.parseBoolean(existingConfigs.getOrDefault(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false")); + if (wasRemoteLogEnabledBeforeUpdate) { + throw new InvalidConfigurationException("Disabling remote storage feature on the topic level is not supported."); } } @@ -681,10 +694,11 @@ public class LogConfig extends AbstractConfig { * Check that the given properties contain only valid log config names and that all values can be parsed and are valid */ public static void validate(Properties props) { - validate(props, Collections.emptyMap(), false); + validate(Collections.emptyMap(), props, Collections.emptyMap(), false); } - public static void validate(Properties props, + public static void validate(Map<String, String> existingConfigs, + Properties props, Map<?, ?> configuredProps, boolean isRemoteLogStorageSystemEnabled) { validateNames(props); @@ -695,7 +709,7 @@ public class LogConfig extends AbstractConfig { Map<Object, Object> combinedConfigs = new HashMap<>(configuredProps); combinedConfigs.putAll(props); Map<?, ?> valueMaps = CONFIG.parse(combinedConfigs); - validateTopicLogConfigValues(valueMaps, isRemoteLogStorageSystemEnabled); + validateTopicLogConfigValues(existingConfigs, valueMaps, isRemoteLogStorageSystemEnabled); } }
