Repository: kafka Updated Branches: refs/heads/0.11.0 83edf3f6f -> c029960bf
KAFKA-5472: Eliminated duplicate group names when validating connector results Kafka Connect was adding duplicate group names in the response from the REST API's validation of connector configurations. This fixes the duplicates and maintains the order of the `ConfigDef` objects so that the `ConfigValue` results are in the same order. This is a blocker and should be merged to 0.11.0. Author: Randall Hauch <[email protected]> Reviewers: Ewen Cheslack-Postava <[email protected]> Closes #3379 from rhauch/KAFKA-5472 (cherry picked from commit de982ba3fbf99664f0aaa5aa4b72af8fd1881232) Signed-off-by: Ewen Cheslack-Postava <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c029960b Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c029960b Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c029960b Branch: refs/heads/0.11.0 Commit: c029960bf4ae2cd79b22886f4ee519c4af0bcc8b Parents: 83edf3f Author: Randall Hauch <[email protected]> Authored: Tue Jun 20 17:48:32 2017 -0700 Committer: Ewen Cheslack-Postava <[email protected]> Committed: Tue Jun 20 17:48:41 2017 -0700 ---------------------------------------------------------------------- .../org/apache/kafka/common/config/ConfigDef.java | 5 +++-- .../apache/kafka/connect/runtime/AbstractHerder.java | 15 +++++++++------ 2 files changed, 12 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/c029960b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java index 8197b1f..2514e4f 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java +++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java @@ -26,6 +26,7 @@ import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.Locale; @@ -82,13 +83,13 @@ public class ConfigDef { private Set<String> configsWithNoParent; public ConfigDef() { - configKeys = new HashMap<>(); + configKeys = new LinkedHashMap<>(); groups = new LinkedList<>(); configsWithNoParent = null; } public ConfigDef(ConfigDef base) { - configKeys = new HashMap<>(base.configKeys); + configKeys = new LinkedHashMap<>(base.configKeys); groups = new LinkedList<>(base.groups); configsWithNoParent = base.configsWithNoParent == null ? null : new HashSet<>(base.configsWithNoParent); } http://git-wip-us.apache.org/repos/asf/kafka/blob/c029960b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java index 6293b01..cfb8ae0 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java @@ -46,9 +46,12 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; /** @@ -232,11 +235,11 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con throw new BadRequestException("Connector config " + connectorConfig + " contains no connector type"); List<ConfigValue> configValues = new ArrayList<>(); - Map<String, ConfigKey> configKeys = new HashMap<>(); - List<String> allGroups = new ArrayList<>(); + Map<String, ConfigKey> configKeys = new LinkedHashMap<>(); + Set<String> allGroups = new LinkedHashSet<>(); Connector connector = getConnector(connType); - ClassLoader savedLoader = worker.getPlugins().compareAndSwapLoaders(connector); + ClassLoader savedLoader = plugins().compareAndSwapLoaders(connector); try { // do basic connector validation (name, connector type, etc.) ConfigDef basicConfigDef = (connector instanceof SourceConnector) @@ -271,10 +274,10 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con configKeys.putAll(configDef.configKeys()); allGroups.addAll(configDef.groups()); configValues.addAll(config.configValues()); - return generateResult(connType, configKeys, configValues, allGroups); + return generateResult(connType, configKeys, configValues, new ArrayList<>(allGroups)); } catch (ConfigException e) { // Basic validation must have failed. Return the result. - return generateResult(connType, configKeys, configValues, allGroups); + return generateResult(connType, configKeys, configValues, new ArrayList<>(allGroups)); } finally { Plugins.compareAndSwapLoaders(savedLoader); } @@ -353,7 +356,7 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con if (tempConnectors.containsKey(connType)) { return tempConnectors.get(connType); } else { - Connector connector = worker.getPlugins().newConnector(connType); + Connector connector = plugins().newConnector(connType); tempConnectors.put(connType, connector); return connector; }
