gharris1727 commented on code in PR #14303:
URL: https://github.com/apache/kafka/pull/14303#discussion_r1307738833


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java:
##########
@@ -429,7 +429,9 @@ void enrich(ConfigDef newDef) {
                 final ConfigDef.Validator typeValidator = 
ConfigDef.LambdaValidator.with(
                     (String name, Object value) -> {
                         validateProps(prefix);
-                        getConfigDefFromConfigProvidingClass(typeConfig, 
(Class<?>) value);
+                        if (value != null) {

Review Comment:
   Ah, this hides the exception message from "Not a (something)" and only shows 
the "Missing required configuration (name) which has no default value". I think 
that is reasonable.
   
   The other call-site for this method is ConnectorConfig.EnrichablePlugin 
which swallows this error on the validate() code path and propagates them when 
instantiating the ConnectorConfig. Instantiating the ConnectorConfig will also 
throw the "Missing required configuration" error, so it is not necessary to 
throw the error.
   
   I think you could safely change the getConfigDefFromConfigProvidingClass 
implementation to return an empty stream when the value is null, rather than 
throwing an exception. I don't think this is necessary, but maybe it keeps 
these two code paths more similar.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java:
##########
@@ -90,50 +93,97 @@ public SinkConnectorConfig(Plugins plugins, Map<String, 
String> props) {
      * @param props sink configuration properties
      */
     public static void validate(Map<String, String> props) {
-        final boolean hasTopicsConfig = hasTopicsConfig(props);
-        final boolean hasTopicsRegexConfig = hasTopicsRegexConfig(props);
-        final boolean hasDlqTopicConfig = hasDlqTopicConfig(props);
+        validate(
+                props,
+                error -> {
+                    throw new ConfigException(error.property, error.value, 
error.errorMessage);
+                }
+        );
+    }
+
+    /**
+     * Perform preflight validation for the sink-specific properties for a 
connector.
+     *
+     * @param props           the configuration for the sink connector
+     * @param validatedConfig any already-known {@link ConfigValue validation 
results} for the configuration.
+     *                        May be empty, but may not be null. Any 
configuration errors discovered by this method will
+     *                        be {@link ConfigValue#addErrorMessage(String) 
added} to a value in this map, adding a new
+     *                        entry if one for the problematic property does 
not already exist.
+     */
+    public static void validate(Map<String, String> props, Map<String, 
ConfigValue> validatedConfig) {
+        validate(props, error -> addErrorMessage(validatedConfig, error));
+    }
+
+    private static void validate(Map<String, String> props, 
Consumer<ConfigError> onError) {
+        final String topicsList = props.get(TOPICS_CONFIG);
+        final String topicsRegex = props.get(TOPICS_REGEX_CONFIG);
+        final String dlqTopic = props.getOrDefault(DLQ_TOPIC_NAME_CONFIG, 
"").trim();
+        final boolean hasTopicsConfig = !Utils.isBlank(topicsList);
+        final boolean hasTopicsRegexConfig = !Utils.isBlank(topicsRegex);
+        final boolean hasDlqTopicConfig = !Utils.isBlank(dlqTopic);
 
         if (hasTopicsConfig && hasTopicsRegexConfig) {
-            throw new ConfigException(SinkTask.TOPICS_CONFIG + " and " + 
SinkTask.TOPICS_REGEX_CONFIG +
-                " are mutually exclusive options, but both are set.");
+            String errorMessage = TOPICS_CONFIG + " and " + 
TOPICS_REGEX_CONFIG + " are mutually exclusive options, but both are set.";
+            onError.accept(new ConfigError(TOPICS_CONFIG, topicsList, 
errorMessage));
+            onError.accept(new ConfigError(TOPICS_REGEX_CONFIG, topicsRegex, 
errorMessage));
         }
 
         if (!hasTopicsConfig && !hasTopicsRegexConfig) {
-            throw new ConfigException("Must configure one of " +
-                SinkTask.TOPICS_CONFIG + " or " + 
SinkTask.TOPICS_REGEX_CONFIG);
+            String errorMessage = "Must configure one of " + TOPICS_CONFIG + " 
or " + TOPICS_REGEX_CONFIG;
+            onError.accept(new ConfigError(TOPICS_CONFIG, topicsList, 
errorMessage));
+            onError.accept(new ConfigError(TOPICS_REGEX_CONFIG, topicsRegex, 
errorMessage));
         }
 
         if (hasDlqTopicConfig) {
-            String dlqTopic = props.get(DLQ_TOPIC_NAME_CONFIG).trim();
             if (hasTopicsConfig) {
                 List<String> topics = parseTopicsList(props);
                 if (topics.contains(dlqTopic)) {
-                    throw new ConfigException(String.format("The DLQ topic 
'%s' may not be included in the list of "
-                            + "topics ('%s=%s') consumed by the connector", 
dlqTopic, SinkTask.TOPICS_REGEX_CONFIG, topics));
+                    String errorMessage = String.format(
+                            "The DLQ topic '%s' may not be included in the 
list of topics ('%s=%s') consumed by the connector",
+                            dlqTopic, TOPICS_CONFIG, topics
+                    );
+                    onError.accept(new ConfigError(TOPICS_CONFIG, topicsList, 
errorMessage));
                 }
             }
             if (hasTopicsRegexConfig) {
-                String topicsRegexStr = 
props.get(SinkTask.TOPICS_REGEX_CONFIG);
-                Pattern pattern = Pattern.compile(topicsRegexStr);
+                Pattern pattern = Pattern.compile(topicsRegex);
                 if (pattern.matcher(dlqTopic).matches()) {
-                    throw new ConfigException(String.format("The DLQ topic 
'%s' may not be included in the regex matching the "
-                            + "topics ('%s=%s') consumed by the connector", 
dlqTopic, SinkTask.TOPICS_REGEX_CONFIG, topicsRegexStr));
+                    String errorMessage = String.format(
+                            "The DLQ topic '%s' may not be included in the 
regex matching the topics ('%s=%s') consumed by the connector",
+                            dlqTopic, TOPICS_REGEX_CONFIG, topicsRegex
+                    );
+                    onError.accept(new ConfigError(TOPICS_REGEX_CONFIG, 
topicsRegex, errorMessage));
                 }
             }
         }
     }
 
+    private static class ConfigError {

Review Comment:
   WDYT about having validate(Map) call validate(Map, Map) and then inspecting 
the result map for errors?
   That would eliminate the need for the Consumer<ConfigError> indirection.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to