divijvaidya commented on code in PR #14176:
URL: https://github.com/apache/kafka/pull/14176#discussion_r1291213288


##########
storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java:
##########
@@ -459,49 +463,53 @@ public static void validateValues(Map<?, ?> props) {
         long maxCompactionLag = (Long) 
props.get(TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG);
         if (minCompactionLag > maxCompactionLag) {
             throw new InvalidConfigurationException("conflict topic config 
setting "
-                + TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG + " (" + 
minCompactionLag + ") > "
-                + TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG + " (" + 
maxCompactionLag + ")");
+                    + TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG + " (" + 
minCompactionLag + ") > "
+                    + TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG + " (" + 
maxCompactionLag + ")");
         }
+    }
 
-        if (props.containsKey(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG)) {
-            boolean isRemoteStorageEnabled = (Boolean) 
props.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG);
-            String cleanupPolicy = 
props.get(TopicConfig.CLEANUP_POLICY_CONFIG).toString().toLowerCase(Locale.getDefault());
-            if (isRemoteStorageEnabled && 
cleanupPolicy.contains(TopicConfig.CLEANUP_POLICY_COMPACT)) {
-                throw new ConfigException("Remote log storage is unsupported 
for the compacted topics");
-            }
+    public static void validateValuesInBroker(Map<?, ?> props) {
+        validateValues(props);
+        Boolean isRemoteLogStorageSystemEnabled =
+                (Boolean) 
props.get(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP);
+        Boolean isRemoteStorageEnabled = (Boolean) 
props.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG);
+        if (!isRemoteLogStorageSystemEnabled && isRemoteStorageEnabled) {
+            throw new ConfigException("Tiered Storage functionality is 
disabled in the broker. " +
+                    "Topic cannot be configured with remote log storage.");
         }
 
-        if (props.containsKey(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG)) {
-            Long retentionBytes = (Long) 
props.get(TopicConfig.RETENTION_BYTES_CONFIG);
-            Long localLogRetentionBytes = (Long) 
props.get(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG);
-            if (retentionBytes > -1 && localLogRetentionBytes != -2) {
-                if (localLogRetentionBytes == -1) {
-                    String message = String.format("Value must not be -1 as %s 
value is set as %d.",
-                            TopicConfig.RETENTION_BYTES_CONFIG, 
retentionBytes);
-                    throw new 
ConfigException(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, 
localLogRetentionBytes, message);
-                }
-                if (localLogRetentionBytes > retentionBytes) {
-                    String message = String.format("Value must not be more 
than %s property value: %d",
-                            TopicConfig.RETENTION_BYTES_CONFIG, 
retentionBytes);
-                    throw new 
ConfigException(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, 
localLogRetentionBytes, message);
-                }
+        String cleanupPolicy = 
props.get(TopicConfig.CLEANUP_POLICY_CONFIG).toString().toLowerCase(Locale.getDefault());
+        if (isRemoteStorageEnabled && 
cleanupPolicy.contains(TopicConfig.CLEANUP_POLICY_COMPACT)) {
+            throw new ConfigException("Remote log storage is unsupported for 
the compacted topics");
+        }

Review Comment:
   @kamalcph are we planning to work on this comment in this PR? I would prefer 
that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to