divijvaidya commented on code in PR #14176:
URL: https://github.com/apache/kafka/pull/14176#discussion_r1288894765
##########
storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java:
##########
@@ -459,49 +463,53 @@ public static void validateValues(Map<?, ?> props) {
long maxCompactionLag = (Long)
props.get(TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG);
if (minCompactionLag > maxCompactionLag) {
throw new InvalidConfigurationException("conflict topic config
setting "
- + TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG + " (" +
minCompactionLag + ") > "
- + TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG + " (" +
maxCompactionLag + ")");
+ + TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG + " (" +
minCompactionLag + ") > "
+ + TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG + " (" +
maxCompactionLag + ")");
}
+ }
- if (props.containsKey(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG)) {
- boolean isRemoteStorageEnabled = (Boolean)
props.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG);
- String cleanupPolicy =
props.get(TopicConfig.CLEANUP_POLICY_CONFIG).toString().toLowerCase(Locale.getDefault());
- if (isRemoteStorageEnabled &&
cleanupPolicy.contains(TopicConfig.CLEANUP_POLICY_COMPACT)) {
- throw new ConfigException("Remote log storage is unsupported
for the compacted topics");
- }
+ public static void validateValuesInBroker(Map<?, ?> props) {
+ validateValues(props);
+ Boolean isRemoteLogStorageSystemEnabled =
+ (Boolean)
props.get(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP);
+ Boolean isRemoteStorageEnabled = (Boolean)
props.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG);
+ if (!isRemoteLogStorageSystemEnabled && isRemoteStorageEnabled) {
+ throw new ConfigException("Tiered Storage functionality is
disabled in the broker. " +
+ "Topic cannot be configured with remote log storage.");
}
- if (props.containsKey(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG)) {
- Long retentionBytes = (Long)
props.get(TopicConfig.RETENTION_BYTES_CONFIG);
- Long localLogRetentionBytes = (Long)
props.get(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG);
- if (retentionBytes > -1 && localLogRetentionBytes != -2) {
- if (localLogRetentionBytes == -1) {
- String message = String.format("Value must not be -1 as %s
value is set as %d.",
- TopicConfig.RETENTION_BYTES_CONFIG,
retentionBytes);
- throw new
ConfigException(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG,
localLogRetentionBytes, message);
- }
- if (localLogRetentionBytes > retentionBytes) {
- String message = String.format("Value must not be more
than %s property value: %d",
- TopicConfig.RETENTION_BYTES_CONFIG,
retentionBytes);
- throw new
ConfigException(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG,
localLogRetentionBytes, message);
- }
+ String cleanupPolicy =
props.get(TopicConfig.CLEANUP_POLICY_CONFIG).toString().toLowerCase(Locale.getDefault());
+ if (isRemoteStorageEnabled &&
cleanupPolicy.contains(TopicConfig.CLEANUP_POLICY_COMPACT)) {
+ throw new ConfigException("Remote log storage is unsupported for
the compacted topics");
+ }
Review Comment:
could be a separate function - `validateNoRemoteStorageForCompactedTopic`
##########
storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java:
##########
@@ -459,49 +463,53 @@ public static void validateValues(Map<?, ?> props) {
long maxCompactionLag = (Long)
props.get(TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG);
if (minCompactionLag > maxCompactionLag) {
throw new InvalidConfigurationException("conflict topic config
setting "
- + TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG + " (" +
minCompactionLag + ") > "
- + TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG + " (" +
maxCompactionLag + ")");
+ + TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG + " (" +
minCompactionLag + ") > "
+ + TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG + " (" +
maxCompactionLag + ")");
}
+ }
- if (props.containsKey(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG)) {
- boolean isRemoteStorageEnabled = (Boolean)
props.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG);
- String cleanupPolicy =
props.get(TopicConfig.CLEANUP_POLICY_CONFIG).toString().toLowerCase(Locale.getDefault());
- if (isRemoteStorageEnabled &&
cleanupPolicy.contains(TopicConfig.CLEANUP_POLICY_COMPACT)) {
- throw new ConfigException("Remote log storage is unsupported
for the compacted topics");
- }
+ public static void validateValuesInBroker(Map<?, ?> props) {
+ validateValues(props);
+ Boolean isRemoteLogStorageSystemEnabled =
+ (Boolean)
props.get(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP);
+ Boolean isRemoteStorageEnabled = (Boolean)
props.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG);
+ if (!isRemoteLogStorageSystemEnabled && isRemoteStorageEnabled) {
+ throw new ConfigException("Tiered Storage functionality is
disabled in the broker. " +
+ "Topic cannot be configured with remote log storage.");
}
- if (props.containsKey(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG)) {
- Long retentionBytes = (Long)
props.get(TopicConfig.RETENTION_BYTES_CONFIG);
- Long localLogRetentionBytes = (Long)
props.get(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG);
- if (retentionBytes > -1 && localLogRetentionBytes != -2) {
- if (localLogRetentionBytes == -1) {
- String message = String.format("Value must not be -1 as %s
value is set as %d.",
- TopicConfig.RETENTION_BYTES_CONFIG,
retentionBytes);
- throw new
ConfigException(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG,
localLogRetentionBytes, message);
- }
- if (localLogRetentionBytes > retentionBytes) {
- String message = String.format("Value must not be more
than %s property value: %d",
- TopicConfig.RETENTION_BYTES_CONFIG,
retentionBytes);
- throw new
ConfigException(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG,
localLogRetentionBytes, message);
- }
+ String cleanupPolicy =
props.get(TopicConfig.CLEANUP_POLICY_CONFIG).toString().toLowerCase(Locale.getDefault());
+ if (isRemoteStorageEnabled &&
cleanupPolicy.contains(TopicConfig.CLEANUP_POLICY_COMPACT)) {
+ throw new ConfigException("Remote log storage is unsupported for
the compacted topics");
+ }
+
+ Long retentionBytes = (Long)
props.get(TopicConfig.RETENTION_BYTES_CONFIG);
+ Long localLogRetentionBytes = (Long)
props.get(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG);
+ if (isRemoteStorageEnabled && retentionBytes > -1 &&
localLogRetentionBytes != -2) {
+ if (localLogRetentionBytes == -1) {
+ String message = String.format("Value must not be -1 as %s
value is set as %d.",
+ TopicConfig.RETENTION_BYTES_CONFIG, retentionBytes);
+ throw new
ConfigException(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG,
localLogRetentionBytes, message);
+ }
+ if (localLogRetentionBytes > retentionBytes) {
+ String message = String.format("Value must not be more than %s
property value: %d",
+ TopicConfig.RETENTION_BYTES_CONFIG, retentionBytes);
+ throw new
ConfigException(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG,
localLogRetentionBytes, message);
}
Review Comment:
should this be in `validateValues()` ? I would expect only the assertions
which are dependent on broker level config to be present outside
`validateValues`. One advantage of adding these static assertions into
`validateValues` is that it is checked on client side, hence, saves an extra
server call if configuration is not correct.
Also extract into function please
##########
storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java:
##########
@@ -459,49 +463,53 @@ public static void validateValues(Map<?, ?> props) {
long maxCompactionLag = (Long)
props.get(TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG);
if (minCompactionLag > maxCompactionLag) {
throw new InvalidConfigurationException("conflict topic config
setting "
- + TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG + " (" +
minCompactionLag + ") > "
- + TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG + " (" +
maxCompactionLag + ")");
+ + TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG + " (" +
minCompactionLag + ") > "
+ + TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG + " (" +
maxCompactionLag + ")");
}
+ }
- if (props.containsKey(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG)) {
- boolean isRemoteStorageEnabled = (Boolean)
props.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG);
- String cleanupPolicy =
props.get(TopicConfig.CLEANUP_POLICY_CONFIG).toString().toLowerCase(Locale.getDefault());
- if (isRemoteStorageEnabled &&
cleanupPolicy.contains(TopicConfig.CLEANUP_POLICY_COMPACT)) {
- throw new ConfigException("Remote log storage is unsupported
for the compacted topics");
- }
+ public static void validateValuesInBroker(Map<?, ?> props) {
+ validateValues(props);
+ Boolean isRemoteLogStorageSystemEnabled =
+ (Boolean)
props.get(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP);
+ Boolean isRemoteStorageEnabled = (Boolean)
props.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG);
+ if (!isRemoteLogStorageSystemEnabled && isRemoteStorageEnabled) {
+ throw new ConfigException("Tiered Storage functionality is
disabled in the broker. " +
+ "Topic cannot be configured with remote log storage.");
}
Review Comment:
could be a separate function - `validateRemoteStorageOnlyIfSystemEnabled`
##########
storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java:
##########
@@ -459,49 +463,53 @@ public static void validateValues(Map<?, ?> props) {
long maxCompactionLag = (Long)
props.get(TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG);
if (minCompactionLag > maxCompactionLag) {
throw new InvalidConfigurationException("conflict topic config
setting "
- + TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG + " (" +
minCompactionLag + ") > "
- + TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG + " (" +
maxCompactionLag + ")");
+ + TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG + " (" +
minCompactionLag + ") > "
+ + TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG + " (" +
maxCompactionLag + ")");
}
+ }
- if (props.containsKey(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG)) {
- boolean isRemoteStorageEnabled = (Boolean)
props.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG);
- String cleanupPolicy =
props.get(TopicConfig.CLEANUP_POLICY_CONFIG).toString().toLowerCase(Locale.getDefault());
- if (isRemoteStorageEnabled &&
cleanupPolicy.contains(TopicConfig.CLEANUP_POLICY_COMPACT)) {
- throw new ConfigException("Remote log storage is unsupported
for the compacted topics");
- }
+ public static void validateValuesInBroker(Map<?, ?> props) {
+ validateValues(props);
+ Boolean isRemoteLogStorageSystemEnabled =
+ (Boolean)
props.get(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP);
+ Boolean isRemoteStorageEnabled = (Boolean)
props.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG);
Review Comment:
could these be null? how are we handling when
`TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG` is not defined for a topic?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]