showuon commented on code in PR #14161: URL: https://github.com/apache/kafka/pull/14161#discussion_r1300875008
########## storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java: ########## @@ -500,22 +500,29 @@ public static void validateBrokerLogConfigValues(Map<?, ?> props, * The default values should be extracted from the KafkaConfig. * @param props The properties to be validated */ - private static void validateTopicLogConfigValues(Map<?, ?> props, - boolean isRemoteLogStorageSystemEnabled) { + public static void validateTopicLogConfigValues(Map<?, ?> props, Review Comment: I agree with @kamalcph that we actually doesn't want to validate anything else except `validateRemoteStorageOnlyIfSystemEnabled`. ########## core/src/main/scala/kafka/log/LogManager.scala: ########## @@ -870,12 +870,14 @@ class LogManager(logDirs: Seq[File], * Update the configuration of the provided topic. */ def updateTopicConfig(topic: String, - newTopicConfig: Properties): Unit = { + newTopicConfig: Properties, + isRemoteLogStorageSystemEnabled: Boolean): Unit = { topicConfigUpdated(topic) val logs = logsByTopic(topic) + // Combine the default properties with the overrides in zk to create the new LogConfig + val newLogConfig = LogConfig.fromProps(currentDefaultConfig.originals, newTopicConfig) + LogConfig.validateTopicLogConfigValues(newLogConfig.values(), isRemoteLogStorageSystemEnabled, true) if (logs.nonEmpty) { - // Combine the default properties with the overrides in zk to create the new LogConfig Review Comment: Why should we validate the log when the `logs` is empty? ########## storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java: ########## @@ -500,22 +500,29 @@ public static void validateBrokerLogConfigValues(Map<?, ?> props, * The default values should be extracted from the KafkaConfig. * @param props The properties to be validated */ - private static void validateTopicLogConfigValues(Map<?, ?> props, - boolean isRemoteLogStorageSystemEnabled) { + public static void validateTopicLogConfigValues(Map<?, ?> props, + boolean isRemoteLogStorageSystemEnabled, + boolean isReceivingConfigFromStore) { validateValues(props); boolean isRemoteLogStorageEnabled = (Boolean) props.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG); if (isRemoteLogStorageEnabled) { - validateRemoteStorageOnlyIfSystemEnabled(isRemoteLogStorageSystemEnabled); - validateNoRemoteStorageForCompactedTopic(props); - validateRemoteStorageRetentionSize(props); - validateRemoteStorageRetentionTime(props); + validateRemoteStorageOnlyIfSystemEnabled(isRemoteLogStorageSystemEnabled, isReceivingConfigFromStore); + if (!isReceivingConfigFromStore) { + validateNoRemoteStorageForCompactedTopic(props); + validateRemoteStorageRetentionSize(props); + validateRemoteStorageRetentionTime(props); + } } } - private static void validateRemoteStorageOnlyIfSystemEnabled(boolean isRemoteLogStorageSystemEnabled) { + private static void validateRemoteStorageOnlyIfSystemEnabled(boolean isRemoteLogStorageSystemEnabled, boolean isReceivingConfigFromStore) { if (!isRemoteLogStorageSystemEnabled) { - throw new ConfigException("Tiered Storage functionality is disabled in the broker. " + - "Topic cannot be configured with remote log storage."); + if (isReceivingConfigFromStore) { Review Comment: I think having 2 error messages make it more clear. After all, in the restart case, the user doesn't "configure" anything but seeing this message: `Topics cannot be configured with remote log storage.` is weird. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org