Repository: kafka Updated Branches: refs/heads/trunk b1691cf49 -> 89c67727c
KAFKA-3506: Kafka Connect restart APIs Author: Jason Gustafson <[email protected]> Reviewers: Ewen Cheslack-Postava <[email protected]> Closes #1189 from hachikuji/KAFKA-3506 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/89c67727 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/89c67727 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/89c67727 Branch: refs/heads/trunk Commit: 89c67727c2793bf56b0b005a7d758beebedb5aed Parents: b1691cf Author: Jason Gustafson <[email protected]> Authored: Mon Apr 18 10:50:58 2016 -0700 Committer: Ewen Cheslack-Postava <[email protected]> Committed: Mon Apr 18 10:50:58 2016 -0700 ---------------------------------------------------------------------- .../apache/kafka/connect/runtime/Herder.java | 18 +- .../apache/kafka/connect/runtime/Worker.java | 7 + .../runtime/distributed/DistributedHerder.java | 91 +++++- .../distributed/NotAssignedException.java | 29 ++ .../runtime/distributed/NotLeaderException.java | 21 +- .../distributed/RequestTargetException.java | 47 +++ .../distributed/StaleConfigException.java | 27 ++ .../runtime/distributed/WorkerCoordinator.java | 59 ++++ .../runtime/distributed/WorkerGroupMember.java | 9 + .../kafka/connect/runtime/rest/RestServer.java | 9 +- .../rest/resources/ConnectorsResource.java | 134 ++++++--- .../runtime/standalone/StandaloneHerder.java | 60 +++- .../distributed/DistributedHerderTest.java | 301 +++++++++++++++++++ .../rest/resources/ConnectorsResourceTest.java | 143 +++++++-- .../standalone/StandaloneHerderTest.java | 154 ++++++++++ 15 files changed, 1009 insertions(+), 100 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/89c67727/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java index 3ea4a81..cce100e 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java @@ -61,9 +61,9 @@ public interface Herder { * from the current configuration. However, note * * @returns A list of connector names - * @throws org.apache.kafka.connect.runtime.distributed.NotLeaderException if this node can not resolve the request + * @throws org.apache.kafka.connect.runtime.distributed.RequestTargetException if this node can not resolve the request * (e.g., because it has not joined the cluster or does not have configs in sync with the group) and it is - * also not the leader + * not the leader or the task owner (e.g., task restart must be handled by the worker which owns the task) * @throws org.apache.kafka.connect.errors.ConnectException if this node is the leader, but still cannot resolve the * request (e.g., it is not in sync with other worker's config state) */ @@ -135,6 +135,20 @@ public interface Herder { */ ConfigInfos validateConfigs(String connType, Map<String, String> connectorConfig); + /** + * Restart the task with the given id. + * @param id id of the task + * @param cb callback to invoke upon completion + */ + void restartTask(ConnectorTaskId id, Callback<Void> cb); + + /** + * Restart the connector. + * @param connName name of the connector + * @param cb callback to invoke upon completion + */ + void restartConnector(String connName, Callback<Void> cb); + class Created<T> { private final boolean created; private final T result; http://git-wip-us.apache.org/repos/asf/kafka/blob/89c67727/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java index e1a806a..22843d3 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java @@ -242,6 +242,9 @@ public class Worker { return names.substring(0, names.toString().length() - 2); } + public boolean ownsTask(ConnectorTaskId taskId) { + return tasks.containsKey(taskId); + } private static Connector instantiateConnector(Class<? extends Connector> connClass) { try { @@ -415,6 +418,10 @@ public class Worker { return workerId; } + public boolean ownsConnector(String connName) { + return this.connectors.containsKey(connName); + } + private static class WorkerConnector { private final String connName; private final ConnectorStatus.Listener lifecycleListener; http://git-wip-us.apache.org/repos/asf/kafka/blob/89c67727/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java index 2fc8297..24d548d 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java @@ -545,6 +545,79 @@ public class DistributedHerder extends AbstractHerder implements Runnable { } @Override + public synchronized void restartConnector(final String connName, final Callback<Void> callback) { + addRequest(new Callable<Void>() { + @Override + public Void call() throws Exception { + // if config is out of sync, then a rebalance is likely to begin shortly, so rather than risking + // a stale response, we return an error and let the user retry + if (!isConfigSynced()) { + throw new StaleConfigException("Cannot complete request because config is out of sync"); + } + + if (!configState.connectors().contains(connName)) { + callback.onCompletion(new NotFoundException("Unknown connector: " + connName), null); + return null; + } + + if (worker.ownsConnector(connName)) { + try { + worker.stopConnector(connName); + startConnector(connName); + callback.onCompletion(null, null); + } catch (Throwable t) { + callback.onCompletion(t, null); + } + } else if (isLeader()) { + callback.onCompletion(new NotAssignedException("Cannot restart connector since it is not assigned to this member", member.ownerUrl(connName)), null); + } else { + callback.onCompletion(new NotLeaderException("Cannot restart connector since it is not assigned to this member", leaderUrl()), null); + } + return null; + } + }, forwardErrorCallback(callback)); + } + + @Override + public synchronized void restartTask(final ConnectorTaskId id, final Callback<Void> callback) { + addRequest(new Callable<Void>() { + @Override + public Void call() throws Exception { + // if config is out of sync, then a rebalance is likely to begin shortly, so rather than risking + // a stale response, we return an error and let the user retry + if (!isConfigSynced()) { + throw new StaleConfigException("Cannot complete request because config is out of sync"); + } + + if (!configState.connectors().contains(id.connector())) { + callback.onCompletion(new NotFoundException("Unknown connector: " + id.connector()), null); + return null; + } + + if (configState.taskConfig(id) == null) { + callback.onCompletion(new NotFoundException("Unknown task: " + id), null); + return null; + } + + if (worker.ownsTask(id)) { + try { + worker.stopAndAwaitTask(id); + startTask(id); + callback.onCompletion(null, null); + } catch (Throwable t) { + callback.onCompletion(t, null); + } + } else if (isLeader()) { + callback.onCompletion(new NotAssignedException("Cannot restart task since it is not assigned to this member", member.ownerUrl(id)), null); + } else { + callback.onCompletion(new NotLeaderException("Cannot restart task since it is not assigned to this member", leaderUrl()), null); + } + return null; + } + }, forwardErrorCallback(callback)); + } + + @Override public int generation() { return generation; } @@ -679,10 +752,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable { } for (ConnectorTaskId taskId : assignment.tasks()) { try { - log.info("Starting task {}", taskId); - Map<String, String> configs = configState.taskConfig(taskId); - TaskConfig taskConfig = new TaskConfig(configs); - worker.startTask(taskId, taskConfig, this); + startTask(taskId); } catch (ConfigException e) { log.error("Couldn't instantiate task " + taskId + " because it has an invalid task " + "configuration. This task will not execute until reconfigured.", e); @@ -691,6 +761,13 @@ public class DistributedHerder extends AbstractHerder implements Runnable { log.info("Finished starting connectors and tasks"); } + private void startTask(ConnectorTaskId taskId) { + log.info("Starting task {}", taskId); + Map<String, String> configs = configState.taskConfig(taskId); + TaskConfig taskConfig = new TaskConfig(configs); + worker.startTask(taskId, taskConfig, this); + } + // Helper for starting a connector with the given name, which will extract & parse the config, generate connector // context and add to the worker. This needs to be called from within the main worker thread for this herder. private void startConnector(String connectorName) { @@ -791,10 +868,14 @@ public class DistributedHerder extends AbstractHerder implements Runnable { } } + private boolean isConfigSynced() { + return assignment != null && configState.offset() == assignment.offset(); + } + // Common handling for requests that get config data. Checks if we are in sync with the current config, which allows // us to answer requests directly. If we are not, handles invoking the callback with the appropriate error. private boolean checkConfigSynced(Callback<?> callback) { - if (assignment == null || configState.offset() != assignment.offset()) { + if (!isConfigSynced()) { if (!isLeader()) callback.onCompletion(new NotLeaderException("Cannot get config data because config is not in sync and this is not the leader", leaderUrl()), null); else http://git-wip-us.apache.org/repos/asf/kafka/blob/89c67727/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/NotAssignedException.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/NotAssignedException.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/NotAssignedException.java new file mode 100644 index 0000000..a4211cc --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/NotAssignedException.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ +package org.apache.kafka.connect.runtime.distributed; + +/** + * Thrown when a request intended for the owner of a task or connector is received by a worker which doesn't + * own it (typically the leader). + */ +public class NotAssignedException extends RequestTargetException { + + public NotAssignedException(String message, String ownerUrl) { + super(message, ownerUrl); + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/89c67727/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/NotLeaderException.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/NotLeaderException.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/NotLeaderException.java index 5f94b53..9340eda 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/NotLeaderException.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/NotLeaderException.java @@ -17,31 +17,14 @@ package org.apache.kafka.connect.runtime.distributed; -import org.apache.kafka.connect.errors.ConnectException; - /** * Indicates an operation was not permitted because it can only be performed on the leader and this worker is not currently * the leader. */ -public class NotLeaderException extends ConnectException { - private final String leaderUrl; +public class NotLeaderException extends RequestTargetException { public NotLeaderException(String msg, String leaderUrl) { - super(msg); - this.leaderUrl = leaderUrl; - } - - public NotLeaderException(String msg, String leaderUrl, Throwable throwable) { - super(msg, throwable); - this.leaderUrl = leaderUrl; + super(msg, leaderUrl); } - public NotLeaderException(String leaderUrl, Throwable throwable) { - super(throwable); - this.leaderUrl = leaderUrl; - } - - public String leaderUrl() { - return leaderUrl; - } } http://git-wip-us.apache.org/repos/asf/kafka/blob/89c67727/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/RequestTargetException.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/RequestTargetException.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/RequestTargetException.java new file mode 100644 index 0000000..42a5f5d --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/RequestTargetException.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ +package org.apache.kafka.connect.runtime.distributed; + +import org.apache.kafka.connect.errors.ConnectException; + +/** + * Raised when a request has been received by a worker which cannot handle it, + * but can forward it to the right target + */ +public class RequestTargetException extends ConnectException { + private final String forwardUrl; + + public RequestTargetException(String s, String forwardUrl) { + super(s); + this.forwardUrl = forwardUrl; + } + + public RequestTargetException(String s, Throwable throwable, String forwardUrl) { + super(s, throwable); + this.forwardUrl = forwardUrl; + } + + public RequestTargetException(Throwable throwable, String forwardUrl) { + super(throwable); + this.forwardUrl = forwardUrl; + } + + public String forwardUrl() { + return forwardUrl; + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/89c67727/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/StaleConfigException.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/StaleConfigException.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/StaleConfigException.java new file mode 100644 index 0000000..c615b37 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/StaleConfigException.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ +package org.apache.kafka.connect.runtime.distributed; + +import org.apache.kafka.connect.errors.ConnectException; + +public class StaleConfigException extends ConnectException { + + public StaleConfigException(String s) { + super(s); + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/89c67727/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java index fa50fbf..d5802c6 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java @@ -54,6 +54,7 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos private final WorkerCoordinatorMetrics sensors; private ClusterConfigState configSnapshot; private final WorkerRebalanceListener listener; + private LeaderState leaderState; private boolean rejoinRequested; @@ -198,6 +199,8 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos } } + this.leaderState = new LeaderState(allConfigs, connectorAssignments, taskAssignments); + return fillAssignmentsAndSerialize(allConfigs.keySet(), ConnectProtocol.Assignment.NO_ERROR, leaderId, allConfigs.get(leaderId).url(), maxOffset, connectorAssignments, taskAssignments); } @@ -228,6 +231,7 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos @Override protected void onJoinPrepare(int generation, String memberId) { + this.leaderState = null; log.debug("Revoking previous assignment {}", assignmentSnapshot); if (assignmentSnapshot != null && !assignmentSnapshot.failed()) listener.onRevoked(assignmentSnapshot.leader(), assignmentSnapshot.connectors(), assignmentSnapshot.tasks()); @@ -247,6 +251,22 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos super.close(); } + private boolean isLeader() { + return assignmentSnapshot != null && memberId.equals(assignmentSnapshot.leader()); + } + + public String ownerUrl(String connector) { + if (needRejoin() || !isLeader()) + return null; + return leaderState.ownerUrl(connector); + } + + public String ownerUrl(ConnectorTaskId task) { + if (needRejoin() || !isLeader()) + return null; + return leaderState.ownerUrl(task); + } + private class WorkerCoordinatorMetrics { public final Metrics metrics; public final String metricGrpName; @@ -282,4 +302,43 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos return res; } + private static <K, V> Map<V, K> invertAssignment(Map<K, List<V>> assignment) { + Map<V, K> inverted = new HashMap<>(); + for (Map.Entry<K, List<V>> assignmentEntry : assignment.entrySet()) { + K key = assignmentEntry.getKey(); + for (V value : assignmentEntry.getValue()) + inverted.put(value, key); + } + return inverted; + } + + private static class LeaderState { + private final Map<String, ConnectProtocol.WorkerState> allMembers; + private final Map<String, String> connectorOwners; + private final Map<ConnectorTaskId, String> taskOwners; + + public LeaderState(Map<String, ConnectProtocol.WorkerState> allMembers, + Map<String, List<String>> connectorAssignment, + Map<String, List<ConnectorTaskId>> taskAssignment) { + this.allMembers = allMembers; + this.connectorOwners = invertAssignment(connectorAssignment); + this.taskOwners = invertAssignment(taskAssignment); + } + + private String ownerUrl(ConnectorTaskId id) { + String ownerId = taskOwners.get(id); + if (ownerId == null) + return null; + return allMembers.get(ownerId).url(); + } + + private String ownerUrl(String connector) { + String ownerId = connectorOwners.get(connector); + if (ownerId == null) + return null; + return allMembers.get(ownerId).url(); + } + + } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/89c67727/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java index 57028ef..058f171 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java @@ -33,6 +33,7 @@ import org.apache.kafka.common.network.Selector; import org.apache.kafka.common.utils.AppInfoParser; import org.apache.kafka.common.utils.Time; import org.apache.kafka.connect.storage.KafkaConfigStorage; +import org.apache.kafka.connect.util.ConnectorTaskId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -175,6 +176,14 @@ public class WorkerGroupMember { coordinator.maybeLeaveGroup(); } + public String ownerUrl(String connector) { + return coordinator.ownerUrl(connector); + } + + public String ownerUrl(ConnectorTaskId task) { + return coordinator.ownerUrl(task); + } + private void stop(boolean swallowException) { log.trace("Stopping the Connect group member."); AtomicReference<Throwable> firstException = new AtomicReference<Throwable>(); http://git-wip-us.apache.org/repos/asf/kafka/blob/89c67727/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java index 1505a01..3475e1c 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java @@ -20,7 +20,6 @@ package org.apache.kafka.connect.runtime.rest; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider; - import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.runtime.Herder; import org.apache.kafka.connect.runtime.WorkerConfig; @@ -48,6 +47,9 @@ import org.glassfish.jersey.servlet.ServletContainer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.servlet.DispatcherType; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.UriBuilder; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -58,10 +60,6 @@ import java.util.EnumSet; import java.util.List; import java.util.Map; -import javax.servlet.DispatcherType; -import javax.ws.rs.core.Response; -import javax.ws.rs.core.UriBuilder; - /** * Embedded server for the REST API that provides the control plane for Kafka Connect workers. */ @@ -272,4 +270,5 @@ public class RestServer { else return base + path; } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/89c67727/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java index b6e9f61..a53ed7d 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java @@ -18,10 +18,10 @@ package org.apache.kafka.connect.runtime.rest.resources; import com.fasterxml.jackson.core.type.TypeReference; - import org.apache.kafka.connect.runtime.ConnectorConfig; import org.apache.kafka.connect.runtime.Herder; -import org.apache.kafka.connect.runtime.distributed.NotLeaderException; +import org.apache.kafka.connect.runtime.distributed.StaleConfigException; +import org.apache.kafka.connect.runtime.distributed.RequestTargetException; import org.apache.kafka.connect.runtime.rest.RestServer; import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo; import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; @@ -33,14 +33,6 @@ import org.apache.kafka.connect.util.FutureCallback; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.net.URI; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - import javax.servlet.ServletContext; import javax.ws.rs.Consumes; import javax.ws.rs.DELETE; @@ -50,8 +42,17 @@ import javax.ws.rs.PUT; import javax.ws.rs.Path; import javax.ws.rs.PathParam; import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; +import javax.ws.rs.core.UriBuilder; +import java.net.URI; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; @Path("/connectors") @Produces(MediaType.APPLICATION_JSON) @@ -75,16 +76,17 @@ public class ConnectorsResource { @GET @Path("/") - public Collection<String> listConnectors() throws Throwable { + public Collection<String> listConnectors(final @QueryParam("forward") Boolean forward) throws Throwable { FutureCallback<Collection<String>> cb = new FutureCallback<>(); herder.connectors(cb); return completeOrForwardRequest(cb, "/connectors", "GET", null, new TypeReference<Collection<String>>() { - }); + }, forward); } @POST @Path("/") - public Response createConnector(final CreateConnectorRequest createRequest) throws Throwable { + public Response createConnector(final @QueryParam("forward") Boolean forward, + final CreateConnectorRequest createRequest) throws Throwable { String name = createRequest.name(); Map<String, String> configs = createRequest.config(); if (!configs.containsKey(ConnectorConfig.NAME_CONFIG)) @@ -93,24 +95,26 @@ public class ConnectorsResource { FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>(); herder.putConnectorConfig(name, configs, false, cb); Herder.Created<ConnectorInfo> info = completeOrForwardRequest(cb, "/connectors", "POST", createRequest, - new TypeReference<ConnectorInfo>() { }, new CreatedConnectorInfoTranslator()); + new TypeReference<ConnectorInfo>() { }, new CreatedConnectorInfoTranslator(), forward); return Response.created(URI.create("/connectors/" + name)).entity(info.result()).build(); } @GET @Path("/{connector}") - public ConnectorInfo getConnector(final @PathParam("connector") String connector) throws Throwable { + public ConnectorInfo getConnector(final @PathParam("connector") String connector, + final @QueryParam("forward") Boolean forward) throws Throwable { FutureCallback<ConnectorInfo> cb = new FutureCallback<>(); herder.connectorInfo(connector, cb); - return completeOrForwardRequest(cb, "/connectors/" + connector, "GET", null); + return completeOrForwardRequest(cb, "/connectors/" + connector, "GET", null, forward); } @GET @Path("/{connector}/config") - public Map<String, String> getConnectorConfig(final @PathParam("connector") String connector) throws Throwable { + public Map<String, String> getConnectorConfig(final @PathParam("connector") String connector, + final @QueryParam("forward") Boolean forward) throws Throwable { FutureCallback<Map<String, String>> cb = new FutureCallback<>(); herder.connectorConfig(connector, cb); - return completeOrForwardRequest(cb, "/connectors/" + connector + "/config", "GET", null); + return completeOrForwardRequest(cb, "/connectors/" + connector + "/config", "GET", null, forward); } @GET @@ -122,11 +126,12 @@ public class ConnectorsResource { @PUT @Path("/{connector}/config") public Response putConnectorConfig(final @PathParam("connector") String connector, - final Map<String, String> connectorConfig) throws Throwable { + final @QueryParam("forward") Boolean forward, + final Map<String, String> connectorConfig) throws Throwable { FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>(); herder.putConnectorConfig(connector, connectorConfig, true, cb); Herder.Created<ConnectorInfo> createdInfo = completeOrForwardRequest(cb, "/connectors/" + connector + "/config", - "PUT", connectorConfig, new TypeReference<ConnectorInfo>() { }, new CreatedConnectorInfoTranslator()); + "PUT", connectorConfig, new TypeReference<ConnectorInfo>() { }, new CreatedConnectorInfoTranslator(), forward); Response.ResponseBuilder response; if (createdInfo.created()) response = Response.created(URI.create("/connectors/" + connector)); @@ -135,55 +140,102 @@ public class ConnectorsResource { return response.entity(createdInfo.result()).build(); } + @POST + @Path("/{connector}/restart") + public void restartConnector(final @PathParam("connector") String connector, + final @QueryParam("forward") Boolean forward) throws Throwable { + FutureCallback<Void> cb = new FutureCallback<>(); + herder.restartConnector(connector, cb); + completeOrForwardRequest(cb, "/connectors/" + connector + "/restart", "POST", null, forward); + } + @GET @Path("/{connector}/tasks") - public List<TaskInfo> getTaskConfigs(final @PathParam("connector") String connector) throws Throwable { + public List<TaskInfo> getTaskConfigs(final @PathParam("connector") String connector, + final @QueryParam("forward") Boolean forward) throws Throwable { FutureCallback<List<TaskInfo>> cb = new FutureCallback<>(); herder.taskConfigs(connector, cb); return completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks", "GET", null, new TypeReference<List<TaskInfo>>() { - }); + }, forward); } @POST @Path("/{connector}/tasks") public void putTaskConfigs(final @PathParam("connector") String connector, + final @QueryParam("forward") Boolean forward, final List<Map<String, String>> taskConfigs) throws Throwable { FutureCallback<Void> cb = new FutureCallback<>(); herder.putTaskConfigs(connector, taskConfigs, cb); - completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks", "POST", taskConfigs); + completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks", "POST", taskConfigs, forward); } @GET @Path("/{connector}/tasks/{task}/status") - public ConnectorStateInfo.TaskState getTaskStatus(@PathParam("connector") String connector, - @PathParam("task") Integer task) throws Throwable { + public ConnectorStateInfo.TaskState getTaskStatus(final @PathParam("connector") String connector, + final @PathParam("task") Integer task) throws Throwable { return herder.taskStatus(new ConnectorTaskId(connector, task)); } + @POST + @Path("/{connector}/tasks/{task}/restart") + public void restartTask(final @PathParam("connector") String connector, + final @PathParam("task") Integer task, + final @QueryParam("forward") Boolean forward) throws Throwable { + FutureCallback<Void> cb = new FutureCallback<>(); + ConnectorTaskId taskId = new ConnectorTaskId(connector, task); + herder.restartTask(taskId, cb); + completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks/" + task + "/restart", "POST", null, forward); + } + @DELETE @Path("/{connector}") - public void destroyConnector(final @PathParam("connector") String connector) throws Throwable { + public void destroyConnector(final @PathParam("connector") String connector, + final @QueryParam("forward") Boolean forward) throws Throwable { FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>(); herder.putConnectorConfig(connector, null, true, cb); - completeOrForwardRequest(cb, "/connectors/" + connector, "DELETE", null); + completeOrForwardRequest(cb, "/connectors/" + connector, "DELETE", null, forward); } // Wait for a FutureCallback to complete. If it succeeds, return the parsed response. If it fails, try to forward the // request to the leader. - private <T, U> T completeOrForwardRequest( - FutureCallback<T> cb, String path, String method, Object body, TypeReference<U> resultType, - Translator<T, U> translator) throws Throwable { + private <T, U> T completeOrForwardRequest(FutureCallback<T> cb, + String path, + String method, + Object body, + TypeReference<U> resultType, + Translator<T, U> translator, + Boolean forward) throws Throwable { try { return cb.get(REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS); } catch (ExecutionException e) { - if (e.getCause() instanceof NotLeaderException) { - NotLeaderException notLeaderError = (NotLeaderException) e.getCause(); - String forwardUrl = RestServer.urlJoin(notLeaderError.leaderUrl(), path); - log.debug("Forwarding request to leader: {} {} {}", forwardUrl, method, body); - return translator.translate(RestServer.httpRequest(forwardUrl, method, body, resultType)); + Throwable cause = e.getCause(); + + if (cause instanceof RequestTargetException) { + if (forward == null || forward) { + // the only time we allow recursive forwarding is when no forward flag has + // been set, which should only be seen by the first worker to handle a user request. + // this gives two total hops to resolve the request before giving up. + boolean recursiveForward = forward == null; + RequestTargetException targetException = (RequestTargetException) cause; + String forwardUrl = UriBuilder.fromUri(targetException.forwardUrl()) + .path(path) + .queryParam("forward", recursiveForward) + .build() + .toString(); + log.debug("Forwarding request {} {} {}", forwardUrl, method, body); + return translator.translate(RestServer.httpRequest(forwardUrl, method, body, resultType)); + } else { + // we should find the right target for the query within two hops, so if + // we don't, it probably means that a rebalance has taken place. + throw new ConnectRestException(Response.Status.CONFLICT.getStatusCode(), + "Cannot complete request because of a conflicting operation (e.g. worker rebalance)"); + } + } else if (cause instanceof StaleConfigException) { + throw new ConnectRestException(Response.Status.CONFLICT.getStatusCode(), + "Cannot complete request momentarily due to stale configuration (typically caused by a concurrent config change)"); } - throw e.getCause(); + throw cause; } catch (TimeoutException e) { // This timeout is for the operation itself. None of the timeout error codes are relevant, so internal server // error is the best option @@ -193,12 +245,14 @@ public class ConnectorsResource { } } - private <T> T completeOrForwardRequest(FutureCallback<T> cb, String path, String method, Object body, TypeReference<T> resultType) throws Throwable { - return completeOrForwardRequest(cb, path, method, body, resultType, new IdentityTranslator<T>()); + private <T> T completeOrForwardRequest(FutureCallback<T> cb, String path, String method, Object body, + TypeReference<T> resultType, Boolean forward) throws Throwable { + return completeOrForwardRequest(cb, path, method, body, resultType, new IdentityTranslator<T>(), forward); } - private <T> T completeOrForwardRequest(FutureCallback<T> cb, String path, String method, Object body) throws Throwable { - return completeOrForwardRequest(cb, path, method, body, null, new IdentityTranslator<T>()); + private <T> T completeOrForwardRequest(FutureCallback<T> cb, String path, String method, + Object body, Boolean forward) throws Throwable { + return completeOrForwardRequest(cb, path, method, body, null, new IdentityTranslator<T>(), forward); } private interface Translator<T, U> { http://git-wip-us.apache.org/repos/asf/kafka/blob/89c67727/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java index 9c48ed7..a59336a 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java @@ -202,6 +202,44 @@ public class StandaloneHerder extends AbstractHerder { throw new UnsupportedOperationException("Kafka Connect in standalone mode does not support externally setting task configurations."); } + @Override + public synchronized void restartTask(ConnectorTaskId taskId, Callback<Void> cb) { + if (!connectors.containsKey(taskId.connector())) + cb.onCompletion(new NotFoundException("Connector " + taskId.connector() + " not found", null), null); + + ConnectorState state = connectors.get(taskId.connector()); + if (state.taskConfigs.contains(taskId)) + cb.onCompletion(new NotFoundException("Task " + taskId + " not found", null), null); + + try { + worker.stopAndAwaitTask(taskId); + + Map<String, String> taskConfig = state.taskConfigs.get(taskId.task()); + worker.startTask(taskId, new TaskConfig(taskConfig), this); + + cb.onCompletion(null, null); + } catch (Exception e) { + log.error("Failed to restart task {}", taskId, e); + cb.onCompletion(e, null); + } + } + + @Override + public synchronized void restartConnector(String connName, Callback<Void> cb) { + if (!connectors.containsKey(connName)) + cb.onCompletion(new NotFoundException("Connector " + connName + " not found", null), null); + + ConnectorState state = connectors.get(connName); + try { + worker.stopConnector(connName); + worker.startConnector(state.config, new HerderConnectorContext(this, connName), this); + cb.onCompletion(null, null); + } catch (Exception e) { + log.error("Failed to restart connector {}", connName, e); + cb.onCompletion(e, null); + } + } + /** * Start a connector in the worker and record its state. * @param connectorProps new connector configuration @@ -234,19 +272,23 @@ public class StandaloneHerder extends AbstractHerder { int index = 0; for (Map<String, String> taskConfigMap : state.taskConfigs) { ConnectorTaskId taskId = new ConnectorTaskId(connName, index); - TaskConfig config = new TaskConfig(taskConfigMap); - try { - worker.startTask(taskId, config, this); - } catch (Throwable e) { - log.error("Failed to add task {}: ", taskId, e); - // Swallow this so we can continue updating the rest of the tasks - // FIXME what's the proper response? Kill all the tasks? Consider this the same as a task - // that died after starting successfully. - } + startTask(taskId, taskConfigMap); index++; } } + private void startTask(ConnectorTaskId taskId, Map<String, String> taskConfigMap) { + TaskConfig config = new TaskConfig(taskConfigMap); + try { + worker.startTask(taskId, config, this); + } catch (Throwable e) { + log.error("Failed to add task {}: ", taskId, e); + // Swallow this so we can continue updating the rest of the tasks + // FIXME what's the proper response? Kill all the tasks? Consider this the same as a task + // that died after starting successfully. + } + } + private Set<ConnectorTaskId> tasksFor(ConnectorState state) { Set<ConnectorTaskId> tasks = new HashSet<>(); for (int i = 0; i < state.taskConfigs.size(); i++) http://git-wip-us.apache.org/repos/asf/kafka/blob/89c67727/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java index aa747f6..b667fa8 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.connect.connector.ConnectorContext; import org.apache.kafka.connect.errors.AlreadyExistsException; +import org.apache.kafka.connect.errors.NotFoundException; import org.apache.kafka.connect.runtime.ConnectorConfig; import org.apache.kafka.connect.runtime.Herder; import org.apache.kafka.connect.runtime.TaskConfig; @@ -56,10 +57,13 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; @RunWith(PowerMockRunner.class) @PrepareForTest(DistributedHerder.class) @@ -315,6 +319,303 @@ public class DistributedHerderTest { } @Test + public void testRestartConnector() throws Exception { + EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andStubReturn(TASK_CONFIGS); + + // get the initial assignment + EasyMock.expect(member.memberId()).andStubReturn("leader"); + expectRebalance(1, Collections.singletonList(CONN1), Collections.<ConnectorTaskId>emptyList()); + expectPostRebalanceCatchup(SNAPSHOT); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(), + EasyMock.eq(herder)); + PowerMock.expectLastCall(); + + // now handle the connector restart + member.wakeup(); + PowerMock.expectLastCall(); + member.ensureActive(); + PowerMock.expectLastCall(); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + EasyMock.expect(worker.ownsConnector(CONN1)).andReturn(true); + + worker.stopConnector(CONN1); + PowerMock.expectLastCall(); + worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(), + EasyMock.eq(herder)); + PowerMock.expectLastCall(); + + PowerMock.replayAll(); + + herder.tick(); + FutureCallback<Void> callback = new FutureCallback<>(); + herder.restartConnector(CONN1, callback); + herder.tick(); + callback.get(1000L, TimeUnit.MILLISECONDS); + + PowerMock.verifyAll(); + } + + @Test + public void testRestartUnknownConnector() throws Exception { + // get the initial assignment + EasyMock.expect(member.memberId()).andStubReturn("leader"); + expectRebalance(1, Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList()); + expectPostRebalanceCatchup(SNAPSHOT); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + + // now handle the connector restart + member.wakeup(); + PowerMock.expectLastCall(); + member.ensureActive(); + PowerMock.expectLastCall(); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + PowerMock.replayAll(); + + herder.tick(); + FutureCallback<Void> callback = new FutureCallback<>(); + herder.restartConnector(CONN2, callback); + herder.tick(); + try { + callback.get(1000L, TimeUnit.MILLISECONDS); + fail("Expected NotLeaderException to be raised"); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof NotFoundException); + } + + PowerMock.verifyAll(); + } + + @Test + public void testRestartConnectorRedirectToLeader() throws Exception { + // get the initial assignment + EasyMock.expect(member.memberId()).andStubReturn("member"); + expectRebalance(1, Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList()); + expectPostRebalanceCatchup(SNAPSHOT); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + // now handle the connector restart + member.wakeup(); + PowerMock.expectLastCall(); + member.ensureActive(); + PowerMock.expectLastCall(); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + EasyMock.expect(worker.ownsConnector(CONN1)).andReturn(false); + + PowerMock.replayAll(); + + herder.tick(); + FutureCallback<Void> callback = new FutureCallback<>(); + herder.restartConnector(CONN1, callback); + herder.tick(); + + try { + callback.get(1000L, TimeUnit.MILLISECONDS); + fail("Expected NotLeaderException to be raised"); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof NotLeaderException); + } + + PowerMock.verifyAll(); + } + + @Test + public void testRestartConnectorRedirectToOwner() throws Exception { + // get the initial assignment + EasyMock.expect(member.memberId()).andStubReturn("leader"); + expectRebalance(1, Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList()); + expectPostRebalanceCatchup(SNAPSHOT); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + // now handle the connector restart + member.wakeup(); + PowerMock.expectLastCall(); + member.ensureActive(); + PowerMock.expectLastCall(); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + String ownerUrl = "ownerUrl"; + EasyMock.expect(worker.ownsConnector(CONN1)).andReturn(false); + EasyMock.expect(member.ownerUrl(CONN1)).andReturn(ownerUrl); + + PowerMock.replayAll(); + + herder.tick(); + FutureCallback<Void> callback = new FutureCallback<>(); + herder.restartConnector(CONN1, callback); + herder.tick(); + + try { + callback.get(1000L, TimeUnit.MILLISECONDS); + fail("Expected NotLeaderException to be raised"); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof NotAssignedException); + NotAssignedException notAssignedException = (NotAssignedException) e.getCause(); + assertEquals(ownerUrl, notAssignedException.forwardUrl()); + } + + PowerMock.verifyAll(); + } + + @Test + public void testRestartTask() throws Exception { + EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andStubReturn(TASK_CONFIGS); + + // get the initial assignment + EasyMock.expect(member.memberId()).andStubReturn("leader"); + expectRebalance(1, Collections.<String>emptyList(), Collections.singletonList(TASK0)); + expectPostRebalanceCatchup(SNAPSHOT); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + worker.startTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject(), EasyMock.eq(herder)); + PowerMock.expectLastCall(); + + // now handle the task restart + member.wakeup(); + PowerMock.expectLastCall(); + member.ensureActive(); + PowerMock.expectLastCall(); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + EasyMock.expect(worker.ownsTask(TASK0)).andReturn(true); + + worker.stopAndAwaitTask(TASK0); + PowerMock.expectLastCall(); + worker.startTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject(), EasyMock.eq(herder)); + PowerMock.expectLastCall(); + + PowerMock.replayAll(); + + herder.tick(); + FutureCallback<Void> callback = new FutureCallback<>(); + herder.restartTask(TASK0, callback); + herder.tick(); + callback.get(1000L, TimeUnit.MILLISECONDS); + + PowerMock.verifyAll(); + } + + @Test + public void testRestartUnknownTask() throws Exception { + // get the initial assignment + EasyMock.expect(member.memberId()).andStubReturn("member"); + expectRebalance(1, Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList()); + expectPostRebalanceCatchup(SNAPSHOT); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + member.wakeup(); + PowerMock.expectLastCall(); + member.ensureActive(); + PowerMock.expectLastCall(); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + PowerMock.replayAll(); + + FutureCallback<Void> callback = new FutureCallback<>(); + herder.tick(); + herder.restartTask(new ConnectorTaskId("blah", 0), callback); + herder.tick(); + + try { + callback.get(1000L, TimeUnit.MILLISECONDS); + fail("Expected NotLeaderException to be raised"); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof NotFoundException); + } + + PowerMock.verifyAll(); + } + + @Test + public void testRestartTaskRedirectToLeader() throws Exception { + // get the initial assignment + EasyMock.expect(member.memberId()).andStubReturn("member"); + expectRebalance(1, Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList()); + expectPostRebalanceCatchup(SNAPSHOT); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + // now handle the task restart + EasyMock.expect(worker.ownsTask(TASK0)).andReturn(false); + member.wakeup(); + PowerMock.expectLastCall(); + member.ensureActive(); + PowerMock.expectLastCall(); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + PowerMock.replayAll(); + + herder.tick(); + FutureCallback<Void> callback = new FutureCallback<>(); + herder.restartTask(TASK0, callback); + herder.tick(); + + try { + callback.get(1000L, TimeUnit.MILLISECONDS); + fail("Expected NotLeaderException to be raised"); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof NotLeaderException); + } + + PowerMock.verifyAll(); + } + + @Test + public void testRestartTaskRedirectToOwner() throws Exception { + // get the initial assignment + EasyMock.expect(member.memberId()).andStubReturn("leader"); + expectRebalance(1, Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList()); + expectPostRebalanceCatchup(SNAPSHOT); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + // now handle the task restart + String ownerUrl = "ownerUrl"; + EasyMock.expect(worker.ownsTask(TASK0)).andReturn(false); + EasyMock.expect(member.ownerUrl(TASK0)).andReturn(ownerUrl); + member.wakeup(); + PowerMock.expectLastCall(); + member.ensureActive(); + PowerMock.expectLastCall(); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + PowerMock.replayAll(); + + herder.tick(); + FutureCallback<Void> callback = new FutureCallback<>(); + herder.restartTask(TASK0, callback); + herder.tick(); + + try { + callback.get(1000L, TimeUnit.MILLISECONDS); + fail("Expected NotLeaderException to be raised"); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof NotAssignedException); + NotAssignedException notAssignedException = (NotAssignedException) e.getCause(); + assertEquals(ownerUrl, notAssignedException.forwardUrl()); + } + + PowerMock.verifyAll(); + } + + @Test public void testConnectorConfigAdded() { // If a connector was added, we need to rebalance EasyMock.expect(member.memberId()).andStubReturn("member"); http://git-wip-us.apache.org/repos/asf/kafka/blob/89c67727/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java index 970f56c..fa7d997 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java @@ -24,6 +24,7 @@ import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.NotFoundException; import org.apache.kafka.connect.runtime.ConnectorConfig; import org.apache.kafka.connect.runtime.Herder; +import org.apache.kafka.connect.runtime.distributed.NotAssignedException; import org.apache.kafka.connect.runtime.distributed.NotLeaderException; import org.apache.kafka.connect.runtime.rest.RestServer; import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo; @@ -64,6 +65,7 @@ public class ConnectorsResourceTest { private static final String LEADER_URL = "http://leader:8083/"; private static final String CONNECTOR_NAME = "test"; private static final String CONNECTOR2_NAME = "test2"; + private static final Boolean FORWARD = true; private static final Map<String, String> CONNECTOR_CONFIG = new HashMap<>(); static { CONNECTOR_CONFIG.put("name", CONNECTOR_NAME); @@ -103,7 +105,7 @@ public class ConnectorsResourceTest { PowerMock.replayAll(); - Collection<String> connectors = connectorsResource.listConnectors(); + Collection<String> connectors = connectorsResource.listConnectors(FORWARD); // Ordering isn't guaranteed, compare sets assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_NAME, CONNECTOR2_NAME)), new HashSet<>(connectors)); @@ -116,13 +118,13 @@ public class ConnectorsResourceTest { herder.connectors(EasyMock.capture(cb)); expectAndCallbackNotLeaderException(cb); // Should forward request - EasyMock.expect(RestServer.httpRequest(EasyMock.eq("http://leader:8083/connectors"), EasyMock.eq("GET"), + EasyMock.expect(RestServer.httpRequest(EasyMock.eq("http://leader:8083/connectors?forward=false"), EasyMock.eq("GET"), EasyMock.isNull(), EasyMock.anyObject(TypeReference.class))) .andReturn(new RestServer.HttpResponse<>(200, new HashMap<String, List<String>>(), Arrays.asList(CONNECTOR2_NAME, CONNECTOR_NAME))); PowerMock.replayAll(); - Collection<String> connectors = connectorsResource.listConnectors(); + Collection<String> connectors = connectorsResource.listConnectors(FORWARD); // Ordering isn't guaranteed, compare sets assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_NAME, CONNECTOR2_NAME)), new HashSet<>(connectors)); @@ -138,7 +140,7 @@ public class ConnectorsResourceTest { PowerMock.replayAll(); // throws - connectorsResource.listConnectors(); + connectorsResource.listConnectors(FORWARD); } @Test @@ -151,7 +153,7 @@ public class ConnectorsResourceTest { PowerMock.replayAll(); - connectorsResource.createConnector(body); + connectorsResource.createConnector(FORWARD, body); PowerMock.verifyAll(); } @@ -164,12 +166,12 @@ public class ConnectorsResourceTest { herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.eq(body.config()), EasyMock.eq(false), EasyMock.capture(cb)); expectAndCallbackNotLeaderException(cb); // Should forward request - EasyMock.expect(RestServer.httpRequest(EasyMock.eq("http://leader:8083/connectors"), EasyMock.eq("POST"), EasyMock.eq(body), EasyMock.<TypeReference>anyObject())) + EasyMock.expect(RestServer.httpRequest(EasyMock.eq("http://leader:8083/connectors?forward=false"), EasyMock.eq("POST"), EasyMock.eq(body), EasyMock.<TypeReference>anyObject())) .andReturn(new RestServer.HttpResponse<>(201, new HashMap<String, List<String>>(), new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG, CONNECTOR_TASK_NAMES))); PowerMock.replayAll(); - connectorsResource.createConnector(body); + connectorsResource.createConnector(FORWARD, body); PowerMock.verifyAll(); @@ -186,7 +188,7 @@ public class ConnectorsResourceTest { PowerMock.replayAll(); - connectorsResource.createConnector(body); + connectorsResource.createConnector(FORWARD, body); PowerMock.verifyAll(); } @@ -199,7 +201,7 @@ public class ConnectorsResourceTest { PowerMock.replayAll(); - connectorsResource.destroyConnector(CONNECTOR_NAME); + connectorsResource.destroyConnector(CONNECTOR_NAME, FORWARD); PowerMock.verifyAll(); } @@ -210,12 +212,12 @@ public class ConnectorsResourceTest { herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.<Map<String, String>>isNull(), EasyMock.eq(true), EasyMock.capture(cb)); expectAndCallbackNotLeaderException(cb); // Should forward request - EasyMock.expect(RestServer.httpRequest("http://leader:8083/connectors/" + CONNECTOR_NAME, "DELETE", null, null)) + EasyMock.expect(RestServer.httpRequest("http://leader:8083/connectors/" + CONNECTOR_NAME + "?forward=false", "DELETE", null, null)) .andReturn(new RestServer.HttpResponse<>(204, new HashMap<String, List<String>>(), null)); PowerMock.replayAll(); - connectorsResource.destroyConnector(CONNECTOR_NAME); + connectorsResource.destroyConnector(CONNECTOR_NAME, FORWARD); PowerMock.verifyAll(); } @@ -229,7 +231,7 @@ public class ConnectorsResourceTest { PowerMock.replayAll(); - connectorsResource.destroyConnector(CONNECTOR_NAME); + connectorsResource.destroyConnector(CONNECTOR_NAME, FORWARD); PowerMock.verifyAll(); } @@ -242,7 +244,7 @@ public class ConnectorsResourceTest { PowerMock.replayAll(); - ConnectorInfo connInfo = connectorsResource.getConnector(CONNECTOR_NAME); + ConnectorInfo connInfo = connectorsResource.getConnector(CONNECTOR_NAME, FORWARD); assertEquals(new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG, CONNECTOR_TASK_NAMES), connInfo); PowerMock.verifyAll(); @@ -256,7 +258,7 @@ public class ConnectorsResourceTest { PowerMock.replayAll(); - Map<String, String> connConfig = connectorsResource.getConnectorConfig(CONNECTOR_NAME); + Map<String, String> connConfig = connectorsResource.getConnectorConfig(CONNECTOR_NAME, FORWARD); assertEquals(CONNECTOR_CONFIG, connConfig); PowerMock.verifyAll(); @@ -270,7 +272,7 @@ public class ConnectorsResourceTest { PowerMock.replayAll(); - connectorsResource.getConnectorConfig(CONNECTOR_NAME); + connectorsResource.getConnectorConfig(CONNECTOR_NAME, FORWARD); PowerMock.verifyAll(); } @@ -283,7 +285,7 @@ public class ConnectorsResourceTest { PowerMock.replayAll(); - connectorsResource.putConnectorConfig(CONNECTOR_NAME, CONNECTOR_CONFIG); + connectorsResource.putConnectorConfig(CONNECTOR_NAME, FORWARD, CONNECTOR_CONFIG); PowerMock.verifyAll(); } @@ -296,7 +298,7 @@ public class ConnectorsResourceTest { PowerMock.replayAll(); - List<TaskInfo> taskInfos = connectorsResource.getTaskConfigs(CONNECTOR_NAME); + List<TaskInfo> taskInfos = connectorsResource.getTaskConfigs(CONNECTOR_NAME, FORWARD); assertEquals(TASK_INFOS, taskInfos); PowerMock.verifyAll(); @@ -310,7 +312,7 @@ public class ConnectorsResourceTest { PowerMock.replayAll(); - connectorsResource.getTaskConfigs(CONNECTOR_NAME); + connectorsResource.getTaskConfigs(CONNECTOR_NAME, FORWARD); PowerMock.verifyAll(); } @@ -323,7 +325,7 @@ public class ConnectorsResourceTest { PowerMock.replayAll(); - connectorsResource.putTaskConfigs(CONNECTOR_NAME, TASK_CONFIGS); + connectorsResource.putTaskConfigs(CONNECTOR_NAME, FORWARD, TASK_CONFIGS); PowerMock.verifyAll(); } @@ -336,7 +338,108 @@ public class ConnectorsResourceTest { PowerMock.replayAll(); - connectorsResource.putTaskConfigs(CONNECTOR_NAME, TASK_CONFIGS); + connectorsResource.putTaskConfigs(CONNECTOR_NAME, FORWARD, TASK_CONFIGS); + + PowerMock.verifyAll(); + } + + @Test(expected = NotFoundException.class) + public void testRestartConnectorNotFound() throws Throwable { + final Capture<Callback<Void>> cb = Capture.newInstance(); + herder.restartConnector(EasyMock.eq(CONNECTOR_NAME), EasyMock.capture(cb)); + expectAndCallbackException(cb, new NotFoundException("not found")); + + PowerMock.replayAll(); + + connectorsResource.restartConnector(CONNECTOR_NAME, FORWARD); + + PowerMock.verifyAll(); + } + + @Test + public void testRestartConnectorLeaderRedirect() throws Throwable { + final Capture<Callback<Void>> cb = Capture.newInstance(); + herder.restartConnector(EasyMock.eq(CONNECTOR_NAME), EasyMock.capture(cb)); + expectAndCallbackNotLeaderException(cb); + + EasyMock.expect(RestServer.httpRequest(EasyMock.eq("http://leader:8083/connectors/" + CONNECTOR_NAME + "/restart?forward=true"), + EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.<TypeReference>anyObject())) + .andReturn(new RestServer.HttpResponse<>(202, new HashMap<String, List<String>>(), null)); + + PowerMock.replayAll(); + + connectorsResource.restartConnector(CONNECTOR_NAME, null); + + PowerMock.verifyAll(); + } + + @Test + public void testRestartConnectorOwnerRedirect() throws Throwable { + final Capture<Callback<Void>> cb = Capture.newInstance(); + herder.restartConnector(EasyMock.eq(CONNECTOR_NAME), EasyMock.capture(cb)); + String ownerUrl = "http://owner:8083"; + expectAndCallbackException(cb, new NotAssignedException("not owner test", ownerUrl)); + + EasyMock.expect(RestServer.httpRequest(EasyMock.eq("http://owner:8083/connectors/" + CONNECTOR_NAME + "/restart?forward=false"), + EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.<TypeReference>anyObject())) + .andReturn(new RestServer.HttpResponse<>(202, new HashMap<String, List<String>>(), null)); + + PowerMock.replayAll(); + + connectorsResource.restartConnector(CONNECTOR_NAME, true); + + PowerMock.verifyAll(); + } + + @Test(expected = NotFoundException.class) + public void testRestartTaskNotFound() throws Throwable { + ConnectorTaskId taskId = new ConnectorTaskId(CONNECTOR_NAME, 0); + final Capture<Callback<Void>> cb = Capture.newInstance(); + herder.restartTask(EasyMock.eq(taskId), EasyMock.capture(cb)); + expectAndCallbackException(cb, new NotFoundException("not found")); + + PowerMock.replayAll(); + + connectorsResource.restartTask(CONNECTOR_NAME, 0, FORWARD); + + PowerMock.verifyAll(); + } + + @Test + public void testRestartTaskLeaderRedirect() throws Throwable { + ConnectorTaskId taskId = new ConnectorTaskId(CONNECTOR_NAME, 0); + + final Capture<Callback<Void>> cb = Capture.newInstance(); + herder.restartTask(EasyMock.eq(taskId), EasyMock.capture(cb)); + expectAndCallbackNotLeaderException(cb); + + EasyMock.expect(RestServer.httpRequest(EasyMock.eq("http://leader:8083/connectors/" + CONNECTOR_NAME + "/tasks/0/restart?forward=true"), + EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.<TypeReference>anyObject())) + .andReturn(new RestServer.HttpResponse<>(202, new HashMap<String, List<String>>(), null)); + + PowerMock.replayAll(); + + connectorsResource.restartTask(CONNECTOR_NAME, 0, null); + + PowerMock.verifyAll(); + } + + @Test + public void testRestartTaskOwnerRedirect() throws Throwable { + ConnectorTaskId taskId = new ConnectorTaskId(CONNECTOR_NAME, 0); + + final Capture<Callback<Void>> cb = Capture.newInstance(); + herder.restartTask(EasyMock.eq(taskId), EasyMock.capture(cb)); + String ownerUrl = "http://owner:8083"; + expectAndCallbackException(cb, new NotAssignedException("not owner test", ownerUrl)); + + EasyMock.expect(RestServer.httpRequest(EasyMock.eq("http://owner:8083/connectors/" + CONNECTOR_NAME + "/tasks/0/restart?forward=false"), + EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.<TypeReference>anyObject())) + .andReturn(new RestServer.HttpResponse<>(202, new HashMap<String, List<String>>(), null)); + + PowerMock.replayAll(); + + connectorsResource.restartTask(CONNECTOR_NAME, 0, true); PowerMock.verifyAll(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/89c67727/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java index 3959ff8..05a64a1 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java @@ -156,6 +156,160 @@ public class StandaloneHerderTest { } @Test + public void testRestartConnector() throws Exception { + expectAdd(CONNECTOR_NAME, BogusSourceConnector.class, BogusSourceTask.class, false); + + worker.stopConnector(CONNECTOR_NAME); + EasyMock.expectLastCall(); + + worker.startConnector(EasyMock.eq(new ConnectorConfig(connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class))), + EasyMock.anyObject(HerderConnectorContext.class), EasyMock.eq(herder)); + EasyMock.expectLastCall(); + + PowerMock.replayAll(); + + herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class), false, createCallback); + + FutureCallback<Void> cb = new FutureCallback<>(); + herder.restartConnector(CONNECTOR_NAME, cb); + cb.get(1000L, TimeUnit.MILLISECONDS); + + PowerMock.verifyAll(); + } + + @Test + public void testRestartConnectorFailureOnStop() throws Exception { + expectAdd(CONNECTOR_NAME, BogusSourceConnector.class, BogusSourceTask.class, false); + + RuntimeException e = new RuntimeException(); + worker.stopConnector(CONNECTOR_NAME); + EasyMock.expectLastCall().andThrow(e); + + // the connector will not be started after the failure in start + + PowerMock.replayAll(); + + herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class), false, createCallback); + + FutureCallback<Void> cb = new FutureCallback<>(); + herder.restartConnector(CONNECTOR_NAME, cb); + try { + cb.get(1000L, TimeUnit.MILLISECONDS); + fail(); + } catch (ExecutionException exception) { + assertEquals(e, exception.getCause()); + } + + PowerMock.verifyAll(); + } + + @Test + public void testRestartConnectorFailureOnStart() throws Exception { + expectAdd(CONNECTOR_NAME, BogusSourceConnector.class, BogusSourceTask.class, false); + + worker.stopConnector(CONNECTOR_NAME); + EasyMock.expectLastCall(); + + RuntimeException e = new RuntimeException(); + worker.startConnector(EasyMock.eq(new ConnectorConfig(connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class))), + EasyMock.anyObject(HerderConnectorContext.class), EasyMock.eq(herder)); + EasyMock.expectLastCall().andThrow(e); + + PowerMock.replayAll(); + + herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class), false, createCallback); + + FutureCallback<Void> cb = new FutureCallback<>(); + herder.restartConnector(CONNECTOR_NAME, cb); + try { + cb.get(1000L, TimeUnit.MILLISECONDS); + fail(); + } catch (ExecutionException exception) { + assertEquals(e, exception.getCause()); + } + + PowerMock.verifyAll(); + } + + @Test + public void testRestartTask() throws Exception { + ConnectorTaskId taskId = new ConnectorTaskId(CONNECTOR_NAME, 0); + expectAdd(CONNECTOR_NAME, BogusSourceConnector.class, BogusSourceTask.class, false); + + worker.stopAndAwaitTask(taskId); + EasyMock.expectLastCall(); + + Map<String, String> generatedTaskProps = taskConfig(BogusSourceTask.class, false); + worker.startTask(taskId, new TaskConfig(generatedTaskProps), herder); + EasyMock.expectLastCall(); + + PowerMock.replayAll(); + + herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class), false, createCallback); + + FutureCallback<Void> cb = new FutureCallback<>(); + herder.restartTask(taskId, cb); + cb.get(1000L, TimeUnit.MILLISECONDS); + + PowerMock.verifyAll(); + } + + @Test + public void testRestartTaskFailureOnStop() throws Exception { + ConnectorTaskId taskId = new ConnectorTaskId(CONNECTOR_NAME, 0); + expectAdd(CONNECTOR_NAME, BogusSourceConnector.class, BogusSourceTask.class, false); + + RuntimeException e = new RuntimeException(); + worker.stopAndAwaitTask(taskId); + EasyMock.expectLastCall().andThrow(e); + + // task will not be started after the failure in stop + + PowerMock.replayAll(); + + herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class), false, createCallback); + + FutureCallback<Void> cb = new FutureCallback<>(); + herder.restartTask(taskId, cb); + try { + cb.get(1000L, TimeUnit.MILLISECONDS); + fail("Expected restart callback to raise an exception"); + } catch (ExecutionException exception) { + assertEquals(e, exception.getCause()); + } + PowerMock.verifyAll(); + } + + @Test + public void testRestartTaskFailureOnStart() throws Exception { + ConnectorTaskId taskId = new ConnectorTaskId(CONNECTOR_NAME, 0); + expectAdd(CONNECTOR_NAME, BogusSourceConnector.class, BogusSourceTask.class, false); + + worker.stopAndAwaitTask(taskId); + EasyMock.expectLastCall(); + + RuntimeException e = new RuntimeException(); + Map<String, String> generatedTaskProps = taskConfig(BogusSourceTask.class, false); + worker.startTask(taskId, new TaskConfig(generatedTaskProps), herder); + EasyMock.expectLastCall().andThrow(e); + + PowerMock.replayAll(); + + herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class), false, createCallback); + + FutureCallback<Void> cb = new FutureCallback<>(); + herder.restartTask(taskId, cb); + try { + cb.get(1000L, TimeUnit.MILLISECONDS); + fail("Expected restart callback to raise an exception"); + } catch (ExecutionException exception) { + assertEquals(e, exception.getCause()); + } + + PowerMock.verifyAll(); + } + + @Test public void testCreateAndStop() throws Exception { connector = PowerMock.createMock(BogusSourceConnector.class); expectAdd(CONNECTOR_NAME, BogusSourceConnector.class, BogusSourceTask.class, false);
