divijvaidya commented on code in PR #14176: URL: https://github.com/apache/kafka/pull/14176#discussion_r1291213288
########## storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java: ########## @@ -459,49 +463,53 @@ public static void validateValues(Map<?, ?> props) { long maxCompactionLag = (Long) props.get(TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG); if (minCompactionLag > maxCompactionLag) { throw new InvalidConfigurationException("conflict topic config setting " - + TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG + " (" + minCompactionLag + ") > " - + TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG + " (" + maxCompactionLag + ")"); + + TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG + " (" + minCompactionLag + ") > " + + TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG + " (" + maxCompactionLag + ")"); } + } - if (props.containsKey(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG)) { - boolean isRemoteStorageEnabled = (Boolean) props.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG); - String cleanupPolicy = props.get(TopicConfig.CLEANUP_POLICY_CONFIG).toString().toLowerCase(Locale.getDefault()); - if (isRemoteStorageEnabled && cleanupPolicy.contains(TopicConfig.CLEANUP_POLICY_COMPACT)) { - throw new ConfigException("Remote log storage is unsupported for the compacted topics"); - } + public static void validateValuesInBroker(Map<?, ?> props) { + validateValues(props); + Boolean isRemoteLogStorageSystemEnabled = + (Boolean) props.get(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP); + Boolean isRemoteStorageEnabled = (Boolean) props.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG); + if (!isRemoteLogStorageSystemEnabled && isRemoteStorageEnabled) { + throw new ConfigException("Tiered Storage functionality is disabled in the broker. " + + "Topic cannot be configured with remote log storage."); } - if (props.containsKey(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG)) { - Long retentionBytes = (Long) props.get(TopicConfig.RETENTION_BYTES_CONFIG); - Long localLogRetentionBytes = (Long) props.get(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG); - if (retentionBytes > -1 && localLogRetentionBytes != -2) { - if (localLogRetentionBytes == -1) { - String message = String.format("Value must not be -1 as %s value is set as %d.", - TopicConfig.RETENTION_BYTES_CONFIG, retentionBytes); - throw new ConfigException(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, localLogRetentionBytes, message); - } - if (localLogRetentionBytes > retentionBytes) { - String message = String.format("Value must not be more than %s property value: %d", - TopicConfig.RETENTION_BYTES_CONFIG, retentionBytes); - throw new ConfigException(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, localLogRetentionBytes, message); - } + String cleanupPolicy = props.get(TopicConfig.CLEANUP_POLICY_CONFIG).toString().toLowerCase(Locale.getDefault()); + if (isRemoteStorageEnabled && cleanupPolicy.contains(TopicConfig.CLEANUP_POLICY_COMPACT)) { + throw new ConfigException("Remote log storage is unsupported for the compacted topics"); + } Review Comment: @kamalcph are we planning to work on this comment in this PR? I would prefer that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org