Repository: kafka
Updated Branches:
  refs/heads/trunk b1691cf49 -> 89c67727c


KAFKA-3506: Kafka Connect restart APIs

Author: Jason Gustafson <[email protected]>

Reviewers: Ewen Cheslack-Postava <[email protected]>

Closes #1189 from hachikuji/KAFKA-3506


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/89c67727
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/89c67727
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/89c67727

Branch: refs/heads/trunk
Commit: 89c67727c2793bf56b0b005a7d758beebedb5aed
Parents: b1691cf
Author: Jason Gustafson <[email protected]>
Authored: Mon Apr 18 10:50:58 2016 -0700
Committer: Ewen Cheslack-Postava <[email protected]>
Committed: Mon Apr 18 10:50:58 2016 -0700

----------------------------------------------------------------------
 .../apache/kafka/connect/runtime/Herder.java    |  18 +-
 .../apache/kafka/connect/runtime/Worker.java    |   7 +
 .../runtime/distributed/DistributedHerder.java  |  91 +++++-
 .../distributed/NotAssignedException.java       |  29 ++
 .../runtime/distributed/NotLeaderException.java |  21 +-
 .../distributed/RequestTargetException.java     |  47 +++
 .../distributed/StaleConfigException.java       |  27 ++
 .../runtime/distributed/WorkerCoordinator.java  |  59 ++++
 .../runtime/distributed/WorkerGroupMember.java  |   9 +
 .../kafka/connect/runtime/rest/RestServer.java  |   9 +-
 .../rest/resources/ConnectorsResource.java      | 134 ++++++---
 .../runtime/standalone/StandaloneHerder.java    |  60 +++-
 .../distributed/DistributedHerderTest.java      | 301 +++++++++++++++++++
 .../rest/resources/ConnectorsResourceTest.java  | 143 +++++++--
 .../standalone/StandaloneHerderTest.java        | 154 ++++++++++
 15 files changed, 1009 insertions(+), 100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/89c67727/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
