divijvaidya commented on code in PR #14176: URL: https://github.com/apache/kafka/pull/14176#discussion_r1291222788
########## storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java: ########## @@ -459,49 +463,53 @@ public static void validateValues(Map<?, ?> props) { long maxCompactionLag = (Long) props.get(TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG); if (minCompactionLag > maxCompactionLag) { throw new InvalidConfigurationException("conflict topic config setting " - + TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG + " (" + minCompactionLag + ") > " - + TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG + " (" + maxCompactionLag + ")"); + + TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG + " (" + minCompactionLag + ") > " + + TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG + " (" + maxCompactionLag + ")"); } + } - if (props.containsKey(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG)) { - boolean isRemoteStorageEnabled = (Boolean) props.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG); - String cleanupPolicy = props.get(TopicConfig.CLEANUP_POLICY_CONFIG).toString().toLowerCase(Locale.getDefault()); - if (isRemoteStorageEnabled && cleanupPolicy.contains(TopicConfig.CLEANUP_POLICY_COMPACT)) { - throw new ConfigException("Remote log storage is unsupported for the compacted topics"); - } + public static void validateValuesInBroker(Map<?, ?> props) { + validateValues(props); + Boolean isRemoteLogStorageSystemEnabled = + (Boolean) props.get(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP); + Boolean isRemoteStorageEnabled = (Boolean) props.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG); + if (!isRemoteLogStorageSystemEnabled && isRemoteStorageEnabled) { + throw new ConfigException("Tiered Storage functionality is disabled in the broker. " + + "Topic cannot be configured with remote log storage."); } - if (props.containsKey(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG)) { - Long retentionBytes = (Long) props.get(TopicConfig.RETENTION_BYTES_CONFIG); - Long localLogRetentionBytes = (Long) props.get(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG); - if (retentionBytes > -1 && localLogRetentionBytes != -2) { - if (localLogRetentionBytes == -1) { - String message = String.format("Value must not be -1 as %s value is set as %d.", - TopicConfig.RETENTION_BYTES_CONFIG, retentionBytes); - throw new ConfigException(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, localLogRetentionBytes, message); - } - if (localLogRetentionBytes > retentionBytes) { - String message = String.format("Value must not be more than %s property value: %d", - TopicConfig.RETENTION_BYTES_CONFIG, retentionBytes); - throw new ConfigException(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, localLogRetentionBytes, message); - } + String cleanupPolicy = props.get(TopicConfig.CLEANUP_POLICY_CONFIG).toString().toLowerCase(Locale.getDefault()); + if (isRemoteStorageEnabled && cleanupPolicy.contains(TopicConfig.CLEANUP_POLICY_COMPACT)) { + throw new ConfigException("Remote log storage is unsupported for the compacted topics"); + } + + Long retentionBytes = (Long) props.get(TopicConfig.RETENTION_BYTES_CONFIG); + Long localLogRetentionBytes = (Long) props.get(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG); + if (isRemoteStorageEnabled && retentionBytes > -1 && localLogRetentionBytes != -2) { + if (localLogRetentionBytes == -1) { + String message = String.format("Value must not be -1 as %s value is set as %d.", + TopicConfig.RETENTION_BYTES_CONFIG, retentionBytes); + throw new ConfigException(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, localLogRetentionBytes, message); + } + if (localLogRetentionBytes > retentionBytes) { + String message = String.format("Value must not be more than %s property value: %d", + TopicConfig.RETENTION_BYTES_CONFIG, retentionBytes); + throw new ConfigException(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, localLogRetentionBytes, message); } Review Comment: I should have been more clear. LogConfig.validateValues() is called on both client as well as server. It actually also called when we create LogManager e.g. at https://github.com/apache/kafka/blob/f137da04fa71734d176e19f5800622f4b4dfdb66/core/src/main/scala/kafka/log/LogManager.scala#L1379 My point in suggesting to add this in `validateValues()` was that it is the method which is called by other places in the code (example above) when they want to do a validation on values associated with a topic. Now, we could change all calls in broker to `validateValues()` to use `validateValuesInBroker()` OR we encapsulate some of the code which depends on topic level validation (not the ones where assertion depends on both broker and topic config) to `validateValues()`. If we don't make this change, the above LogManager code will not be validating assertions associated with topic level configuration of `retentionBytes` which is a miss?! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org