Repository: kafka Updated Branches: refs/heads/trunk 065ddf901 -> 5236bf60d
KAFKA-3526: Return string instead of object in ConfigKeyInfo and ConfigValueInfo Author: Liquan Pei <[email protected]> Reviewers: Ewen Cheslack-Postava <[email protected]> Closes #1200 from Ishiihara/config-string Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5236bf60 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5236bf60 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5236bf60 Branch: refs/heads/trunk Commit: 5236bf60debbb0c08010315a92dd3fbfa482e871 Parents: 065ddf9 Author: Liquan Pei <[email protected]> Authored: Fri Apr 15 15:51:31 2016 -0700 Committer: Ewen Cheslack-Postava <[email protected]> Committed: Fri Apr 15 15:51:31 2016 -0700 ---------------------------------------------------------------------- .../apache/kafka/common/config/ConfigDef.java | 35 +++++++++-- .../kafka/common/config/ConfigDefTest.java | 3 +- .../kafka/connect/runtime/AbstractHerder.java | 40 +++++++++---- .../runtime/rest/entities/ConfigKeyInfo.java | 6 +- .../runtime/rest/entities/ConfigValueInfo.java | 12 ++-- .../resources/ConnectorPluginsResourceTest.java | 63 ++++++++++++++++---- 6 files changed, 120 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/5236bf60/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java index 881cb0b..1df55d9 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java +++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java @@ -538,19 +538,18 @@ public class ConfigDef { } } + @SuppressWarnings("unchecked") private void validate(String name, Map<String, Object> parsed, Map<String, ConfigValue> configs) { if (!configKeys.containsKey(name)) { return; } ConfigKey key = configKeys.get(name); ConfigValue config = configs.get(name); - Object value = parsed.get(name); List<Object> recommendedValues; if (key.recommender != null) { try { recommendedValues = key.recommender.validValues(name, parsed); List<Object> originalRecommendedValues = config.recommendedValues(); - if (!originalRecommendedValues.isEmpty()) { Set<Object> originalRecommendedValueSet = new HashSet<>(originalRecommendedValues); Iterator<Object> it = recommendedValues.iterator(); @@ -562,9 +561,6 @@ public class ConfigDef { } } config.recommendedValues(recommendedValues); - if (value != null && !recommendedValues.isEmpty() && !recommendedValues.contains(value)) { - config.addErrorMessage("Invalid value for configuration " + key.name); - } config.visible(key.recommender.visible(name, parsed)); } catch (ConfigException e) { config.addErrorMessage(e.getMessage()); @@ -676,6 +672,35 @@ public class ConfigDef { } } + public static String convertToString(Object parsedValue, Type type) { + if (parsedValue == null) { + return null; + } + + if (type == null) { + return parsedValue.toString(); + } + + switch (type) { + case BOOLEAN: + case SHORT: + case INT: + case LONG: + case DOUBLE: + case STRING: + case PASSWORD: + return parsedValue.toString(); + case LIST: + List<?> valueList = (List<?>) parsedValue; + return Utils.join(valueList, ","); + case CLASS: + Class<?> clazz = (Class<?>) parsedValue; + return clazz.getCanonicalName(); + default: + throw new IllegalStateException("Unknown type."); + } + } + /** * The config types */ http://git-wip-us.apache.org/repos/asf/kafka/blob/5236bf60/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java index 022fb6b..e20e422 100644 --- a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java +++ b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java @@ -236,12 +236,11 @@ public class ConfigDefTest { Map<String, ConfigValue> expected = new HashMap<>(); String errorMessageB = "Missing required configuration \"b\" which has no default value."; String errorMessageC = "Missing required configuration \"c\" which has no default value."; - String errorMessageD = "Invalid value for configuration d"; ConfigValue configA = new ConfigValue("a", 1, Arrays.<Object>asList(1, 2, 3), Collections.<String>emptyList()); ConfigValue configB = new ConfigValue("b", null, Arrays.<Object>asList(4, 5), Arrays.asList(errorMessageB, errorMessageB)); ConfigValue configC = new ConfigValue("c", null, Arrays.<Object>asList(4, 5), Arrays.asList(errorMessageC)); - ConfigValue configD = new ConfigValue("d", 10, Arrays.<Object>asList(1, 2, 3), Arrays.asList(errorMessageD)); + ConfigValue configD = new ConfigValue("d", 10, Arrays.<Object>asList(1, 2, 3), Collections.<String>emptyList()); expected.put("a", configA); expected.put("b", configB); http://git-wip-us.apache.org/repos/asf/kafka/blob/5236bf60/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java index a97c4db..1d87d60 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java @@ -19,6 +19,7 @@ package org.apache.kafka.connect.runtime; import org.apache.kafka.common.config.Config; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.ConfigKey; +import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.common.config.ConfigValue; import org.apache.kafka.connect.connector.Connector; import org.apache.kafka.connect.errors.NotFoundException; @@ -230,16 +231,18 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con configValueMap.put(configName, configValue); if (!configKeys.containsKey(configName)) { configValue.addErrorMessage("Configuration is not defined: " + configName); - configInfoList.add(new ConfigInfo(null, convertConfigValue(configValue))); + configInfoList.add(new ConfigInfo(null, convertConfigValue(configValue, null))); } } - for (String configName: configKeys.keySet()) { - ConfigKeyInfo configKeyInfo = convertConfigKey(configKeys.get(configName)); + for (Map.Entry<String, ConfigKey> entry : configKeys.entrySet()) { + String configName = entry.getKey(); + ConfigKeyInfo configKeyInfo = convertConfigKey(entry.getValue()); + Type type = entry.getValue().type; ConfigValueInfo configValueInfo = null; if (configValueMap.containsKey(configName)) { ConfigValue configValue = configValueMap.get(configName); - configValueInfo = convertConfigValue(configValue); + configValueInfo = convertConfigValue(configValue, type); errorCount += configValue.errorMessages().size(); } configInfoList.add(new ConfigInfo(configKeyInfo, configValueInfo)); @@ -249,11 +252,16 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con private static ConfigKeyInfo convertConfigKey(ConfigKey configKey) { String name = configKey.name; - String type = configKey.type.name(); - Object defaultValue = configKey.defaultValue; + Type type = configKey.type; + String typeName = configKey.type.name(); + boolean required = false; - if (defaultValue == ConfigDef.NO_DEFAULT_VALUE) { + String defaultValue; + if (configKey.defaultValue == ConfigDef.NO_DEFAULT_VALUE) { + defaultValue = (String) configKey.defaultValue; required = true; + } else { + defaultValue = ConfigDef.convertToString(configKey.defaultValue, type); } String importance = configKey.importance.name(); String documentation = configKey.documentation; @@ -262,11 +270,23 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con String width = configKey.width.name(); String displayName = configKey.displayName; List<String> dependents = configKey.dependents; - return new ConfigKeyInfo(name, type, required, defaultValue, importance, documentation, group, orderInGroup, width, displayName, dependents); + return new ConfigKeyInfo(name, typeName, required, defaultValue, importance, documentation, group, orderInGroup, width, displayName, dependents); } - private static ConfigValueInfo convertConfigValue(ConfigValue configValue) { - return new ConfigValueInfo(configValue.name(), configValue.value(), configValue.recommendedValues(), configValue.errorMessages(), configValue.visible()); + private static ConfigValueInfo convertConfigValue(ConfigValue configValue, Type type) { + String value = ConfigDef.convertToString(configValue.value(), type); + List<String> recommendedValues = new LinkedList<>(); + + if (type == Type.LIST) { + for (Object object: configValue.recommendedValues()) { + recommendedValues.add(ConfigDef.convertToString(object, Type.STRING)); + } + } else { + for (Object object : configValue.recommendedValues()) { + recommendedValues.add(ConfigDef.convertToString(object, type)); + } + } + return new ConfigValueInfo(configValue.name(), value, recommendedValues, configValue.errorMessages(), configValue.visible()); } private Connector getConnector(String connType) { http://git-wip-us.apache.org/repos/asf/kafka/blob/5236bf60/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigKeyInfo.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigKeyInfo.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigKeyInfo.java index f813709..ead24c5 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigKeyInfo.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigKeyInfo.java @@ -28,7 +28,7 @@ public class ConfigKeyInfo { private final String name; private final String type; private final boolean required; - private final Object defaultValue; + private final String defaultValue; private final String importance; private final String documentation; private final String group; @@ -41,7 +41,7 @@ public class ConfigKeyInfo { public ConfigKeyInfo(@JsonProperty("name") String name, @JsonProperty("type") String type, @JsonProperty("required") boolean required, - @JsonProperty("default_value") Object defaultValue, + @JsonProperty("default_value") String defaultValue, @JsonProperty("importance") String importance, @JsonProperty("documentation") String documentation, @JsonProperty("group") String group, @@ -78,7 +78,7 @@ public class ConfigKeyInfo { } @JsonProperty("default_value") - public Object defaultValue() { + public String defaultValue() { return defaultValue; } http://git-wip-us.apache.org/repos/asf/kafka/blob/5236bf60/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigValueInfo.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigValueInfo.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigValueInfo.java index 51e7ee5..a6ae006 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigValueInfo.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigValueInfo.java @@ -25,16 +25,16 @@ import java.util.Objects; public class ConfigValueInfo { private String name; - private Object value; - private List<Object> recommendedValues; + private String value; + private List<String> recommendedValues; private List<String> errors; private boolean visible; @JsonCreator public ConfigValueInfo( @JsonProperty("name") String name, - @JsonProperty("value") Object value, - @JsonProperty("recommended_values") List<Object> recommendedValues, + @JsonProperty("value") String value, + @JsonProperty("recommended_values") List<String> recommendedValues, @JsonProperty("errors") List<String> errors, @JsonProperty("visible") boolean visible) { this.name = name; @@ -50,12 +50,12 @@ public class ConfigValueInfo { } @JsonProperty - public Object value() { + public String value() { return value; } @JsonProperty("recommended_values") - public List<Object> recommendedValues() { + public List<String> recommendedValues() { return recommendedValues; } http://git-wip-us.apache.org/repos/asf/kafka/blob/5236bf60/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java index 1049e7e..732db3d 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java @@ -21,8 +21,10 @@ import com.fasterxml.jackson.core.type.TypeReference; import org.apache.kafka.common.config.Config; import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigDef.Recommender; import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Type; +import org.apache.kafka.common.config.ConfigDef.Width; import org.apache.kafka.connect.connector.Connector; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.runtime.AbstractHerder; @@ -48,6 +50,7 @@ import org.powermock.core.classloader.annotations.PowerMockIgnore; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -68,7 +71,8 @@ public class ConnectorPluginsResourceTest { private static Map<String, String> props = new HashMap<>(); static { props.put("test.string.config", "testString"); - props.put("test.int.config", "10"); + props.put("test.int.config", "1"); + props.put("test.list.config", "a,b"); } private static final ConfigInfos CONFIG_INFOS; @@ -76,22 +80,27 @@ public class ConnectorPluginsResourceTest { static { List<ConfigInfo> configs = new LinkedList<>(); - ConfigKeyInfo configKeyInfo = new ConfigKeyInfo("test.string.config", "STRING", true, "", "HIGH", "Test configuration for string type.", null, -1, "NONE", "test.string.config", new LinkedList<String>()); - ConfigValueInfo configValueInfo = new ConfigValueInfo("test.string.config", "testString", Collections.<Object>emptyList(), Collections.<String>emptyList(), true); + ConfigKeyInfo configKeyInfo = new ConfigKeyInfo("test.string.config", "STRING", true, "", "HIGH", "Test configuration for string type.", null, -1, "NONE", "test.string.config", Collections.<String>emptyList()); + ConfigValueInfo configValueInfo = new ConfigValueInfo("test.string.config", "testString", Collections.<String>emptyList(), Collections.<String>emptyList(), true); ConfigInfo configInfo = new ConfigInfo(configKeyInfo, configValueInfo); configs.add(configInfo); - configKeyInfo = new ConfigKeyInfo("test.int.config", "INT", true, "", "MEDIUM", "Test configuration for integer type.", null, -1, "NONE", "test.int.config", new LinkedList<String>()); - configValueInfo = new ConfigValueInfo("test.int.config", 10, Collections.<Object>emptyList(), Collections.<String>emptyList(), true); + configKeyInfo = new ConfigKeyInfo("test.int.config", "INT", true, "", "MEDIUM", "Test configuration for integer type.", "Test", 1, "MEDIUM", "test.int.config", Collections.<String>emptyList()); + configValueInfo = new ConfigValueInfo("test.int.config", "1", Arrays.asList("1", "2", "3"), Collections.<String>emptyList(), true); configInfo = new ConfigInfo(configKeyInfo, configValueInfo); configs.add(configInfo); - configKeyInfo = new ConfigKeyInfo("test.string.config.default", "STRING", false, "", "LOW", "Test configuration with default value.", null, -1, "NONE", "test.string.config.default", new LinkedList<String>()); - configValueInfo = new ConfigValueInfo("test.string.config.default", "", Collections.<Object>emptyList(), Collections.<String>emptyList(), true); + configKeyInfo = new ConfigKeyInfo("test.string.config.default", "STRING", false, "", "LOW", "Test configuration with default value.", null, -1, "NONE", "test.string.config.default", Collections.<String>emptyList()); + configValueInfo = new ConfigValueInfo("test.string.config.default", "", Collections.<String>emptyList(), Collections.<String>emptyList(), true); configInfo = new ConfigInfo(configKeyInfo, configValueInfo); configs.add(configInfo); - CONFIG_INFOS = new ConfigInfos(ConnectorPluginsResourceTestConnector.class.getName(), 0, Collections.<String>emptyList(), configs); + configKeyInfo = new ConfigKeyInfo("test.list.config", "LIST", true, "", "HIGH", "Test configuration for list type.", "Test", 2, "LONG", "test.list.config", Collections.<String>emptyList()); + configValueInfo = new ConfigValueInfo("test.list.config", "a,b", Arrays.asList("a", "b", "c"), Collections.<String>emptyList(), true); + configInfo = new ConfigInfo(configKeyInfo, configValueInfo); + configs.add(configInfo); + + CONFIG_INFOS = new ConfigInfos(ConnectorPluginsResourceTestConnector.class.getName(), 0, Collections.singletonList("Test"), configs); } @Mock @@ -143,14 +152,17 @@ public class ConnectorPluginsResourceTest { /* Name here needs to be unique as we are testing the aliasing mechanism */ public static class ConnectorPluginsResourceTestConnector extends Connector { - public static final String TEST_STRING_CONFIG = "test.string.config"; - public static final String TEST_INT_CONFIG = "test.int.config"; - public static final String TEST_STRING_CONFIG_DEFAULT = "test.string.config.default"; + private static final String TEST_STRING_CONFIG = "test.string.config"; + private static final String TEST_INT_CONFIG = "test.int.config"; + private static final String TEST_STRING_CONFIG_DEFAULT = "test.string.config.default"; + private static final String TEST_LIST_CONFIG = "test.list.config"; + private static final String GROUP = "Test"; private static final ConfigDef CONFIG_DEF = new ConfigDef() .define(TEST_STRING_CONFIG, Type.STRING, Importance.HIGH, "Test configuration for string type.") - .define(TEST_INT_CONFIG, Type.INT, Importance.MEDIUM, "Test configuration for integer type.") - .define(TEST_STRING_CONFIG_DEFAULT, Type.STRING, "", Importance.LOW, "Test configuration with default value."); + .define(TEST_INT_CONFIG, Type.INT, Importance.MEDIUM, "Test configuration for integer type.", GROUP, 1, Width.MEDIUM, TEST_INT_CONFIG, new IntegerRecommender()) + .define(TEST_STRING_CONFIG_DEFAULT, Type.STRING, "", Importance.LOW, "Test configuration with default value.") + .define(TEST_LIST_CONFIG, Type.LIST, Importance.HIGH, "Test configuration for list type.", GROUP, 2, Width.LONG, TEST_LIST_CONFIG, new ListRecommender()); @Override public String version() { @@ -182,4 +194,29 @@ public class ConnectorPluginsResourceTest { return CONFIG_DEF; } } + + private static class IntegerRecommender implements Recommender { + + @Override + public List<Object> validValues(String name, Map<String, Object> parsedConfig) { + return Arrays.<Object>asList(1, 2, 3); + } + + @Override + public boolean visible(String name, Map<String, Object> parsedConfig) { + return true; + } + } + + private static class ListRecommender implements Recommender { + @Override + public List<Object> validValues(String name, Map<String, Object> parsedConfig) { + return Arrays.<Object>asList("a", "b", "c"); + } + + @Override + public boolean visible(String name, Map<String, Object> parsedConfig) { + return true; + } + } }