index 3ea4a81..cce100e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
@@ -61,9 +61,9 @@ public interface Herder {
      * from the current configuration. However, note
      *
      * @returns A list of connector names
-     * @throws org.apache.kafka.connect.runtime.distributed.NotLeaderException 
if this node can not resolve the request
+     * @throws 
org.apache.kafka.connect.runtime.distributed.RequestTargetException if this 
node can not resolve the request
      *         (e.g., because it has not joined the cluster or does not have 
configs in sync with the group) and it is
-     *         also not the leader
+     *         not the leader or the task owner (e.g., task restart must be 
handled by the worker which owns the task)
      * @throws org.apache.kafka.connect.errors.ConnectException if this node 
is the leader, but still cannot resolve the
      *         request (e.g., it is not in sync with other worker's config 
state)
      */
@@ -135,6 +135,20 @@ public interface Herder {
      */
     ConfigInfos validateConfigs(String connType, Map<String, String> 
connectorConfig);
 
+    /**
+     * Restart the task with the given id.
+     * @param id id of the task
+     * @param cb callback to invoke upon completion
+     */
+    void restartTask(ConnectorTaskId id, Callback<Void> cb);
+
+    /**
+     * Restart the connector.
+     * @param connName name of the connector
+     * @param cb callback to invoke upon completion
+     */
+    void restartConnector(String connName, Callback<Void> cb);
+
     class Created<T> {
         private final boolean created;
         private final T result;

http://git-wip-us.apache.org/repos/asf/kafka/blob/89c67727/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index e1a806a..22843d3 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -242,6 +242,9 @@ public class Worker {
         return names.substring(0, names.toString().length() - 2);
     }
 
+    public boolean ownsTask(ConnectorTaskId taskId) {
+        return tasks.containsKey(taskId);
+    }
 
     private static Connector instantiateConnector(Class<? extends Connector> 
connClass) {
         try {
@@ -415,6 +418,10 @@ public class Worker {
         return workerId;
     }
 
+    public boolean ownsConnector(String connName) {
+        return this.connectors.containsKey(connName);
+    }
+
     private static class WorkerConnector  {
         private final String connName;
         private final ConnectorStatus.Listener lifecycleListener;

http://git-wip-us.apache.org/repos/asf/kafka/blob/89c67727/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 2fc8297..24d548d 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -545,6 +545,79 @@ public class DistributedHerder extends AbstractHerder 
implements Runnable {
     }
 
     @Override
+    public synchronized void restartConnector(final String connName, final 
Callback<Void> callback) {
+        addRequest(new Callable<Void>() {
+            @Override
+            public Void call() throws Exception {
+                // if config is out of sync, then a rebalance is likely to 
begin shortly, so rather than risking
+                // a stale response, we return an error and let the user retry
+                if (!isConfigSynced()) {
+                    throw new StaleConfigException("Cannot complete request 
because config is out of sync");
+                }
+
+                if (!configState.connectors().contains(connName)) {
+                    callback.onCompletion(new NotFoundException("Unknown 
connector: " + connName), null);
+                    return null;
+                }
+
+                if (worker.ownsConnector(connName)) {
+                    try {
+                        worker.stopConnector(connName);
+                        startConnector(connName);
+                        callback.onCompletion(null, null);
+                    } catch (Throwable t) {
+                        callback.onCompletion(t, null);
+                    }
+                } else if (isLeader()) {
+                    callback.onCompletion(new NotAssignedException("Cannot 
restart connector since it is not assigned to this member", 
member.ownerUrl(connName)), null);
+                } else {
+                    callback.onCompletion(new NotLeaderException("Cannot 
restart connector since it is not assigned to this member", leaderUrl()), null);
+                }
+                return null;
+            }
+        }, forwardErrorCallback(callback));
+    }
+
+    @Override
+    public synchronized void restartTask(final ConnectorTaskId id, final 
Callback<Void> callback) {
+        addRequest(new Callable<Void>() {
+            @Override
+            public Void call() throws Exception {
+                // if config is out of sync, then a rebalance is likely to 
begin shortly, so rather than risking
+                // a stale response, we return an error and let the user retry
+                if (!isConfigSynced()) {
+                    throw new StaleConfigException("Cannot complete request 
because config is out of sync");
+                }
+
+                if (!configState.connectors().contains(id.connector())) {
+                    callback.onCompletion(new NotFoundException("Unknown 
connector: " + id.connector()), null);
+                    return null;
+                }
+
+                if (configState.taskConfig(id) == null) {
+                    callback.onCompletion(new NotFoundException("Unknown task: 
" + id), null);
+                    return null;
+                }
+
+                if (worker.ownsTask(id)) {
+                    try {
+                        worker.stopAndAwaitTask(id);
+                        startTask(id);
+                        callback.onCompletion(null, null);
+                    } catch (Throwable t) {
+                        callback.onCompletion(t, null);
+                    }
+                } else if (isLeader()) {
+                    callback.onCompletion(new NotAssignedException("Cannot 
restart task since it is not assigned to this member", member.ownerUrl(id)), 
null);
+                } else {
+                    callback.onCompletion(new NotLeaderException("Cannot 
restart task since it is not assigned to this member", leaderUrl()), null);
+                }
+                return null;
+            }
+        }, forwardErrorCallback(callback));
+    }
+
+    @Override
     public int generation() {
         return generation;
     }
@@ -679,10 +752,7 @@ public class DistributedHerder extends AbstractHerder 
implements Runnable {
         }
         for (ConnectorTaskId taskId : assignment.tasks()) {
             try {
-                log.info("Starting task {}", taskId);
-                Map<String, String> configs = configState.taskConfig(taskId);
-                TaskConfig taskConfig = new TaskConfig(configs);
-                worker.startTask(taskId, taskConfig, this);
+                startTask(taskId);
             } catch (ConfigException e) {
                 log.error("Couldn't instantiate task " + taskId + " because it 
has an invalid task " +
                         "configuration. This task will not execute until 
reconfigured.", e);
@@ -691,6 +761,13 @@ public class DistributedHerder extends AbstractHerder 
implements Runnable {
         log.info("Finished starting connectors and tasks");
     }
 
+    private void startTask(ConnectorTaskId taskId) {
+        log.info("Starting task {}", taskId);
+        Map<String, String> configs = configState.taskConfig(taskId);
+        TaskConfig taskConfig = new TaskConfig(configs);
+        worker.startTask(taskId, taskConfig, this);
+    }
+
     // Helper for starting a connector with the given name, which will extract 
& parse the config, generate connector
     // context and add to the worker. This needs to be called from within the 
main worker thread for this herder.
     private void startConnector(String connectorName) {
@@ -791,10 +868,14 @@ public class DistributedHerder extends AbstractHerder 
implements Runnable {
         }
     }
 
+    private boolean isConfigSynced() {
+        return assignment != null && configState.offset() == 
assignment.offset();
+    }
+
     // Common handling for requests that get config data. Checks if we are in 
sync with the current config, which allows
     // us to answer requests directly. If we are not, handles invoking the 
callback with the appropriate error.
     private boolean checkConfigSynced(Callback<?> callback) {
-        if (assignment == null || configState.offset() != assignment.offset()) 
{
+        if (!isConfigSynced()) {
             if (!isLeader())
                 callback.onCompletion(new NotLeaderException("Cannot get 
config data because config is not in sync and this is not the leader", 
leaderUrl()), null);
             else

http://git-wip-us.apache.org/repos/asf/kafka/blob/89c67727/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/NotAssignedException.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/NotAssignedException.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/NotAssignedException.java
new file mode 100644
index 0000000..a4211cc
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/NotAssignedException.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.connect.runtime.distributed;
+
+/**
+ * Thrown when a request intended for the owner of a task or connector is 
received by a worker which doesn't
+ * own it (typically the leader).
+ */
+public class NotAssignedException extends RequestTargetException {
+
+    public NotAssignedException(String message, String ownerUrl) {
+        super(message, ownerUrl);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/89c67727/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/NotLeaderException.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/NotLeaderException.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/NotLeaderException.java
index 5f94b53..9340eda 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/NotLeaderException.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/NotLeaderException.java
@@ -17,31 +17,14 @@
 
 package org.apache.kafka.connect.runtime.distributed;
 
-import org.apache.kafka.connect.errors.ConnectException;
-
 /**
  * Indicates an operation was not permitted because it can only be performed 
on the leader and this worker is not currently
  * the leader.
  */
-public class NotLeaderException extends ConnectException {
-    private final String leaderUrl;
+public class NotLeaderException extends RequestTargetException {
 
     public NotLeaderException(String msg, String leaderUrl) {
-        super(msg);
-        this.leaderUrl = leaderUrl;
-    }
-
-    public NotLeaderException(String msg, String leaderUrl, Throwable 
throwable) {
-        super(msg, throwable);
-        this.leaderUrl = leaderUrl;
+        super(msg, leaderUrl);
     }
 
-    public NotLeaderException(String leaderUrl, Throwable throwable) {
-        super(throwable);
-        this.leaderUrl = leaderUrl;
-    }
-
-    public String leaderUrl() {
-        return leaderUrl;
-    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/89c67727/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/RequestTargetException.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/RequestTargetException.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/RequestTargetException.java
new file mode 100644
index 0000000..42a5f5d
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/RequestTargetException.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.connect.runtime.distributed;
+
+import org.apache.kafka.connect.errors.ConnectException;
+
+/**
+ * Raised when a request has been received by a worker which cannot handle it,
+ * but can forward it to the right target
+ */
+public class RequestTargetException extends ConnectException {
+    private final String forwardUrl;
+
+    public RequestTargetException(String s, String forwardUrl) {
+        super(s);
+        this.forwardUrl = forwardUrl;
+    }
+
+    public RequestTargetException(String s, Throwable throwable, String 
forwardUrl) {
+        super(s, throwable);
+        this.forwardUrl = forwardUrl;
+    }
+
+    public RequestTargetException(Throwable throwable, String forwardUrl) {
+        super(throwable);
+        this.forwardUrl = forwardUrl;
+    }
+
+    public String forwardUrl() {
+        return forwardUrl;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/89c67727/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/StaleConfigException.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/StaleConfigException.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/StaleConfigException.java
new file mode 100644
index 0000000..c615b37
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/StaleConfigException.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.connect.runtime.distributed;
+
+import org.apache.kafka.connect.errors.ConnectException;
+
+public class StaleConfigException extends ConnectException {
+
+    public StaleConfigException(String s) {
+        super(s);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/89c67727/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
index fa50fbf..d5802c6 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
@@ -54,6 +54,7 @@ public final class WorkerCoordinator extends 
AbstractCoordinator implements Clos
     private final WorkerCoordinatorMetrics sensors;
     private ClusterConfigState configSnapshot;
     private final WorkerRebalanceListener listener;
+    private LeaderState leaderState;
 
     private boolean rejoinRequested;
 
@@ -198,6 +199,8 @@ public final class WorkerCoordinator extends 
AbstractCoordinator implements Clos
             }
         }
 
+        this.leaderState = new LeaderState(allConfigs, connectorAssignments, 
taskAssignments);
+
         return fillAssignmentsAndSerialize(allConfigs.keySet(), 
ConnectProtocol.Assignment.NO_ERROR,
                 leaderId, allConfigs.get(leaderId).url(), maxOffset, 
connectorAssignments, taskAssignments);
     }
@@ -228,6 +231,7 @@ public final class WorkerCoordinator extends 
AbstractCoordinator implements Clos
 
     @Override
     protected void onJoinPrepare(int generation, String memberId) {
+        this.leaderState = null;
         log.debug("Revoking previous assignment {}", assignmentSnapshot);
         if (assignmentSnapshot != null && !assignmentSnapshot.failed())
             listener.onRevoked(assignmentSnapshot.leader(), 
assignmentSnapshot.connectors(), assignmentSnapshot.tasks());
@@ -247,6 +251,22 @@ public final class WorkerCoordinator extends 
AbstractCoordinator implements Clos
         super.close();
     }
 
+    private boolean isLeader() {
+        return assignmentSnapshot != null && 
memberId.equals(assignmentSnapshot.leader());
+    }
+
+    public String ownerUrl(String connector) {
+        if (needRejoin() || !isLeader())
+            return null;
+        return leaderState.ownerUrl(connector);
+    }
+
+    public String ownerUrl(ConnectorTaskId task) {
+        if (needRejoin() || !isLeader())
+            return null;
+        return leaderState.ownerUrl(task);
+    }
+
     private class WorkerCoordinatorMetrics {
         public final Metrics metrics;
         public final String metricGrpName;
@@ -282,4 +302,43 @@ public final class WorkerCoordinator extends 
AbstractCoordinator implements Clos
         return res;
     }
 
+    private static <K, V> Map<V, K> invertAssignment(Map<K, List<V>> 
assignment) {
+        Map<V, K> inverted = new HashMap<>();
+        for (Map.Entry<K, List<V>> assignmentEntry : assignment.entrySet()) {
+            K key = assignmentEntry.getKey();
+            for (V value : assignmentEntry.getValue())
+                inverted.put(value, key);
+        }
+        return inverted;
+    }
+
+    private static class LeaderState {
+        private final Map<String, ConnectProtocol.WorkerState> allMembers;
+        private final Map<String, String> connectorOwners;
+        private final Map<ConnectorTaskId, String> taskOwners;
+
+        public LeaderState(Map<String, ConnectProtocol.WorkerState> allMembers,
+                           Map<String, List<String>> connectorAssignment,
+                           Map<String, List<ConnectorTaskId>> taskAssignment) {
+            this.allMembers = allMembers;
+            this.connectorOwners = invertAssignment(connectorAssignment);
+            this.taskOwners = invertAssignment(taskAssignment);
+        }
+
+        private String ownerUrl(ConnectorTaskId id) {
+            String ownerId = taskOwners.get(id);
+            if (ownerId == null)
+                return null;
+            return allMembers.get(ownerId).url();
+        }
+
+        private String ownerUrl(String connector) {
+            String ownerId = connectorOwners.get(connector);
+            if (ownerId == null)
+                return null;
+            return allMembers.get(ownerId).url();
+        }
+
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/89c67727/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
index 57028ef..058f171 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
@@ -33,6 +33,7 @@ import org.apache.kafka.common.network.Selector;
 import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.connect.storage.KafkaConfigStorage;
+import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -175,6 +176,14 @@ public class WorkerGroupMember {
         coordinator.maybeLeaveGroup();
     }
 
+    public String ownerUrl(String connector) {
+        return coordinator.ownerUrl(connector);
+    }
+
+    public String ownerUrl(ConnectorTaskId task) {
+        return coordinator.ownerUrl(task);
+    }
+
     private void stop(boolean swallowException) {
         log.trace("Stopping the Connect group member.");
         AtomicReference<Throwable> firstException = new 
AtomicReference<Throwable>();

http://git-wip-us.apache.org/repos/asf/kafka/blob/89c67727/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
index 1505a01..3475e1c 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
@@ -20,7 +20,6 @@ package org.apache.kafka.connect.runtime.rest;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
-
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.WorkerConfig;
@@ -48,6 +47,9 @@ import org.glassfish.jersey.servlet.ServletContainer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.servlet.DispatcherType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.UriBuilder;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -58,10 +60,6 @@ import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
 
-import javax.servlet.DispatcherType;
-import javax.ws.rs.core.Response;
-import javax.ws.rs.core.UriBuilder;
-
 /**
  * Embedded server for the REST API that provides the control plane for Kafka 
Connect workers.
  */
@@ -272,4 +270,5 @@ public class RestServer {
         else
             return base + path;
     }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/89c67727/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
index b6e9f61..a53ed7d 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
@@ -18,10 +18,10 @@
 package org.apache.kafka.connect.runtime.rest.resources;
 
 import com.fasterxml.jackson.core.type.TypeReference;
-
 import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.Herder;
-import org.apache.kafka.connect.runtime.distributed.NotLeaderException;
+import org.apache.kafka.connect.runtime.distributed.StaleConfigException;
+import org.apache.kafka.connect.runtime.distributed.RequestTargetException;
 import org.apache.kafka.connect.runtime.rest.RestServer;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
@@ -33,14 +33,6 @@ import org.apache.kafka.connect.util.FutureCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.net.URI;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
 import javax.servlet.ServletContext;
 import javax.ws.rs.Consumes;
 import javax.ws.rs.DELETE;
@@ -50,8 +42,17 @@ import javax.ws.rs.PUT;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
+import javax.ws.rs.core.UriBuilder;
+import java.net.URI;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 @Path("/connectors")
 @Produces(MediaType.APPLICATION_JSON)
@@ -75,16 +76,17 @@ public class ConnectorsResource {
 
     @GET
     @Path("/")
-    public Collection<String> listConnectors() throws Throwable {
+    public Collection<String> listConnectors(final @QueryParam("forward") 
Boolean forward) throws Throwable {
         FutureCallback<Collection<String>> cb = new FutureCallback<>();
         herder.connectors(cb);
         return completeOrForwardRequest(cb, "/connectors", "GET", null, new 
TypeReference<Collection<String>>() {
-        });
+        }, forward);
     }
 
     @POST
     @Path("/")
-    public Response createConnector(final CreateConnectorRequest 
createRequest) throws Throwable {
+    public Response createConnector(final @QueryParam("forward") Boolean 
forward,
+                                    final CreateConnectorRequest 
createRequest) throws Throwable {
         String name = createRequest.name();
         Map<String, String> configs = createRequest.config();
         if (!configs.containsKey(ConnectorConfig.NAME_CONFIG))
@@ -93,24 +95,26 @@ public class ConnectorsResource {
         FutureCallback<Herder.Created<ConnectorInfo>> cb = new 
FutureCallback<>();
         herder.putConnectorConfig(name, configs, false, cb);
         Herder.Created<ConnectorInfo> info = completeOrForwardRequest(cb, 
"/connectors", "POST", createRequest,
-                new TypeReference<ConnectorInfo>() { }, new 
CreatedConnectorInfoTranslator());
+                new TypeReference<ConnectorInfo>() { }, new 
CreatedConnectorInfoTranslator(), forward);
         return Response.created(URI.create("/connectors/" + 
name)).entity(info.result()).build();
     }
 
     @GET
     @Path("/{connector}")
-    public ConnectorInfo getConnector(final @PathParam("connector") String 
connector) throws Throwable {
+    public ConnectorInfo getConnector(final @PathParam("connector") String 
connector,
+                                      final @QueryParam("forward") Boolean 
forward) throws Throwable {
         FutureCallback<ConnectorInfo> cb = new FutureCallback<>();
         herder.connectorInfo(connector, cb);
-        return completeOrForwardRequest(cb, "/connectors/" + connector, "GET", 
null);
+        return completeOrForwardRequest(cb, "/connectors/" + connector, "GET", 
null, forward);
     }
 
     @GET
     @Path("/{connector}/config")
-    public Map<String, String> getConnectorConfig(final 
@PathParam("connector") String connector) throws Throwable {
+    public Map<String, String> getConnectorConfig(final 
@PathParam("connector") String connector,
+                                                  final @QueryParam("forward") 
Boolean forward) throws Throwable {
         FutureCallback<Map<String, String>> cb = new FutureCallback<>();
         herder.connectorConfig(connector, cb);
-        return completeOrForwardRequest(cb, "/connectors/" + connector + 
"/config", "GET", null);
+        return completeOrForwardRequest(cb, "/connectors/" + connector + 
"/config", "GET", null, forward);
     }
 
     @GET
@@ -122,11 +126,12 @@ public class ConnectorsResource {
     @PUT
     @Path("/{connector}/config")
     public Response putConnectorConfig(final @PathParam("connector") String 
connector,
-                                   final Map<String, String> connectorConfig) 
throws Throwable {
+                                       final @QueryParam("forward") Boolean 
forward,
+                                       final Map<String, String> 
connectorConfig) throws Throwable {
         FutureCallback<Herder.Created<ConnectorInfo>> cb = new 
FutureCallback<>();
         herder.putConnectorConfig(connector, connectorConfig, true, cb);
         Herder.Created<ConnectorInfo> createdInfo = 
completeOrForwardRequest(cb, "/connectors/" + connector + "/config",
-                "PUT", connectorConfig, new TypeReference<ConnectorInfo>() { 
}, new CreatedConnectorInfoTranslator());
+                "PUT", connectorConfig, new TypeReference<ConnectorInfo>() { 
}, new CreatedConnectorInfoTranslator(), forward);
         Response.ResponseBuilder response;
         if (createdInfo.created())
             response = Response.created(URI.create("/connectors/" + 
connector));
@@ -135,55 +140,102 @@ public class ConnectorsResource {
         return response.entity(createdInfo.result()).build();
     }
 
+    @POST
+    @Path("/{connector}/restart")
+    public void restartConnector(final @PathParam("connector") String 
connector,
+                                 final @QueryParam("forward") Boolean forward) 
throws Throwable {
+        FutureCallback<Void> cb = new FutureCallback<>();
+        herder.restartConnector(connector, cb);
+        completeOrForwardRequest(cb, "/connectors/" + connector + "/restart", 
"POST", null, forward);
+    }
+
     @GET
     @Path("/{connector}/tasks")
-    public List<TaskInfo> getTaskConfigs(final @PathParam("connector") String 
connector) throws Throwable {
+    public List<TaskInfo> getTaskConfigs(final @PathParam("connector") String 
connector,
+                                         final @QueryParam("forward") Boolean 
forward) throws Throwable {
         FutureCallback<List<TaskInfo>> cb = new FutureCallback<>();
         herder.taskConfigs(connector, cb);
         return completeOrForwardRequest(cb, "/connectors/" + connector + 
"/tasks", "GET", null, new TypeReference<List<TaskInfo>>() {
-        });
+        }, forward);
     }
 
     @POST
     @Path("/{connector}/tasks")
     public void putTaskConfigs(final @PathParam("connector") String connector,
+                               final @QueryParam("forward") Boolean forward,
                                final List<Map<String, String>> taskConfigs) 
throws Throwable {
         FutureCallback<Void> cb = new FutureCallback<>();
         herder.putTaskConfigs(connector, taskConfigs, cb);
-        completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks", 
"POST", taskConfigs);
+        completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks", 
"POST", taskConfigs, forward);
     }
 
     @GET
     @Path("/{connector}/tasks/{task}/status")
-    public ConnectorStateInfo.TaskState getTaskStatus(@PathParam("connector") 
String connector,
-                                                      @PathParam("task") 
Integer task) throws Throwable {
+    public ConnectorStateInfo.TaskState getTaskStatus(final 
@PathParam("connector") String connector,
+                                                      final @PathParam("task") 
Integer task) throws Throwable {
         return herder.taskStatus(new ConnectorTaskId(connector, task));
     }
 
+    @POST
+    @Path("/{connector}/tasks/{task}/restart")
+    public void restartTask(final @PathParam("connector") String connector,
+                            final @PathParam("task") Integer task,
+                            final @QueryParam("forward") Boolean forward) 
throws Throwable {
+        FutureCallback<Void> cb = new FutureCallback<>();
+        ConnectorTaskId taskId = new ConnectorTaskId(connector, task);
+        herder.restartTask(taskId, cb);
+        completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks/" + 
task + "/restart", "POST", null, forward);
+    }
+
     @DELETE
     @Path("/{connector}")
-    public void destroyConnector(final @PathParam("connector") String 
connector) throws Throwable {
+    public void destroyConnector(final @PathParam("connector") String 
connector,
+                                 final @QueryParam("forward") Boolean forward) 
throws Throwable {
         FutureCallback<Herder.Created<ConnectorInfo>> cb = new 
FutureCallback<>();
         herder.putConnectorConfig(connector, null, true, cb);
-        completeOrForwardRequest(cb, "/connectors/" + connector, "DELETE", 
null);
+        completeOrForwardRequest(cb, "/connectors/" + connector, "DELETE", 
null, forward);
     }
 
     // Wait for a FutureCallback to complete. If it succeeds, return the 
parsed response. If it fails, try to forward the
     // request to the leader.
-    private <T, U> T completeOrForwardRequest(
-            FutureCallback<T> cb, String path, String method, Object body, 
TypeReference<U> resultType,
-            Translator<T, U> translator) throws Throwable {
+    private <T, U> T completeOrForwardRequest(FutureCallback<T> cb,
+                                              String path,
+                                              String method,
+                                              Object body,
+                                              TypeReference<U> resultType,
+                                              Translator<T, U> translator,
+                                              Boolean forward) throws 
Throwable {
         try {
             return cb.get(REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS);
         } catch (ExecutionException e) {
-            if (e.getCause() instanceof NotLeaderException) {
-                NotLeaderException notLeaderError = (NotLeaderException) 
e.getCause();
-                String forwardUrl = 
RestServer.urlJoin(notLeaderError.leaderUrl(), path);
-                log.debug("Forwarding request to leader: {} {} {}", 
forwardUrl, method, body);
-                return translator.translate(RestServer.httpRequest(forwardUrl, 
method, body, resultType));
+            Throwable cause = e.getCause();
+
+            if (cause instanceof RequestTargetException) {
+                if (forward == null || forward) {
+                    // the only time we allow recursive forwarding is when no 
forward flag has
+                    // been set, which should only be seen by the first worker 
to handle a user request.
+                    // this gives two total hops to resolve the request before 
giving up.
+                    boolean recursiveForward = forward == null;
+                    RequestTargetException targetException = 
(RequestTargetException) cause;
+                    String forwardUrl = 
UriBuilder.fromUri(targetException.forwardUrl())
+                            .path(path)
+                            .queryParam("forward", recursiveForward)
+                            .build()
+                            .toString();
+                    log.debug("Forwarding request {} {} {}", forwardUrl, 
method, body);
+                    return 
translator.translate(RestServer.httpRequest(forwardUrl, method, body, 
resultType));
+                } else {
+                    // we should find the right target for the query within 
two hops, so if
+                    // we don't, it probably means that a rebalance has taken 
place.
+                    throw new 
ConnectRestException(Response.Status.CONFLICT.getStatusCode(),
+                            "Cannot complete request because of a conflicting 
operation (e.g. worker rebalance)");
+                }
+            } else if (cause instanceof StaleConfigException) {
+                throw new 
ConnectRestException(Response.Status.CONFLICT.getStatusCode(),
+                        "Cannot complete request momentarily due to stale 
configuration (typically caused by a concurrent config change)");
             }
 
-            throw e.getCause();
+            throw cause;
         } catch (TimeoutException e) {
             // This timeout is for the operation itself. None of the timeout 
error codes are relevant, so internal server
             // error is the best option
@@ -193,12 +245,14 @@ public class ConnectorsResource {
         }
     }
 
-    private <T> T completeOrForwardRequest(FutureCallback<T> cb, String path, 
String method, Object body, TypeReference<T> resultType) throws Throwable {
-        return completeOrForwardRequest(cb, path, method, body, resultType, 
new IdentityTranslator<T>());
+    private <T> T completeOrForwardRequest(FutureCallback<T> cb, String path, 
String method, Object body,
+                                           TypeReference<T> resultType, 
Boolean forward) throws Throwable {
+        return completeOrForwardRequest(cb, path, method, body, resultType, 
new IdentityTranslator<T>(), forward);
     }
 
-    private <T> T completeOrForwardRequest(FutureCallback<T> cb, String path, 
String method, Object body) throws Throwable {
-        return completeOrForwardRequest(cb, path, method, body, null, new 
IdentityTranslator<T>());
+    private <T> T completeOrForwardRequest(FutureCallback<T> cb, String path, 
String method,
+                                           Object body, Boolean forward) 
throws Throwable {
+        return completeOrForwardRequest(cb, path, method, body, null, new 
IdentityTranslator<T>(), forward);
     }
 
     private interface Translator<T, U> {

http://git-wip-us.apache.org/repos/asf/kafka/blob/89c67727/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
index 9c48ed7..a59336a 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
@@ -202,6 +202,44 @@ public class StandaloneHerder extends AbstractHerder {
         throw new UnsupportedOperationException("Kafka Connect in standalone 
mode does not support externally setting task configurations.");
     }
 
+    @Override
+    public synchronized void restartTask(ConnectorTaskId taskId, 
Callback<Void> cb) {
+        if (!connectors.containsKey(taskId.connector()))
+            cb.onCompletion(new NotFoundException("Connector " + 
taskId.connector() + " not found", null), null);
+
+        ConnectorState state = connectors.get(taskId.connector());
+        if (state.taskConfigs.contains(taskId))
+            cb.onCompletion(new NotFoundException("Task " + taskId + " not 
found", null), null);
+
+        try {
+            worker.stopAndAwaitTask(taskId);
+
+            Map<String, String> taskConfig = 
state.taskConfigs.get(taskId.task());
+            worker.startTask(taskId, new TaskConfig(taskConfig), this);
+
+            cb.onCompletion(null, null);
+        } catch (Exception e) {
+            log.error("Failed to restart task {}", taskId, e);
+            cb.onCompletion(e, null);
+        }
+    }
+
+    @Override
+    public synchronized void restartConnector(String connName, Callback<Void> 
cb) {
+        if (!connectors.containsKey(connName))
+            cb.onCompletion(new NotFoundException("Connector " + connName + " 
not found", null), null);
+
+        ConnectorState state = connectors.get(connName);
+        try {
+            worker.stopConnector(connName);
+            worker.startConnector(state.config, new 
HerderConnectorContext(this, connName), this);
+            cb.onCompletion(null, null);
+        } catch (Exception e) {
+            log.error("Failed to restart connector {}", connName, e);
+            cb.onCompletion(e, null);
+        }
+    }
+
     /**
      * Start a connector in the worker and record its state.
      * @param connectorProps new connector configuration
@@ -234,19 +272,23 @@ public class StandaloneHerder extends AbstractHerder {
         int index = 0;
         for (Map<String, String> taskConfigMap : state.taskConfigs) {
             ConnectorTaskId taskId = new ConnectorTaskId(connName, index);
-            TaskConfig config = new TaskConfig(taskConfigMap);
-            try {
-                worker.startTask(taskId, config, this);
-            } catch (Throwable e) {
-                log.error("Failed to add task {}: ", taskId, e);
-                // Swallow this so we can continue updating the rest of the 
tasks
-                // FIXME what's the proper response? Kill all the tasks? 
Consider this the same as a task
-                // that died after starting successfully.
-            }
+            startTask(taskId, taskConfigMap);
             index++;
         }
     }
 
+    private void startTask(ConnectorTaskId taskId, Map<String, String> 
taskConfigMap) {
+        TaskConfig config = new TaskConfig(taskConfigMap);
+        try {
+            worker.startTask(taskId, config, this);
+        } catch (Throwable e) {
+            log.error("Failed to add task {}: ", taskId, e);
+            // Swallow this so we can continue updating the rest of the tasks
+            // FIXME what's the proper response? Kill all the tasks? Consider 
this the same as a task
+            // that died after starting successfully.
+        }
+    }
+
     private Set<ConnectorTaskId> tasksFor(ConnectorState state) {
         Set<ConnectorTaskId> tasks = new HashSet<>();
         for (int i = 0; i < state.taskConfigs.size(); i++)

http://git-wip-us.apache.org/repos/asf/kafka/blob/89c67727/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index aa747f6..b667fa8 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.connect.connector.ConnectorContext;
 import org.apache.kafka.connect.errors.AlreadyExistsException;
+import org.apache.kafka.connect.errors.NotFoundException;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.TaskConfig;
@@ -56,10 +57,13 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(DistributedHerder.class)
@@ -315,6 +319,303 @@ public class DistributedHerderTest {
     }
 
     @Test
+    public void testRestartConnector() throws Exception {
+        EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, 
null)).andStubReturn(TASK_CONFIGS);
+
+        // get the initial assignment
+        EasyMock.expect(member.memberId()).andStubReturn("leader");
+        expectRebalance(1, Collections.singletonList(CONN1), 
Collections.<ConnectorTaskId>emptyList());
+        expectPostRebalanceCatchup(SNAPSHOT);
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+        worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), 
EasyMock.<ConnectorContext>anyObject(),
+                EasyMock.eq(herder));
+        PowerMock.expectLastCall();
+
+        // now handle the connector restart
+        member.wakeup();
+        PowerMock.expectLastCall();
+        member.ensureActive();
+        PowerMock.expectLastCall();
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        EasyMock.expect(worker.ownsConnector(CONN1)).andReturn(true);
+
+        worker.stopConnector(CONN1);
+        PowerMock.expectLastCall();
+        worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), 
EasyMock.<ConnectorContext>anyObject(),
+                EasyMock.eq(herder));
+        PowerMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        herder.tick();
+        FutureCallback<Void> callback = new FutureCallback<>();
+        herder.restartConnector(CONN1, callback);
+        herder.tick();
+        callback.get(1000L, TimeUnit.MILLISECONDS);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testRestartUnknownConnector() throws Exception {
+        // get the initial assignment
+        EasyMock.expect(member.memberId()).andStubReturn("leader");
+        expectRebalance(1, Collections.<String>emptyList(), 
Collections.<ConnectorTaskId>emptyList());
+        expectPostRebalanceCatchup(SNAPSHOT);
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+
+        // now handle the connector restart
+        member.wakeup();
+        PowerMock.expectLastCall();
+        member.ensureActive();
+        PowerMock.expectLastCall();
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        herder.tick();
+        FutureCallback<Void> callback = new FutureCallback<>();
+        herder.restartConnector(CONN2, callback);
+        herder.tick();
+        try {
+            callback.get(1000L, TimeUnit.MILLISECONDS);
+            fail("Expected NotLeaderException to be raised");
+        } catch (ExecutionException e) {
+            assertTrue(e.getCause() instanceof NotFoundException);
+        }
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testRestartConnectorRedirectToLeader() throws Exception {
+        // get the initial assignment
+        EasyMock.expect(member.memberId()).andStubReturn("member");
+        expectRebalance(1, Collections.<String>emptyList(), 
Collections.<ConnectorTaskId>emptyList());
+        expectPostRebalanceCatchup(SNAPSHOT);
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        // now handle the connector restart
+        member.wakeup();
+        PowerMock.expectLastCall();
+        member.ensureActive();
+        PowerMock.expectLastCall();
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        EasyMock.expect(worker.ownsConnector(CONN1)).andReturn(false);
+
+        PowerMock.replayAll();
+
+        herder.tick();
+        FutureCallback<Void> callback = new FutureCallback<>();
+        herder.restartConnector(CONN1, callback);
+        herder.tick();
+
+        try {
+            callback.get(1000L, TimeUnit.MILLISECONDS);
+            fail("Expected NotLeaderException to be raised");
+        } catch (ExecutionException e) {
+            assertTrue(e.getCause() instanceof NotLeaderException);
+        }
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testRestartConnectorRedirectToOwner() throws Exception {
+        // get the initial assignment
+        EasyMock.expect(member.memberId()).andStubReturn("leader");
+        expectRebalance(1, Collections.<String>emptyList(), 
Collections.<ConnectorTaskId>emptyList());
+        expectPostRebalanceCatchup(SNAPSHOT);
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        // now handle the connector restart
+        member.wakeup();
+        PowerMock.expectLastCall();
+        member.ensureActive();
+        PowerMock.expectLastCall();
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        String ownerUrl = "ownerUrl";
+        EasyMock.expect(worker.ownsConnector(CONN1)).andReturn(false);
+        EasyMock.expect(member.ownerUrl(CONN1)).andReturn(ownerUrl);
+
+        PowerMock.replayAll();
+
+        herder.tick();
+        FutureCallback<Void> callback = new FutureCallback<>();
+        herder.restartConnector(CONN1, callback);
+        herder.tick();
+
+        try {
+            callback.get(1000L, TimeUnit.MILLISECONDS);
+            fail("Expected NotLeaderException to be raised");
+        } catch (ExecutionException e) {
+            assertTrue(e.getCause() instanceof NotAssignedException);
+            NotAssignedException notAssignedException = (NotAssignedException) 
e.getCause();
+            assertEquals(ownerUrl, notAssignedException.forwardUrl());
+        }
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testRestartTask() throws Exception {
+        EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, 
null)).andStubReturn(TASK_CONFIGS);
+
+        // get the initial assignment
+        EasyMock.expect(member.memberId()).andStubReturn("leader");
+        expectRebalance(1, Collections.<String>emptyList(), 
Collections.singletonList(TASK0));
+        expectPostRebalanceCatchup(SNAPSHOT);
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+        worker.startTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject(), 
EasyMock.eq(herder));
+        PowerMock.expectLastCall();
+
+        // now handle the task restart
+        member.wakeup();
+        PowerMock.expectLastCall();
+        member.ensureActive();
+        PowerMock.expectLastCall();
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        EasyMock.expect(worker.ownsTask(TASK0)).andReturn(true);
+
+        worker.stopAndAwaitTask(TASK0);
+        PowerMock.expectLastCall();
+        worker.startTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject(), 
EasyMock.eq(herder));
+        PowerMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        herder.tick();
+        FutureCallback<Void> callback = new FutureCallback<>();
+        herder.restartTask(TASK0, callback);
+        herder.tick();
+        callback.get(1000L, TimeUnit.MILLISECONDS);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testRestartUnknownTask() throws Exception {
+        // get the initial assignment
+        EasyMock.expect(member.memberId()).andStubReturn("member");
+        expectRebalance(1, Collections.<String>emptyList(), 
Collections.<ConnectorTaskId>emptyList());
+        expectPostRebalanceCatchup(SNAPSHOT);
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        member.wakeup();
+        PowerMock.expectLastCall();
+        member.ensureActive();
+        PowerMock.expectLastCall();
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        FutureCallback<Void> callback = new FutureCallback<>();
+        herder.tick();
+        herder.restartTask(new ConnectorTaskId("blah", 0), callback);
+        herder.tick();
+
+        try {
+            callback.get(1000L, TimeUnit.MILLISECONDS);
+            fail("Expected NotLeaderException to be raised");
+        } catch (ExecutionException e) {
+            assertTrue(e.getCause() instanceof NotFoundException);
+        }
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testRestartTaskRedirectToLeader() throws Exception {
+        // get the initial assignment
+        EasyMock.expect(member.memberId()).andStubReturn("member");
+        expectRebalance(1, Collections.<String>emptyList(), 
Collections.<ConnectorTaskId>emptyList());
+        expectPostRebalanceCatchup(SNAPSHOT);
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        // now handle the task restart
+        EasyMock.expect(worker.ownsTask(TASK0)).andReturn(false);
+        member.wakeup();
+        PowerMock.expectLastCall();
+        member.ensureActive();
+        PowerMock.expectLastCall();
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        herder.tick();
+        FutureCallback<Void> callback = new FutureCallback<>();
+        herder.restartTask(TASK0, callback);
+        herder.tick();
+
+        try {
+            callback.get(1000L, TimeUnit.MILLISECONDS);
+            fail("Expected NotLeaderException to be raised");
+        } catch (ExecutionException e) {
+            assertTrue(e.getCause() instanceof NotLeaderException);
+        }
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testRestartTaskRedirectToOwner() throws Exception {
+        // get the initial assignment
+        EasyMock.expect(member.memberId()).andStubReturn("leader");
+        expectRebalance(1, Collections.<String>emptyList(), 
Collections.<ConnectorTaskId>emptyList());
+        expectPostRebalanceCatchup(SNAPSHOT);
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        // now handle the task restart
+        String ownerUrl = "ownerUrl";
+        EasyMock.expect(worker.ownsTask(TASK0)).andReturn(false);
+        EasyMock.expect(member.ownerUrl(TASK0)).andReturn(ownerUrl);
+        member.wakeup();
+        PowerMock.expectLastCall();
+        member.ensureActive();
+        PowerMock.expectLastCall();
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        herder.tick();
+        FutureCallback<Void> callback = new FutureCallback<>();
+        herder.restartTask(TASK0, callback);
+        herder.tick();
+
+        try {
+            callback.get(1000L, TimeUnit.MILLISECONDS);
+            fail("Expected NotLeaderException to be raised");
+        } catch (ExecutionException e) {
+            assertTrue(e.getCause() instanceof NotAssignedException);
+            NotAssignedException notAssignedException = (NotAssignedException) 
e.getCause();
+            assertEquals(ownerUrl, notAssignedException.forwardUrl());
+        }
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
     public void testConnectorConfigAdded() {
         // If a connector was added, we need to rebalance
         EasyMock.expect(member.memberId()).andStubReturn("member");

http://git-wip-us.apache.org/repos/asf/kafka/blob/89c67727/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
index 970f56c..fa7d997 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.errors.NotFoundException;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.distributed.NotAssignedException;
 import org.apache.kafka.connect.runtime.distributed.NotLeaderException;
 import org.apache.kafka.connect.runtime.rest.RestServer;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
@@ -64,6 +65,7 @@ public class ConnectorsResourceTest {
     private static final String LEADER_URL = "http://leader:8083/";;
     private static final String CONNECTOR_NAME = "test";
     private static final String CONNECTOR2_NAME = "test2";
+    private static final Boolean FORWARD = true;
     private static final Map<String, String> CONNECTOR_CONFIG = new 
HashMap<>();
     static {
         CONNECTOR_CONFIG.put("name", CONNECTOR_NAME);
@@ -103,7 +105,7 @@ public class ConnectorsResourceTest {
 
         PowerMock.replayAll();
 
-        Collection<String> connectors = connectorsResource.listConnectors();
+        Collection<String> connectors = 
connectorsResource.listConnectors(FORWARD);
         // Ordering isn't guaranteed, compare sets
         assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_NAME, 
CONNECTOR2_NAME)), new HashSet<>(connectors));
 
@@ -116,13 +118,13 @@ public class ConnectorsResourceTest {
         herder.connectors(EasyMock.capture(cb));
         expectAndCallbackNotLeaderException(cb);
         // Should forward request
-        
EasyMock.expect(RestServer.httpRequest(EasyMock.eq("http://leader:8083/connectors";),
 EasyMock.eq("GET"),
+        
EasyMock.expect(RestServer.httpRequest(EasyMock.eq("http://leader:8083/connectors?forward=false";),
 EasyMock.eq("GET"),
                 EasyMock.isNull(), EasyMock.anyObject(TypeReference.class)))
                 .andReturn(new RestServer.HttpResponse<>(200, new 
HashMap<String, List<String>>(), Arrays.asList(CONNECTOR2_NAME, 
CONNECTOR_NAME)));
 
         PowerMock.replayAll();
 
-        Collection<String> connectors = connectorsResource.listConnectors();
+        Collection<String> connectors = 
connectorsResource.listConnectors(FORWARD);
         // Ordering isn't guaranteed, compare sets
         assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_NAME, 
CONNECTOR2_NAME)), new HashSet<>(connectors));
 
@@ -138,7 +140,7 @@ public class ConnectorsResourceTest {
         PowerMock.replayAll();
 
         // throws
-        connectorsResource.listConnectors();
+        connectorsResource.listConnectors(FORWARD);
     }
 
     @Test
@@ -151,7 +153,7 @@ public class ConnectorsResourceTest {
 
         PowerMock.replayAll();
 
-        connectorsResource.createConnector(body);
+        connectorsResource.createConnector(FORWARD, body);
 
         PowerMock.verifyAll();
     }
@@ -164,12 +166,12 @@ public class ConnectorsResourceTest {
         herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME), 
EasyMock.eq(body.config()), EasyMock.eq(false), EasyMock.capture(cb));
         expectAndCallbackNotLeaderException(cb);
         // Should forward request
-        
EasyMock.expect(RestServer.httpRequest(EasyMock.eq("http://leader:8083/connectors";),
 EasyMock.eq("POST"), EasyMock.eq(body), EasyMock.<TypeReference>anyObject()))
+        
EasyMock.expect(RestServer.httpRequest(EasyMock.eq("http://leader:8083/connectors?forward=false";),
 EasyMock.eq("POST"), EasyMock.eq(body), EasyMock.<TypeReference>anyObject()))
                 .andReturn(new RestServer.HttpResponse<>(201, new 
HashMap<String, List<String>>(), new ConnectorInfo(CONNECTOR_NAME, 
CONNECTOR_CONFIG, CONNECTOR_TASK_NAMES)));
 
         PowerMock.replayAll();
 
-        connectorsResource.createConnector(body);
+        connectorsResource.createConnector(FORWARD, body);
 
         PowerMock.verifyAll();
 
@@ -186,7 +188,7 @@ public class ConnectorsResourceTest {
 
         PowerMock.replayAll();
 
-        connectorsResource.createConnector(body);
+        connectorsResource.createConnector(FORWARD, body);
 
         PowerMock.verifyAll();
     }
@@ -199,7 +201,7 @@ public class ConnectorsResourceTest {
 
         PowerMock.replayAll();
 
-        connectorsResource.destroyConnector(CONNECTOR_NAME);
+        connectorsResource.destroyConnector(CONNECTOR_NAME, FORWARD);
 
         PowerMock.verifyAll();
     }
@@ -210,12 +212,12 @@ public class ConnectorsResourceTest {
         herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME), 
EasyMock.<Map<String, String>>isNull(), EasyMock.eq(true), 
EasyMock.capture(cb));
         expectAndCallbackNotLeaderException(cb);
         // Should forward request
-        
EasyMock.expect(RestServer.httpRequest("http://leader:8083/connectors/"; + 
CONNECTOR_NAME, "DELETE", null, null))
+        
EasyMock.expect(RestServer.httpRequest("http://leader:8083/connectors/"; + 
CONNECTOR_NAME + "?forward=false", "DELETE", null, null))
                 .andReturn(new RestServer.HttpResponse<>(204, new 
HashMap<String, List<String>>(), null));
 
         PowerMock.replayAll();
 
-        connectorsResource.destroyConnector(CONNECTOR_NAME);
+        connectorsResource.destroyConnector(CONNECTOR_NAME, FORWARD);
 
         PowerMock.verifyAll();
     }
@@ -229,7 +231,7 @@ public class ConnectorsResourceTest {
 
         PowerMock.replayAll();
 
-        connectorsResource.destroyConnector(CONNECTOR_NAME);
+        connectorsResource.destroyConnector(CONNECTOR_NAME, FORWARD);
 
         PowerMock.verifyAll();
     }
@@ -242,7 +244,7 @@ public class ConnectorsResourceTest {
 
         PowerMock.replayAll();
 
-        ConnectorInfo connInfo = 
connectorsResource.getConnector(CONNECTOR_NAME);
+        ConnectorInfo connInfo = 
connectorsResource.getConnector(CONNECTOR_NAME, FORWARD);
         assertEquals(new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG, 
CONNECTOR_TASK_NAMES), connInfo);
 
         PowerMock.verifyAll();
@@ -256,7 +258,7 @@ public class ConnectorsResourceTest {
 
         PowerMock.replayAll();
 
-        Map<String, String> connConfig = 
connectorsResource.getConnectorConfig(CONNECTOR_NAME);
+        Map<String, String> connConfig = 
connectorsResource.getConnectorConfig(CONNECTOR_NAME, FORWARD);
         assertEquals(CONNECTOR_CONFIG, connConfig);
 
         PowerMock.verifyAll();
@@ -270,7 +272,7 @@ public class ConnectorsResourceTest {
 
         PowerMock.replayAll();
 
-        connectorsResource.getConnectorConfig(CONNECTOR_NAME);
+        connectorsResource.getConnectorConfig(CONNECTOR_NAME, FORWARD);
 
         PowerMock.verifyAll();
     }
@@ -283,7 +285,7 @@ public class ConnectorsResourceTest {
 
         PowerMock.replayAll();
 
-        connectorsResource.putConnectorConfig(CONNECTOR_NAME, 
CONNECTOR_CONFIG);
+        connectorsResource.putConnectorConfig(CONNECTOR_NAME, FORWARD, 
CONNECTOR_CONFIG);
 
         PowerMock.verifyAll();
     }
@@ -296,7 +298,7 @@ public class ConnectorsResourceTest {
 
         PowerMock.replayAll();
 
-        List<TaskInfo> taskInfos = 
connectorsResource.getTaskConfigs(CONNECTOR_NAME);
+        List<TaskInfo> taskInfos = 
connectorsResource.getTaskConfigs(CONNECTOR_NAME, FORWARD);
         assertEquals(TASK_INFOS, taskInfos);
 
         PowerMock.verifyAll();
@@ -310,7 +312,7 @@ public class ConnectorsResourceTest {
 
         PowerMock.replayAll();
 
-        connectorsResource.getTaskConfigs(CONNECTOR_NAME);
+        connectorsResource.getTaskConfigs(CONNECTOR_NAME, FORWARD);
 
         PowerMock.verifyAll();
     }
@@ -323,7 +325,7 @@ public class ConnectorsResourceTest {
 
         PowerMock.replayAll();
 
-        connectorsResource.putTaskConfigs(CONNECTOR_NAME, TASK_CONFIGS);
+        connectorsResource.putTaskConfigs(CONNECTOR_NAME, FORWARD, 
TASK_CONFIGS);
 
         PowerMock.verifyAll();
     }
@@ -336,7 +338,108 @@ public class ConnectorsResourceTest {
 
         PowerMock.replayAll();
 
-        connectorsResource.putTaskConfigs(CONNECTOR_NAME, TASK_CONFIGS);
+        connectorsResource.putTaskConfigs(CONNECTOR_NAME, FORWARD, 
TASK_CONFIGS);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test(expected = NotFoundException.class)
+    public void testRestartConnectorNotFound() throws Throwable {
+        final Capture<Callback<Void>> cb = Capture.newInstance();
+        herder.restartConnector(EasyMock.eq(CONNECTOR_NAME), 
EasyMock.capture(cb));
+        expectAndCallbackException(cb, new NotFoundException("not found"));
+
+        PowerMock.replayAll();
+
+        connectorsResource.restartConnector(CONNECTOR_NAME, FORWARD);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testRestartConnectorLeaderRedirect() throws Throwable {
+        final Capture<Callback<Void>> cb = Capture.newInstance();
+        herder.restartConnector(EasyMock.eq(CONNECTOR_NAME), 
EasyMock.capture(cb));
+        expectAndCallbackNotLeaderException(cb);
+
+        
EasyMock.expect(RestServer.httpRequest(EasyMock.eq("http://leader:8083/connectors/";
 + CONNECTOR_NAME + "/restart?forward=true"),
+                EasyMock.eq("POST"), EasyMock.isNull(), 
EasyMock.<TypeReference>anyObject()))
+                .andReturn(new RestServer.HttpResponse<>(202, new 
HashMap<String, List<String>>(), null));
+
+        PowerMock.replayAll();
+
+        connectorsResource.restartConnector(CONNECTOR_NAME, null);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testRestartConnectorOwnerRedirect() throws Throwable {
+        final Capture<Callback<Void>> cb = Capture.newInstance();
+        herder.restartConnector(EasyMock.eq(CONNECTOR_NAME), 
EasyMock.capture(cb));
+        String ownerUrl = "http://owner:8083";;
+        expectAndCallbackException(cb, new NotAssignedException("not owner 
test", ownerUrl));
+
+        
EasyMock.expect(RestServer.httpRequest(EasyMock.eq("http://owner:8083/connectors/";
 + CONNECTOR_NAME + "/restart?forward=false"),
+                EasyMock.eq("POST"), EasyMock.isNull(), 
EasyMock.<TypeReference>anyObject()))
+                .andReturn(new RestServer.HttpResponse<>(202, new 
HashMap<String, List<String>>(), null));
+
+        PowerMock.replayAll();
+
+        connectorsResource.restartConnector(CONNECTOR_NAME, true);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test(expected = NotFoundException.class)
+    public void testRestartTaskNotFound() throws Throwable {
+        ConnectorTaskId taskId = new ConnectorTaskId(CONNECTOR_NAME, 0);
+        final Capture<Callback<Void>> cb = Capture.newInstance();
+        herder.restartTask(EasyMock.eq(taskId), EasyMock.capture(cb));
+        expectAndCallbackException(cb, new NotFoundException("not found"));
+
+        PowerMock.replayAll();
+
+        connectorsResource.restartTask(CONNECTOR_NAME, 0, FORWARD);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testRestartTaskLeaderRedirect() throws Throwable {
+        ConnectorTaskId taskId = new ConnectorTaskId(CONNECTOR_NAME, 0);
+
+        final Capture<Callback<Void>> cb = Capture.newInstance();
+        herder.restartTask(EasyMock.eq(taskId), EasyMock.capture(cb));
+        expectAndCallbackNotLeaderException(cb);
+
+        
EasyMock.expect(RestServer.httpRequest(EasyMock.eq("http://leader:8083/connectors/";
 + CONNECTOR_NAME + "/tasks/0/restart?forward=true"),
+                EasyMock.eq("POST"), EasyMock.isNull(), 
EasyMock.<TypeReference>anyObject()))
+                .andReturn(new RestServer.HttpResponse<>(202, new 
HashMap<String, List<String>>(), null));
+
+        PowerMock.replayAll();
+
+        connectorsResource.restartTask(CONNECTOR_NAME, 0, null);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testRestartTaskOwnerRedirect() throws Throwable {
+        ConnectorTaskId taskId = new ConnectorTaskId(CONNECTOR_NAME, 0);
+
+        final Capture<Callback<Void>> cb = Capture.newInstance();
+        herder.restartTask(EasyMock.eq(taskId), EasyMock.capture(cb));
+        String ownerUrl = "http://owner:8083";;
+        expectAndCallbackException(cb, new NotAssignedException("not owner 
test", ownerUrl));
+
+        
EasyMock.expect(RestServer.httpRequest(EasyMock.eq("http://owner:8083/connectors/";
 + CONNECTOR_NAME + "/tasks/0/restart?forward=false"),
+                EasyMock.eq("POST"), EasyMock.isNull(), 
EasyMock.<TypeReference>anyObject()))
+                .andReturn(new RestServer.HttpResponse<>(202, new 
HashMap<String, List<String>>(), null));
+
+        PowerMock.replayAll();
+
+        connectorsResource.restartTask(CONNECTOR_NAME, 0, true);
 
         PowerMock.verifyAll();
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/89c67727/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
index 3959ff8..05a64a1 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
@@ -156,6 +156,160 @@ public class StandaloneHerderTest {
     }
 
     @Test
+    public void testRestartConnector() throws Exception {
+        expectAdd(CONNECTOR_NAME, BogusSourceConnector.class, 
BogusSourceTask.class, false);
+
+        worker.stopConnector(CONNECTOR_NAME);
+        EasyMock.expectLastCall();
+
+        worker.startConnector(EasyMock.eq(new 
ConnectorConfig(connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class))),
+                EasyMock.anyObject(HerderConnectorContext.class), 
EasyMock.eq(herder));
+        EasyMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        herder.putConnectorConfig(CONNECTOR_NAME, 
connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class), false, 
createCallback);
+
+        FutureCallback<Void> cb = new FutureCallback<>();
+        herder.restartConnector(CONNECTOR_NAME, cb);
+        cb.get(1000L, TimeUnit.MILLISECONDS);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testRestartConnectorFailureOnStop() throws Exception {
+        expectAdd(CONNECTOR_NAME, BogusSourceConnector.class, 
BogusSourceTask.class, false);
+
+        RuntimeException e = new RuntimeException();
+        worker.stopConnector(CONNECTOR_NAME);
+        EasyMock.expectLastCall().andThrow(e);
+
+        // the connector will not be started after the failure in start
+
+        PowerMock.replayAll();
+
+        herder.putConnectorConfig(CONNECTOR_NAME, 
connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class), false, 
createCallback);
+
+        FutureCallback<Void> cb = new FutureCallback<>();
+        herder.restartConnector(CONNECTOR_NAME, cb);
+        try {
+            cb.get(1000L, TimeUnit.MILLISECONDS);
+            fail();
+        } catch (ExecutionException exception) {
+            assertEquals(e, exception.getCause());
+        }
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testRestartConnectorFailureOnStart() throws Exception {
+        expectAdd(CONNECTOR_NAME, BogusSourceConnector.class, 
BogusSourceTask.class, false);
+
+        worker.stopConnector(CONNECTOR_NAME);
+        EasyMock.expectLastCall();
+
+        RuntimeException e = new RuntimeException();
+        worker.startConnector(EasyMock.eq(new 
ConnectorConfig(connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class))),
+                EasyMock.anyObject(HerderConnectorContext.class), 
EasyMock.eq(herder));
+        EasyMock.expectLastCall().andThrow(e);
+
+        PowerMock.replayAll();
+
+        herder.putConnectorConfig(CONNECTOR_NAME, 
connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class), false, 
createCallback);
+
+        FutureCallback<Void> cb = new FutureCallback<>();
+        herder.restartConnector(CONNECTOR_NAME, cb);
+        try {
+            cb.get(1000L, TimeUnit.MILLISECONDS);
+            fail();
+        } catch (ExecutionException exception) {
+            assertEquals(e, exception.getCause());
+        }
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testRestartTask() throws Exception {
+        ConnectorTaskId taskId = new ConnectorTaskId(CONNECTOR_NAME, 0);
+        expectAdd(CONNECTOR_NAME, BogusSourceConnector.class, 
BogusSourceTask.class, false);
+
+        worker.stopAndAwaitTask(taskId);
+        EasyMock.expectLastCall();
+
+        Map<String, String> generatedTaskProps = 
taskConfig(BogusSourceTask.class, false);
+        worker.startTask(taskId, new TaskConfig(generatedTaskProps), herder);
+        EasyMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        herder.putConnectorConfig(CONNECTOR_NAME, 
connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class), false, 
createCallback);
+
+        FutureCallback<Void> cb = new FutureCallback<>();
+        herder.restartTask(taskId, cb);
+        cb.get(1000L, TimeUnit.MILLISECONDS);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testRestartTaskFailureOnStop() throws Exception {
+        ConnectorTaskId taskId = new ConnectorTaskId(CONNECTOR_NAME, 0);
+        expectAdd(CONNECTOR_NAME, BogusSourceConnector.class, 
BogusSourceTask.class, false);
+
+        RuntimeException e = new RuntimeException();
+        worker.stopAndAwaitTask(taskId);
+        EasyMock.expectLastCall().andThrow(e);
+
+        // task will not be started after the failure in stop
+
+        PowerMock.replayAll();
+
+        herder.putConnectorConfig(CONNECTOR_NAME, 
connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class), false, 
createCallback);
+
+        FutureCallback<Void> cb = new FutureCallback<>();
+        herder.restartTask(taskId, cb);
+        try {
+            cb.get(1000L, TimeUnit.MILLISECONDS);
+            fail("Expected restart callback to raise an exception");
+        } catch (ExecutionException exception) {
+            assertEquals(e, exception.getCause());
+        }
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testRestartTaskFailureOnStart() throws Exception {
+        ConnectorTaskId taskId = new ConnectorTaskId(CONNECTOR_NAME, 0);
+        expectAdd(CONNECTOR_NAME, BogusSourceConnector.class, 
BogusSourceTask.class, false);
+
+        worker.stopAndAwaitTask(taskId);
+        EasyMock.expectLastCall();
+
+        RuntimeException e = new RuntimeException();
+        Map<String, String> generatedTaskProps = 
taskConfig(BogusSourceTask.class, false);
+        worker.startTask(taskId, new TaskConfig(generatedTaskProps), herder);
+        EasyMock.expectLastCall().andThrow(e);
+
+        PowerMock.replayAll();
+
+        herder.putConnectorConfig(CONNECTOR_NAME, 
connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class), false, 
createCallback);
+
+        FutureCallback<Void> cb = new FutureCallback<>();
+        herder.restartTask(taskId, cb);
+        try {
+            cb.get(1000L, TimeUnit.MILLISECONDS);
+            fail("Expected restart callback to raise an exception");
+        } catch (ExecutionException exception) {
+            assertEquals(e, exception.getCause());
+        }
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
     public void testCreateAndStop() throws Exception {
         connector = PowerMock.createMock(BogusSourceConnector.class);
         expectAdd(CONNECTOR_NAME, BogusSourceConnector.class, 
BogusSourceTask.class, false);

Reply via email to