This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new 8b0ef93bb48 KAFKA-18499 Clean up zookeeper from LogConfig (#18583)
8b0ef93bb48 is described below

commit 8b0ef93bb48570108ad98669bb9eb247e0679abc
Author: mingdaoy <[email protected]>
AuthorDate: Sat Jan 25 22:31:46 2025 +0800

    KAFKA-18499 Clean up zookeeper from LogConfig (#18583)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../main/scala/kafka/server/ConfigHandler.scala    |  2 +-
 .../server/ControllerConfigurationValidator.scala  |  2 +-
 .../test/scala/unit/kafka/log/LogConfigTest.scala  | 45 ++++++++--------------
 .../kafka/storage/internals/log/LogConfig.java     | 23 ++---------
 4 files changed, 21 insertions(+), 51 deletions(-)

diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala 
b/core/src/main/scala/kafka/server/ConfigHandler.scala
index eabe12d0d94..638b36b95ce 100644
--- a/core/src/main/scala/kafka/server/ConfigHandler.scala
+++ b/core/src/main/scala/kafka/server/ConfigHandler.scala
@@ -43,7 +43,7 @@ trait ConfigHandler {
 }
 
 /**
-  * The TopicConfigHandler will process topic config changes from ZooKeeper or 
the metadata log.
+  * The TopicConfigHandler will process topic config changes from the metadata 
log.
   * The callback provides the topic name and the full properties set.
   */
 class TopicConfigHandler(private val replicaManager: ReplicaManager,
diff --git 
a/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala 
b/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala
index 35c209d9ffa..f163a2739ae 100644
--- a/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala
+++ b/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala
@@ -118,7 +118,7 @@ class ControllerConfigurationValidator(kafkaConfig: 
KafkaConfig) extends Configu
             nullTopicConfigs.mkString(","))
         }
         LogConfig.validate(oldConfigs, properties, 
kafkaConfig.extractLogConfigMap,
-          kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), 
false)
+          kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
       case BROKER => validateBrokerName(resource.name())
       case CLIENT_METRICS =>
         val properties = new Properties()
diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala 
b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
index 3a0a450f05a..1e26d653bbc 100644
--- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
@@ -293,7 +293,7 @@ class LogConfigTest {
     props.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, 
localRetentionMs.toString)
     props.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, 
localRetentionBytes.toString)
     assertThrows(classOf[ConfigException],
-      () => LogConfig.validate(Collections.emptyMap(), props, 
kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true))
+      () => LogConfig.validate(Collections.emptyMap(), props, 
kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
   }
 
   @Test
@@ -305,17 +305,17 @@ class LogConfigTest {
     val logProps = new Properties()
     logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, 
TopicConfig.CLEANUP_POLICY_DELETE)
     logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
