gharris1727 commented on code in PR #14303:
URL: https://github.com/apache/kafka/pull/14303#discussion_r1307738833
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java:
##########
@@ -429,7 +429,9 @@ void enrich(ConfigDef newDef) {
final ConfigDef.Validator typeValidator =
ConfigDef.LambdaValidator.with(
(String name, Object value) -> {
validateProps(prefix);
- getConfigDefFromConfigProvidingClass(typeConfig,
(Class<?>) value);
+ if (value != null) {
Review Comment:
Ah, this hides the exception message from "Not a (something)" and only shows
the "Missing required configuration (name) which has no default value". I think
that is reasonable.
The other call-site for this method is ConnectorConfig.EnrichablePlugin
which swallows this error on the validate() code path and propagates them when
instantiating the ConnectorConfig. Instantiating the ConnectorConfig will also
throw the "Missing required configuration" error, so it is not necessary to
throw the error.
I think you could safely change the getConfigDefFromConfigProvidingClass
implementation to return an empty stream when the value is null, rather than
throwing an exception. I don't think this is necessary, but maybe it keeps
these two code paths more similar.
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java:
##########
@@ -90,50 +93,97 @@ public SinkConnectorConfig(Plugins plugins, Map<String,
String> props) {
* @param props sink configuration properties
*/
public static void validate(Map<String, String> props) {
- final boolean hasTopicsConfig = hasTopicsConfig(props);
- final boolean hasTopicsRegexConfig = hasTopicsRegexConfig(props);
- final boolean hasDlqTopicConfig = hasDlqTopicConfig(props);
+ validate(
+ props,
+ error -> {
+ throw new ConfigException(error.property, error.value,
error.errorMessage);
+ }
+ );
+ }
+
+ /**
+ * Perform preflight validation for the sink-specific properties for a
connector.
+ *
+ * @param props the configuration for the sink connector
+ * @param validatedConfig any already-known {@link ConfigValue validation
results} for the configuration.
+ * May be empty, but may not be null. Any
configuration errors discovered by this method will
+ * be {@link ConfigValue#addErrorMessage(String)
added} to a value in this map, adding a new
+ * entry if one for the problematic property does
not already exist.
+ */
+ public static void validate(Map<String, String> props, Map<String,
ConfigValue> validatedConfig) {
+ validate(props, error -> addErrorMessage(validatedConfig, error));
+ }
+
+ private static void validate(Map<String, String> props,
Consumer<ConfigError> onError) {
+ final String topicsList = props.get(TOPICS_CONFIG);
+ final String topicsRegex = props.get(TOPICS_REGEX_CONFIG);
+ final String dlqTopic = props.getOrDefault(DLQ_TOPIC_NAME_CONFIG,
"").trim();
+ final boolean hasTopicsConfig = !Utils.isBlank(topicsList);
+ final boolean hasTopicsRegexConfig = !Utils.isBlank(topicsRegex);
+ final boolean hasDlqTopicConfig = !Utils.isBlank(dlqTopic);
if (hasTopicsConfig && hasTopicsRegexConfig) {
- throw new ConfigException(SinkTask.TOPICS_CONFIG + " and " +
SinkTask.TOPICS_REGEX_CONFIG +
- " are mutually exclusive options, but both are set.");
+ String errorMessage = TOPICS_CONFIG + " and " +
TOPICS_REGEX_CONFIG + " are mutually exclusive options, but both are set.";
+ onError.accept(new ConfigError(TOPICS_CONFIG, topicsList,
errorMessage));
+ onError.accept(new ConfigError(TOPICS_REGEX_CONFIG, topicsRegex,
errorMessage));
}
if (!hasTopicsConfig && !hasTopicsRegexConfig) {
- throw new ConfigException("Must configure one of " +
- SinkTask.TOPICS_CONFIG + " or " +
SinkTask.TOPICS_REGEX_CONFIG);
+ String errorMessage = "Must configure one of " + TOPICS_CONFIG + "
or " + TOPICS_REGEX_CONFIG;
+ onError.accept(new ConfigError(TOPICS_CONFIG, topicsList,
errorMessage));
+ onError.accept(new ConfigError(TOPICS_REGEX_CONFIG, topicsRegex,
errorMessage));
}
if (hasDlqTopicConfig) {
- String dlqTopic = props.get(DLQ_TOPIC_NAME_CONFIG).trim();
if (hasTopicsConfig) {
List<String> topics = parseTopicsList(props);
if (topics.contains(dlqTopic)) {
- throw new ConfigException(String.format("The DLQ topic
'%s' may not be included in the list of "
- + "topics ('%s=%s') consumed by the connector",
dlqTopic, SinkTask.TOPICS_REGEX_CONFIG, topics));
+ String errorMessage = String.format(
+ "The DLQ topic '%s' may not be included in the
list of topics ('%s=%s') consumed by the connector",
+ dlqTopic, TOPICS_CONFIG, topics
+ );
+ onError.accept(new ConfigError(TOPICS_CONFIG, topicsList,
errorMessage));
}
}
if (hasTopicsRegexConfig) {
- String topicsRegexStr =
props.get(SinkTask.TOPICS_REGEX_CONFIG);
- Pattern pattern = Pattern.compile(topicsRegexStr);
+ Pattern pattern = Pattern.compile(topicsRegex);
if (pattern.matcher(dlqTopic).matches()) {
- throw new ConfigException(String.format("The DLQ topic
'%s' may not be included in the regex matching the "
- + "topics ('%s=%s') consumed by the connector",
dlqTopic, SinkTask.TOPICS_REGEX_CONFIG, topicsRegexStr));
+ String errorMessage = String.format(
+ "The DLQ topic '%s' may not be included in the
regex matching the topics ('%s=%s') consumed by the connector",
+ dlqTopic, TOPICS_REGEX_CONFIG, topicsRegex
+ );
+ onError.accept(new ConfigError(TOPICS_REGEX_CONFIG,
topicsRegex, errorMessage));
}
}
}
}
+ private static class ConfigError {
Review Comment:
WDYT about having validate(Map) call validate(Map, Map) and then inspecting
the result map for errors?
That would eliminate the need for the Consumer<ConfigError> indirection.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]