This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push:
new 8b0ef93bb48 KAFKA-18499 Clean up zookeeper from LogConfig (#18583)
8b0ef93bb48 is described below
commit 8b0ef93bb48570108ad98669bb9eb247e0679abc
Author: mingdaoy <[email protected]>
AuthorDate: Sat Jan 25 22:31:46 2025 +0800
KAFKA-18499 Clean up zookeeper from LogConfig (#18583)
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../main/scala/kafka/server/ConfigHandler.scala | 2 +-
.../server/ControllerConfigurationValidator.scala | 2 +-
.../test/scala/unit/kafka/log/LogConfigTest.scala | 45 ++++++++--------------
.../kafka/storage/internals/log/LogConfig.java | 23 ++---------
4 files changed, 21 insertions(+), 51 deletions(-)
diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala
b/core/src/main/scala/kafka/server/ConfigHandler.scala
index eabe12d0d94..638b36b95ce 100644
--- a/core/src/main/scala/kafka/server/ConfigHandler.scala
+++ b/core/src/main/scala/kafka/server/ConfigHandler.scala
@@ -43,7 +43,7 @@ trait ConfigHandler {
}
/**
- * The TopicConfigHandler will process topic config changes from ZooKeeper or
the metadata log.
+ * The TopicConfigHandler will process topic config changes from the metadata
log.
* The callback provides the topic name and the full properties set.
*/
class TopicConfigHandler(private val replicaManager: ReplicaManager,
diff --git
a/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala
b/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala
index 35c209d9ffa..f163a2739ae 100644
--- a/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala
+++ b/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala
@@ -118,7 +118,7 @@ class ControllerConfigurationValidator(kafkaConfig:
KafkaConfig) extends Configu
nullTopicConfigs.mkString(","))
}
LogConfig.validate(oldConfigs, properties,
kafkaConfig.extractLogConfigMap,
- kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(),
false)
+ kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
case BROKER => validateBrokerName(resource.name())
case CLIENT_METRICS =>
val properties = new Properties()
diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
index 3a0a450f05a..1e26d653bbc 100644
--- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
@@ -293,7 +293,7 @@ class LogConfigTest {
props.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG,
localRetentionMs.toString)
props.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG,
localRetentionBytes.toString)
assertThrows(classOf[ConfigException],
- () => LogConfig.validate(Collections.emptyMap(), props,
kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true))
+ () => LogConfig.validate(Collections.emptyMap(), props,
kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
}
@Test
@@ -305,17 +305,17 @@ class LogConfigTest {
val logProps = new Properties()
logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG,
TopicConfig.CLEANUP_POLICY_DELETE)
logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
- LogConfig.validate(Collections.emptyMap(), logProps,
kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true)
+ LogConfig.validate(Collections.emptyMap(), logProps,
kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG,
TopicConfig.CLEANUP_POLICY_COMPACT)
assertThrows(classOf[ConfigException],
- () => LogConfig.validate(Collections.emptyMap(), logProps,
kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true))
+ () => LogConfig.validate(Collections.emptyMap(), logProps,
kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, "delete,compact")
assertThrows(classOf[ConfigException],
- () => LogConfig.validate(Collections.emptyMap(), logProps,
kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true))
+ () => LogConfig.validate(Collections.emptyMap(), logProps,
kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, "compact,delete")
assertThrows(classOf[ConfigException],
- () => LogConfig.validate(Collections.emptyMap(), logProps,
kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true))
+ () => LogConfig.validate(Collections.emptyMap(), logProps,
kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
}
@ParameterizedTest(name = "testEnableRemoteLogStorage with
sysRemoteStorageEnabled: {0}")
@@ -328,10 +328,10 @@ class LogConfigTest {
val logProps = new Properties()
logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
if (sysRemoteStorageEnabled) {
- LogConfig.validate(Collections.emptyMap(), logProps,
kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true)
+ LogConfig.validate(Collections.emptyMap(), logProps,
kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
} else {
val message = assertThrows(classOf[ConfigException],
- () => LogConfig.validate(Collections.emptyMap(), logProps,
kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true))
+ () => LogConfig.validate(Collections.emptyMap(), logProps,
kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
assertTrue(message.getMessage.contains("Tiered Storage functionality is
disabled in the broker"))
}
}
@@ -348,7 +348,7 @@ class LogConfigTest {
if (wasRemoteStorageEnabled) {
val message = assertThrows(classOf[InvalidConfigurationException],
() =>
LogConfig.validate(Collections.singletonMap(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG,
"true"),
- logProps, kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), false))
+ logProps, kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
assertTrue(message.getMessage.contains("It is invalid to disable remote
storage without deleting remote data. " +
"If you want to keep the remote data and turn to read only, please set
`remote.storage.enable=true,remote.log.copy.disable=true`. " +
"If you want to disable remote storage and delete all remote data,
please set `remote.storage.enable=false,remote.log.delete.on.disable=true`."))
@@ -357,11 +357,11 @@ class LogConfigTest {
// It should be able to disable the remote log storage when delete on
disable is set to true
logProps.put(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, "true")
LogConfig.validate(Collections.singletonMap(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG,
"true"),
- logProps, kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), false)
+ logProps, kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
} else {
- LogConfig.validate(Collections.emptyMap(), logProps,
kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true)
+ LogConfig.validate(Collections.emptyMap(), logProps,
kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
LogConfig.validate(Collections.singletonMap(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG,
"false"), logProps,
- kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), false)
+ kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
}
}
@@ -381,11 +381,11 @@ class LogConfigTest {
if (sysRemoteStorageEnabled) {
val message = assertThrows(classOf[ConfigException],
() => LogConfig.validate(Collections.emptyMap(), logProps,
kafkaConfig.extractLogConfigMap,
- kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(),
true))
+ kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
assertTrue(message.getMessage.contains(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG))
} else {
LogConfig.validate(Collections.emptyMap(), logProps,
kafkaConfig.extractLogConfigMap,
- kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(),
true)
+ kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
}
}
@@ -405,11 +405,11 @@ class LogConfigTest {
if (sysRemoteStorageEnabled) {
val message = assertThrows(classOf[ConfigException],
() => LogConfig.validate(Collections.emptyMap(), logProps,
kafkaConfig.extractLogConfigMap,
- kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(),
true))
+ kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
assertTrue(message.getMessage.contains(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG))
} else {
LogConfig.validate(Collections.emptyMap(), logProps,
kafkaConfig.extractLogConfigMap,
- kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(),
true)
+ kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
}
}
@@ -447,21 +447,6 @@ class LogConfigTest {
LogConfig.validate(logProps)
}
- @ParameterizedTest
- @ValueSource(strings =
Array(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG,
TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG))
- def testInValidRemoteConfigsInZK(configKey: String): Unit = {
- val kafkaProps = TestUtils.createDummyBrokerConfig()
-
kafkaProps.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP,
"true")
- val kafkaConfig = KafkaConfig.fromProps(kafkaProps)
- val logProps = new Properties
- logProps.put(configKey, "true")
-
- val message = assertThrows(classOf[InvalidConfigurationException],
- () => LogConfig.validate(Collections.emptyMap(), logProps,
kafkaConfig.extractLogConfigMap, true, true))
- assertTrue(message.getMessage.contains("It is invalid to set
`remote.log.delete.on.disable` or " +
- "`remote.log.copy.disable` under Zookeeper's mode."))
- }
-
@Test
def testValidateWithMetadataVersionJbodSupport(): Unit = {
def validate(metadataVersion: MetadataVersion, jbodConfig: Boolean): Unit =
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
index f4294329f25..a74a10a2219 100644
---
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
+++
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
@@ -512,17 +512,12 @@ public class LogConfig extends AbstractConfig {
* @param existingConfigs The existing properties
* @param newConfigs The new properties to be
validated
* @param isRemoteLogStorageSystemEnabled true if system wise remote log
storage is enabled
- * @param fromZK true if this is a ZK cluster
*/
private static void validateTopicLogConfigValues(Map<String, String>
existingConfigs,
Map<?, ?> newConfigs,
- boolean
isRemoteLogStorageSystemEnabled,
- boolean fromZK) {
+ boolean
isRemoteLogStorageSystemEnabled) {
validateValues(newConfigs);
- if (fromZK) {
- validateNoInvalidRemoteStorageConfigsInZK(newConfigs);
- }
boolean isRemoteLogStorageEnabled = (Boolean)
newConfigs.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG);
if (isRemoteLogStorageEnabled) {
validateRemoteStorageOnlyIfSystemEnabled(newConfigs,
isRemoteLogStorageSystemEnabled, false);
@@ -564,15 +559,6 @@ public class LogConfig extends AbstractConfig {
}
}
- public static void validateNoInvalidRemoteStorageConfigsInZK(Map<?, ?>
newConfigs) {
- boolean isRemoteLogDeleteOnDisable = (Boolean)
Utils.castToStringObjectMap(newConfigs).getOrDefault(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG,
false);
- boolean isRemoteLogCopyDisabled = (Boolean)
Utils.castToStringObjectMap(newConfigs).getOrDefault(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG,
false);
- if (isRemoteLogDeleteOnDisable || isRemoteLogCopyDisabled) {
- throw new InvalidConfigurationException("It is invalid to set
`remote.log.delete.on.disable` or " +
- "`remote.log.copy.disable` under Zookeeper's mode.");
- }
- }
-
public static void validateRemoteStorageOnlyIfSystemEnabled(Map<?, ?>
props, boolean isRemoteLogStorageSystemEnabled, boolean
isReceivingConfigFromStore) {
boolean isRemoteLogStorageEnabled = (Boolean)
props.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG);
if (isRemoteLogStorageEnabled && !isRemoteLogStorageSystemEnabled) {
@@ -630,14 +616,13 @@ public class LogConfig extends AbstractConfig {
* Check that the given properties contain only valid log config names and
that all values can be parsed and are valid
*/
public static void validate(Properties props) {
- validate(Collections.emptyMap(), props, Collections.emptyMap(), false,
false);
+ validate(Collections.emptyMap(), props, Collections.emptyMap(), false);
}
public static void validate(Map<String, String> existingConfigs,
Properties props,
Map<?, ?> configuredProps,
- boolean isRemoteLogStorageSystemEnabled,
- boolean fromZK) {
+ boolean isRemoteLogStorageSystemEnabled) {
validateNames(props);
if (configuredProps == null || configuredProps.isEmpty()) {
Map<?, ?> valueMaps = CONFIG.parse(props);
@@ -646,7 +631,7 @@ public class LogConfig extends AbstractConfig {
Map<Object, Object> combinedConfigs = new
HashMap<>(configuredProps);
combinedConfigs.putAll(props);
Map<?, ?> valueMaps = CONFIG.parse(combinedConfigs);
- validateTopicLogConfigValues(existingConfigs, valueMaps,
isRemoteLogStorageSystemEnabled, fromZK);
+ validateTopicLogConfigValues(existingConfigs, valueMaps,
isRemoteLogStorageSystemEnabled);
}
}