Repository: kafka Updated Branches: refs/heads/trunk a1eb12d7c -> c07d01722
http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigInfo.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigInfo.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigInfo.java new file mode 100644 index 0000000..6040563 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigInfo.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.runtime.rest.entities; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Objects; + +public class ConfigInfo { + + private ConfigKeyInfo configKey; + private ConfigValueInfo configValue; + + @JsonCreator + public ConfigInfo( + @JsonProperty("definition") ConfigKeyInfo configKey, + @JsonProperty("value") ConfigValueInfo configValue) { + this.configKey = configKey; + this.configValue = configValue; + } + + @JsonProperty("definition") + public ConfigKeyInfo configKey() { + return configKey; + } + + @JsonProperty("value") + public ConfigValueInfo configValue() { + return configValue; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ConfigInfo that = (ConfigInfo) o; + return Objects.equals(configKey, that.configKey) && + Objects.equals(configValue, that.configValue); + } + + @Override + public int hashCode() { + return Objects.hash(configKey, configValue); + } + + @Override + public String toString() { + return "[" + configKey.toString() + "," + configValue.toString() + "]"; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigInfos.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigInfos.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigInfos.java new file mode 100644 index 0000000..3e73983 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigInfos.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.runtime.rest.entities; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.List; +import java.util.Objects; + +public class ConfigInfos { + + @JsonProperty("name") + private final String name; + + @JsonProperty("error_count") + private final int errorCount; + + @JsonProperty("groups") + private final List<String> groups; + + @JsonProperty("configs") + private final List<ConfigInfo> configs; + + @JsonCreator + public ConfigInfos(@JsonProperty("name") String name, + @JsonProperty("error_count") int errorCount, + @JsonProperty("groups") List<String> groups, + @JsonProperty("configs") List<ConfigInfo> configs) { + this.name = name; + this.groups = groups; + this.errorCount = errorCount; + this.configs = configs; + } + + @JsonProperty + public String name() { + return name; + } + + @JsonProperty + public List<String> groups() { + return groups; + } + + @JsonProperty("error_count") + public int errorCount() { + return errorCount; + } + + @JsonProperty("configs") + public List<ConfigInfo> values() { + return configs; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ConfigInfos that = (ConfigInfos) o; + return Objects.equals(name, that.name) && + Objects.equals(errorCount, that.errorCount) && + Objects.equals(groups, that.groups) && + Objects.equals(configs, that.configs); + } + + @Override + public int hashCode() { + return Objects.hash(name, errorCount, groups, configs); + } + + @Override + public String toString() { + StringBuffer sb = new StringBuffer(); + sb.append("[") + .append(name) + .append(",") + .append(errorCount) + .append(",") + .append(groups) + .append(",") + .append(configs) + .append("]"); + return sb.toString(); + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigKeyInfo.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigKeyInfo.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigKeyInfo.java new file mode 100644 index 0000000..f813709 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigKeyInfo.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.runtime.rest.entities; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.List; +import java.util.Objects; + +public class ConfigKeyInfo { + + private final String name; + private final String type; + private final boolean required; + private final Object defaultValue; + private final String importance; + private final String documentation; + private final String group; + private final int orderInGroup; + private final String width; + private final String displayName; + private final List<String> dependents; + + @JsonCreator + public ConfigKeyInfo(@JsonProperty("name") String name, + @JsonProperty("type") String type, + @JsonProperty("required") boolean required, + @JsonProperty("default_value") Object defaultValue, + @JsonProperty("importance") String importance, + @JsonProperty("documentation") String documentation, + @JsonProperty("group") String group, + @JsonProperty("order_in_group") int orderInGroup, + @JsonProperty("width") String width, + @JsonProperty("display_name") String displayName, + @JsonProperty("dependents") List<String> dependents) { + this.name = name; + this.type = type; + this.required = required; + this.defaultValue = defaultValue; + this.importance = importance; + this.documentation = documentation; + this.group = group; + this.orderInGroup = orderInGroup; + this.width = width; + this.displayName = displayName; + this.dependents = dependents; + } + + @JsonProperty + public String name() { + return name; + } + + @JsonProperty + public String type() { + return type; + } + + @JsonProperty + public boolean required() { + return required; + } + + @JsonProperty("default_value") + public Object defaultValue() { + return defaultValue; + } + + @JsonProperty + public String documentation() { + return documentation; + } + + @JsonProperty + public String group() { + return group; + } + + @JsonProperty("order") + public int orderInGroup() { + return orderInGroup; + } + + @JsonProperty + public String width() { + return width; + } + + @JsonProperty + public String importance() { + return importance; + } + + @JsonProperty("display_name") + public String displayName() { + return displayName; + } + + @JsonProperty + public List<String> dependents() { + return dependents; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ConfigKeyInfo that = (ConfigKeyInfo) o; + return Objects.equals(name, that.name) && + Objects.equals(type, that.type) && + Objects.equals(required, that.required) && + Objects.equals(defaultValue, that.defaultValue) && + Objects.equals(importance, that.importance) && + Objects.equals(documentation, that.documentation) && + Objects.equals(group, that.group) && + Objects.equals(orderInGroup, that.orderInGroup) && + Objects.equals(width, that.width) && + Objects.equals(displayName, that.displayName) && + Objects.equals(dependents, that.dependents); + } + + @Override + public int hashCode() { + return Objects.hash(name, type, required, defaultValue, importance, documentation, group, orderInGroup, width, displayName, dependents); + } + + @Override + public String toString() { + StringBuffer sb = new StringBuffer(); + sb.append("[") + .append(name) + .append(",") + .append(type) + .append(",") + .append(required) + .append(",") + .append(defaultValue) + .append(",") + .append(importance) + .append(",") + .append(documentation) + .append(",") + .append(group) + .append(",") + .append(orderInGroup) + .append(",") + .append(width) + .append(",") + .append(displayName) + .append(",") + .append(dependents) + .append("]"); + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigValueInfo.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigValueInfo.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigValueInfo.java new file mode 100644 index 0000000..51e7ee5 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigValueInfo.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.runtime.rest.entities; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.List; +import java.util.Objects; + +public class ConfigValueInfo { + private String name; + private Object value; + private List<Object> recommendedValues; + private List<String> errors; + private boolean visible; + + @JsonCreator + public ConfigValueInfo( + @JsonProperty("name") String name, + @JsonProperty("value") Object value, + @JsonProperty("recommended_values") List<Object> recommendedValues, + @JsonProperty("errors") List<String> errors, + @JsonProperty("visible") boolean visible) { + this.name = name; + this.value = value; + this.recommendedValues = recommendedValues; + this.errors = errors; + this.visible = visible; + } + + @JsonProperty + public String name() { + return name; + } + + @JsonProperty + public Object value() { + return value; + } + + @JsonProperty("recommended_values") + public List<Object> recommendedValues() { + return recommendedValues; + } + + @JsonProperty + public List<String> errors() { + return errors; + } + + @JsonProperty + public boolean visible() { + return visible; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ConfigValueInfo that = (ConfigValueInfo) o; + return Objects.equals(name, that.name) && + Objects.equals(value, that.value) && + Objects.equals(recommendedValues, that.recommendedValues) && + Objects.equals(errors, that.errors) && + Objects.equals(visible, that.visible); + } + + @Override + public int hashCode() { + return Objects.hash(name, value, recommendedValues, errors, visible); + } + + @Override + public String toString() { + StringBuffer sb = new StringBuffer(); + sb.append("[") + .append(name) + .append(",") + .append(value) + .append(",") + .append(recommendedValues) + .append(",") + .append(errors) + .append(",") + .append(visible) + .append("]"); + return sb.toString(); + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorInfo.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorInfo.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorInfo.java index 8daae05..9567ef9 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorInfo.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorInfo.java @@ -19,6 +19,7 @@ package org.apache.kafka.connect.runtime.rest.entities; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; + import org.apache.kafka.connect.util.ConnectorTaskId; import java.util.ArrayList; @@ -34,13 +35,15 @@ public class ConnectorInfo { private final List<ConnectorTaskId> tasks; @JsonCreator - public ConnectorInfo(@JsonProperty("name") String name, @JsonProperty("config") Map<String, String> config, + public ConnectorInfo(@JsonProperty("name") String name, + @JsonProperty("config") Map<String, String> config, @JsonProperty("tasks") List<ConnectorTaskId> tasks) { this.name = name; this.config = config; this.tasks = tasks; } + @JsonProperty public String name() { return name; http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java new file mode 100644 index 0000000..8439707 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.runtime.rest.resources; + +import org.apache.kafka.connect.runtime.Herder; +import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos; + +import java.util.Map; + +import javax.ws.rs.Consumes; +import javax.ws.rs.PUT; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; + +@Path("/connector-plugins") +@Produces(MediaType.APPLICATION_JSON) +@Consumes(MediaType.APPLICATION_JSON) +public class ConnectorPluginsResource { + + private final Herder herder; + + public ConnectorPluginsResource(Herder herder) { + this.herder = herder; + } + + @PUT + @Path("/{connectorType}/config/validate") + public ConfigInfos validateConfigs(final @PathParam("connectorType") String connType, + final Map<String, String> connectorConfig) throws Throwable { + return herder.validateConfigs(connType, connectorConfig); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java index d0d940b..b6e9f61 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java @@ -18,6 +18,7 @@ package org.apache.kafka.connect.runtime.rest.resources; import com.fasterxml.jackson.core.type.TypeReference; + import org.apache.kafka.connect.runtime.ConnectorConfig; import org.apache.kafka.connect.runtime.Herder; import org.apache.kafka.connect.runtime.distributed.NotLeaderException; @@ -32,6 +33,14 @@ import org.apache.kafka.connect.util.FutureCallback; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.net.URI; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + import javax.servlet.ServletContext; import javax.ws.rs.Consumes; import javax.ws.rs.DELETE; @@ -43,13 +52,6 @@ import javax.ws.rs.PathParam; import javax.ws.rs.Produces; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; -import java.net.URI; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; @Path("/connectors") @Produces(MediaType.APPLICATION_JSON) http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java index 707470f..9c48ed7 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java @@ -49,7 +49,6 @@ import java.util.Set; public class StandaloneHerder extends AbstractHerder { private static final Logger log = LoggerFactory.getLogger(StandaloneHerder.class); - private final Worker worker; private HashMap<String, ConnectorState> connectors = new HashMap<>(); public StandaloneHerder(Worker worker) { @@ -60,8 +59,7 @@ public class StandaloneHerder extends AbstractHerder { StandaloneHerder(String workerId, Worker worker, StatusBackingStore statusBackingStore) { - super(statusBackingStore, workerId); - this.worker = worker; + super(worker, statusBackingStore, workerId); } public synchronized void start() { http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkConnector.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkConnector.java b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkConnector.java index 0ab64fd..c2515a0 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkConnector.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkConnector.java @@ -17,6 +17,7 @@ package org.apache.kafka.connect.tools; +import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.utils.AppInfoParser; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.source.SourceConnector; @@ -61,4 +62,9 @@ public class VerifiableSinkConnector extends SourceConnector { @Override public void stop() { } + + @Override + public ConfigDef config() { + return new ConfigDef(); + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceConnector.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceConnector.java b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceConnector.java index 5f9afd5..b18db6e 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceConnector.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceConnector.java @@ -17,6 +17,7 @@ package org.apache.kafka.connect.tools; +import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.utils.AppInfoParser; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.source.SourceConnector; @@ -61,4 +62,9 @@ public class VerifiableSourceConnector extends SourceConnector { @Override public void stop() { } + + @Override + public ConfigDef config() { + return new ConfigDef(); + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java index f17023c..1dc5784 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java @@ -34,6 +34,7 @@ public class AbstractHerderTest extends EasyMockSupport { @Test public void connectorStatus() { + Worker worker = null; String workerId = "workerId"; String connector = "connector"; int generation = 5; @@ -42,8 +43,8 @@ public class AbstractHerderTest extends EasyMockSupport { StatusBackingStore store = strictMock(StatusBackingStore.class); AbstractHerder herder = partialMockBuilder(AbstractHerder.class) - .withConstructor(StatusBackingStore.class, String.class) - .withArgs(store, workerId) + .withConstructor(Worker.class, StatusBackingStore.class, String.class) + .withArgs(worker, store, workerId) .addMockedMethod("generation") .createMock(); @@ -76,14 +77,15 @@ public class AbstractHerderTest extends EasyMockSupport { @Test public void taskStatus() { + Worker worker = null; ConnectorTaskId taskId = new ConnectorTaskId("connector", 0); String workerId = "workerId"; StatusBackingStore store = strictMock(StatusBackingStore.class); AbstractHerder herder = partialMockBuilder(AbstractHerder.class) - .withConstructor(StatusBackingStore.class, String.class) - .withArgs(store, workerId) + .withConstructor(Worker.class, StatusBackingStore.class, String.class) + .withArgs(worker, store, workerId) .addMockedMethod("generation") .createMock(); http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java index 67d3fdc..557d789 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java @@ -18,6 +18,7 @@ package org.apache.kafka.connect.runtime; import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.utils.Time; import org.apache.kafka.connect.connector.Connector; import org.apache.kafka.connect.connector.ConnectorContext; @@ -466,7 +467,11 @@ public class WorkerTest extends ThreadedTest { /* Name here needs to be unique as we are testing the aliasing mechanism */ - private static class WorkerTestConnector extends Connector { + public static class WorkerTestConnector extends Connector { + + private static final ConfigDef CONFIG_DEF = new ConfigDef() + .define("configName", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "Test configName."); + @Override public String version() { return "1.0"; @@ -491,6 +496,11 @@ public class WorkerTest extends ThreadedTest { public void stop() { } + + @Override + public ConfigDef config() { + return CONFIG_DEF; + } } private static class TestSourceTask extends SourceTask { http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java new file mode 100644 index 0000000..625c91f --- /dev/null +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.runtime.rest.resources; + +import com.fasterxml.jackson.core.type.TypeReference; + +import org.apache.kafka.common.config.Config; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigDef.Type; +import org.apache.kafka.common.config.ConfigDef.Importance; +import org.apache.kafka.connect.connector.Connector; +import org.apache.kafka.connect.connector.Task; +import org.apache.kafka.connect.runtime.AbstractHerder; +import org.apache.kafka.connect.runtime.Herder; +import org.apache.kafka.connect.runtime.rest.RestServer; +import org.apache.kafka.connect.runtime.rest.entities.ConfigInfo; +import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos; +import org.apache.kafka.connect.runtime.rest.entities.ConfigKeyInfo; +import org.apache.kafka.connect.runtime.rest.entities.ConfigValueInfo; +import org.easymock.EasyMock; +import org.easymock.IAnswer; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.easymock.PowerMock; +import org.powermock.api.easymock.annotation.Mock; +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +@RunWith(PowerMockRunner.class) +@PrepareForTest(RestServer.class) +@PowerMockIgnore("javax.management.*") +public class ConnectorPluginsResourceTest { + + private static Map<String, String> props = new HashMap<>(); + static { + props.put("test.string.config", "testString"); + props.put("test.int.config", "10"); + } + + private static final ConfigInfos CONFIG_INFOS; + static { + List<ConfigInfo> configs = new LinkedList<>(); + + ConfigKeyInfo configKeyInfo = new ConfigKeyInfo("test.string.config", "STRING", true, "", "HIGH", "Test configuration for string type.", null, -1, "NONE", "test.string.config", new LinkedList<String>()); + ConfigValueInfo configValueInfo = new ConfigValueInfo("test.string.config", "testString", Collections.<Object>emptyList(), Collections.<String>emptyList(), true); + ConfigInfo configInfo = new ConfigInfo(configKeyInfo, configValueInfo); + configs.add(configInfo); + + configKeyInfo = new ConfigKeyInfo("test.int.config", "INT", true, "", "MEDIUM", "Test configuration for integer type.", null, -1, "NONE", "test.int.config", new LinkedList<String>()); + configValueInfo = new ConfigValueInfo("test.int.config", 10, Collections.<Object>emptyList(), Collections.<String>emptyList(), true); + configInfo = new ConfigInfo(configKeyInfo, configValueInfo); + configs.add(configInfo); + + configKeyInfo = new ConfigKeyInfo("test.string.config.default", "STRING", false, "", "LOW", "Test configuration with default value.", null, -1, "NONE", "test.string.config.default", new LinkedList<String>()); + configValueInfo = new ConfigValueInfo("test.string.config.default", "", Collections.<Object>emptyList(), Collections.<String>emptyList(), true); + configInfo = new ConfigInfo(configKeyInfo, configValueInfo); + configs.add(configInfo); + + CONFIG_INFOS = new ConfigInfos(ConnectorPluginsResourceTestConnector.class.getName(), 0, Collections.<String>emptyList(), configs); + } + + @Mock + private Herder herder; + private ConnectorPluginsResource connectorPluginsResource; + + @Before + public void setUp() throws NoSuchMethodException { + PowerMock.mockStatic(RestServer.class, + RestServer.class.getMethod("httpRequest", String.class, String.class, Object.class, TypeReference.class)); + connectorPluginsResource = new ConnectorPluginsResource(herder); + } + + @Test + public void testValidateConfig() throws Throwable { + herder.validateConfigs(EasyMock.eq(ConnectorPluginsResourceTestConnector.class.getName()), EasyMock.eq(props)); + + PowerMock.expectLastCall().andAnswer(new IAnswer<ConfigInfos>() { + @Override + public ConfigInfos answer() { + Config config = new ConnectorPluginsResourceTestConnector().validate(props); + Connector connector = new ConnectorPluginsResourceTestConnector(); + ConfigDef configDef = connector.config(); + return AbstractHerder.generateResult(ConnectorPluginsResourceTestConnector.class.getName(), configDef.configKeys(), config.configValues(), configDef.groups()); + } + }); + PowerMock.replayAll(); + + ConfigInfos configInfos = connectorPluginsResource.validateConfigs(ConnectorPluginsResourceTestConnector.class.getName(), props); + assertEquals(CONFIG_INFOS.name(), configInfos.name()); + assertEquals(CONFIG_INFOS.errorCount(), configInfos.errorCount()); + assertEquals(CONFIG_INFOS.groups(), configInfos.groups()); + assertEquals(new HashSet<>(CONFIG_INFOS.values()), new HashSet<>(configInfos.values())); + + PowerMock.verifyAll(); + } + + /* Name here needs to be unique as we are testing the aliasing mechanism */ + public static class ConnectorPluginsResourceTestConnector extends Connector { + + public static final String TEST_STRING_CONFIG = "test.string.config"; + public static final String TEST_INT_CONFIG = "test.int.config"; + public static final String TEST_STRING_CONFIG_DEFAULT = "test.string.config.default"; + + private static final ConfigDef CONFIG_DEF = new ConfigDef() + .define(TEST_STRING_CONFIG, Type.STRING, Importance.HIGH, "Test configuration for string type.") + .define(TEST_INT_CONFIG, Type.INT, Importance.MEDIUM, "Test configuration for integer type.") + .define(TEST_STRING_CONFIG_DEFAULT, Type.STRING, "", Importance.LOW, "Test configuration with default value."); + + @Override + public String version() { + return "1.0"; + } + + @Override + public void start(Map<String, String> props) { + + } + + @Override + public Class<? extends Task> taskClass() { + return null; + } + + @Override + public List<Map<String, String>> taskConfigs(int maxTasks) { + return null; + } + + @Override + public void stop() { + + } + + @Override + public ConfigDef config() { + return CONFIG_DEF; + } + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java index 4659ae8..970f56c 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java @@ -18,6 +18,7 @@ package org.apache.kafka.connect.runtime.rest.resources; import com.fasterxml.jackson.core.type.TypeReference; + import org.apache.kafka.connect.errors.AlreadyExistsException; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.NotFoundException; @@ -83,7 +84,6 @@ public class ConnectorsResourceTest { TASK_INFOS.add(new TaskInfo(new ConnectorTaskId(CONNECTOR_NAME, 1), TASK_CONFIGS.get(1))); } - @Mock private Herder herder; private ConnectorsResource connectorsResource; @@ -172,6 +172,8 @@ public class ConnectorsResourceTest { connectorsResource.createConnector(body); PowerMock.verifyAll(); + + } @Test(expected = AlreadyExistsException.class)
