http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
new file mode 100644
index 0000000..85db168
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -0,0 +1,920 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime.distributed;
+
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.connector.ConnectorContext;
+import org.apache.kafka.connect.errors.AlreadyExistsException;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.errors.NotFoundException;
+import org.apache.kafka.connect.runtime.ConnectorConfig;
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.HerderConnectorContext;
+import org.apache.kafka.connect.runtime.TaskConfig;
+import org.apache.kafka.connect.runtime.Worker;
+import org.apache.kafka.connect.runtime.rest.RestServer;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
+import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
+import org.apache.kafka.connect.sink.SinkConnector;
+import org.apache.kafka.connect.storage.KafkaConfigStorage;
+import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * <p>
+ *     Distributed "herder" that coordinates with other workers to spread work 
across multiple processes.
+ * </p>
+ * <p>
+ *     Under the hood, this is implemented as a group managed by Kafka's group 
membership facilities (i.e. the generalized
+ *     group/consumer coordinator). Each instance of DistributedHerder joins 
the group and indicates what it's current
+ *     configuration state is (where it is in the configuration log). The 
group coordinator selects one member to take
+ *     this information and assign each instance a subset of the active 
connectors & tasks to execute. This assignment
+ *     is currently performed in a simple round-robin fashion, but this is not 
guaranteed -- the herder may also choose
+ *     to, e.g., use a sticky assignment to avoid the usual start/stop costs 
associated with connectors and tasks. Once
+ *     an assignment is received, the DistributedHerder simply runs its 
assigned connectors and tasks in a Worker.
+ * </p>
+ * <p>
+ *     In addition to distributing work, the DistributedHerder uses the leader 
determined during the work assignment
+ *     to select a leader for this generation of the group who is responsible 
for other tasks that can only be performed
+ *     by a single node at a time. Most importantly, this includes writing 
updated configurations for connectors and tasks,
+ *     (and therefore, also for creating, destroy, and scaling up/down 
connectors).
+ * </p>
+ */
+public class DistributedHerder implements Herder, Runnable {
+    private static final Logger log = 
LoggerFactory.getLogger(DistributedHerder.class);
+
+    private static final long RECONFIGURE_CONNECTOR_TASKS_BACKOFF_MS = 250;
+
+    private final Worker worker;
+    private final KafkaConfigStorage configStorage;
+    private ClusterConfigState configState;
+    private final Time time;
+
+    private final int workerSyncTimeoutMs;
+    private final int workerUnsyncBackoffMs;
+
+    private final WorkerGroupMember member;
+    private final AtomicBoolean stopping;
+    private final CountDownLatch stopLatch = new CountDownLatch(1);
+
+    // Track enough information about the current membership state to be able 
to determine which requests via the API
+    // and the from other nodes are safe to process
+    private boolean rebalanceResolved;
+    private ConnectProtocol.Assignment assignment;
+
+    // To handle most external requests, like creating or destroying a 
connector, we can use a generic request where
+    // the caller specifies all the code that should be executed.
+    private final Queue<HerderRequest> requests = new PriorityQueue<>();
+    // Config updates can be collected and applied together when possible. 
Also, we need to take care to rebalance when
+    // needed (e.g. task reconfiguration, which requires everyone to 
coordinate offset commits).
+    private Set<String> connectorConfigUpdates = new HashSet<>();
+    private boolean needsReconfigRebalance;
+
+    private final ExecutorService forwardRequestExecutor;
+
+    public DistributedHerder(DistributedConfig config, Worker worker, String 
restUrl) {
+        this(config, worker, null, null, restUrl, new SystemTime());
+    }
+
+    // public for testing
+    public DistributedHerder(DistributedConfig config, Worker worker, 
KafkaConfigStorage configStorage, WorkerGroupMember member, String restUrl, 
Time time) {
+        this.worker = worker;
+        if (configStorage != null) {
+            // For testing. Assume configuration has already been performed
+            this.configStorage = configStorage;
+        } else {
+            this.configStorage = new 
KafkaConfigStorage(worker.getInternalValueConverter(), 
connectorConfigCallback(), taskConfigCallback());
+            this.configStorage.configure(config.originals());
+        }
+        configState = ClusterConfigState.EMPTY;
+        this.time = time;
+
+        this.workerSyncTimeoutMs = 
config.getInt(DistributedConfig.WORKER_SYNC_TIMEOUT_MS_CONFIG);
+        this.workerUnsyncBackoffMs = 
config.getInt(DistributedConfig.WORKER_UNSYNC_BACKOFF_MS_CONFIG);
+
+        this.member = member != null ? member : new WorkerGroupMember(config, 
restUrl, this.configStorage, rebalanceListener());
+        stopping = new AtomicBoolean(false);
+
+        rebalanceResolved = true; // If we still need to follow up after a 
rebalance occurred, starting up tasks
+        needsReconfigRebalance = false;
+
+        forwardRequestExecutor = Executors.newSingleThreadExecutor();
+    }
+
+    @Override
+    public void start() {
+        Thread thread = new Thread(this, "DistributedHerder");
+        thread.start();
+    }
+
+    public void run() {
+        try {
+            log.info("Herder starting");
+
+            configStorage.start();
+
+            log.info("Herder started");
+
+            while (!stopping.get()) {
+                tick();
+            }
+
+            halt();
+
+            log.info("Herder stopped");
+        } catch (Throwable t) {
+            log.error("Uncaught exception in herder work thread, exiting: ", 
t);
+            stopLatch.countDown();
+            System.exit(1);
+        } finally {
+            stopLatch.countDown();
+        }
+    }
+
+    // public for testing
+    public void tick() {
+        // The main loop does two primary things: 1) drive the group 
membership protocol, responding to rebalance events
+        // as they occur, and 2) handle external requests targeted at the 
leader. All the "real" work of the herder is
+        // performed in this thread, which keeps synchronization 
straightforward at the cost of some operations possibly
+        // blocking up this thread (especially those in callbacks due to 
rebalance events).
+
+        try {
+            member.ensureActive();
+            // Ensure we're in a good state in our group. If not restart and 
everything should be setup to rejoin
+            if (!handleRebalanceCompleted()) return;
+        } catch (WakeupException e) {
+            // May be due to a request from another thread, or might be 
stopping. If the latter, we need to check the
+            // flag immediately. If the former, we need to re-run the 
ensureActive call since we can't handle requests
+            // unless we're in the group.
+            return;
+        }
+
+        // Process any external requests
+        final long now = time.milliseconds();
+        long nextRequestTimeoutMs = Long.MAX_VALUE;
+        while (true) {
+            final HerderRequest next;
+            synchronized (this) {
+                next = requests.peek();
+                if (next == null) {
+                    break;
+                } else if (now >= next.at) {
+                    requests.poll();
+                } else {
+                    nextRequestTimeoutMs = next.at - now;
+                    break;
+                }
+            }
+
+            try {
+                next.action().call();
+                next.callback().onCompletion(null, null);
+            } catch (Throwable t) {
+                next.callback().onCompletion(t, null);
+            }
+        }
+
+        // Process any configuration updates
+        Set<String> connectorConfigUpdatesCopy = null;
+        synchronized (this) {
+            if (needsReconfigRebalance || !connectorConfigUpdates.isEmpty()) {
+                // Connector reconfigs only need local updates since there is 
no coordination between workers required.
+                // However, if connectors were added or removed, work needs to 
be rebalanced since we have more work
+                // items to distribute among workers.
+                ClusterConfigState newConfigState = configStorage.snapshot();
+                if 
(!newConfigState.connectors().equals(configState.connectors()))
+                    needsReconfigRebalance = true;
+                configState = newConfigState;
+                if (needsReconfigRebalance) {
+                    // Task reconfigs require a rebalance. Request the 
rebalance, clean out state, and then restart
+                    // this loop, which will then ensure the rebalance occurs 
without any other requests being
+                    // processed until it completes.
+                    member.requestRejoin();
+                    // Any connector config updates will be addressed during 
the rebalance too
+                    connectorConfigUpdates.clear();
+                    needsReconfigRebalance = false;
+                    return;
+                } else if (!connectorConfigUpdates.isEmpty()) {
+                    // We can't start/stop while locked since starting 
connectors can cause task updates that will
+                    // require writing configs, which in turn make callbacks 
into this class from another thread that
+                    // require acquiring a lock. This leads to deadlock. 
Instead, just copy the info we need and process
+                    // the updates after unlocking.
+                    connectorConfigUpdatesCopy = connectorConfigUpdates;
+                    connectorConfigUpdates = new HashSet<>();
+                }
+            }
+        }
+        if (connectorConfigUpdatesCopy != null) {
+            // If we only have connector config updates, we can just bounce 
the updated connectors that are
+            // currently assigned to this worker.
+            Set<String> localConnectors = assignment == null ? 
Collections.<String>emptySet() : new HashSet<>(assignment.connectors());
+            for (String connectorName : connectorConfigUpdatesCopy) {
+                if (!localConnectors.contains(connectorName))
+                    continue;
+                boolean remains = 
configState.connectors().contains(connectorName);
+                log.info("Handling connector-only config update by {} 
connector {}",
+                        remains ? "restarting" : "stopping", connectorName);
+                worker.stopConnector(connectorName);
+                // The update may be a deletion, so verify we actually need to 
restart the connector
+                if (remains)
+                    startConnector(connectorName);
+            }
+        }
+
+        // Let the group take any actions it needs to
+        try {
+            member.poll(nextRequestTimeoutMs);
+            // Ensure we're in a good state in our group. If not restart and 
everything should be setup to rejoin
+            if (!handleRebalanceCompleted()) return;
+        } catch (WakeupException e) { // FIXME should not be WakeupException
+            // Ignore. Just indicates we need to check the exit flag, for 
requested actions, etc.
+        }
+    }
+
+    // public for testing
+    public void halt() {
+        synchronized (this) {
+            // Clean up any connectors and tasks that are still running.
+            log.info("Stopping connectors and tasks that are still assigned to 
this worker.");
+            for (String connName : new HashSet<>(worker.connectorNames())) {
+                try {
+                    worker.stopConnector(connName);
+                } catch (Throwable t) {
+                    log.error("Failed to shut down connector " + connName, t);
+                }
+            }
+            for (ConnectorTaskId taskId : new HashSet<>(worker.taskIds())) {
+                try {
+                    worker.stopTask(taskId);
+                } catch (Throwable t) {
+                    log.error("Failed to shut down task " + taskId, t);
+                }
+            }
+
+            member.stop();
+
+            // Explicitly fail any outstanding requests so they actually get a 
response and get an understandable reason
+            // for their failure
+            while (!requests.isEmpty()) {
+                HerderRequest request = requests.poll();
+                request.callback().onCompletion(new ConnectException("Worker 
is shutting down"), null);
+            }
+
+            if (configStorage != null)
+                configStorage.stop();
+        }
+    }
+
+    @Override
+    public void stop() {
+        log.info("Herder stopping");
+
+        stopping.set(true);
+        member.wakeup();
+        while (stopLatch.getCount() > 0) {
+            try {
+                stopLatch.await();
+            } catch (InterruptedException e) {
+                // ignore, should not happen
+            }
+        }
+
+
+        forwardRequestExecutor.shutdown();
+        try {
+            if (!forwardRequestExecutor.awaitTermination(10000, 
TimeUnit.MILLISECONDS))
+                forwardRequestExecutor.shutdownNow();
+        } catch (InterruptedException e) {
+            // ignore
+        }
+
+        log.info("Herder stopped");
+    }
+
+    @Override
+    public synchronized void connectors(final Callback<Collection<String>> 
callback) {
+        log.trace("Submitting connector listing request");
+
+        addRequest(
+                new Callable<Void>() {
+                    @Override
+                    public Void call() throws Exception {
+                        if (!checkConfigSynced(callback))
+                            return null;
+
+                        callback.onCompletion(null, configState.connectors());
+                        return null;
+                    }
+                },
+                forwardErrorCallback(callback)
+        );
+    }
+
+    @Override
+    public synchronized void connectorInfo(final String connName, final 
Callback<ConnectorInfo> callback) {
+        log.trace("Submitting connector info request {}", connName);
+
+        addRequest(
+                new Callable<Void>() {
+                    @Override
+                    public Void call() throws Exception {
+                        if (!checkConfigSynced(callback))
+                            return null;
+
+                        if (!configState.connectors().contains(connName)) {
+                            callback.onCompletion(new 
NotFoundException("Connector " + connName + " not found"), null);
+                        } else {
+                            callback.onCompletion(null, new 
ConnectorInfo(connName, configState.connectorConfig(connName), 
configState.tasks(connName)));
+                        }
+                        return null;
+                    }
+                },
+                forwardErrorCallback(callback)
+        );
+    }
+
+    @Override
+    public void connectorConfig(String connName, final Callback<Map<String, 
String>> callback) {
+        // Subset of connectorInfo, so piggy back on that implementation
+        log.trace("Submitting connector config read request {}", connName);
+        connectorInfo(connName, new Callback<ConnectorInfo>() {
+            @Override
+            public void onCompletion(Throwable error, ConnectorInfo result) {
+                if (error != null)
+                    callback.onCompletion(error, null);
+                else
+                    callback.onCompletion(null, result.config());
+            }
+        });
+    }
+
+    @Override
+    public void putConnectorConfig(final String connName, Map<String, String> 
config, final boolean allowReplace,
+                                   final Callback<Created<ConnectorInfo>> 
callback) {
+        final Map<String, String> connConfig;
+        if (config == null) {
+            connConfig = null;
+        } else if (!config.containsKey(ConnectorConfig.NAME_CONFIG)) {
+            connConfig = new HashMap<>(config);
+            connConfig.put(ConnectorConfig.NAME_CONFIG, connName);
+        } else {
+            connConfig = config;
+        }
+
+        log.trace("Submitting connector config write request {}", connName);
+
+        addRequest(
+                new Callable<Void>() {
+                    @Override
+                    public Void call() throws Exception {
+                        log.trace("Handling connector config request {}", 
connName);
+                        if (!isLeader()) {
+                            callback.onCompletion(new NotLeaderException("Only 
the leader can set connector configs.", leaderUrl()), null);
+                            return null;
+                        }
+
+                        boolean exists = 
configState.connectors().contains(connName);
+                        if (!allowReplace && exists) {
+                            callback.onCompletion(new 
AlreadyExistsException("Connector " + connName + " already exists"), null);
+                            return null;
+                        }
+
+                        if (connConfig == null && !exists) {
+                            callback.onCompletion(new 
NotFoundException("Connector " + connName + " not found"), null);
+                            return null;
+                        }
+
+                        log.trace("Submitting connector config {} {} {}", 
connName, allowReplace, configState.connectors());
+                        configStorage.putConnectorConfig(connName, connConfig);
+
+                        boolean created = !exists && connConfig != null;
+                        // Note that we use the updated connector config 
despite the fact that we don't have an updated
+                        // snapshot yet. The existing task info should still 
be accurate.
+                        ConnectorInfo info = connConfig == null ? null :
+                                new ConnectorInfo(connName, connConfig, 
configState.tasks(connName));
+                        callback.onCompletion(null, new Created<>(created, 
info));
+
+                        return null;
+                    }
+                },
+                forwardErrorCallback(callback)
+        );
+    }
+
+    @Override
+    public synchronized void requestTaskReconfiguration(final String connName) 
{
+        log.trace("Submitting connector task reconfiguration request {}", 
connName);
+
+        addRequest(
+                new Callable<Void>() {
+                    @Override
+                    public Void call() throws Exception {
+                        reconfigureConnectorTasksWithRetry(connName);
+                        return null;
+                    }
+                },
+                new Callback<Void>() {
+                    @Override
+                    public void onCompletion(Throwable error, Void result) {
+                        log.error("Unexpected error during task 
reconfiguration: ", error);
+                        log.error("Task reconfiguration for {} failed 
unexpectedly, this connector will not be properly reconfigured unless manually 
triggered.", connName);
+                    }
+                }
+        );
+    }
+
+    @Override
+    public synchronized void taskConfigs(final String connName, final 
Callback<List<TaskInfo>> callback) {
+        log.trace("Submitting get task configuration request {}", connName);
+
+        addRequest(
+                new Callable<Void>() {
+                    @Override
+                    public Void call() throws Exception {
+                        if (!checkConfigSynced(callback))
+                            return null;
+
+                        if (!configState.connectors().contains(connName)) {
+                            callback.onCompletion(new 
NotFoundException("Connector " + connName + " not found"), null);
+                        } else {
+                            List<TaskInfo> result = new ArrayList<>();
+                            for (int i = 0; i < 
configState.taskCount(connName); i++) {
+                                ConnectorTaskId id = new 
ConnectorTaskId(connName, i);
+                                result.add(new TaskInfo(id, 
configState.taskConfig(id)));
+                            }
+                            callback.onCompletion(null, result);
+                        }
+                        return null;
+                    }
+                },
+                forwardErrorCallback(callback)
+        );
+    }
+
+    @Override
+    public synchronized void putTaskConfigs(final String connName, final 
List<Map<String, String>> configs, final Callback<Void> callback) {
+        log.trace("Submitting put task configuration request {}", connName);
+
+        addRequest(
+                new Callable<Void>() {
+                    @Override
+                    public Void call() throws Exception {
+                        if (!isLeader())
+                            callback.onCompletion(new NotLeaderException("Only 
the leader may write task configurations.", leaderUrl()), null);
+                        else if (!configState.connectors().contains(connName))
+                            callback.onCompletion(new 
NotFoundException("Connector " + connName + " not found"), null);
+                        else {
+                            
configStorage.putTaskConfigs(taskConfigListAsMap(connName, configs));
+                            callback.onCompletion(null, null);
+                        }
+                        return null;
+                    }
+                },
+                forwardErrorCallback(callback)
+        );
+    }
+
+
+    // Should only be called from work thread, so synchronization should not 
be needed
+    private boolean isLeader() {
+        return assignment != null && 
member.memberId().equals(assignment.leader());
+    }
+
+    /**
+     * Get the URL for the leader's REST interface, or null if we do not have 
the leader's URL yet.
+     */
+    private String leaderUrl() {
+        if (assignment == null)
+            return null;
+        return assignment.leaderUrl();
+    }
+
+    /**
+     * Handle post-assignment operations, either trying to resolve issues that 
kept assignment from completing, getting
+     * this node into sync and its work started. Since
+     *
+     * @return false if we couldn't finish
+     */
+    private boolean handleRebalanceCompleted() {
+        if (this.rebalanceResolved)
+            return true;
+
+        // We need to handle a variety of cases after a rebalance:
+        // 1. Assignment failed
+        //  1a. We are the leader for the round. We will be leader again if we 
rejoin now, so we need to catch up before
+        //      even attempting to. If we can't we should drop out of the 
group because we will block everyone from making
+        //      progress. We can backoff and try rejoining later.
+        //  1b. We are not the leader. We might need to catch up. If we're 
already caught up we can rejoin immediately,
+        //      otherwise, we just want to wait indefinitely to catch up and 
rejoin whenver we're finally ready.
+        // 2. Assignment succeeded.
+        //  2a. We are caught up on configs. Awesome! We can proceed to run 
our assigned work.
+        //  2b. We need to try to catch up. We can do this potentially 
indefinitely because if it takes to long, we'll
+        //      be kicked out of the group anyway due to lack of heartbeats.
+
+        boolean needsReadToEnd = false;
+        long syncConfigsTimeoutMs = Long.MAX_VALUE;
+        boolean needsRejoin = false;
+        if (assignment.failed()) {
+            needsRejoin = true;
+            if (isLeader()) {
+                log.warn("Join group completed, but assignment failed and we 
are the leader. Reading to end of config and retrying.");
+                needsReadToEnd = true;
+                syncConfigsTimeoutMs = workerSyncTimeoutMs;
+            } else if (configState.offset() < assignment.offset()) {
+                log.warn("Join group completed, but assignment failed and we 
lagging. Reading to end of config and retrying.");
+                needsReadToEnd = true;
+            } else {
+                log.warn("Join group completed, but assignment failed. We were 
up to date, so just retrying.");
+            }
+        } else {
+            if (configState.offset() < assignment.offset()) {
+                log.warn("Catching up to assignment's config offset.");
+                needsReadToEnd = true;
+            }
+        }
+
+        if (needsReadToEnd) {
+            // Force exiting this method to avoid creating any 
connectors/tasks and require immediate rejoining if
+            // we timed out. This should only happen if we were the leader and 
didn't finish quickly enough, in which
+            // case we've waited a long time and should have already left the 
group OR the timeout should have been
+            // very long and not having finished also indicates we've waited 
longer than the session timeout.
+            if (!readConfigToEnd(syncConfigsTimeoutMs))
+                needsRejoin = true;
+        }
+
+        if (needsRejoin) {
+            member.requestRejoin();
+            return false;
+        }
+
+        // Should still validate that they match since we may have gone *past* 
the required offset, in which case we
+        // should *not* start any tasks and rejoin
+        if (configState.offset() != assignment.offset()) {
+            log.info("Current config state offset {} does not match group 
assignment {}. Forcing rebalance.", configState.offset(), assignment.offset());
+            member.requestRejoin();
+            return false;
+        }
+
+        startWork();
+
+        // We only mark this as resolved once we've actually started work, 
which allows us to correctly track whether
+        // what work is currently active and running. If we bail early, the 
main tick loop + having requested rejoin
+        // guarantees we'll attempt to rejoin before executing this method 
again.
+        rebalanceResolved = true;
+        return true;
+    }
+
+    /**
+     * Try to read to the end of the config log within the given timeout
+     * @param timeoutMs maximum time to wait to sync to the end of the log
+     * @return true if successful, false if timed out
+     */
+    private boolean readConfigToEnd(long timeoutMs) {
+        log.info("Current config state offset {} is behind group assignment 
{}, reading to end of config log", configState.offset(), assignment.offset());
+        try {
+            configStorage.readToEnd().get(timeoutMs, TimeUnit.MILLISECONDS);
+            configState = configStorage.snapshot();
+            log.info("Finished reading to end of log and updated config 
snapshot, new config log offset: {}", configState.offset());
+            return true;
+        } catch (TimeoutException e) {
+            log.warn("Didn't reach end of config log quickly enough", e);
+            // TODO: With explicit leave group support, it would be good to 
explicitly leave the group *before* this
+            // backoff since it'll be longer than the session timeout
+            if (isLeader())
+                backoff(workerUnsyncBackoffMs);
+            return false;
+        } catch (InterruptedException | ExecutionException e) {
+            throw new ConnectException("Error trying to catch up after 
assignment", e);
+        }
+    }
+
+    private void backoff(long ms) {
+        Utils.sleep(ms);
+    }
+
+    private void startWork() {
+        // Start assigned connectors and tasks
+        log.info("Starting connectors and tasks using config offset {}", 
assignment.offset());
+        for (String connectorName : assignment.connectors()) {
+            try {
+                startConnector(connectorName);
+            } catch (ConfigException e) {
+                log.error("Couldn't instantiate connector " + connectorName + 
" because it has an invalid connector " +
+                        "configuration. This connector will not execute until 
reconfigured.", e);
+            }
+        }
+        for (ConnectorTaskId taskId : assignment.tasks()) {
+            try {
+                log.info("Starting task {}", taskId);
+                Map<String, String> configs = configState.taskConfig(taskId);
+                TaskConfig taskConfig = new TaskConfig(configs);
+                worker.addTask(taskId, taskConfig);
+            } catch (ConfigException e) {
+                log.error("Couldn't instantiate task " + taskId + " because it 
has an invalid task " +
+                        "configuration. This task will not execute until 
reconfigured.", e);
+            }
+        }
+        log.info("Finished starting connectors and tasks");
+    }
+
+    // Helper for starting a connector with the given name, which will extract 
& parse the config, generate connector
+    // context and add to the worker. This needs to be called from within the 
main worker thread for this herder.
+    private void startConnector(String connectorName) {
+        log.info("Starting connector {}", connectorName);
+        Map<String, String> configs = 
configState.connectorConfig(connectorName);
+        ConnectorConfig connConfig = new ConnectorConfig(configs);
+        String connName = connConfig.getString(ConnectorConfig.NAME_CONFIG);
+        ConnectorContext ctx = new 
HerderConnectorContext(DistributedHerder.this, connName);
+        worker.addConnector(connConfig, ctx);
+
+        // Immediately request configuration since this could be a brand new 
connector. However, also only update those
+        // task configs if they are actually different from the existing ones 
to avoid unnecessary updates when this is
+        // just restoring an existing connector.
+        reconfigureConnectorTasksWithRetry(connName);
+    }
+
+    private void reconfigureConnectorTasksWithRetry(final String connName) {
+        reconfigureConnector(connName, new Callback<Void>() {
+            @Override
+            public void onCompletion(Throwable error, Void result) {
+                // If we encountered an error, we don't have much choice but 
to just retry. If we don't, we could get
+                // stuck with a connector that thinks it has generated tasks, 
but wasn't actually successful and therefore
+                // never makes progress. The retry has to run through a 
HerderRequest since this callback could be happening
+                // from the HTTP request forwarding thread.
+                if (error != null) {
+                    log.error("Failed to reconfigure connector's tasks, 
retrying after backoff:", error);
+                    addRequest(RECONFIGURE_CONNECTOR_TASKS_BACKOFF_MS,
+                            new Callable<Void>() {
+                                @Override
+                                public Void call() throws Exception {
+                                    
reconfigureConnectorTasksWithRetry(connName);
+                                    return null;
+                                }
+                            }, new Callback<Void>() {
+                                @Override
+                                public void onCompletion(Throwable error, Void 
result) {
+                                    log.error("Unexpected error during 
connector task reconfiguration: ", error);
+                                    log.error("Task reconfiguration for {} 
failed unexpectedly, this connector will not be properly reconfigured unless 
manually triggered.", connName);
+                                }
+                            }
+                    );
+                }
+            }
+        });
+    }
+
+    // Updates configurations for a connector by requesting them from the 
connector, filling in parameters provided
+    // by the system, then checks whether any configs have actually changed 
before submitting the new configs to storage
+    private void reconfigureConnector(final String connName, final 
Callback<Void> cb) {
+        try {
+            Map<String, String> configs = 
configState.connectorConfig(connName);
+            ConnectorConfig connConfig = new ConnectorConfig(configs);
+
+            List<String> sinkTopics = null;
+            if 
(SinkConnector.class.isAssignableFrom(connConfig.getClass(ConnectorConfig.CONNECTOR_CLASS_CONFIG)))
+                sinkTopics = connConfig.getList(ConnectorConfig.TOPICS_CONFIG);
+
+            final List<Map<String, String>> taskProps
+                    = worker.connectorTaskConfigs(connName, 
connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG), sinkTopics);
+            boolean changed = false;
+            int currentNumTasks = configState.taskCount(connName);
+            if (taskProps.size() != currentNumTasks) {
+                log.debug("Change in connector task count from {} to {}, 
writing updated task configurations", currentNumTasks, taskProps.size());
+                changed = true;
+            } else {
+                int index = 0;
+                for (Map<String, String> taskConfig : taskProps) {
+                    if (!taskConfig.equals(configState.taskConfig(new 
ConnectorTaskId(connName, index)))) {
+                        log.debug("Change in task configurations, writing 
updated task configurations");
+                        changed = true;
+                        break;
+                    }
+                    index++;
+                }
+            }
+            if (changed) {
+                if (isLeader()) {
+                    configStorage.putTaskConfigs(taskConfigListAsMap(connName, 
taskProps));
+                    cb.onCompletion(null, null);
+                } else {
+                    // We cannot forward the request on the same thread 
because this reconfiguration can happen in as a
+                    // result of . If we blocked
+                    forwardRequestExecutor.submit(new Runnable() {
+                        @Override
+                        public void run() {
+                            try {
+                                String reconfigUrl = 
RestServer.urlJoin(leaderUrl(), "/connectors/" + connName + "/tasks");
+                                RestServer.httpRequest(reconfigUrl, "POST", 
taskProps, null);
+                                cb.onCompletion(null, null);
+                            } catch (ConnectException e) {
+                                log.error("Request to leader to reconfigure 
connector tasks failed", e);
+                                cb.onCompletion(e, null);
+                            }
+                        }
+                    });
+                }
+            }
+        } catch (Throwable t) {
+            cb.onCompletion(t, null);
+        }
+    }
+
+    // Common handling for requests that get config data. Checks if we are in 
sync with the current config, which allows
+    // us to answer requests directly. If we are not, handles invoking the 
callback with the appropriate error.
+    private boolean checkConfigSynced(Callback<?> callback) {
+        if (assignment == null || configState.offset() != assignment.offset()) 
{
+            if (!isLeader())
+                callback.onCompletion(new NotLeaderException("Cannot get 
config data because config is not in sync and this is not the leader", 
leaderUrl()), null);
+            else
+                callback.onCompletion(new ConnectException("Cannot get config 
data because this is the leader node, but it does not have the most up to date 
configs"), null);
+            return false;
+        }
+        return true;
+    }
+
+    private void addRequest(Callable<Void> action, Callback<Void> callback) {
+        addRequest(0, action, callback);
+    }
+
+    private void addRequest(long delayMs, Callable<Void> action, 
Callback<Void> callback) {
+        HerderRequest req = new HerderRequest(time.milliseconds() + delayMs, 
action, callback);
+        requests.add(req);
+        if (requests.peek() == req)
+            member.wakeup();
+    }
+
+    private class HerderRequest implements Comparable<HerderRequest> {
+        private final long at;
+        private final Callable<Void> action;
+        private final Callback<Void> callback;
+
+        public HerderRequest(long at, Callable<Void> action, Callback<Void> 
callback) {
+            this.at = at;
+            this.action = action;
+            this.callback = callback;
+        }
+
+        public Callable<Void> action() {
+            return action;
+        }
+
+        public Callback<Void> callback() {
+            return callback;
+        }
+
+        @Override
+        public int compareTo(HerderRequest o) {
+            return Long.compare(at, o.at);
+        }
+    }
+
+    private static final Callback<Void> forwardErrorCallback(final Callback<?> 
callback) {
+        return new Callback<Void>() {
+            @Override
+            public void onCompletion(Throwable error, Void result) {
+                if (error != null)
+                    callback.onCompletion(error, null);
+            }
+        };
+    };
+
+
+    // Config callbacks are triggered from the KafkaConfigStorage thread
+    private Callback<String> connectorConfigCallback() {
+        return new Callback<String>() {
+            @Override
+            public void onCompletion(Throwable error, String connector) {
+                log.info("Connector {} config updated", connector);
+                // Stage the update and wake up the work thread. Connector 
config *changes* only need the one connector
+                // to be bounced. However, this callback may also indicate a 
connector *addition*, which does require
+                // a rebalance, so we need to be careful about what operation 
we request.
+                synchronized (DistributedHerder.this) {
+                    connectorConfigUpdates.add(connector);
+                }
+                member.wakeup();
+            }
+        };
+    }
+
+    private Callback<List<ConnectorTaskId>> taskConfigCallback() {
+        return new Callback<List<ConnectorTaskId>>() {
+            @Override
+            public void onCompletion(Throwable error, List<ConnectorTaskId> 
tasks) {
+                log.info("Tasks {} configs updated", tasks);
+                // Stage the update and wake up the work thread. No need to 
record the set of tasks here because task reconfigs
+                // always need a rebalance to ensure offsets get committed.
+                // TODO: As an optimization, some task config updates could 
avoid a rebalance. In particular, single-task
+                // connectors clearly don't need any coordination.
+                synchronized (DistributedHerder.this) {
+                    needsReconfigRebalance = true;
+                }
+                member.wakeup();
+            }
+        };
+    }
+
+    // Rebalances are triggered internally from the group member, so these are 
always executed in the work thread.
+    private WorkerRebalanceListener rebalanceListener() {
+        return new WorkerRebalanceListener() {
+            @Override
+            public void onAssigned(ConnectProtocol.Assignment assignment) {
+                // This callback just logs the info and saves it. The actual 
response is handled in the main loop, which
+                // ensures the group member's logic for rebalancing can 
complete, potentially long-running steps to
+                // catch up (or backoff if we fail) not executed in a 
callback, and so we'll be able to invoke other
+                // group membership actions (e.g., we may need to explicitly 
leave the group if we cannot handle the
+                // assigned tasks).
+                log.info("Joined group and got assignment: {}", assignment);
+                synchronized (DistributedHerder.this) {
+                    DistributedHerder.this.assignment = assignment;
+                    rebalanceResolved = false;
+                }
+                // We *must* interrupt any poll() call since this could occur 
when the poll starts, and we might then
+                // sleep in the poll() for a long time. Forcing a wakeup 
ensures we'll get to process this event in the
+                // main thread.
+                member.wakeup();
+            }
+
+            @Override
+            public void onRevoked(String leader, Collection<String> 
connectors, Collection<ConnectorTaskId> tasks) {
+                log.info("Rebalance started");
+
+                // Note that since we don't reset the assignment, we we don't 
revoke leadership here. During a rebalance,
+                // it is still important to have a leader that can write 
configs, offsets, etc.
+
+                if (rebalanceResolved) {
+                    // TODO: Parallelize this. We should be able to request 
all connectors and tasks to stop, then wait on all of
+                    // them to finish
+                    // TODO: Technically we don't have to stop connectors at 
all until we know they've really been removed from
+                    // this worker. Instead, we can let them continue to run 
but buffer any update requests (which should be
+                    // rare anyway). This would avoid a steady stream of 
start/stop, which probably also includes lots of
+                    // unnecessary repeated connections to the source/sink 
system.
+                    for (String connectorName : connectors)
+                        worker.stopConnector(connectorName);
+                    // TODO: We need to at least commit task offsets, but if 
we could commit offsets & pause them instead of
+                    // stopping them then state could continue to be reused 
when the task remains on this worker. For example,
+                    // this would avoid having to close a connection and then 
reopen it when the task is assigned back to this
+                    // worker again.
+                    for (ConnectorTaskId taskId : tasks)
+                        worker.stopTask(taskId);
+
+                    log.info("Finished stopping tasks in preparation for 
rebalance");
+                } else {
+                    log.info("Wasn't unable to resume work after last 
rebalance, can skip stopping connectors and tasks");
+                }
+
+            }
+        };
+    }
+
+
+    private static Map<ConnectorTaskId, Map<String, String>> 
taskConfigListAsMap(String connName, List<Map<String, String>> configs) {
+        int index = 0;
+        Map<ConnectorTaskId, Map<String, String>> result = new HashMap<>();
+        for (Map<String, String> taskConfigMap : configs) {
+            ConnectorTaskId taskId = new ConnectorTaskId(connName, index);
+            result.put(taskId, taskConfigMap);
+            index++;
+        }
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/NotLeaderException.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/NotLeaderException.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/NotLeaderException.java
new file mode 100644
index 0000000..5f94b53
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/NotLeaderException.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime.distributed;
+
+import org.apache.kafka.connect.errors.ConnectException;
+
+/**
+ * Indicates an operation was not permitted because it can only be performed 
on the leader and this worker is not currently
+ * the leader.
+ */
+public class NotLeaderException extends ConnectException {
+    private final String leaderUrl;
+
+    public NotLeaderException(String msg, String leaderUrl) {
+        super(msg);
+        this.leaderUrl = leaderUrl;
+    }
+
+    public NotLeaderException(String msg, String leaderUrl, Throwable 
throwable) {
+        super(msg, throwable);
+        this.leaderUrl = leaderUrl;
+    }
+
+    public NotLeaderException(String leaderUrl, Throwable throwable) {
+        super(throwable);
+        this.leaderUrl = leaderUrl;
+    }
+
+    public String leaderUrl() {
+        return leaderUrl;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
new file mode 100644
index 0000000..082e235
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
@@ -0,0 +1,294 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.connect.runtime.distributed;
+
+import org.apache.kafka.clients.consumer.internals.AbstractCoordinator;
+import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.CircularIterator;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.storage.KafkaConfigStorage;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This class manages the coordination process with the Kafka group 
coordinator on the broker for managing assignments
+ * to workers.
+ */
+public final class WorkerCoordinator extends AbstractCoordinator implements 
Closeable {
+    private static final Logger log = 
LoggerFactory.getLogger(WorkerCoordinator.class);
+
+    // Currently doesn't support multiple task assignment strategies, so we 
just fill in a default value
+    public static final String DEFAULT_SUBPROTOCOL = "default";
+
+    private final String restUrl;
+    private final KafkaConfigStorage configStorage;
+    private ConnectProtocol.Assignment assignmentSnapshot;
+    private final WorkerCoordinatorMetrics sensors;
+    private ClusterConfigState configSnapshot;
+    private final WorkerRebalanceListener listener;
+
+    private boolean rejoinRequested;
+
+    /**
+     * Initialize the coordination manager.
+     */
+    public WorkerCoordinator(ConsumerNetworkClient client,
+                             String groupId,
+                             int sessionTimeoutMs,
+                             int heartbeatIntervalMs,
+                             Metrics metrics,
+                             String metricGrpPrefix,
+                             Map<String, String> metricTags,
+                             Time time,
+                             long requestTimeoutMs,
+                             long retryBackoffMs,
+                             String restUrl,
+                             KafkaConfigStorage configStorage,
+                             WorkerRebalanceListener listener) {
+        super(client,
+                groupId,
+                sessionTimeoutMs,
+                heartbeatIntervalMs,
+                metrics,
+                metricGrpPrefix,
+                metricTags,
+                time,
+                retryBackoffMs);
+        this.restUrl = restUrl;
+        this.configStorage = configStorage;
+        this.assignmentSnapshot = null;
+        this.sensors = new WorkerCoordinatorMetrics(metrics, metricGrpPrefix, 
metricTags);
+        this.listener = listener;
+        this.rejoinRequested = false;
+    }
+
+    public void requestRejoin() {
+        rejoinRequested = true;
+    }
+
+    @Override
+    public String protocolType() {
+        return "connect";
+    }
+
+    @Override
+    public LinkedHashMap<String, ByteBuffer> metadata() {
+        LinkedHashMap<String, ByteBuffer> metadata = new LinkedHashMap<>();
+        configSnapshot = configStorage.snapshot();
+        ConnectProtocol.WorkerState workerState = new 
ConnectProtocol.WorkerState(restUrl, configSnapshot.offset());
+        metadata.put(DEFAULT_SUBPROTOCOL, 
ConnectProtocol.serializeMetadata(workerState));
+        return metadata;
+    }
+
+    @Override
+    protected void onJoinComplete(int generation, String memberId, String 
protocol, ByteBuffer memberAssignment) {
+        assignmentSnapshot = 
ConnectProtocol.deserializeAssignment(memberAssignment);
+        // At this point we always consider ourselves to be a member of the 
cluster, even if there was an assignment
+        // error (the leader couldn't make the assignment) or we are behind 
the config and cannot yet work on our assigned
+        // tasks. It's the responsibility of the code driving this process to 
decide how to react (e.g. trying to get
+        // up to date, try to rejoin again, leaving the group and backing off, 
etc.).
+        rejoinRequested = false;
+        listener.onAssigned(assignmentSnapshot);
+    }
+
+    @Override
+    protected Map<String, ByteBuffer> performAssignment(String leaderId, 
String protocol, Map<String, ByteBuffer> allMemberMetadata) {
+        log.debug("Performing task assignment");
+
+        Map<String, ConnectProtocol.WorkerState> allConfigs = new HashMap<>();
+        for (Map.Entry<String, ByteBuffer> entry : 
allMemberMetadata.entrySet())
+            allConfigs.put(entry.getKey(), 
ConnectProtocol.deserializeMetadata(entry.getValue()));
+
+        long maxOffset = findMaxMemberConfigOffset(allConfigs);
+        Long leaderOffset = ensureLeaderConfig(maxOffset);
+        if (leaderOffset == null)
+            return fillAssignmentsAndSerialize(allConfigs.keySet(), 
ConnectProtocol.Assignment.CONFIG_MISMATCH,
+                    leaderId, allConfigs.get(leaderId).url(), maxOffset,
+                    new HashMap<String, List<String>>(), new HashMap<String, 
List<ConnectorTaskId>>());
+        return performTaskAssignment(leaderId, leaderOffset, allConfigs);
+    }
+
+    private long findMaxMemberConfigOffset(Map<String, 
ConnectProtocol.WorkerState> allConfigs) {
+        // The new config offset is the maximum seen by any member. We always 
perform assignment using this offset,
+        // even if some members have fallen behind. The config offset used to 
generate the assignment is included in
+        // the response so members that have fallen behind will not use the 
assignment until they have caught up.
+        Long maxOffset = null;
+        for (Map.Entry<String, ConnectProtocol.WorkerState> stateEntry : 
allConfigs.entrySet()) {
+            long memberRootOffset = stateEntry.getValue().offset();
+            if (maxOffset == null)
+                maxOffset = memberRootOffset;
+            else
+                maxOffset = Math.max(maxOffset, memberRootOffset);
+        }
+
+        log.debug("Max config offset root: {}, local snapshot config offsets 
root: {}",
+                maxOffset, configSnapshot.offset());
+        return maxOffset;
+    }
+
+    private Long ensureLeaderConfig(long maxOffset) {
+        // If this leader is behind some other members, we can't do assignment
+        if (configSnapshot.offset() < maxOffset) {
+            // We might be able to take a new snapshot to catch up immediately 
and avoid another round of syncing here.
+            // Alternatively, if this node has already passed the maximum 
reported by any other member of the group, it
+            // is also safe to use this newer state.
+            ClusterConfigState updatedSnapshot = configStorage.snapshot();
+            if (updatedSnapshot.offset() < maxOffset) {
+                log.info("Was selected to perform assignments, but do not have 
latest config found in sync request. " +
+                        "Returning an empty configuration to trigger 
re-sync.");
+                return null;
+            } else {
+                configSnapshot = updatedSnapshot;
+                return configSnapshot.offset();
+            }
+        }
+
+        return maxOffset;
+    }
+
+    private Map<String, ByteBuffer> performTaskAssignment(String leaderId, 
long maxOffset, Map<String, ConnectProtocol.WorkerState> allConfigs) {
+        Map<String, List<String>> connectorAssignments = new HashMap<>();
+        Map<String, List<ConnectorTaskId>> taskAssignments = new HashMap<>();
+
+        // Perform round-robin task assignment
+        CircularIterator<String> memberIt = new 
CircularIterator<>(sorted(allConfigs.keySet()));
+        for (String connectorId : sorted(configSnapshot.connectors())) {
+            String connectorAssignedTo = memberIt.next();
+            log.trace("Assigning connector {} to {}", connectorId, 
connectorAssignedTo);
+            List<String> memberConnectors = 
connectorAssignments.get(connectorAssignedTo);
+            if (memberConnectors == null) {
+                memberConnectors = new ArrayList<>();
+                connectorAssignments.put(connectorAssignedTo, 
memberConnectors);
+            }
+            memberConnectors.add(connectorId);
+
+            for (ConnectorTaskId taskId : 
sorted(configSnapshot.tasks(connectorId))) {
+                String taskAssignedTo = memberIt.next();
+                log.trace("Assigning task {} to {}", taskId, taskAssignedTo);
+                List<ConnectorTaskId> memberTasks = 
taskAssignments.get(taskAssignedTo);
+                if (memberTasks == null) {
+                    memberTasks = new ArrayList<>();
+                    taskAssignments.put(taskAssignedTo, memberTasks);
+                }
+                memberTasks.add(taskId);
+            }
+        }
+
+        return fillAssignmentsAndSerialize(allConfigs.keySet(), 
ConnectProtocol.Assignment.NO_ERROR,
+                leaderId, allConfigs.get(leaderId).url(), maxOffset, 
connectorAssignments, taskAssignments);
+    }
+
+    private Map<String, ByteBuffer> 
fillAssignmentsAndSerialize(Collection<String> members,
+                                                                short error,
+                                                                String 
leaderId,
+                                                                String 
leaderUrl,
+                                                                long maxOffset,
+                                                                Map<String, 
List<String>> connectorAssignments,
+                                                                Map<String, 
List<ConnectorTaskId>> taskAssignments) {
+
+        Map<String, ByteBuffer> groupAssignment = new HashMap<>();
+        for (String member : members) {
+            List<String> connectors = connectorAssignments.get(member);
+            if (connectors == null)
+                connectors = Collections.emptyList();
+            List<ConnectorTaskId> tasks = taskAssignments.get(member);
+            if (tasks == null)
+                tasks = Collections.emptyList();
+            ConnectProtocol.Assignment assignment = new 
ConnectProtocol.Assignment(error, leaderId, leaderUrl, maxOffset, connectors, 
tasks);
+            log.debug("Assignment: {} -> {}", member, assignment);
+            groupAssignment.put(member, 
ConnectProtocol.serializeAssignment(assignment));
+        }
+        log.debug("Finished assignment");
+        return groupAssignment;
+    }
+
+    @Override
+    protected void onJoinPrepare(int generation, String memberId) {
+        log.debug("Revoking previous assignment {}", assignmentSnapshot);
+        if (assignmentSnapshot != null && !assignmentSnapshot.failed())
+            listener.onRevoked(assignmentSnapshot.leader(), 
assignmentSnapshot.connectors(), assignmentSnapshot.tasks());
+    }
+
+    @Override
+    public boolean needRejoin() {
+        return super.needRejoin() || (assignmentSnapshot == null || 
assignmentSnapshot.failed()) || rejoinRequested;
+    }
+
+    public String memberId() {
+        return this.memberId;
+    }
+
+    @Override
+    public void close() {
+        super.close();
+    }
+
+    private class WorkerCoordinatorMetrics {
+        public final Metrics metrics;
+        public final String metricGrpName;
+
+        public WorkerCoordinatorMetrics(Metrics metrics, String 
metricGrpPrefix, Map<String, String> tags) {
+            this.metrics = metrics;
+            this.metricGrpName = metricGrpPrefix + "-coordinator-metrics";
+
+            Measurable numConnectors = new Measurable() {
+                public double measure(MetricConfig config, long now) {
+                    return assignmentSnapshot.connectors().size();
+                }
+            };
+
+            Measurable numTasks = new Measurable() {
+                public double measure(MetricConfig config, long now) {
+                    return assignmentSnapshot.tasks().size();
+                }
+            };
+
+            metrics.addMetric(new MetricName("assigned-connectors",
+                            this.metricGrpName,
+                            "The number of connector instances currently 
assigned to this consumer",
+                            tags),
+                    numConnectors);
+            metrics.addMetric(new MetricName("assigned-tasks",
+                            this.metricGrpName,
+                            "The number of tasks currently assigned to this 
consumer",
+                            tags),
+                    numTasks);
+        }
+    }
+
+    private static <T extends Comparable<T>> List<T> sorted(Collection<T> 
members) {
+        List<T> res = new ArrayList<>(members);
+        Collections.sort(res);
+        return res;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
new file mode 100644
index 0000000..c72e3ef
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime.distributed;
+
+import org.apache.kafka.clients.ClientUtils;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.metrics.JmxReporter;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.network.ChannelBuilder;
+import org.apache.kafka.common.network.Selector;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.storage.KafkaConfigStorage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * This class manages the coordination process with brokers for the Connect 
cluster group membership. It ties together
+ * the Coordinator, which implements the group member protocol, with all the 
other pieces needed to drive the connection
+ * to the group coordinator broker. This isolates all the networking to a 
single thread managed by this class, with
+ * higher level operations in response to group membership events being 
handled by the herder.
+ */
+public class WorkerGroupMember {
+    private static final Logger log = 
LoggerFactory.getLogger(WorkerGroupMember.class);
+
+    private static final AtomicInteger CONNECT_CLIENT_ID_SEQUENCE = new 
AtomicInteger(1);
+    private static final String JMX_PREFIX = "kafka.connect";
+
+    private final Time time;
+    private final String clientId;
+    private final ConsumerNetworkClient client;
+    private final Metrics metrics;
+    private final Metadata metadata;
+    private final long retryBackoffMs;
+    private final WorkerCoordinator coordinator;
+
+    private boolean stopped = false;
+
+    public WorkerGroupMember(DistributedConfig config, String restUrl, 
KafkaConfigStorage configStorage, WorkerRebalanceListener listener) {
+        try {
+            this.time = new SystemTime();
+
+            MetricConfig metricConfig = new 
MetricConfig().samples(config.getInt(CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG))
+                    
.timeWindow(config.getLong(CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG),
 TimeUnit.MILLISECONDS);
+            String clientIdConfig = 
config.getString(CommonClientConfigs.CLIENT_ID_CONFIG);
+            clientId = clientIdConfig.length() <= 0 ? "connect-" + 
CONNECT_CLIENT_ID_SEQUENCE.getAndIncrement() : clientIdConfig;
+            List<MetricsReporter> reporters = 
config.getConfiguredInstances(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
 MetricsReporter.class);
+            reporters.add(new JmxReporter(JMX_PREFIX));
+            this.metrics = new Metrics(metricConfig, reporters, time);
+            this.retryBackoffMs = 
config.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG);
+            this.metadata = new Metadata(retryBackoffMs, 
config.getLong(CommonClientConfigs.METADATA_MAX_AGE_CONFIG));
+            List<InetSocketAddress> addresses = 
ClientUtils.parseAndValidateAddresses(config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG));
+            this.metadata.update(Cluster.bootstrap(addresses), 0);
+            String metricGrpPrefix = "connect";
+            Map<String, String> metricsTags = new LinkedHashMap<>();
+            metricsTags.put("client-id", clientId);
+            ChannelBuilder channelBuilder = 
ClientUtils.createChannelBuilder(config.values());
+            NetworkClient netClient = new NetworkClient(
+                    new 
Selector(config.getLong(CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG), 
metrics, time, metricGrpPrefix, metricsTags, channelBuilder),
+                    this.metadata,
+                    clientId,
+                    100, // a fixed large enough value will suffice
+                    
config.getLong(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG),
+                    config.getInt(CommonClientConfigs.SEND_BUFFER_CONFIG),
+                    config.getInt(CommonClientConfigs.RECEIVE_BUFFER_CONFIG),
+                    
config.getInt(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG), time);
+            this.client = new ConsumerNetworkClient(netClient, metadata, time, 
retryBackoffMs);
+            this.coordinator = new WorkerCoordinator(this.client,
+                    config.getString(DistributedConfig.GROUP_ID_CONFIG),
+                    config.getInt(DistributedConfig.SESSION_TIMEOUT_MS_CONFIG),
+                    
config.getInt(DistributedConfig.HEARTBEAT_INTERVAL_MS_CONFIG),
+                    metrics,
+                    metricGrpPrefix,
+                    metricsTags,
+                    this.time,
+                    config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG),
+                    retryBackoffMs,
+                    restUrl,
+                    configStorage,
+                    listener);
+
+            AppInfoParser.registerAppInfo(JMX_PREFIX, clientId);
+            log.debug("Connect group member created");
+        } catch (Throwable t) {
+            // call close methods if internal objects are already constructed
+            // this is to prevent resource leak. see KAFKA-2121
+            stop(true);
+            // now propagate the exception
+            throw new KafkaException("Failed to construct kafka consumer", t);
+        }
+    }
+
+    public void stop() {
+        if (stopped) return;
+        stop(false);
+    }
+
+    public void ensureActive() {
+        coordinator.ensureCoordinatorKnown();
+        coordinator.ensureActiveGroup();
+    }
+
+    public void poll(long timeout) {
+        if (timeout < 0)
+            throw new IllegalArgumentException("Timeout must not be negative");
+
+        // poll for new data until the timeout expires
+        long remaining = timeout;
+        while (remaining >= 0) {
+            long start = time.milliseconds();
+            coordinator.ensureCoordinatorKnown();
+            coordinator.ensureActiveGroup();
+            client.poll(remaining);
+            remaining -= time.milliseconds() - start;
+        }
+    }
+
+    /**
+     * Interrupt any running poll() calls, causing a WakeupException to be 
thrown in the thread invoking that method.
+     */
+    public void wakeup() {
+        this.client.wakeup();
+    }
+
+    /**
+     * Get the member ID of this worker in the group of workers.
+     *
+     * This ID is the unique member ID automatically generated.
+     *
+     * @return the member ID
+     */
+    public String memberId() {
+        return coordinator.memberId();
+    }
+
+    public void requestRejoin() {
+        coordinator.requestRejoin();
+    }
+
+    private void stop(boolean swallowException) {
+        log.trace("Stopping the Connect group member.");
+        AtomicReference<Throwable> firstException = new 
AtomicReference<Throwable>();
+        this.stopped = true;
+        ClientUtils.closeQuietly(coordinator, "coordinator", firstException);
+        ClientUtils.closeQuietly(metrics, "consumer metrics", firstException);
+        ClientUtils.closeQuietly(client, "consumer network client", 
firstException);
+        AppInfoParser.unregisterAppInfo(JMX_PREFIX, clientId);
+        if (firstException.get() != null && !swallowException)
+            throw new KafkaException("Failed to stop the Connect group 
member", firstException.get());
+        else
+            log.debug("The Connect group member has stopped.");
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerRebalanceListener.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerRebalanceListener.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerRebalanceListener.java
new file mode 100644
index 0000000..40f55d2
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerRebalanceListener.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime.distributed;
+
+import org.apache.kafka.connect.util.ConnectorTaskId;
+
+import java.util.Collection;
+
+/**
+ * Listener for rebalance events in the worker group.
+ */
+public interface WorkerRebalanceListener {
+    /**
+     * Invoked when a new assignment is created by joining the Connect worker 
group. This is invoked for both successful
+     * and unsuccessful assignments.
+     */
+    void onAssigned(ConnectProtocol.Assignment assignment);
+
+    /**
+     * Invoked when a rebalance operation starts, revoking ownership for the 
set of connectors and tasks.
+     */
+    void onRevoked(String leader, Collection<String> connectors, 
Collection<ConnectorTaskId> tasks);
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
new file mode 100644
index 0000000..96346ad
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime.rest;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.rest.entities.ErrorMessage;
+import org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper;
+import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
+import org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource;
+import org.apache.kafka.connect.runtime.rest.resources.RootResource;
+import org.eclipse.jetty.server.Connector;
+import org.eclipse.jetty.server.Handler;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.server.Slf4jRequestLog;
+import org.eclipse.jetty.server.handler.DefaultHandler;
+import org.eclipse.jetty.server.handler.HandlerCollection;
+import org.eclipse.jetty.server.handler.RequestLogHandler;
+import org.eclipse.jetty.server.handler.StatisticsHandler;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.glassfish.jersey.server.ResourceConfig;
+import org.glassfish.jersey.servlet.ServletContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.UriBuilder;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Embedded server for the REST API that provides the control plane for Kafka 
Connect workers.
+ */
+public class RestServer {
+    private static final Logger log = 
LoggerFactory.getLogger(RestServer.class);
+
+    private static final long GRACEFUL_SHUTDOWN_TIMEOUT_MS = 60 * 1000;
+
+    private static final ObjectMapper JSON_SERDE = new ObjectMapper();
+
+    private final WorkerConfig config;
+    private Herder herder;
+    private Server jettyServer;
+
+    /**
+     * Create a REST server for this herder using the specified configs.
+     */
+    public RestServer(WorkerConfig config) {
+        this.config = config;
+
+        // To make the advertised port available immediately, we need to do 
some configuration here
+        String hostname = config.getString(WorkerConfig.REST_HOST_NAME_CONFIG);
+        Integer port = config.getInt(WorkerConfig.REST_PORT_CONFIG);
+
+        jettyServer = new Server();
+
+        ServerConnector connector = new ServerConnector(jettyServer);
+        if (hostname != null && !hostname.isEmpty())
+            connector.setHost(hostname);
+        connector.setPort(port);
+        jettyServer.setConnectors(new Connector[]{connector});
+    }
+
+    public void start(Herder herder) {
+        log.info("Starting REST server");
+
+        this.herder = herder;
+
+        ResourceConfig resourceConfig = new ResourceConfig();
+        resourceConfig.register(new JacksonJsonProvider());
+
+        resourceConfig.register(RootResource.class);
+        resourceConfig.register(new ConnectorsResource(herder));
+
+        resourceConfig.register(ConnectExceptionMapper.class);
+
+        ServletContainer servletContainer = new 
ServletContainer(resourceConfig);
+        ServletHolder servletHolder = new ServletHolder(servletContainer);
+
+        ServletContextHandler context = new 
ServletContextHandler(ServletContextHandler.SESSIONS);
+        context.setContextPath("/");
+        context.addServlet(servletHolder, "/*");
+
+        RequestLogHandler requestLogHandler = new RequestLogHandler();
+        Slf4jRequestLog requestLog = new Slf4jRequestLog();
+        requestLog.setLoggerName(RestServer.class.getCanonicalName());
+        requestLog.setLogLatency(true);
+        requestLogHandler.setRequestLog(requestLog);
+
+        HandlerCollection handlers = new HandlerCollection();
+        handlers.setHandlers(new Handler[]{context, new DefaultHandler(), 
requestLogHandler});
+
+        /* Needed for graceful shutdown as per `setStopTimeout` documentation 
*/
+        StatisticsHandler statsHandler = new StatisticsHandler();
+        statsHandler.setHandler(handlers);
+        jettyServer.setHandler(statsHandler);
+        jettyServer.setStopTimeout(GRACEFUL_SHUTDOWN_TIMEOUT_MS);
+        jettyServer.setStopAtShutdown(true);
+
+        try {
+            jettyServer.start();
+        } catch (Exception e) {
+            throw new ConnectException("Unable to start REST server", e);
+        }
+
+        log.info("REST server listening at " + jettyServer.getURI() + ", 
advertising URL " + advertisedUrl());
+    }
+
+    public void stop() {
+        try {
+            jettyServer.stop();
+            jettyServer.join();
+        } catch (Exception e) {
+            throw new ConnectException("Unable to stop REST server", e);
+        } finally {
+            jettyServer.destroy();
+        }
+    }
+
+    /**
+     * Get the URL to advertise to other workers and clients. This uses the 
default connector from the embedded Jetty
+     * server, unless overrides for advertised hostname and/or port are 
provided via configs.
+     */
+    public String advertisedUrl() {
+        UriBuilder builder = UriBuilder.fromUri(jettyServer.getURI());
+        String advertisedHostname = 
config.getString(WorkerConfig.REST_ADVERTISED_HOST_NAME_CONFIG);
+        if (advertisedHostname != null && !advertisedHostname.isEmpty())
+            builder.host(advertisedHostname);
+        Integer advertisedPort = 
config.getInt(WorkerConfig.REST_ADVERTISED_PORT_CONFIG);
+        if (advertisedPort != null)
+            builder.port(advertisedPort);
+        else
+            builder.port(config.getInt(WorkerConfig.REST_PORT_CONFIG));
+        return builder.build().toString();
+    }
+
+
+    /**
+     * @param url               HTTP connection will be established with this 
url.
+     * @param method            HTTP method ("GET", "POST", "PUT", etc.)
+     * @param requestBodyData   Object to serialize as JSON and send in the 
request body.
+     * @param responseFormat    Expected format of the response to the HTTP 
request.
+     * @param <T>               The type of the deserialized response to the 
HTTP request.
+     * @return The deserialized response to the HTTP request, or null if no 
data is expected.
+     */
+    public static <T> HttpResponse<T> httpRequest(String url, String method, 
Object requestBodyData,
+                                    TypeReference<T> responseFormat) {
+        HttpURLConnection connection = null;
+        try {
+            String serializedBody = requestBodyData == null ? null : 
JSON_SERDE.writeValueAsString(requestBodyData);
+            log.debug("Sending {} with input {} to {}", method, 
serializedBody, url);
+
+            connection = (HttpURLConnection) new URL(url).openConnection();
+            connection.setRequestMethod(method);
+
+            connection.setRequestProperty("User-Agent", "kafka-connect");
+            connection.setRequestProperty("Accept", "application/json");
+
+            // connection.getResponseCode() implicitly calls getInputStream, 
so always set to true.
+            // On the other hand, leaving this out breaks nothing.
+            connection.setDoInput(true);
+
+            connection.setUseCaches(false);
+
+            if (requestBodyData != null) {
+                connection.setRequestProperty("Content-Type", 
"application/json");
+                connection.setDoOutput(true);
+
+                OutputStream os = connection.getOutputStream();
+                os.write(serializedBody.getBytes());
+                os.flush();
+                os.close();
+            }
+
+            int responseCode = connection.getResponseCode();
+            if (responseCode == HttpURLConnection.HTTP_NO_CONTENT) {
+                return new HttpResponse<>(responseCode, 
connection.getHeaderFields(), null);
+            } else if (responseCode >= 400) {
+                InputStream es = connection.getErrorStream();
+                ErrorMessage errorMessage = JSON_SERDE.readValue(es, 
ErrorMessage.class);
+                es.close();
+                throw new ConnectRestException(responseCode, 
errorMessage.errorCode(), errorMessage.message());
+            } else if (responseCode >= 200 && responseCode < 300) {
+                InputStream is = connection.getInputStream();
+                T result = JSON_SERDE.readValue(is, responseFormat);
+                is.close();
+                return new HttpResponse<>(responseCode, 
connection.getHeaderFields(), result);
+            } else {
+                throw new 
ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR,
+                        Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(),
+                        "Unexpected status code when handling forwarded 
request: " + responseCode);
+            }
+        } catch (IOException e) {
+            log.error("IO error forwarding REST request: ", e);
+            throw new 
ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR, "IO Error trying to 
forward REST request: " + e.getMessage(), e);
+        } finally {
+            if (connection != null)
+                connection.disconnect();
+        }
+    }
+
+    public static class HttpResponse<T> {
+        private int status;
+        private Map<String, List<String>> headers;
+        private T body;
+
+        public HttpResponse(int status, Map<String, List<String>> headers, T 
body) {
+            this.status = status;
+            this.headers = headers;
+            this.body = body;
+        }
+
+        public int status() {
+            return status;
+        }
+
+        public Map<String, List<String>> headers() {
+            return headers;
+        }
+
+        public T body() {
+            return body;
+        }
+    }
+
+    public static String urlJoin(String base, String path) {
+        if (base.endsWith("/") && path.startsWith("/"))
+            return base + path.substring(1);
+        else
+            return base + path;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorInfo.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorInfo.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorInfo.java
new file mode 100644
index 0000000..8daae05
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorInfo.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime.rest.entities;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+public class ConnectorInfo {
+
+    private final String name;
+    private final Map<String, String> config;
+    private final List<ConnectorTaskId> tasks;
+
+    @JsonCreator
+    public ConnectorInfo(@JsonProperty("name") String name, 
@JsonProperty("config") Map<String, String> config,
+                         @JsonProperty("tasks") List<ConnectorTaskId> tasks) {
+        this.name = name;
+        this.config = config;
+        this.tasks = tasks;
+    }
+
+    @JsonProperty
+    public String name() {
+        return name;
+    }
+
+    @JsonProperty
+    public Map<String, String> config() {
+        return config;
+    }
+
+    @JsonProperty
+    public List<ConnectorTaskId> tasks() {
+        return tasks;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        ConnectorInfo that = (ConnectorInfo) o;
+        return Objects.equals(name, that.name) &&
+                Objects.equals(config, that.config) &&
+                Objects.equals(tasks, that.tasks);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(name, config, tasks);
+    }
+
+
+    private static List<ConnectorTaskId> 
jsonTasks(Collection<org.apache.kafka.connect.util.ConnectorTaskId> tasks) {
+        List<ConnectorTaskId> jsonTasks = new ArrayList<>();
+        for (ConnectorTaskId task : tasks)
+            jsonTasks.add(task);
+        return jsonTasks;
+    }
+}

Reply via email to