showuon commented on code in PR #14176:
URL: https://github.com/apache/kafka/pull/14176#discussion_r1293269490
##########
storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java:
##########
@@ -265,7 +266,10 @@ public Optional<String> serverConfigName(String
configName) {
.define(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, LONG,
DEFAULT_LOCAL_RETENTION_MS, atLeast(-2), MEDIUM,
TopicConfig.LOCAL_LOG_RETENTION_MS_DOC)
.define(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, LONG,
DEFAULT_LOCAL_RETENTION_BYTES, atLeast(-2), MEDIUM,
- TopicConfig.LOCAL_LOG_RETENTION_BYTES_DOC);
+ TopicConfig.LOCAL_LOG_RETENTION_BYTES_DOC)
+
.define(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, BOOLEAN,
+
RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_STORAGE_SYSTEM_ENABLE, null, MEDIUM,
+
RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_DOC);
Review Comment:
Will this appear in the official doc under `Topic Config` section? If so,
maybe `defineInternal`?
##########
storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java:
##########
@@ -454,54 +458,101 @@ public static void validateNames(Properties props) {
throw new InvalidConfigurationException("Unknown topic config
name: " + name);
}
+ /**
+ * Validates the values of the given properties. Can be called by both
client and server.
+ * The `props` supplied should contain all the LogConfig properties and
the default values are extracted from the
+ * LogConfig class.
+ * @param props The properties to be validated
+ */
public static void validateValues(Map<?, ?> props) {
long minCompactionLag = (Long)
props.get(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG);
long maxCompactionLag = (Long)
props.get(TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG);
if (minCompactionLag > maxCompactionLag) {
throw new InvalidConfigurationException("conflict topic config
setting "
- + TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG + " (" +
minCompactionLag + ") > "
- + TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG + " (" +
maxCompactionLag + ")");
+ + TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG + " (" +
minCompactionLag + ") > "
+ + TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG + " (" +
maxCompactionLag + ")");
}
+ }
- if (props.containsKey(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG)) {
- boolean isRemoteStorageEnabled = (Boolean)
props.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG);
- String cleanupPolicy =
props.get(TopicConfig.CLEANUP_POLICY_CONFIG).toString().toLowerCase(Locale.getDefault());
- if (isRemoteStorageEnabled &&
cleanupPolicy.contains(TopicConfig.CLEANUP_POLICY_COMPACT)) {
- throw new ConfigException("Remote log storage is unsupported
for the compacted topics");
- }
+ /**
+ * Validates the default values of the LogConfig. Should be called only by
the broker.
+ * The `props` supplied should contain all the LogConfig properties except
+ * TopicConfig#REMOTE_LOG_STORAGE_ENABLE_CONFIG and the default values
should be extracted from the KafkaConfig.
+ * @param props The properties to be validated
+ */
+ public static void validateDefaultValuesInBroker(Map<?, ?> props) {
Review Comment:
I don't think this is to validate `default` value in broker, it should be
validating broker configs, including user overriding configs. Is that right?
Maybe `validateConfiguredValuesInBroker`?
##########
storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java:
##########
@@ -265,7 +266,10 @@ public Optional<String> serverConfigName(String
configName) {
.define(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, LONG,
DEFAULT_LOCAL_RETENTION_MS, atLeast(-2), MEDIUM,
TopicConfig.LOCAL_LOG_RETENTION_MS_DOC)
.define(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, LONG,
DEFAULT_LOCAL_RETENTION_BYTES, atLeast(-2), MEDIUM,
- TopicConfig.LOCAL_LOG_RETENTION_BYTES_DOC);
+ TopicConfig.LOCAL_LOG_RETENTION_BYTES_DOC)
+
.define(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, BOOLEAN,
+
RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_STORAGE_SYSTEM_ENABLE, null, MEDIUM,
+
RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_DOC);
Review Comment:
Also, we should add a comment to say why we add this `broker level` config
in log Config.
##########
storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java:
##########
@@ -454,54 +458,101 @@ public static void validateNames(Properties props) {
throw new InvalidConfigurationException("Unknown topic config
name: " + name);
}
+ /**
+ * Validates the values of the given properties. Can be called by both
client and server.
+ * The `props` supplied should contain all the LogConfig properties and
the default values are extracted from the
+ * LogConfig class.
+ * @param props The properties to be validated
+ */
public static void validateValues(Map<?, ?> props) {
long minCompactionLag = (Long)
props.get(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG);
long maxCompactionLag = (Long)
props.get(TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG);
if (minCompactionLag > maxCompactionLag) {
throw new InvalidConfigurationException("conflict topic config
setting "
- + TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG + " (" +
minCompactionLag + ") > "
- + TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG + " (" +
maxCompactionLag + ")");
+ + TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG + " (" +
minCompactionLag + ") > "
+ + TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG + " (" +
maxCompactionLag + ")");
}
+ }
- if (props.containsKey(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG)) {
- boolean isRemoteStorageEnabled = (Boolean)
props.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG);
- String cleanupPolicy =
props.get(TopicConfig.CLEANUP_POLICY_CONFIG).toString().toLowerCase(Locale.getDefault());
- if (isRemoteStorageEnabled &&
cleanupPolicy.contains(TopicConfig.CLEANUP_POLICY_COMPACT)) {
- throw new ConfigException("Remote log storage is unsupported
for the compacted topics");
- }
+ /**
+ * Validates the default values of the LogConfig. Should be called only by
the broker.
+ * The `props` supplied should contain all the LogConfig properties except
+ * TopicConfig#REMOTE_LOG_STORAGE_ENABLE_CONFIG and the default values
should be extracted from the KafkaConfig.
Review Comment:
From what I saw, we will include
`TopicConfig#REMOTE_LOG_STORAGE_ENABLE_CONFIG` in the `props` parameter. What
does this comment mean?
Maybe what you want to say is in the `validateDefaultValuesInBroker` method,
the `props` doesn't contain any `topic-level` configs, only broker-level
configs, is that right?
##########
core/src/main/scala/kafka/server/ControllerServer.scala:
##########
@@ -231,7 +231,7 @@ class ControllerServer(
setMetrics(quorumControllerMetrics).
setCreateTopicPolicy(createTopicPolicy.asJava).
setAlterConfigPolicy(alterConfigPolicy.asJava).
- setConfigurationValidator(new ControllerConfigurationValidator()).
+ setConfigurationValidator(new
ControllerConfigurationValidator(sharedServer.brokerConfig)).
Review Comment:
I think the scenario from @divijvaidya is too complicated. I don't think we
have any other similar config validations like this (from broker 1 has
different config with broker 2). IMO, this PR already adds validation for it,
and for the edge case, we can still fail the request with clear logs, it should
be good enough. WDYT?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]