-    LogConfig.validate(Collections.emptyMap(), logProps, 
kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true)
+    LogConfig.validate(Collections.emptyMap(), logProps, 
kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
 
     logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, 
TopicConfig.CLEANUP_POLICY_COMPACT)
     assertThrows(classOf[ConfigException],
-      () => LogConfig.validate(Collections.emptyMap(), logProps, 
kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true))
+      () => LogConfig.validate(Collections.emptyMap(), logProps, 
kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
     logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, "delete,compact")
     assertThrows(classOf[ConfigException],
-      () => LogConfig.validate(Collections.emptyMap(), logProps, 
kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true))
+      () => LogConfig.validate(Collections.emptyMap(), logProps, 
kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
     logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, "compact,delete")
     assertThrows(classOf[ConfigException],
-      () => LogConfig.validate(Collections.emptyMap(), logProps, 
kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true))
+      () => LogConfig.validate(Collections.emptyMap(), logProps, 
kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
   }
 
   @ParameterizedTest(name = "testEnableRemoteLogStorage with 
sysRemoteStorageEnabled: {0}")
@@ -328,10 +328,10 @@ class LogConfigTest {
     val logProps = new Properties()
     logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
     if (sysRemoteStorageEnabled) {
-      LogConfig.validate(Collections.emptyMap(), logProps, 
kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true)
+      LogConfig.validate(Collections.emptyMap(), logProps, 
kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
     } else {
       val message = assertThrows(classOf[ConfigException],
-        () => LogConfig.validate(Collections.emptyMap(), logProps, 
kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true))
+        () => LogConfig.validate(Collections.emptyMap(), logProps, 
kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
       assertTrue(message.getMessage.contains("Tiered Storage functionality is 
disabled in the broker"))
     }
   }
@@ -348,7 +348,7 @@ class LogConfigTest {
     if (wasRemoteStorageEnabled) {
       val message = assertThrows(classOf[InvalidConfigurationException],
         () => 
LogConfig.validate(Collections.singletonMap(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG,
 "true"),
-          logProps, kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), false))
+          logProps, kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
       assertTrue(message.getMessage.contains("It is invalid to disable remote 
storage without deleting remote data. " +
         "If you want to keep the remote data and turn to read only, please set 
`remote.storage.enable=true,remote.log.copy.disable=true`. " +
         "If you want to disable remote storage and delete all remote data, 
please set `remote.storage.enable=false,remote.log.delete.on.disable=true`."))
@@ -357,11 +357,11 @@ class LogConfigTest {
       // It should be able to disable the remote log storage when delete on 
disable is set to true
       logProps.put(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, "true")
       
LogConfig.validate(Collections.singletonMap(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG,
 "true"),
-        logProps, kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), false)
+        logProps, kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
     } else {
-      LogConfig.validate(Collections.emptyMap(), logProps, 
kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true)
+      LogConfig.validate(Collections.emptyMap(), logProps, 
kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
       
LogConfig.validate(Collections.singletonMap(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG,
 "false"), logProps,
-        kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), false)
+        kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
     }
   }
 
@@ -381,11 +381,11 @@ class LogConfigTest {
     if (sysRemoteStorageEnabled) {
       val message = assertThrows(classOf[ConfigException],
         () => LogConfig.validate(Collections.emptyMap(), logProps, 
kafkaConfig.extractLogConfigMap,
-          kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), 
true))
+          kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
       
assertTrue(message.getMessage.contains(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG))
     } else {
       LogConfig.validate(Collections.emptyMap(), logProps, 
kafkaConfig.extractLogConfigMap,
-        kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), 
true)
+        kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
     }
   }
 
@@ -405,11 +405,11 @@ class LogConfigTest {
     if (sysRemoteStorageEnabled) {
       val message = assertThrows(classOf[ConfigException],
         () => LogConfig.validate(Collections.emptyMap(), logProps, 
kafkaConfig.extractLogConfigMap,
-          kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), 
true))
+          kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
       
assertTrue(message.getMessage.contains(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG))
     } else {
       LogConfig.validate(Collections.emptyMap(), logProps, 
kafkaConfig.extractLogConfigMap,
-        kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), 
true)
+        kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
     }
   }
 
@@ -447,21 +447,6 @@ class LogConfigTest {
     LogConfig.validate(logProps)
   }
 
-  @ParameterizedTest
-  @ValueSource(strings = 
Array(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, 
TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG))
-  def testInValidRemoteConfigsInZK(configKey: String): Unit = {
-    val kafkaProps = TestUtils.createDummyBrokerConfig()
-    
kafkaProps.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, 
"true")
-    val kafkaConfig = KafkaConfig.fromProps(kafkaProps)
-    val logProps = new Properties
-    logProps.put(configKey, "true")
-
-    val message = assertThrows(classOf[InvalidConfigurationException],
-      () => LogConfig.validate(Collections.emptyMap(), logProps, 
kafkaConfig.extractLogConfigMap, true, true))
-    assertTrue(message.getMessage.contains("It is invalid to set 
`remote.log.delete.on.disable` or " +
-      "`remote.log.copy.disable` under Zookeeper's mode."))
-  }
-
   @Test
   def testValidateWithMetadataVersionJbodSupport(): Unit = {
     def validate(metadataVersion: MetadataVersion, jbodConfig: Boolean): Unit =
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
index f4294329f25..a74a10a2219 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
@@ -512,17 +512,12 @@ public class LogConfig extends AbstractConfig {
      * @param existingConfigs                   The existing properties
      * @param newConfigs                        The new properties to be 
validated
      * @param isRemoteLogStorageSystemEnabled   true if system wise remote log 
storage is enabled
-     * @param fromZK                            true if this is a ZK cluster
      */
     private static void validateTopicLogConfigValues(Map<String, String> 
existingConfigs,
                                                      Map<?, ?> newConfigs,
-                                                     boolean 
isRemoteLogStorageSystemEnabled,
-                                                     boolean fromZK) {
+                                                     boolean 
isRemoteLogStorageSystemEnabled) {
         validateValues(newConfigs);
 
-        if (fromZK) {
-            validateNoInvalidRemoteStorageConfigsInZK(newConfigs);
-        }
         boolean isRemoteLogStorageEnabled = (Boolean) 
newConfigs.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG);
         if (isRemoteLogStorageEnabled) {
             validateRemoteStorageOnlyIfSystemEnabled(newConfigs, 
isRemoteLogStorageSystemEnabled, false);
@@ -564,15 +559,6 @@ public class LogConfig extends AbstractConfig {
         }
     }
 
-    public static void validateNoInvalidRemoteStorageConfigsInZK(Map<?, ?> 
newConfigs) {
-        boolean isRemoteLogDeleteOnDisable = (Boolean) 
Utils.castToStringObjectMap(newConfigs).getOrDefault(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG,
 false);
-        boolean isRemoteLogCopyDisabled = (Boolean) 
Utils.castToStringObjectMap(newConfigs).getOrDefault(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG,
 false);
-        if (isRemoteLogDeleteOnDisable || isRemoteLogCopyDisabled) {
-            throw new InvalidConfigurationException("It is invalid to set 
`remote.log.delete.on.disable` or " +
-                    "`remote.log.copy.disable` under Zookeeper's mode.");
-        }
-    }
-
     public static void validateRemoteStorageOnlyIfSystemEnabled(Map<?, ?> 
props, boolean isRemoteLogStorageSystemEnabled, boolean 
isReceivingConfigFromStore) {
         boolean isRemoteLogStorageEnabled = (Boolean) 
props.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG);
         if (isRemoteLogStorageEnabled && !isRemoteLogStorageSystemEnabled) {
@@ -630,14 +616,13 @@ public class LogConfig extends AbstractConfig {
      * Check that the given properties contain only valid log config names and 
that all values can be parsed and are valid
      */
     public static void validate(Properties props) {
-        validate(Collections.emptyMap(), props, Collections.emptyMap(), false, 
false);
+        validate(Collections.emptyMap(), props, Collections.emptyMap(), false);
     }
 
     public static void validate(Map<String, String> existingConfigs,
                                 Properties props,
                                 Map<?, ?> configuredProps,
-                                boolean isRemoteLogStorageSystemEnabled,
-                                boolean fromZK) {
+                                boolean isRemoteLogStorageSystemEnabled) {
         validateNames(props);
         if (configuredProps == null || configuredProps.isEmpty()) {
             Map<?, ?> valueMaps = CONFIG.parse(props);
@@ -646,7 +631,7 @@ public class LogConfig extends AbstractConfig {
             Map<Object, Object> combinedConfigs = new 
HashMap<>(configuredProps);
             combinedConfigs.putAll(props);
             Map<?, ?> valueMaps = CONFIG.parse(combinedConfigs);
-            validateTopicLogConfigValues(existingConfigs, valueMaps, 
isRemoteLogStorageSystemEnabled, fromZK);
+            validateTopicLogConfigValues(existingConfigs, valueMaps, 
isRemoteLogStorageSystemEnabled);
         }
     }
 

Reply via email to