This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 38db4c46ffd3a600ebfdeea07c0ba24a67d3405e
Author: Luke Chen <[email protected]>
AuthorDate: Wed Jul 31 01:07:09 2024 +0900

    KAFKA-17205: Allow topic config validation in controller level in KRaft 
mode (#16693)
    
    Reviewers: Kamal Chandraprakash <[email protected]>, Christo 
Lolov <[email protected]>
---
 .../server/ControllerConfigurationValidator.scala  |  9 ++--
 core/src/main/scala/kafka/zk/AdminZkClient.scala   |  4 +-
 .../kafka/admin/RemoteTopicCrudTest.scala          | 20 +++++++++
 .../kafka/api/BaseAdminIntegrationTest.scala       |  2 +-
 .../test/scala/unit/kafka/log/LogConfigTest.scala  | 43 ++++++++++++++-----
 .../ControllerConfigurationValidatorTest.scala     | 49 +++++++++++++++-------
 .../controller/ConfigurationControlManager.java    | 10 +++--
 .../kafka/controller/ConfigurationValidator.java   |  9 ++--
 .../kafka/storage/internals/log/LogConfig.java     | 38 +++++++++++------
 9 files changed, 132 insertions(+), 52 deletions(-)

diff --git 
a/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala 
b/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala
index b99065b573e..06a60e30076 100644
--- a/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala
+++ b/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala
@@ -89,14 +89,15 @@ class ControllerConfigurationValidator(kafkaConfig: 
KafkaConfig) extends Configu
 
   override def validate(
     resource: ConfigResource,
-    config: util.Map[String, String]
+    newConfigs: util.Map[String, String],
+    oldConfigs: util.Map[String, String]
   ): Unit = {
     resource.`type`() match {
       case TOPIC =>
         validateTopicName(resource.name())
         val properties = new Properties()
         val nullTopicConfigs = new mutable.ArrayBuffer[String]()
-        config.forEach((key, value) => {
+        newConfigs.forEach((key, value) => {
           if (value == null) {
             nullTopicConfigs += key
           } else {
@@ -107,12 +108,12 @@ class ControllerConfigurationValidator(kafkaConfig: 
KafkaConfig) extends Configu
           throw new InvalidConfigurationException("Null value not supported 
for topic configs: " +
             nullTopicConfigs.mkString(","))
         }
-        LogConfig.validate(properties, kafkaConfig.extractLogConfigMap,
+        LogConfig.validate(oldConfigs, properties, 
kafkaConfig.extractLogConfigMap,
           kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
       case BROKER => validateBrokerName(resource.name())
       case CLIENT_METRICS =>
         val properties = new Properties()
-        config.forEach((key, value) => properties.setProperty(key, value))
+        newConfigs.forEach((key, value) => properties.setProperty(key, value))
         ClientMetricsConfigs.validate(resource.name(), properties)
       case _ => throwExceptionForUnknownResourceType(resource)
     }
diff --git a/core/src/main/scala/kafka/zk/AdminZkClient.scala 
b/core/src/main/scala/kafka/zk/AdminZkClient.scala
index cd9153c07dc..8db20583e24 100644
--- a/core/src/main/scala/kafka/zk/AdminZkClient.scala
+++ b/core/src/main/scala/kafka/zk/AdminZkClient.scala
@@ -161,7 +161,7 @@ class AdminZkClient(zkClient: KafkaZkClient,
         partitionReplicaAssignment.keys.filter(_ >= 0).sum != sequenceSum)
         throw new InvalidReplicaAssignmentException("partitions should be a 
consecutive 0-based integer sequence")
 
-    LogConfig.validate(config,
+    LogConfig.validate(Collections.emptyMap(), config,
       kafkaConfig.map(_.extractLogConfigMap).getOrElse(Collections.emptyMap()),
       
kafkaConfig.exists(_.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
   }
@@ -479,7 +479,7 @@ class AdminZkClient(zkClient: KafkaZkClient,
     if (!zkClient.topicExists(topic))
       throw new UnknownTopicOrPartitionException(s"Topic '$topic' does not 
exist.")
     // remove the topic overrides
-    LogConfig.validate(configs,
+    LogConfig.validate(Collections.emptyMap(), configs,
       kafkaConfig.map(_.extractLogConfigMap).getOrElse(Collections.emptyMap()),
       
kafkaConfig.exists(_.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
   }
diff --git 
a/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala 
b/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
index f995b86b704..c0e976bf1d4 100644
--- a/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
+++ b/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
@@ -291,6 +291,26 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
       () => admin.incrementalAlterConfigs(configs).all().get(), "Invalid local 
retention size")
   }
 
+  // The remote storage config validation on controller level only works in 
KRaft
+  @ParameterizedTest
+  @ValueSource(strings = Array("kraft"))
+  def testUpdateTopicConfigWithDisablingRemoteStorage(quorum: String): Unit = {
+    val admin = createAdminClient()
+    val topicConfig = new Properties
+    topicConfig.setProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, 
"true")
+    TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, 
controllerServers, numPartitions, numReplicationFactor,
+      topicConfig = topicConfig)
+
+    val configs = new util.HashMap[ConfigResource, 
util.Collection[AlterConfigOp]]()
+    configs.put(new ConfigResource(ConfigResource.Type.TOPIC, testTopicName),
+      util.Arrays.asList(
+        new AlterConfigOp(new 
ConfigEntry(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false"),
+          AlterConfigOp.OpType.SET),
+      ))
+    assertThrowsException(classOf[InvalidConfigurationException],
+      () => admin.incrementalAlterConfigs(configs).all().get(), "Disabling 
remote storage feature on the topic level is not supported.")
+  }
+
   @ParameterizedTest
   @ValueSource(strings = Array("zk", "kraft"))
   def testTopicDeletion(quorum: String): Unit = {
diff --git 
a/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala
index 1a516336745..436673806a5 100644
--- a/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala
@@ -32,7 +32,7 @@ import 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig
 import org.apache.kafka.security.authorizer.AclEntry
 import org.apache.kafka.server.config.{ServerConfigs, ReplicationConfigs, 
ServerLogConfigs}
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach , TestInfo, Timeout}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo, Timeout}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
 
diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala 
b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
index 2793da51515..4e7e4e23b38 100644
--- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
@@ -22,6 +22,7 @@ import kafka.utils.TestUtils
 import org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM
 import org.apache.kafka.common.config.ConfigDef.Type.INT
 import org.apache.kafka.common.config.{ConfigException, SslConfigs, 
TopicConfig}
+import org.apache.kafka.common.errors.InvalidConfigurationException
 import org.apache.kafka.server.common.MetadataVersion
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Test
@@ -299,7 +300,7 @@ class LogConfigTest {
     props.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, 
localRetentionMs.toString)
     props.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, 
localRetentionBytes.toString)
     assertThrows(classOf[ConfigException],
-      () => LogConfig.validate(props, kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
+      () => LogConfig.validate(Collections.emptyMap(), props, 
kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
   }
 
   @Test
@@ -311,17 +312,17 @@ class LogConfigTest {
     val logProps = new Properties()
     logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, 
TopicConfig.CLEANUP_POLICY_DELETE)
     logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
-    LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
+    LogConfig.validate(Collections.emptyMap(), logProps, 
kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
 
     logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, 
TopicConfig.CLEANUP_POLICY_COMPACT)
     assertThrows(classOf[ConfigException],
-      () => LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
+      () => LogConfig.validate(Collections.emptyMap(), logProps, 
kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
     logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, "delete,compact")
     assertThrows(classOf[ConfigException],
-      () => LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
+      () => LogConfig.validate(Collections.emptyMap(), logProps, 
kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
     logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, "compact,delete")
     assertThrows(classOf[ConfigException],
-      () => LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
+      () => LogConfig.validate(Collections.emptyMap(), logProps, 
kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
   }
 
   @ParameterizedTest(name = "testEnableRemoteLogStorage with 
sysRemoteStorageEnabled: {0}")
@@ -334,14 +335,34 @@ class LogConfigTest {
     val logProps = new Properties()
     logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
     if (sysRemoteStorageEnabled) {
-      LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
+      LogConfig.validate(Collections.emptyMap(), logProps, 
kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
     } else {
       val message = assertThrows(classOf[ConfigException],
-        () => LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
+        () => LogConfig.validate(Collections.emptyMap(), logProps, 
kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
       assertTrue(message.getMessage.contains("Tiered Storage functionality is 
disabled in the broker"))
     }
   }
 
+  @ParameterizedTest(name = "testDisableRemoteLogStorage with 
wasRemoteStorageEnabled: {0}")
+  @ValueSource(booleans = Array(true, false))
+  def testDisableRemoteLogStorage(wasRemoteStorageEnabled: Boolean): Unit = {
+    val kafkaProps = TestUtils.createDummyBrokerConfig()
+    
kafkaProps.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, 
"true")
+    val kafkaConfig = KafkaConfig.fromProps(kafkaProps)
+
+    val logProps = new Properties()
+    logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false")
+    if (wasRemoteStorageEnabled) {
+      val message = assertThrows(classOf[InvalidConfigurationException],
+        () => 
LogConfig.validate(Collections.singletonMap(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG,
 "true"),
+          logProps, kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
+      assertTrue(message.getMessage.contains("Disabling remote storage feature 
on the topic level is not supported."))
+    } else {
+      LogConfig.validate(Collections.emptyMap(), logProps, 
kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
+      
LogConfig.validate(Collections.singletonMap(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG,
 "false"), logProps, kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
+    }
+  }
+
   @ParameterizedTest(name = "testTopicCreationWithInvalidRetentionTime with 
sysRemoteStorageEnabled: {0}")
   @ValueSource(booleans = Array(true, false))
   def testTopicCreationWithInvalidRetentionTime(sysRemoteStorageEnabled: 
Boolean): Unit = {
@@ -357,10 +378,10 @@ class LogConfigTest {
     logProps.put(TopicConfig.RETENTION_MS_CONFIG, "500")
     if (sysRemoteStorageEnabled) {
       val message = assertThrows(classOf[ConfigException],
-        () => LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
+        () => LogConfig.validate(Collections.emptyMap(), logProps, 
kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
       
assertTrue(message.getMessage.contains(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG))
     } else {
-      LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
+      LogConfig.validate(Collections.emptyMap(), logProps, 
kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
     }
   }
 
@@ -379,10 +400,10 @@ class LogConfigTest {
     logProps.put(TopicConfig.RETENTION_BYTES_CONFIG, "128")
     if (sysRemoteStorageEnabled) {
       val message = assertThrows(classOf[ConfigException],
-        () => LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
+        () => LogConfig.validate(Collections.emptyMap(), logProps, 
kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
       
assertTrue(message.getMessage.contains(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG))
     } else {
-      LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
+      LogConfig.validate(Collections.emptyMap(), logProps, 
kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
     }
   }
 
diff --git 
a/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala
 
b/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala
index 00bb93811b2..250f07ca23e 100644
--- 
a/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala
@@ -20,11 +20,13 @@ package kafka.server
 import kafka.utils.TestUtils
 import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, 
BROKER_LOGGER, CLIENT_METRICS, TOPIC}
-import org.apache.kafka.common.config.TopicConfig.{SEGMENT_BYTES_CONFIG, 
SEGMENT_JITTER_MS_CONFIG, SEGMENT_MS_CONFIG}
+import 
org.apache.kafka.common.config.TopicConfig.{REMOTE_LOG_STORAGE_ENABLE_CONFIG, 
SEGMENT_BYTES_CONFIG, SEGMENT_JITTER_MS_CONFIG, SEGMENT_MS_CONFIG}
 import org.apache.kafka.common.errors.{InvalidConfigurationException, 
InvalidRequestException, InvalidTopicException}
 import org.apache.kafka.server.metrics.ClientMetricsConfigs
 import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows}
 import org.junit.jupiter.api.Test
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
 
 import java.util
 import java.util.Collections.emptyMap
@@ -37,7 +39,7 @@ class ControllerConfigurationValidatorTest {
   def testDefaultTopicResourceIsRejected(): Unit = {
     assertEquals("Default topic resources are not allowed.",
         assertThrows(classOf[InvalidRequestException], () => 
validator.validate(
-        new ConfigResource(TOPIC, ""), emptyMap())). getMessage)
+        new ConfigResource(TOPIC, ""), emptyMap(), emptyMap())). getMessage)
   }
 
   @Test
@@ -45,14 +47,14 @@ class ControllerConfigurationValidatorTest {
     assertEquals("Topic name is invalid: '(<-invalid->)' contains " +
       "one or more characters other than ASCII alphanumerics, '.', '_' and 
'-'",
         assertThrows(classOf[InvalidTopicException], () => validator.validate(
-          new ConfigResource(TOPIC, "(<-invalid->)"), emptyMap())). getMessage)
+          new ConfigResource(TOPIC, "(<-invalid->)"), emptyMap(), 
emptyMap())). getMessage)
   }
 
   @Test
   def testUnknownResourceType(): Unit = {
     assertEquals("Unknown resource type BROKER_LOGGER",
       assertThrows(classOf[InvalidRequestException], () => validator.validate(
-        new ConfigResource(BROKER_LOGGER, "foo"), emptyMap())). getMessage)
+        new ConfigResource(BROKER_LOGGER, "foo"), emptyMap(), emptyMap())). 
getMessage)
   }
 
   @Test
@@ -63,7 +65,7 @@ class ControllerConfigurationValidatorTest {
     config.put(SEGMENT_MS_CONFIG, null)
     assertEquals("Null value not supported for topic configs: 
segment.bytes,segment.ms",
       assertThrows(classOf[InvalidConfigurationException], () => 
validator.validate(
-        new ConfigResource(TOPIC, "foo"), config)). getMessage)
+        new ConfigResource(TOPIC, "foo"), config, emptyMap())). getMessage)
   }
 
   @Test
@@ -71,7 +73,7 @@ class ControllerConfigurationValidatorTest {
     val config = new util.TreeMap[String, String]()
     config.put(SEGMENT_JITTER_MS_CONFIG, "1000")
     config.put(SEGMENT_BYTES_CONFIG, "67108864")
-    validator.validate(new ConfigResource(TOPIC, "foo"), config)
+    validator.validate(new ConfigResource(TOPIC, "foo"), config, emptyMap())
   }
 
   @Test
@@ -82,7 +84,24 @@ class ControllerConfigurationValidatorTest {
     config.put("foobar", "abc")
     assertEquals("Unknown topic config name: foobar",
       assertThrows(classOf[InvalidConfigurationException], () => 
validator.validate(
-        new ConfigResource(TOPIC, "foo"), config)). getMessage)
+        new ConfigResource(TOPIC, "foo"), config, emptyMap())). getMessage)
+  }
+
+  @ParameterizedTest(name = "testDisablingRemoteStorageTopicConfig with 
wasRemoteStorageEnabled: {0}")
+  @ValueSource(booleans = Array(true, false))
+  def testDisablingRemoteStorageTopicConfig(wasRemoteStorageEnabled: Boolean): 
Unit = {
+    val config = new util.TreeMap[String, String]()
+    config.put(REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false")
+    if (wasRemoteStorageEnabled) {
+      assertEquals("Disabling remote storage feature on the topic level is not 
supported.",
+        assertThrows(classOf[InvalidConfigurationException], () => 
validator.validate(
+          new ConfigResource(TOPIC, "foo"), config, 
util.Collections.singletonMap(REMOTE_LOG_STORAGE_ENABLE_CONFIG, 
"true"))).getMessage)
+    } else {
+      validator.validate(
+        new ConfigResource(TOPIC, "foo"), config, util.Collections.emptyMap())
+      validator.validate(
+        new ConfigResource(TOPIC, "foo"), config, 
util.Collections.singletonMap(REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false"))
+    }
   }
 
   @Test
@@ -91,7 +110,7 @@ class ControllerConfigurationValidatorTest {
     config.put(SEGMENT_JITTER_MS_CONFIG, "1000")
     assertEquals("Unable to parse broker name as a base 10 number.",
       assertThrows(classOf[InvalidRequestException], () => validator.validate(
-        new ConfigResource(BROKER, "blah"), config)). getMessage)
+        new ConfigResource(BROKER, "blah"), config, emptyMap())). getMessage)
   }
 
   @Test
@@ -100,7 +119,7 @@ class ControllerConfigurationValidatorTest {
     config.put(SEGMENT_JITTER_MS_CONFIG, "1000")
     assertEquals("Invalid negative broker ID.",
       assertThrows(classOf[InvalidRequestException], () => validator.validate(
-        new ConfigResource(BROKER, "-1"), config)). getMessage)
+        new ConfigResource(BROKER, "-1"), config, emptyMap())). getMessage)
   }
 
   @Test
@@ -111,7 +130,7 @@ class ControllerConfigurationValidatorTest {
     config.put(ClientMetricsConfigs.CLIENT_MATCH_PATTERN, 
"client_instance_id=b69cc35a-7a54-4790-aa69-cc2bd4ee4538,client_id=1" +
       
",client_software_name=apache-kafka-java,client_software_version=2.8.0-SNAPSHOT,client_source_address=127.0.0.1,"
 +
       "client_source_port=1234")
-    validator.validate(new ConfigResource(CLIENT_METRICS, "subscription-1"), 
config)
+    validator.validate(new ConfigResource(CLIENT_METRICS, "subscription-1"), 
config, emptyMap())
   }
 
   @Test
@@ -119,7 +138,7 @@ class ControllerConfigurationValidatorTest {
     val config = new util.TreeMap[String, String]()
     assertEquals("Subscription name can't be empty",
       assertThrows(classOf[InvalidRequestException], () => validator.validate(
-        new ConfigResource(CLIENT_METRICS, ""), config)). getMessage)
+        new ConfigResource(CLIENT_METRICS, ""), config, emptyMap())). 
getMessage)
   }
 
   @Test
@@ -128,12 +147,12 @@ class ControllerConfigurationValidatorTest {
     config.put(ClientMetricsConfigs.PUSH_INTERVAL_MS, "10")
     assertEquals("Invalid value 10 for interval.ms, interval must be between 
100 and 3600000 (1 hour)",
       assertThrows(classOf[InvalidRequestException], () => validator.validate(
-        new ConfigResource(CLIENT_METRICS, "subscription-1"), config)). 
getMessage)
+        new ConfigResource(CLIENT_METRICS, "subscription-1"), config, 
emptyMap())). getMessage)
 
     config.put(ClientMetricsConfigs.PUSH_INTERVAL_MS, "3600001")
     assertEquals("Invalid value 3600001 for interval.ms, interval must be 
between 100 and 3600000 (1 hour)",
       assertThrows(classOf[InvalidRequestException], () => validator.validate(
-        new ConfigResource(CLIENT_METRICS, "subscription-1"), config)). 
getMessage)
+        new ConfigResource(CLIENT_METRICS, "subscription-1"), config, 
emptyMap())). getMessage)
   }
 
   @Test
@@ -142,7 +161,7 @@ class ControllerConfigurationValidatorTest {
     config.put("random", "10")
     assertEquals("Unknown client metrics configuration: random",
       assertThrows(classOf[InvalidRequestException], () => validator.validate(
-        new ConfigResource(CLIENT_METRICS, "subscription-1"), config)). 
getMessage)
+        new ConfigResource(CLIENT_METRICS, "subscription-1"), config, 
emptyMap())). getMessage)
   }
 
   @Test
@@ -151,6 +170,6 @@ class ControllerConfigurationValidatorTest {
     config.put(ClientMetricsConfigs.CLIENT_MATCH_PATTERN, "10")
     assertEquals("Illegal client matching pattern: 10",
       assertThrows(classOf[InvalidConfigurationException], () => 
validator.validate(
-        new ConfigResource(CLIENT_METRICS, "subscription-1"), config)). 
getMessage)
+        new ConfigResource(CLIENT_METRICS, "subscription-1"), config, 
emptyMap())). getMessage)
   }
 }
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
 
b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
index 4ea7d9757d9..b10614cc2e8 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
@@ -269,9 +269,13 @@ public class ConfigurationControlManager {
                                          List<ApiMessageAndVersion> 
recordsImplicitlyDeleted,
                                          boolean newlyCreatedResource) {
         Map<String, String> allConfigs = new HashMap<>();
+        Map<String, String> existingConfigsMap = new HashMap<>();
         Map<String, String> alteredConfigsForAlterConfigPolicyCheck = new 
HashMap<>();
-        TimelineHashMap<String, String> existingConfigs = 
configData.get(configResource);
-        if (existingConfigs != null) allConfigs.putAll(existingConfigs);
+        TimelineHashMap<String, String> existingConfigsSnapshot = 
configData.get(configResource);
+        if (existingConfigsSnapshot != null) {
+            allConfigs.putAll(existingConfigsSnapshot);
+            existingConfigsMap.putAll(existingConfigsSnapshot);
+        }
         for (ApiMessageAndVersion newRecord : recordsExplicitlyAltered) {
             ConfigRecord configRecord = (ConfigRecord) newRecord.message();
             if (configRecord.value() == null) {
@@ -288,7 +292,7 @@ public class ConfigurationControlManager {
             // in the list passed to the policy in order to maintain backwards 
compatibility
         }
         try {
-            validator.validate(configResource, allConfigs);
+            validator.validate(configResource, allConfigs, existingConfigsMap);
             if (!newlyCreatedResource) {
                 existenceChecker.accept(configResource);
             }
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationValidator.java
 
b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationValidator.java
index 7e8f505f40b..c23c64d8c49 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationValidator.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationValidator.java
@@ -28,7 +28,7 @@ public interface ConfigurationValidator {
         public void validate(ConfigResource resource) { }
 
         @Override
-        public void validate(ConfigResource resource, Map<String, String> 
config) { }
+        public void validate(ConfigResource resource, Map<String, String> 
newConfigs, Map<String, String> existingConfigs) { }
     };
 
     /**
@@ -41,8 +41,9 @@ public interface ConfigurationValidator {
     /**
      * Throws an ApiException if a configuration is invalid for the given 
resource.
      *
-     * @param resource      The configuration resource.
-     * @param config        The new configuration.
+     * @param resource               The configuration resource.
+     * @param newConfigs             The new configuration.
+     * @param existingConfigs        The existing configuration.
      */
-    void validate(ConfigResource resource, Map<String, String> config);
+    void validate(ConfigResource resource, Map<String, String> newConfigs, 
Map<String, String> existingConfigs);
 }
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
index 430486bdcad..9cf4a71e38e 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
@@ -608,19 +608,32 @@ public class LogConfig extends AbstractConfig {
 
     /**
      * Validates the values of the given properties. Should be called only by 
the broker.
-     * The `props` supplied contains the topic-level configs,
+     * The `newConfigs` supplied contains the topic-level configs,
      * The default values should be extracted from the KafkaConfig.
-     * @param props The properties to be validated
+     * @param existingConfigs                   The existing properties
+     * @param newConfigs                        The new properties to be 
validated
+     * @param isRemoteLogStorageSystemEnabled   true if system wise remote log 
storage is enabled
      */
-    private static void validateTopicLogConfigValues(Map<?, ?> props,
+    private static void validateTopicLogConfigValues(Map<String, String> 
existingConfigs,
+                                                     Map<?, ?> newConfigs,
                                                      boolean 
isRemoteLogStorageSystemEnabled) {
-        validateValues(props);
-        boolean isRemoteLogStorageEnabled = (Boolean) 
props.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG);
+        validateValues(newConfigs);
+        boolean isRemoteLogStorageEnabled = (Boolean) 
newConfigs.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG);
         if (isRemoteLogStorageEnabled) {
-            validateRemoteStorageOnlyIfSystemEnabled(props, 
isRemoteLogStorageSystemEnabled, false);
-            validateNoRemoteStorageForCompactedTopic(props);
-            validateRemoteStorageRetentionSize(props);
-            validateRemoteStorageRetentionTime(props);
+            validateRemoteStorageOnlyIfSystemEnabled(newConfigs, 
isRemoteLogStorageSystemEnabled, false);
+            validateNoRemoteStorageForCompactedTopic(newConfigs);
+            validateRemoteStorageRetentionSize(newConfigs);
+            validateRemoteStorageRetentionTime(newConfigs);
+        } else {
+            // The new config "remote.storage.enable" is false, validate if 
it's turning from true to false
+            validateNotTurningOffRemoteStorage(existingConfigs);
+        }
+    }
+
+    public static void validateNotTurningOffRemoteStorage(Map<String, String> 
existingConfigs) {
+        boolean wasRemoteLogEnabledBeforeUpdate = 
Boolean.parseBoolean(existingConfigs.getOrDefault(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG,
 "false"));
+        if (wasRemoteLogEnabledBeforeUpdate) {
+            throw new InvalidConfigurationException("Disabling remote storage 
feature on the topic level is not supported.");
         }
     }
 
@@ -681,10 +694,11 @@ public class LogConfig extends AbstractConfig {
      * Check that the given properties contain only valid log config names and 
that all values can be parsed and are valid
      */
     public static void validate(Properties props) {
-        validate(props, Collections.emptyMap(), false);
+        validate(Collections.emptyMap(), props, Collections.emptyMap(), false);
     }
 
-    public static void validate(Properties props,
+    public static void validate(Map<String, String> existingConfigs,
+                                Properties props,
                                 Map<?, ?> configuredProps,
                                 boolean isRemoteLogStorageSystemEnabled) {
         validateNames(props);
@@ -695,7 +709,7 @@ public class LogConfig extends AbstractConfig {
             Map<Object, Object> combinedConfigs = new 
HashMap<>(configuredProps);
             combinedConfigs.putAll(props);
             Map<?, ?> valueMaps = CONFIG.parse(combinedConfigs);
-            validateTopicLogConfigValues(valueMaps, 
isRemoteLogStorageSystemEnabled);
+            validateTopicLogConfigValues(existingConfigs, valueMaps, 
isRemoteLogStorageSystemEnabled);
         }
     }
 

Reply via email to