http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderConfig.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderConfig.java
 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderConfig.java
new file mode 100644
index 0000000..bd2ba56
--- /dev/null
+++ 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderConfig.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.runtime.distributed;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.SSLConfigs;
+import org.apache.kafka.common.config.SaslConfigs;
+
+import java.util.Map;
+
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+
+public class DistributedHerderConfig extends AbstractConfig {
+    private static final ConfigDef CONFIG;
+
+    /*
+     * NOTE: DO NOT CHANGE EITHER CONFIG STRINGS OR THEIR JAVA VARIABLE NAMES 
AS
+     * THESE ARE PART OF THE PUBLIC API AND CHANGE WILL BREAK USER CODE.
+     */
+
+    /**
+     * <code>group.id</code>
+     */
+    public static final String GROUP_ID_CONFIG = "group.id";
+    private static final String GROUP_ID_DOC = "A unique string that 
identifies the Copycat cluster group this worker belongs to.";
+
+    /**
+     * <code>session.timeout.ms</code>
+     */
+    public static final String SESSION_TIMEOUT_MS_CONFIG = 
"session.timeout.ms";
+    private static final String SESSION_TIMEOUT_MS_DOC = "The timeout used to 
detect failures when using Kafka's group management facilities.";
+
+    /**
+     * <code>heartbeat.interval.ms</code>
+     */
+    public static final String HEARTBEAT_INTERVAL_MS_CONFIG = 
"heartbeat.interval.ms";
+    private static final String HEARTBEAT_INTERVAL_MS_DOC = "The expected time 
between heartbeats to the group coordinator when using Kafka's group management 
facilities. Heartbeats are used to ensure that the worker's session stays 
active and to facilitate rebalancing when new members join or leave the group. 
The value must be set lower than <code>session.timeout.ms</code>, but typically 
should be set no higher than 1/3 of that value. It can be adjusted even lower 
to control the expected time for normal rebalances.";
+
+    /**
+     * <code>worker.sync.timeout.ms</code>
+     */
+    public static final String WORKER_SYNC_TIMEOUT_MS_CONFIG = 
"worker.sync.timeout.ms";
+    private static final String WORKER_SYNC_TIMEOUT_MS_DOC = "When the worker 
is out of sync with other workers and needs" +
+            " to resynchronize configurations, wait up to this amount of time 
before giving up, leaving the group, and" +
+            " waiting a backoff period before rejoining.";
+
+    /**
+     * <code>group.unsync.timeout.ms</code>
+     */
+    public static final String WORKER_UNSYNC_BACKOFF_MS_CONFIG = 
"worker.unsync.backoff.ms";
+    private static final String WORKER_UNSYNC_BACKOFF_MS_DOC = "When the 
worker is out of sync with other workers and " +
+            " fails to catch up within worker.sync.timeout.ms, leave the 
Copycat cluster for this long before rejoining.";
+    public static final int WORKER_UNSYNC_BACKOFF_MS_DEFAULT = 5 * 60 * 1000;
+
+    static {
+        CONFIG = new ConfigDef()
+                .define(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
+                        ConfigDef.Type.LIST,
+                        ConfigDef.Importance.HIGH,
+                        CommonClientConfigs.BOOSTRAP_SERVERS_DOC)
+                .define(GROUP_ID_CONFIG, ConfigDef.Type.STRING, 
ConfigDef.Importance.HIGH, GROUP_ID_DOC)
+                .define(SESSION_TIMEOUT_MS_CONFIG,
+                        ConfigDef.Type.INT,
+                        30000,
+                        ConfigDef.Importance.HIGH,
+                        SESSION_TIMEOUT_MS_DOC)
+                .define(HEARTBEAT_INTERVAL_MS_CONFIG,
+                        ConfigDef.Type.INT,
+                        3000,
+                        ConfigDef.Importance.HIGH,
+                        HEARTBEAT_INTERVAL_MS_DOC)
+                .define(CommonClientConfigs.METADATA_MAX_AGE_CONFIG,
+                        ConfigDef.Type.LONG,
+                        5 * 60 * 1000,
+                        atLeast(0),
+                        ConfigDef.Importance.LOW,
+                        CommonClientConfigs.METADATA_MAX_AGE_DOC)
+                .define(CommonClientConfigs.CLIENT_ID_CONFIG,
+                        ConfigDef.Type.STRING,
+                        "",
+                        ConfigDef.Importance.LOW,
+                        CommonClientConfigs.CLIENT_ID_DOC)
+                .define(CommonClientConfigs.SEND_BUFFER_CONFIG,
+                        ConfigDef.Type.INT,
+                        128 * 1024,
+                        atLeast(0),
+                        ConfigDef.Importance.MEDIUM,
+                        CommonClientConfigs.SEND_BUFFER_DOC)
+                .define(CommonClientConfigs.RECEIVE_BUFFER_CONFIG,
+                        ConfigDef.Type.INT,
+                        32 * 1024,
+                        atLeast(0),
+                        ConfigDef.Importance.MEDIUM,
+                        CommonClientConfigs.RECEIVE_BUFFER_DOC)
+                .define(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG,
+                        ConfigDef.Type.LONG,
+                        50L,
+                        atLeast(0L),
+                        ConfigDef.Importance.LOW,
+                        CommonClientConfigs.RECONNECT_BACKOFF_MS_DOC)
+                .define(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG,
+                        ConfigDef.Type.LONG,
+                        100L,
+                        atLeast(0L),
+                        ConfigDef.Importance.LOW,
+                        CommonClientConfigs.RETRY_BACKOFF_MS_DOC)
+                .define(CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG,
+                        ConfigDef.Type.LONG,
+                        30000,
+                        atLeast(0),
+                        ConfigDef.Importance.LOW,
+                        CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC)
+                .define(CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG,
+                        ConfigDef.Type.INT,
+                        2,
+                        atLeast(1),
+                        ConfigDef.Importance.LOW,
+                        CommonClientConfigs.METRICS_NUM_SAMPLES_DOC)
+                .define(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
+                        ConfigDef.Type.LIST,
+                        "",
+                        ConfigDef.Importance.LOW,
+                        CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
+                .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
ConfigDef.Type.STRING, CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL, 
ConfigDef.Importance.MEDIUM, CommonClientConfigs.SECURITY_PROTOCOL_DOC)
+                .define(SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, 
ConfigDef.Type.CLASS, SSLConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS, 
ConfigDef.Importance.LOW, SSLConfigs.PRINCIPAL_BUILDER_CLASS_DOC)
+                .define(SSLConfigs.SSL_PROTOCOL_CONFIG, ConfigDef.Type.STRING, 
SSLConfigs.DEFAULT_SSL_PROTOCOL, ConfigDef.Importance.MEDIUM, 
SSLConfigs.SSL_PROTOCOL_DOC)
+                .define(SSLConfigs.SSL_PROVIDER_CONFIG, ConfigDef.Type.STRING, 
ConfigDef.Importance.MEDIUM, SSLConfigs.SSL_PROVIDER_DOC, false)
+                .define(SSLConfigs.SSL_CIPHER_SUITES_CONFIG, 
ConfigDef.Type.LIST, ConfigDef.Importance.LOW, 
SSLConfigs.SSL_CIPHER_SUITES_DOC, false)
+                .define(SSLConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, 
ConfigDef.Type.LIST, SSLConfigs.DEFAULT_ENABLED_PROTOCOLS, 
ConfigDef.Importance.MEDIUM, SSLConfigs.SSL_ENABLED_PROTOCOLS_DOC)
+                .define(SSLConfigs.SSL_KEYSTORE_TYPE_CONFIG, 
ConfigDef.Type.STRING, SSLConfigs.DEFAULT_SSL_KEYSTORE_TYPE, 
ConfigDef.Importance.MEDIUM, SSLConfigs.SSL_KEYSTORE_TYPE_DOC)
+                .define(SSLConfigs.SSL_KEYSTORE_LOCATION_CONFIG, 
ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, 
SSLConfigs.SSL_KEYSTORE_LOCATION_DOC, false)
+                .define(SSLConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, 
ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, 
SSLConfigs.SSL_KEYSTORE_PASSWORD_DOC, false)
+                .define(SSLConfigs.SSL_KEY_PASSWORD_CONFIG, 
ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, 
SSLConfigs.SSL_KEY_PASSWORD_DOC, false)
+                .define(SSLConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, 
ConfigDef.Type.STRING, SSLConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE, 
ConfigDef.Importance.MEDIUM, SSLConfigs.SSL_TRUSTSTORE_TYPE_DOC)
+                .define(SSLConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, 
ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, 
SSLConfigs.SSL_TRUSTSTORE_LOCATION_DOC, false)
+                .define(SSLConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, 
ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, 
SSLConfigs.SSL_TRUSTSTORE_PASSWORD_DOC, false)
+                .define(SSLConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, 
ConfigDef.Type.STRING, SSLConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM, 
ConfigDef.Importance.LOW, SSLConfigs.SSL_KEYMANAGER_ALGORITHM_DOC)
+                .define(SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, 
ConfigDef.Type.STRING, SSLConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM, 
ConfigDef.Importance.LOW, SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC)
+                
.define(SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, 
ConfigDef.Type.STRING, ConfigDef.Importance.LOW, 
SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC, false)
+                .define(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, 
ConfigDef.Type.STRING, ConfigDef.Importance.MEDIUM, 
SaslConfigs.SASL_KERBEROS_SERVICE_NAME_DOC, false)
+                .define(SaslConfigs.SASL_KERBEROS_KINIT_CMD, 
ConfigDef.Type.STRING, SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD, 
ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_KINIT_CMD_DOC)
+                .define(SaslConfigs.SASL_KAFKA_SERVER_REALM, 
ConfigDef.Type.STRING, ConfigDef.Importance.LOW, 
SaslConfigs.SASL_KAFKA_SERVER_DOC, false)
+                .define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, 
ConfigDef.Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, 
ConfigDef.Importance.LOW, 
SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR_DOC)
+                .define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER, 
ConfigDef.Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_JITTER, 
ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER_DOC)
+                .define(SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN, 
ConfigDef.Type.LONG, SaslConfigs.DEFAULT_KERBEROS_MIN_TIME_BEFORE_RELOGIN, 
ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN_DOC)
+                .define(SaslConfigs.AUTH_TO_LOCAL, ConfigDef.Type.LIST, 
SaslConfigs.DEFAULT_AUTH_TO_LOCAL, ConfigDef.Importance.MEDIUM, 
SaslConfigs.AUTH_TO_LOCAL_DOC)
+                .define(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG,
+                        ConfigDef.Type.INT,
+                        40 * 1000,
+                        atLeast(0),
+                        ConfigDef.Importance.MEDIUM,
+                        CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC)
+                        /* default is set to be a bit lower than the server 
default (10 min), to avoid both client and server closing connection at same 
time */
+                .define(CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG,
+                        ConfigDef.Type.LONG,
+                        9 * 60 * 1000,
+                        ConfigDef.Importance.MEDIUM,
+                        CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC)
+                .define(WORKER_SYNC_TIMEOUT_MS_CONFIG,
+                        ConfigDef.Type.INT,
+                        3000,
+                        ConfigDef.Importance.MEDIUM,
+                        WORKER_SYNC_TIMEOUT_MS_DOC)
+                .define(WORKER_UNSYNC_BACKOFF_MS_CONFIG,
+                        ConfigDef.Type.INT,
+                        WORKER_UNSYNC_BACKOFF_MS_DEFAULT,
+                        ConfigDef.Importance.MEDIUM,
+                        WORKER_UNSYNC_BACKOFF_MS_DOC);
+    }
+
+    DistributedHerderConfig(Map<?, ?> props) {
+        super(CONFIG, props);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/NotLeaderException.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/NotLeaderException.java
 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/NotLeaderException.java
new file mode 100644
index 0000000..ce8fba5
--- /dev/null
+++ 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/NotLeaderException.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.runtime.distributed;
+
+import org.apache.kafka.copycat.errors.CopycatException;
+
+/**
+ * Indicates an operation was not permitted because it can only be performed 
on the leader and this worker is not currently
+ * the leader.
+ */
+public class NotLeaderException extends CopycatException {
+    public NotLeaderException(String s) {
+        super(s);
+    }
+
+    public NotLeaderException(String s, Throwable throwable) {
+        super(s, throwable);
+    }
+
+    public NotLeaderException(Throwable throwable) {
+        super(throwable);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinator.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinator.java
 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinator.java
new file mode 100644
index 0000000..c70ed4f
--- /dev/null
+++ 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinator.java
@@ -0,0 +1,288 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.copycat.runtime.distributed;
+
+import org.apache.kafka.clients.consumer.internals.AbstractCoordinator;
+import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.CircularIterator;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.copycat.storage.KafkaConfigStorage;
+import org.apache.kafka.copycat.util.ConnectorTaskId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This class manages the coordination process with the Kafka group 
coordinator on the broker for managing Copycat assignments to workers.
+ */
+public final class WorkerCoordinator extends AbstractCoordinator implements 
Closeable {
+    private static final Logger log = 
LoggerFactory.getLogger(WorkerCoordinator.class);
+
+    // Currently Copycat doesn't support multiple task assignment strategies, 
so we currently just fill in a default value
+    public static final String DEFAULT_SUBPROTOCOL = "default";
+
+    private final KafkaConfigStorage configStorage;
+    private CopycatProtocol.Assignment assignmentSnapshot;
+    private final CopycatWorkerCoordinatorMetrics sensors;
+    private ClusterConfigState configSnapshot;
+    private final WorkerRebalanceListener listener;
+
+    private boolean rejoinRequested;
+
+    /**
+     * Initialize the coordination manager.
+     */
+    public WorkerCoordinator(ConsumerNetworkClient client,
+                             String groupId,
+                             int sessionTimeoutMs,
+                             int heartbeatIntervalMs,
+                             Metrics metrics,
+                             String metricGrpPrefix,
+                             Map<String, String> metricTags,
+                             Time time,
+                             long requestTimeoutMs,
+                             long retryBackoffMs,
+                             KafkaConfigStorage configStorage,
+                             WorkerRebalanceListener listener) {
+        super(client,
+                groupId,
+                sessionTimeoutMs,
+                heartbeatIntervalMs,
+                metrics,
+                metricGrpPrefix,
+                metricTags,
+                time,
+                requestTimeoutMs,
+                retryBackoffMs);
+        this.configStorage = configStorage;
+        this.assignmentSnapshot = null;
+        this.sensors = new CopycatWorkerCoordinatorMetrics(metrics, 
metricGrpPrefix, metricTags);
+        this.listener = listener;
+        this.rejoinRequested = false;
+    }
+
+    public void requestRejoin() {
+        rejoinRequested = true;
+    }
+
+    @Override
+    public String protocolType() {
+        return "copycat";
+    }
+
+    @Override
+    public LinkedHashMap<String, ByteBuffer> metadata() {
+        LinkedHashMap<String, ByteBuffer> metadata = new LinkedHashMap<>();
+        configSnapshot = configStorage.snapshot();
+        CopycatProtocol.ConfigState configState = new 
CopycatProtocol.ConfigState(configSnapshot.offset());
+        metadata.put(DEFAULT_SUBPROTOCOL, 
CopycatProtocol.serializeMetadata(configState));
+        return metadata;
+    }
+
+    @Override
+    protected void onJoin(int generation, String memberId, String protocol, 
ByteBuffer memberAssignment) {
+        assignmentSnapshot = 
CopycatProtocol.deserializeAssignment(memberAssignment);
+        // At this point we always consider ourselves to be a member of the 
cluster, even if there was an assignment
+        // error (the leader couldn't make the assignment) or we are behind 
the config and cannot yet work on our assigned
+        // tasks. It's the responsibility of the code driving this process to 
decide how to react (e.g. trying to get
+        // up to date, try to rejoin again, leaving the group and backing off, 
etc.).
+        rejoinRequested = false;
+        listener.onAssigned(assignmentSnapshot);
+    }
+
+    @Override
+    protected Map<String, ByteBuffer> doSync(String leaderId, String protocol, 
Map<String, ByteBuffer> allMemberMetadata) {
+        log.debug("Performing task assignment");
+
+        Map<String, CopycatProtocol.ConfigState> allConfigs = new HashMap<>();
+        for (Map.Entry<String, ByteBuffer> entry : 
allMemberMetadata.entrySet())
+            allConfigs.put(entry.getKey(), 
CopycatProtocol.deserializeMetadata(entry.getValue()));
+
+        long maxOffset = findMaxMemberConfigOffset(allConfigs);
+        Long leaderOffset = ensureLeaderConfig(maxOffset);
+        if (leaderOffset == null)
+            return fillAssignmentsAndSerialize(allConfigs.keySet(), 
CopycatProtocol.Assignment.CONFIG_MISMATCH,
+                    leaderId, maxOffset, new HashMap<String, List<String>>(), 
new HashMap<String, List<ConnectorTaskId>>());
+        return performTaskAssignment(leaderId, leaderOffset, allConfigs);
+    }
+
+    private long findMaxMemberConfigOffset(Map<String, 
CopycatProtocol.ConfigState> allConfigs) {
+        // The new config offset is the maximum seen by any member. We always 
perform assignment using this offset,
+        // even if some members have fallen behind. The config offset used to 
generate the assignment is included in
+        // the response so members that have fallen behind will not use the 
assignment until they have caught up.
+        Long maxOffset = null;
+        for (Map.Entry<String, CopycatProtocol.ConfigState> stateEntry : 
allConfigs.entrySet()) {
+            long memberRootOffset = stateEntry.getValue().offset();
+            if (maxOffset == null)
+                maxOffset = memberRootOffset;
+            else
+                maxOffset = Math.max(maxOffset, memberRootOffset);
+        }
+
+        log.debug("Max config offset root: {}, local snapshot config offsets 
root: {}",
+                maxOffset, configSnapshot.offset());
+        return maxOffset;
+    }
+
+    private Long ensureLeaderConfig(long maxOffset) {
+        // If this leader is behind some other members, we can't do assignment
+        if (configSnapshot.offset() < maxOffset) {
+            // We might be able to take a new snapshot to catch up immediately 
and avoid another round of syncing here.
+            // Alternatively, if this node has already passed the maximum 
reported by any other member of the group, it
+            // is also safe to use this newer state.
+            ClusterConfigState updatedSnapshot = configStorage.snapshot();
+            if (updatedSnapshot.offset() < maxOffset) {
+                log.info("Was selected to perform assignments, but do not have 
latest config found in sync request. " +
+                        "Returning an empty configuration to trigger 
re-sync.");
+                return null;
+            } else {
+                configSnapshot = updatedSnapshot;
+                return configSnapshot.offset();
+            }
+        }
+
+        return maxOffset;
+    }
+
+    private Map<String, ByteBuffer> performTaskAssignment(String leaderId, 
long maxOffset, Map<String, CopycatProtocol.ConfigState> allConfigs) {
+        Map<String, List<String>> connectorAssignments = new HashMap<>();
+        Map<String, List<ConnectorTaskId>> taskAssignments = new HashMap<>();
+
+        // Perform round-robin task assignment
+        CircularIterator<String> memberIt = new 
CircularIterator<>(sorted(allConfigs.keySet()));
+        for (String connectorId : sorted(configSnapshot.connectors())) {
+            String connectorAssignedTo = memberIt.next();
+            log.trace("Assigning connector {} to {}", connectorId, 
connectorAssignedTo);
+            List<String> memberConnectors = 
connectorAssignments.get(connectorAssignedTo);
+            if (memberConnectors == null) {
+                memberConnectors = new ArrayList<>();
+                connectorAssignments.put(connectorAssignedTo, 
memberConnectors);
+            }
+            memberConnectors.add(connectorId);
+
+            for (ConnectorTaskId taskId : 
sorted(configSnapshot.tasks(connectorId))) {
+                String taskAssignedTo = memberIt.next();
+                log.trace("Assigning task {} to {}", taskId, taskAssignedTo);
+                List<ConnectorTaskId> memberTasks = 
taskAssignments.get(taskAssignedTo);
+                if (memberTasks == null) {
+                    memberTasks = new ArrayList<>();
+                    taskAssignments.put(taskAssignedTo, memberTasks);
+                }
+                memberTasks.add(taskId);
+            }
+        }
+
+        return fillAssignmentsAndSerialize(allConfigs.keySet(), 
CopycatProtocol.Assignment.NO_ERROR,
+                leaderId, maxOffset, connectorAssignments, taskAssignments);
+    }
+
+    private Map<String, ByteBuffer> 
fillAssignmentsAndSerialize(Collection<String> members,
+                                                                short error,
+                                                                String 
leaderId,
+                                                                long maxOffset,
+                                                                Map<String, 
List<String>> connectorAssignments,
+                                                                Map<String, 
List<ConnectorTaskId>> taskAssignments) {
+
+        Map<String, ByteBuffer> groupAssignment = new HashMap<>();
+        for (String member : members) {
+            List<String> connectors = connectorAssignments.get(member);
+            if (connectors == null)
+                connectors = Collections.emptyList();
+            List<ConnectorTaskId> tasks = taskAssignments.get(member);
+            if (tasks == null)
+                tasks = Collections.emptyList();
+            CopycatProtocol.Assignment assignment = new 
CopycatProtocol.Assignment(error, leaderId, maxOffset, connectors, tasks);
+            log.debug("Assignment: {} -> {}", member, assignment);
+            groupAssignment.put(member, 
CopycatProtocol.serializeAssignment(assignment));
+        }
+        log.debug("Finished assignment");
+        return groupAssignment;
+    }
+
+    @Override
+    protected void onLeave(int generation, String memberId) {
+        log.debug("Revoking previous assignment {}", assignmentSnapshot);
+        if (assignmentSnapshot != null && !assignmentSnapshot.failed())
+            listener.onRevoked(assignmentSnapshot.leader(), 
assignmentSnapshot.connectors(), assignmentSnapshot.tasks());
+    }
+
+    @Override
+    public boolean needRejoin() {
+        return super.needRejoin() || (assignmentSnapshot == null || 
assignmentSnapshot.failed()) || rejoinRequested;
+    }
+
+    @Override
+    public void close() {
+    }
+
+    public String memberId() {
+        return this.memberId;
+    }
+
+    private class CopycatWorkerCoordinatorMetrics {
+        public final Metrics metrics;
+        public final String metricGrpName;
+
+        public CopycatWorkerCoordinatorMetrics(Metrics metrics, String 
metricGrpPrefix, Map<String, String> tags) {
+            this.metrics = metrics;
+            this.metricGrpName = metricGrpPrefix + "-coordinator-metrics";
+
+            Measurable numConnectors = new Measurable() {
+                public double measure(MetricConfig config, long now) {
+                    return assignmentSnapshot.connectors().size();
+                }
+            };
+
+            Measurable numTasks = new Measurable() {
+                public double measure(MetricConfig config, long now) {
+                    return assignmentSnapshot.tasks().size();
+                }
+            };
+
+            metrics.addMetric(new MetricName("assigned-connectors",
+                            this.metricGrpName,
+                            "The number of connector instances currently 
assigned to this consumer",
+                            tags),
+                    numConnectors);
+            metrics.addMetric(new MetricName("assigned-tasks",
+                            this.metricGrpName,
+                            "The number of tasks currently assigned to this 
consumer",
+                            tags),
+                    numTasks);
+        }
+    }
+
+    private static <T extends Comparable<T>> List<T> sorted(Collection<T> 
members) {
+        List<T> res = new ArrayList<>(members);
+        Collections.sort(res);
+        return res;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerGroupMember.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerGroupMember.java
 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerGroupMember.java
new file mode 100644
index 0000000..f8cabaa
--- /dev/null
+++ 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerGroupMember.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.runtime.distributed;
+
+import org.apache.kafka.clients.ClientUtils;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.metrics.JmxReporter;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.network.ChannelBuilder;
+import org.apache.kafka.common.network.Selector;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.copycat.storage.KafkaConfigStorage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * This class manages the coordination process with brokers for the copycat 
cluster group membership. It ties together
+ * the Coordinator, which implements the group member protocol, with all the 
other pieces needed to drive the connection
+ * to the group coordinator broker. This isolates all the networking to a 
single thread managed by this class, with
+ * higher level operations in response to group membership events being 
handled by the herder.
+ */
+public class WorkerGroupMember {
+    private static final Logger log = 
LoggerFactory.getLogger(WorkerGroupMember.class);
+
+    private static final AtomicInteger COPYCAT_CLIENT_ID_SEQUENCE = new 
AtomicInteger(1);
+    private static final String JMX_PREFIX = "kafka.copycat";
+
+    private final Time time;
+    private final String clientId;
+    private final ConsumerNetworkClient client;
+    private final Metrics metrics;
+    private final Metadata metadata;
+    private final long retryBackoffMs;
+    private final WorkerCoordinator coordinator;
+
+    private boolean stopped = false;
+
+    public WorkerGroupMember(DistributedHerderConfig config, 
KafkaConfigStorage configStorage, WorkerRebalanceListener listener) {
+        try {
+            this.time = new SystemTime();
+
+            MetricConfig metricConfig = new 
MetricConfig().samples(config.getInt(CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG))
+                    
.timeWindow(config.getLong(CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG),
 TimeUnit.MILLISECONDS);
+            String clientIdConfig = 
config.getString(CommonClientConfigs.CLIENT_ID_CONFIG);
+            clientId = clientIdConfig.length() <= 0 ? "copycat-" + 
COPYCAT_CLIENT_ID_SEQUENCE.getAndIncrement() : clientIdConfig;
+            List<MetricsReporter> reporters = 
config.getConfiguredInstances(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
 MetricsReporter.class);
+            reporters.add(new JmxReporter(JMX_PREFIX));
+            this.metrics = new Metrics(metricConfig, reporters, time);
+            this.retryBackoffMs = 
config.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG);
+            this.metadata = new Metadata(retryBackoffMs, 
config.getLong(CommonClientConfigs.METADATA_MAX_AGE_CONFIG));
+            List<InetSocketAddress> addresses = 
ClientUtils.parseAndValidateAddresses(config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG));
+            this.metadata.update(Cluster.bootstrap(addresses), 0);
+            String metricGrpPrefix = "copycat";
+            Map<String, String> metricsTags = new LinkedHashMap<>();
+            metricsTags.put("client-id", clientId);
+            ChannelBuilder channelBuilder = 
ClientUtils.createChannelBuilder(config.values());
+            NetworkClient netClient = new NetworkClient(
+                    new 
Selector(config.getLong(CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG), 
metrics, time, metricGrpPrefix, metricsTags, channelBuilder),
+                    this.metadata,
+                    clientId,
+                    100, // a fixed large enough value will suffice
+                    
config.getLong(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG),
+                    config.getInt(CommonClientConfigs.SEND_BUFFER_CONFIG),
+                    config.getInt(CommonClientConfigs.RECEIVE_BUFFER_CONFIG),
+                    
config.getInt(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG), time);
+            this.client = new ConsumerNetworkClient(netClient, metadata, time, 
retryBackoffMs);
+            this.coordinator = new WorkerCoordinator(this.client,
+                    config.getString(DistributedHerderConfig.GROUP_ID_CONFIG),
+                    
config.getInt(DistributedHerderConfig.SESSION_TIMEOUT_MS_CONFIG),
+                    
config.getInt(DistributedHerderConfig.HEARTBEAT_INTERVAL_MS_CONFIG),
+                    metrics,
+                    metricGrpPrefix,
+                    metricsTags,
+                    this.time,
+                    config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG),
+                    retryBackoffMs,
+                    configStorage,
+                    listener);
+
+            AppInfoParser.registerAppInfo(JMX_PREFIX, clientId);
+            log.debug("Copycat group member created");
+        } catch (Throwable t) {
+            // call close methods if internal objects are already constructed
+            // this is to prevent resource leak. see KAFKA-2121
+            stop(true);
+            // now propagate the exception
+            throw new KafkaException("Failed to construct kafka consumer", t);
+        }
+    }
+
+    public void stop() {
+        if (stopped) return;
+        stop(false);
+    }
+
+    public void ensureActive() {
+        coordinator.ensureCoordinatorKnown();
+        coordinator.ensureActiveGroup();
+    }
+
+    public void poll(long timeout) {
+        if (timeout < 0)
+            throw new IllegalArgumentException("Timeout must not be negative");
+
+        // poll for new data until the timeout expires
+        long remaining = timeout;
+        while (remaining >= 0) {
+            long start = time.milliseconds();
+            coordinator.ensureCoordinatorKnown();
+            coordinator.ensureActiveGroup();
+            client.poll(remaining);
+            remaining -= time.milliseconds() - start;
+        }
+    }
+
+    /**
+     * Interrupt any running poll() calls, causing a ConsumerWakeupException 
to be thrown in the thread invoking that method.
+     */
+    public void wakeup() {
+        this.client.wakeup();
+    }
+
+    /**
+     * Get the member ID of this worker in the group of workers.
+     *
+     * This ID is the unique member ID automatically generated.
+     *
+     * @return the member ID
+     */
+    public String memberId() {
+        return coordinator.memberId();
+    }
+
+    public void requestRejoin() {
+        coordinator.requestRejoin();
+    }
+
+    private void stop(boolean swallowException) {
+        log.trace("Stopping the Copycat group member.");
+        AtomicReference<Throwable> firstException = new 
AtomicReference<Throwable>();
+        this.stopped = true;
+        ClientUtils.closeQuietly(coordinator, "coordinator", firstException);
+        ClientUtils.closeQuietly(metrics, "consumer metrics", firstException);
+        ClientUtils.closeQuietly(client, "consumer network client", 
firstException);
+        AppInfoParser.unregisterAppInfo(JMX_PREFIX, clientId);
+        if (firstException.get() != null && !swallowException)
+            throw new KafkaException("Failed to stop the Copycat group 
member", firstException.get());
+        else
+            log.debug("The Copycat group member has stopped.");
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerRebalanceListener.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerRebalanceListener.java
 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerRebalanceListener.java
new file mode 100644
index 0000000..c9d2ed2
--- /dev/null
+++ 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerRebalanceListener.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.runtime.distributed;
+
+import org.apache.kafka.copycat.util.ConnectorTaskId;
+
+import java.util.Collection;
+
+/**
+ * Listener for rebalance events in the worker group.
+ */
+public interface WorkerRebalanceListener {
+    /**
+     * Invoked when a new assignment is created by joining the Copycat worker 
group. This is invoked for both successful
+     * and unsuccessful assignments.
+     */
+    void onAssigned(CopycatProtocol.Assignment assignment);
+
+    /**
+     * Invoked when a rebalance operation starts, revoking ownership for the 
set of connectors and tasks.
+     */
+    void onRevoked(String leader, Collection<String> connectors, 
Collection<ConnectorTaskId> tasks);
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerder.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerder.java
 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerder.java
index d5670fd..167ee60 100644
--- 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerder.java
+++ 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerder.java
@@ -17,20 +17,23 @@
 
 package org.apache.kafka.copycat.runtime.standalone;
 
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.copycat.connector.Connector;
 import org.apache.kafka.copycat.errors.CopycatException;
 import org.apache.kafka.copycat.runtime.ConnectorConfig;
 import org.apache.kafka.copycat.runtime.Herder;
 import org.apache.kafka.copycat.runtime.HerderConnectorContext;
+import org.apache.kafka.copycat.runtime.TaskConfig;
 import org.apache.kafka.copycat.runtime.Worker;
-import org.apache.kafka.copycat.sink.SinkConnector;
-import org.apache.kafka.copycat.sink.SinkTask;
-import org.apache.kafka.copycat.util.*;
+import org.apache.kafka.copycat.util.Callback;
+import org.apache.kafka.copycat.util.ConnectorTaskId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.*;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
 
 /**
  * Single process, in-memory "herder". Useful for a standalone copycat process.
@@ -56,11 +59,8 @@ public class StandaloneHerder implements Herder {
         // There's no coordination/hand-off to do here since this is all 
standalone. Instead, we
         // should just clean up the stuff we normally would, i.e. cleanly 
checkpoint and shutdown all
         // the tasks.
-        for (Map.Entry<String, ConnectorState> entry : connectors.entrySet()) {
-            ConnectorState state = entry.getValue();
-            stopConnector(state);
-        }
-        connectors.clear();
+        for (String connName : new HashSet<>(connectors.keySet()))
+            stopConnector(connName);
 
         log.info("Herder stopped");
     }
@@ -69,11 +69,14 @@ public class StandaloneHerder implements Herder {
     public synchronized void addConnector(Map<String, String> connectorProps,
                                           Callback<String> callback) {
         try {
-            ConnectorState connState = createConnector(connectorProps);
+            ConnectorConfig connConfig = new ConnectorConfig(connectorProps);
+            String connName = 
connConfig.getString(ConnectorConfig.NAME_CONFIG);
+            worker.addConnector(connConfig, new HerderConnectorContext(this, 
connName));
+            connectors.put(connName, new ConnectorState(connConfig));
             if (callback != null)
-                callback.onCompletion(null, connState.name);
+                callback.onCompletion(null, connName);
             // This should always be a new job, create jobs from scratch
-            createConnectorTasks(connState);
+            createConnectorTasks(connName);
         } catch (CopycatException e) {
             if (callback != null)
                 callback.onCompletion(e, null);
@@ -81,9 +84,9 @@ public class StandaloneHerder implements Herder {
     }
 
     @Override
-    public synchronized void deleteConnector(String name, Callback<Void> 
callback) {
+    public synchronized void deleteConnector(String connName, Callback<Void> 
callback) {
         try {
-            destroyConnector(name);
+            stopConnector(connName);
             if (callback != null)
                 callback.onCompletion(null, null);
         } catch (CopycatException e) {
@@ -94,114 +97,35 @@ public class StandaloneHerder implements Herder {
 
     @Override
     public synchronized void requestTaskReconfiguration(String connName) {
-        ConnectorState state = connectors.get(connName);
-        if (state == null) {
+        if (!worker.connectorNames().contains(connName)) {
             log.error("Task that requested reconfiguration does not exist: 
{}", connName);
             return;
         }
-        updateConnectorTasks(state);
-    }
-
-    // Creates and configures the connector. Does not setup any tasks
-    private ConnectorState createConnector(Map<String, String> connectorProps) 
{
-        ConnectorConfig connConfig = new ConnectorConfig(connectorProps);
-        String connName = connConfig.getString(ConnectorConfig.NAME_CONFIG);
-        String className = 
connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
-        log.info("Creating connector {} of type {}", connName, className);
-        int maxTasks = connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG);
-        List<String> topics = 
connConfig.getList(ConnectorConfig.TOPICS_CONFIG); // Sinks only
-        Properties configs = connConfig.unusedProperties();
-
-        if (connectors.containsKey(connName)) {
-            log.error("Ignoring request to create connector due to conflicting 
connector name");
-            throw new CopycatException("Connector with name " + connName + " 
already exists");
-        }
-
-        final Connector connector;
-        try {
-            connector = instantiateConnector(className);
-        } catch (Throwable t) {
-            // Catches normal exceptions due to instantiation errors as well 
as any runtime errors that
-            // may be caused by user code
-            throw new CopycatException("Failed to create connector instance", 
t);
-        }
-        connector.initialize(new HerderConnectorContext(this, connName));
-        try {
-            connector.start(configs);
-        } catch (CopycatException e) {
-            throw new CopycatException("Connector threw an exception while 
starting", e);
-        }
-        ConnectorState state = new ConnectorState(connName, connector, 
maxTasks, topics);
-        connectors.put(connName, state);
-
-        log.info("Finished creating connector {}", connName);
-
-        return state;
-    }
-
-    private static Connector instantiateConnector(String className) {
-        try {
-            return Utils.newInstance(className, Connector.class);
-        } catch (ClassNotFoundException e) {
-            throw new CopycatException("Couldn't instantiate connector class", 
e);
-        }
-    }
-
-    private void destroyConnector(String connName) {
-        log.info("Destroying connector {}", connName);
-        ConnectorState state = connectors.get(connName);
-        if (state == null) {
-            log.error("Failed to destroy connector {} because it does not 
exist", connName);
-            throw new CopycatException("Connector does not exist");
-        }
-
-        stopConnector(state);
-        connectors.remove(state.name);
-
-        log.info("Finished destroying connector {}", connName);
+        updateConnectorTasks(connName);
     }
 
     // Stops a connectors tasks, then the connector
-    private void stopConnector(ConnectorState state) {
-        removeConnectorTasks(state);
+    private void stopConnector(String connName) {
+        removeConnectorTasks(connName);
         try {
-            state.connector.stop();
+            worker.stopConnector(connName);
+            connectors.remove(connName);
         } catch (CopycatException e) {
-            log.error("Error shutting down connector {}: ", state.connector, 
e);
+            log.error("Error shutting down connector {}: ", connName, e);
         }
     }
 
-    private void createConnectorTasks(ConnectorState state) {
-        String taskClassName = state.connector.taskClass().getName();
-
-        log.info("Creating tasks for connector {} of type {}", state.name, 
taskClassName);
-
-        List<Properties> taskConfigs = 
state.connector.taskConfigs(state.maxTasks);
-
-        // Generate the final configs, including framework provided settings
-        Map<ConnectorTaskId, Properties> taskProps = new HashMap<>();
-        for (int i = 0; i < taskConfigs.size(); i++) {
-            ConnectorTaskId taskId = new ConnectorTaskId(state.name, i);
-            Properties config = taskConfigs.get(i);
-            // TODO: This probably shouldn't be in the Herder. It's nice to 
have Copycat ensure the list of topics
-            // is automatically provided to tasks since it is required by the 
framework, but this
-            String subscriptionTopics = Utils.join(state.inputTopics, ",");
-            if (state.connector instanceof SinkConnector) {
-                // Make sure we don't modify the original since the connector 
may reuse it internally
-                Properties configForSink = new Properties();
-                configForSink.putAll(config);
-                configForSink.setProperty(SinkTask.TOPICS_CONFIG, 
subscriptionTopics);
-                config = configForSink;
-            }
-            taskProps.put(taskId, config);
-        }
+    private void createConnectorTasks(String connName) {
+        ConnectorState state = connectors.get(connName);
+        Map<ConnectorTaskId, Map<String, String>> taskConfigs = 
worker.reconfigureConnectorTasks(connName,
+                state.config.getInt(ConnectorConfig.TASKS_MAX_CONFIG),
+                state.config.getList(ConnectorConfig.TOPICS_CONFIG));
 
-        // And initiate the tasks
-        for (int i = 0; i < taskConfigs.size(); i++) {
-            ConnectorTaskId taskId = new ConnectorTaskId(state.name, i);
-            Properties config = taskProps.get(taskId);
+        for (Map.Entry<ConnectorTaskId, Map<String, String>> taskEntry : 
taskConfigs.entrySet()) {
+            ConnectorTaskId taskId = taskEntry.getKey();
+            TaskConfig config = new TaskConfig(taskEntry.getValue());
             try {
-                worker.addTask(taskId, taskClassName, config);
+                worker.addTask(taskId, config);
                 // We only need to store the task IDs so we can clean up.
                 state.tasks.add(taskId);
             } catch (Throwable e) {
@@ -213,7 +137,8 @@ public class StandaloneHerder implements Herder {
         }
     }
 
-    private void removeConnectorTasks(ConnectorState state) {
+    private void removeConnectorTasks(String connName) {
+        ConnectorState state = connectors.get(connName);
         Iterator<ConnectorTaskId> taskIter = state.tasks.iterator();
         while (taskIter.hasNext()) {
             ConnectorTaskId taskId = taskIter.next();
@@ -228,25 +153,18 @@ public class StandaloneHerder implements Herder {
         }
     }
 
-    private void updateConnectorTasks(ConnectorState state) {
-        removeConnectorTasks(state);
-        createConnectorTasks(state);
+    private void updateConnectorTasks(String connName) {
+        removeConnectorTasks(connName);
+        createConnectorTasks(connName);
     }
 
 
     private static class ConnectorState {
-        public String name;
-        public Connector connector;
-        public int maxTasks;
-        public List<String> inputTopics;
+        public ConnectorConfig config;
         Set<ConnectorTaskId> tasks;
 
-        public ConnectorState(String name, Connector connector, int maxTasks,
-                              List<String> inputTopics) {
-            this.name = name;
-            this.connector = connector;
-            this.maxTasks = maxTasks;
-            this.inputTopics = inputTopics;
+        public ConnectorState(ConnectorConfig config) {
+            this.config = config;
             this.tasks = new HashSet<>();
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/KafkaConfigStorage.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/KafkaConfigStorage.java
 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/KafkaConfigStorage.java
index 366bf13..fb4f70d 100644
--- 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/KafkaConfigStorage.java
+++ 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/KafkaConfigStorage.java
@@ -44,6 +44,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -226,6 +227,7 @@ public class KafkaConfigStorage {
         consumerProps.putAll(configs);
         consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringDeserializer");
         consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
+        consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
 
         configLog = createKafkaBasedLog(topic, producerProps, consumerProps, 
consumedCallback);
     }
@@ -271,9 +273,14 @@ public class KafkaConfigStorage {
      * @param properties the configuration to write
      */
     public void putConnectorConfig(String connector, Map<String, String> 
properties) {
-        Struct copycatConfig = new Struct(CONNECTOR_CONFIGURATION_V0);
-        copycatConfig.put("properties", properties);
-        byte[] serializedConfig = converter.fromCopycatData(topic, 
CONNECTOR_CONFIGURATION_V0, copycatConfig);
+        byte[] serializedConfig;
+        if (properties == null) {
+            serializedConfig = null;
+        } else {
+            Struct copycatConfig = new Struct(CONNECTOR_CONFIGURATION_V0);
+            copycatConfig.put("properties", properties);
+            serializedConfig = converter.fromCopycatData(topic, 
CONNECTOR_CONFIGURATION_V0, copycatConfig);
+        }
 
         try {
             configLog.send(CONNECTOR_KEY(connector), serializedConfig);
@@ -349,6 +356,14 @@ public class KafkaConfigStorage {
         }
     }
 
+    public Future<Void> readToEnd() {
+        return configLog.readToEnd();
+    }
+
+    public void readToEnd(Callback<Void> cb) {
+        configLog.readToEnd(cb);
+    }
+
     private KafkaBasedLog<String, byte[]> createKafkaBasedLog(String topic, 
Map<String, Object> producerProps,
                                                               Map<String, 
Object> consumerProps, Callback<ConsumerRecord<String, byte[]>> 
consumedCallback) {
         return new KafkaBasedLog<>(topic, producerProps, consumerProps, 
consumedCallback, new SystemTime());
@@ -369,22 +384,29 @@ public class KafkaConfigStorage {
                 log.error("Failed to convert config data to Copycat format: ", 
e);
                 return;
             }
-            offset = record.offset();
+            // Make the recorded offset match the API used for positions in 
the consumer -- return the offset of the
+            // *next record*, not the last one consumed.
+            offset = record.offset() + 1;
 
             if (record.key().startsWith(CONNECTOR_PREFIX)) {
                 String connectorName = 
record.key().substring(CONNECTOR_PREFIX.length());
                 synchronized (lock) {
-                    // Connector configs can be applied and callbacks invoked 
immediately
-                    if (!(value.value() instanceof Map)) {
-                        log.error("Found connector configuration (" + 
record.key() + ") in wrong format: " + value.value().getClass());
-                        return;
-                    }
-                    Object newConnectorConfig = ((Map<String, Object>) 
value.value()).get("properties");
-                    if (!(newConnectorConfig instanceof Map)) {
-                        log.error("Invalid data for connector config: 
properties filed should be a Map but is " + newConnectorConfig.getClass());
-                        return;
+                    if (value.value() == null) {
+                        // Connector deletion will be written as a null value
+                        connectorConfigs.remove(connectorName);
+                    } else {
+                        // Connector configs can be applied and callbacks 
invoked immediately
+                        if (!(value.value() instanceof Map)) {
+                            log.error("Found connector configuration (" + 
record.key() + ") in wrong format: " + value.value().getClass());
+                            return;
+                        }
+                        Object newConnectorConfig = ((Map<String, Object>) 
value.value()).get("properties");
+                        if (!(newConnectorConfig instanceof Map)) {
+                            log.error("Invalid data for connector config: 
properties filed should be a Map but is " + newConnectorConfig.getClass());
+                            return;
+                        }
+                        connectorConfigs.put(connectorName, (Map<String, 
String>) newConnectorConfig);
                     }
-                    connectorConfigs.put(connectorName, (Map<String, String>) 
newConnectorConfig);
                 }
                 if (!starting)
                     connectorConfigCallback.onCompletion(null, connectorName);
@@ -445,8 +467,7 @@ public class KafkaConfigStorage {
 
                     Map<ConnectorTaskId, Map<String, String>> deferred = 
deferredTaskUpdates.get(connectorName);
 
-                    Object newTaskCountObj = ((Map<String, Object>) 
value.value()).get("tasks");
-                    Integer newTaskCount = (Integer) newTaskCountObj;
+                    int newTaskCount = intValue(((Map<String, Object>) 
value.value()).get("tasks"));
 
                     // Validate the configs we're supposed to update to ensure 
we're getting a complete configuration
                     // update of all tasks that are expected based on the 
number of tasks in the commit message.
@@ -542,5 +563,16 @@ public class KafkaConfigStorage {
                 return false;
         return true;
     }
+
+    // Convert an integer value extracted from a schemaless struct to an int. 
This handles potentially different
+    // encodings by different Converters.
+    private static int intValue(Object value) {
+        if (value instanceof Integer)
+            return (int) value;
+        else if (value instanceof Long)
+            return (int) (long) value;
+        else
+            throw new CopycatException("Expected integer value to be either 
Integer or Long");
+    }
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/KafkaOffsetBackingStore.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/KafkaOffsetBackingStore.java
 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/KafkaOffsetBackingStore.java
index b5af1fe..b270368 100644
--- 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/KafkaOffsetBackingStore.java
+++ 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/KafkaOffsetBackingStore.java
@@ -68,11 +68,13 @@ public class KafkaOffsetBackingStore implements 
OffsetBackingStore {
         producerProps.putAll(configs);
         producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArraySerializer");
         producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArraySerializer");
+        producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
 
         Map<String, Object> consumerProps = new HashMap<>();
         consumerProps.putAll(configs);
         consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
         consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
+        consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
 
         offsetLog = createKafkaBasedLog(topic, producerProps, consumerProps, 
consumedCallback);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ConnectorTaskId.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ConnectorTaskId.java
 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ConnectorTaskId.java
index 683c634..e3e498c 100644
--- 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ConnectorTaskId.java
+++ 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ConnectorTaskId.java
@@ -23,7 +23,7 @@ import java.io.Serializable;
  * Unique ID for a single task. It includes a unique connector ID and a task 
ID that is unique within
  * the connector.
  */
-public class ConnectorTaskId implements Serializable {
+public class ConnectorTaskId implements Serializable, 
Comparable<ConnectorTaskId> {
     private final String connector;
     private final int task;
 
@@ -68,4 +68,12 @@ public class ConnectorTaskId implements Serializable {
     public String toString() {
         return connector + '-' + task;
     }
+
+    @Override
+    public int compareTo(ConnectorTaskId o) {
+        int connectorCmp = connector.compareTo(o.connector);
+        if (connectorCmp != 0)
+            return connectorCmp;
+        return ((Integer) task).compareTo(o.task);
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java
 
b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java
index 4a30992..19e1462 100644
--- 
a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java
+++ 
b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java
@@ -19,11 +19,19 @@ package org.apache.kafka.copycat.runtime;
 
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.copycat.cli.WorkerConfig;
+import org.apache.kafka.copycat.connector.Connector;
+import org.apache.kafka.copycat.connector.ConnectorContext;
+import org.apache.kafka.copycat.connector.Task;
 import org.apache.kafka.copycat.errors.CopycatException;
+import org.apache.kafka.copycat.sink.SinkTask;
 import org.apache.kafka.copycat.source.SourceRecord;
 import org.apache.kafka.copycat.source.SourceTask;
-import org.apache.kafka.copycat.storage.*;
+import org.apache.kafka.copycat.storage.Converter;
+import org.apache.kafka.copycat.storage.OffsetBackingStore;
+import org.apache.kafka.copycat.storage.OffsetStorageReader;
+import org.apache.kafka.copycat.storage.OffsetStorageWriter;
 import org.apache.kafka.copycat.util.ConnectorTaskId;
 import org.apache.kafka.copycat.util.MockTime;
 import org.apache.kafka.copycat.util.ThreadedTest;
@@ -36,16 +44,24 @@ import 
org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(Worker.class)
 @PowerMockIgnore("javax.management.*")
 public class WorkerTest extends ThreadedTest {
 
-    private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
+    private static final String CONNECTOR_ID = "test-connector";
+    private static final ConnectorTaskId TASK_ID = new ConnectorTaskId("job", 
0);
+
     private WorkerConfig config;
     private Worker worker;
     private OffsetBackingStore offsetBackingStore = 
PowerMock.createMock(OffsetBackingStore.class);
@@ -65,6 +81,146 @@ public class WorkerTest extends ThreadedTest {
     }
 
     @Test
+    public void testAddRemoveConnector() throws Exception {
+        offsetBackingStore.configure(EasyMock.anyObject(Map.class));
+        EasyMock.expectLastCall();
+        offsetBackingStore.start();
+        EasyMock.expectLastCall();
+
+        // Create
+        Connector connector = PowerMock.createMock(Connector.class);
+        ConnectorContext ctx = PowerMock.createMock(ConnectorContext.class);
+
+        PowerMock.mockStatic(Worker.class);
+        PowerMock.expectPrivate(Worker.class, "instantiateConnector", new 
Object[]{TestConnector.class}).andReturn(connector);
+
+        Properties props = new Properties();
+        props.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar");
+        props.put(ConnectorConfig.TASKS_MAX_CONFIG, "1");
+        props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID);
+        props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, 
TestConnector.class.getName());
+
+        connector.initialize(ctx);
+        EasyMock.expectLastCall();
+        connector.start(props);
+        EasyMock.expectLastCall();
+
+        // Remove
+        connector.stop();
+        EasyMock.expectLastCall();
+
+        offsetBackingStore.stop();
+        EasyMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        worker = new Worker(new MockTime(), config, offsetBackingStore);
+        worker.start();
+
+        ConnectorConfig config = new 
ConnectorConfig(Utils.propsToStringMap(props));
+        assertEquals(Collections.emptySet(), worker.connectorNames());
+        worker.addConnector(config, ctx);
+        assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_ID)), 
worker.connectorNames());
+        try {
+            worker.addConnector(config, ctx);
+            fail("Should have thrown exception when trying to add connector 
with same name.");
+        } catch (CopycatException e) {
+            // expected
+        }
+        worker.stopConnector(CONNECTOR_ID);
+        assertEquals(Collections.emptySet(), worker.connectorNames());
+        // Nothing should be left, so this should effectively be a nop
+        worker.stop();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test(expected = CopycatException.class)
+    public void testStopInvalidConnector() {
+        offsetBackingStore.configure(EasyMock.anyObject(Map.class));
+        EasyMock.expectLastCall();
+        offsetBackingStore.start();
+        EasyMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        worker = new Worker(new MockTime(), config, offsetBackingStore);
+        worker.start();
+
+        worker.stopConnector(CONNECTOR_ID);
+    }
+
+    @Test
+    public void testReconfigureConnectorTasks() throws Exception {
+        offsetBackingStore.configure(EasyMock.anyObject(Map.class));
+        EasyMock.expectLastCall();
+        offsetBackingStore.start();
+        EasyMock.expectLastCall();
+
+        // Create
+        Connector connector = PowerMock.createMock(Connector.class);
+        ConnectorContext ctx = PowerMock.createMock(ConnectorContext.class);
+
+        PowerMock.mockStatic(Worker.class);
+        PowerMock.expectPrivate(Worker.class, "instantiateConnector", new 
Object[]{TestConnector.class}).andReturn(connector);
+
+        Properties props = new Properties();
+        props.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar");
+        props.put(ConnectorConfig.TASKS_MAX_CONFIG, "1");
+        props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID);
+        props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, 
TestConnector.class.getName());
+
+        connector.initialize(ctx);
+        EasyMock.expectLastCall();
+        connector.start(props);
+        EasyMock.expectLastCall();
+
+        // Reconfigure
+        EasyMock.<Class<? extends 
Task>>expect(connector.taskClass()).andReturn(TestSourceTask.class);
+        Properties taskProps = new Properties();
+        taskProps.setProperty("foo", "bar");
+        
EasyMock.expect(connector.taskConfigs(2)).andReturn(Arrays.asList(taskProps, 
taskProps));
+
+        // Remove
+        connector.stop();
+        EasyMock.expectLastCall();
+
+        offsetBackingStore.stop();
+        EasyMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        worker = new Worker(new MockTime(), config, offsetBackingStore);
+        worker.start();
+
+        ConnectorConfig config = new 
ConnectorConfig(Utils.propsToStringMap(props));
+        assertEquals(Collections.emptySet(), worker.connectorNames());
+        worker.addConnector(config, ctx);
+        assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_ID)), 
worker.connectorNames());
+        try {
+            worker.addConnector(config, ctx);
+            fail("Should have thrown exception when trying to add connector 
with same name.");
+        } catch (CopycatException e) {
+            // expected
+        }
+        Map<ConnectorTaskId, Map<String, String>> taskConfigs = 
worker.reconfigureConnectorTasks(CONNECTOR_ID, 2, Arrays.asList("foo", "bar"));
+        Properties expectedTaskProps = new Properties();
+        expectedTaskProps.setProperty("foo", "bar");
+        expectedTaskProps.setProperty(TaskConfig.TASK_CLASS_CONFIG, 
TestSourceTask.class.getName());
+        expectedTaskProps.setProperty(SinkTask.TOPICS_CONFIG, "foo,bar");
+        assertEquals(2, taskConfigs.size());
+        assertEquals(expectedTaskProps, taskConfigs.get(new 
ConnectorTaskId(CONNECTOR_ID, 0)));
+        assertEquals(expectedTaskProps, taskConfigs.get(new 
ConnectorTaskId(CONNECTOR_ID, 1)));
+        worker.stopConnector(CONNECTOR_ID);
+        assertEquals(Collections.emptySet(), worker.connectorNames());
+        // Nothing should be left, so this should effectively be a nop
+        worker.stop();
+
+        PowerMock.verifyAll();
+    }
+
+
+    @Test
     public void testAddRemoveTask() throws Exception {
         offsetBackingStore.configure(EasyMock.anyObject(Map.class));
         EasyMock.expectLastCall();
@@ -78,7 +234,7 @@ public class WorkerTest extends ThreadedTest {
         WorkerSourceTask workerTask = 
PowerMock.createMock(WorkerSourceTask.class);
 
         PowerMock.mockStatic(Worker.class);
-        PowerMock.expectPrivate(Worker.class, "instantiateTask", 
TestSourceTask.class.getName()).andReturn(task);
+        PowerMock.expectPrivate(Worker.class, "instantiateTask", new 
Object[]{TestSourceTask.class}).andReturn(task);
 
         PowerMock.expectNew(
                 WorkerSourceTask.class, EasyMock.eq(taskId), EasyMock.eq(task),
@@ -91,6 +247,7 @@ public class WorkerTest extends ThreadedTest {
                 EasyMock.anyObject(Time.class))
                 .andReturn(workerTask);
         Properties origProps = new Properties();
+        origProps.put(TaskConfig.TASK_CLASS_CONFIG, 
TestSourceTask.class.getName());
         workerTask.start(origProps);
         EasyMock.expectLastCall();
 
@@ -108,8 +265,11 @@ public class WorkerTest extends ThreadedTest {
 
         worker = new Worker(new MockTime(), config, offsetBackingStore);
         worker.start();
-        worker.addTask(taskId, TestSourceTask.class.getName(), origProps);
+        assertEquals(Collections.emptySet(), worker.taskIds());
+        worker.addTask(taskId, new 
TaskConfig(Utils.propsToStringMap(origProps)));
+        assertEquals(new HashSet<>(Arrays.asList(taskId)), worker.taskIds());
         worker.stopTask(taskId);
+        assertEquals(Collections.emptySet(), worker.taskIds());
         // Nothing should be left, so this should effectively be a nop
         worker.stop();
 
@@ -128,7 +288,7 @@ public class WorkerTest extends ThreadedTest {
         worker = new Worker(new MockTime(), config, offsetBackingStore);
         worker.start();
 
-        worker.stopTask(taskId);
+        worker.stopTask(TASK_ID);
     }
 
     @Test
@@ -143,10 +303,10 @@ public class WorkerTest extends ThreadedTest {
         WorkerSourceTask workerTask = 
PowerMock.createMock(WorkerSourceTask.class);
 
         PowerMock.mockStatic(Worker.class);
-        PowerMock.expectPrivate(Worker.class, "instantiateTask", 
TestSourceTask.class.getName()).andReturn(task);
+        PowerMock.expectPrivate(Worker.class, "instantiateTask", new 
Object[]{TestSourceTask.class}).andReturn(task);
 
         PowerMock.expectNew(
-                WorkerSourceTask.class, EasyMock.eq(taskId), EasyMock.eq(task),
+                WorkerSourceTask.class, EasyMock.eq(TASK_ID), 
EasyMock.eq(task),
                 EasyMock.anyObject(Converter.class),
                 EasyMock.anyObject(Converter.class),
                 EasyMock.anyObject(KafkaProducer.class),
@@ -156,6 +316,7 @@ public class WorkerTest extends ThreadedTest {
                 EasyMock.anyObject(Time.class))
                 .andReturn(workerTask);
         Properties origProps = new Properties();
+        origProps.put(TaskConfig.TASK_CLASS_CONFIG, 
TestSourceTask.class.getName());
         workerTask.start(origProps);
         EasyMock.expectLastCall();
 
@@ -174,13 +335,35 @@ public class WorkerTest extends ThreadedTest {
 
         worker = new Worker(new MockTime(), config, offsetBackingStore);
         worker.start();
-        worker.addTask(taskId, TestSourceTask.class.getName(), origProps);
+        worker.addTask(TASK_ID, new 
TaskConfig(Utils.propsToStringMap(origProps)));
         worker.stop();
 
         PowerMock.verifyAll();
     }
 
 
+    private static class TestConnector extends Connector {
+        @Override
+        public void start(Properties props) {
+
+        }
+
+        @Override
+        public Class<? extends Task> taskClass() {
+            return null;
+        }
+
+        @Override
+        public List<Properties> taskConfigs(int maxTasks) {
+            return null;
+        }
+
+        @Override
+        public void stop() {
+
+        }
+    }
+
     private static class TestSourceTask extends SourceTask {
         public TestSourceTask() {
         }

Reply via email to