http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/runtime/src/main/java/org/apache/kafka/copycat/errors/AlreadyExistsException.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/errors/AlreadyExistsException.java
 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/errors/AlreadyExistsException.java
deleted file mode 100644
index b09cb53..0000000
--- 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/errors/AlreadyExistsException.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.errors;
-
-/**
- * Indicates the operation tried to create an entity that already exists.
- */
-public class AlreadyExistsException extends CopycatException {
-    public AlreadyExistsException(String s) {
-        super(s);
-    }
-
-    public AlreadyExistsException(String s, Throwable throwable) {
-        super(s, throwable);
-    }
-
-    public AlreadyExistsException(Throwable throwable) {
-        super(throwable);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/runtime/src/main/java/org/apache/kafka/copycat/errors/NotFoundException.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/errors/NotFoundException.java
 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/errors/NotFoundException.java
deleted file mode 100644
index a8e13a9..0000000
--- 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/errors/NotFoundException.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.errors;
-
-/**
- * Indicates that an operation attempted to modify or delete a connector or 
task that is not present on the worker.
- */
-public class NotFoundException extends CopycatException {
-    public NotFoundException(String s) {
-        super(s);
-    }
-
-    public NotFoundException(String s, Throwable throwable) {
-        super(s, throwable);
-    }
-
-    public NotFoundException(Throwable throwable) {
-        super(throwable);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/runtime/src/main/java/org/apache/kafka/copycat/errors/RetriableException.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/errors/RetriableException.java
 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/errors/RetriableException.java
deleted file mode 100644
index 75821aa..0000000
--- 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/errors/RetriableException.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.errors;
-
-/**
- * An exception that indicates the operation can be reattempted.
- */
-public class RetriableException extends CopycatException {
-    public RetriableException(String s) {
-        super(s);
-    }
-
-    public RetriableException(String s, Throwable throwable) {
-        super(s, throwable);
-    }
-
-    public RetriableException(Throwable throwable) {
-        super(throwable);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/ConnectorConfig.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/ConnectorConfig.java
 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/ConnectorConfig.java
deleted file mode 100644
index 2242299..0000000
--- 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/ConnectorConfig.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.runtime;
-
-import org.apache.kafka.common.config.AbstractConfig;
-import org.apache.kafka.common.config.ConfigDef;
-import org.apache.kafka.common.config.ConfigDef.Importance;
-import org.apache.kafka.common.config.ConfigDef.Type;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * <p>
- * Configuration options for Connectors. These only include Copycat 
system-level configuration
- * options (e.g. Connector class name, timeouts used by Copycat to control the 
connector) but does
- * not include Connector-specific options (e.g. database connection settings).
- * </p>
- * <p>
- * Note that some of these options are not required for all connectors. For 
example TOPICS_CONFIG
- * is sink-specific.
- * </p>
- */
-public class ConnectorConfig extends AbstractConfig {
-
-    public static final String NAME_CONFIG = "name";
-    private static final String NAME_DOC = "Globally unique name to use for 
this connector.";
-
-    public static final String CONNECTOR_CLASS_CONFIG = "connector.class";
-    private static final String CONNECTOR_CLASS_DOC =
-            "Name of the class for this connector. Must be a subclass of 
org.apache.kafka.copycat.connector"
-                    + ".Connector";
-
-    public static final String TASKS_MAX_CONFIG = "tasks.max";
-    private static final String TASKS_MAX_DOC = "Maximum number of tasks to 
use for this connector.";
-    public static final int TASKS_MAX_DEFAULT = 1;
-
-    public static final String TOPICS_CONFIG = "topics";
-    private static final String TOPICS_DOC = "";
-    public static final String TOPICS_DEFAULT = "";
-
-    private static ConfigDef config;
-
-    static {
-        config = new ConfigDef()
-                .define(NAME_CONFIG, Type.STRING, Importance.HIGH, NAME_DOC)
-                .define(CONNECTOR_CLASS_CONFIG, Type.CLASS, Importance.HIGH, 
CONNECTOR_CLASS_DOC)
-                .define(TASKS_MAX_CONFIG, Type.INT, TASKS_MAX_DEFAULT, 
Importance.HIGH, TASKS_MAX_DOC)
-                .define(TOPICS_CONFIG, Type.LIST, TOPICS_DEFAULT, 
Importance.HIGH, TOPICS_DOC);
-    }
-
-    public ConnectorConfig() {
-        this(new HashMap<String, String>());
-    }
-
-    public ConnectorConfig(Map<String, String> props) {
-        super(config, props);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Copycat.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Copycat.java 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Copycat.java
deleted file mode 100644
index 81f0b16..0000000
--- 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Copycat.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.runtime;
-
-import org.apache.kafka.common.annotation.InterfaceStability;
-import org.apache.kafka.copycat.runtime.rest.RestServer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * This class ties together all the components of a Copycat process (herder, 
worker,
- * storage, command interface), managing their lifecycle.
- */
-@InterfaceStability.Unstable
-public class Copycat {
-    private static final Logger log = LoggerFactory.getLogger(Copycat.class);
-
-    private final Worker worker;
-    private final Herder herder;
-    private final RestServer rest;
-    private final CountDownLatch startLatch = new CountDownLatch(1);
-    private final CountDownLatch stopLatch = new CountDownLatch(1);
-    private final AtomicBoolean shutdown = new AtomicBoolean(false);
-    private final ShutdownHook shutdownHook;
-
-    public Copycat(Worker worker, Herder herder, RestServer rest) {
-        log.debug("Copycat created");
-        this.worker = worker;
-        this.herder = herder;
-        this.rest = rest;
-        shutdownHook = new ShutdownHook();
-    }
-
-    public void start() {
-        log.info("Copycat starting");
-        Runtime.getRuntime().addShutdownHook(shutdownHook);
-
-        worker.start();
-        herder.start();
-        rest.start(herder);
-
-        log.info("Copycat started");
-
-        startLatch.countDown();
-    }
-
-    public void stop() {
-        boolean wasShuttingDown = shutdown.getAndSet(true);
-        if (!wasShuttingDown) {
-            log.info("Copycat stopping");
-
-            rest.stop();
-            herder.stop();
-            worker.stop();
-
-            log.info("Copycat stopped");
-        }
-
-        stopLatch.countDown();
-    }
-
-    public void awaitStop() {
-        try {
-            stopLatch.await();
-        } catch (InterruptedException e) {
-            log.error("Interrupted waiting for Copycat to shutdown");
-        }
-    }
-
-    private class ShutdownHook extends Thread {
-        @Override
-        public void run() {
-            try {
-                startLatch.await();
-                Copycat.this.stop();
-            } catch (InterruptedException e) {
-                log.error("Interrupted in shutdown hook while waiting for 
copycat startup to finish");
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Herder.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Herder.java 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Herder.java
deleted file mode 100644
index 0b03c9a..0000000
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Herder.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.runtime;
-
-import org.apache.kafka.copycat.runtime.rest.entities.ConnectorInfo;
-import org.apache.kafka.copycat.runtime.rest.entities.TaskInfo;
-import org.apache.kafka.copycat.util.Callback;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-
-/**
- * <p>
- * The herder interface tracks and manages workers and connectors. It is the 
main interface for external components
- * to make changes to the state of the cluster. For example, in distributed 
mode, an implementation of this class
- * knows how to accept a connector configuration, may need to route it to the 
current leader worker for the cluster so
- * the config can be written to persistent storage, and then ensures the new 
connector is correctly instantiated on one
- * of the workers.
- * </p>
- * <p>
- * This class must implement all the actions that can be taken on the cluster 
(add/remove connectors, pause/resume tasks,
- * get state of connectors and tasks, etc). The non-Java interfaces to the 
cluster (REST API and CLI) are very simple
- * wrappers of the functionality provided by this interface.
- * </p>
- * <p>
- * In standalone mode, this implementation of this class will be trivial 
because no coordination is needed. In that case,
- * the implementation will mainly be delegating tasks directly to other 
components. For example, when creating a new
- * connector in standalone mode, there is no need to persist the config and 
the connector and its tasks must run in the
- * same process, so the standalone herder implementation can immediately 
instantiate and start the connector and its
- * tasks.
- * </p>
- */
-public interface Herder {
-
-    void start();
-
-    void stop();
-
-    /**
-     * Get a list of connectors currently running in this cluster. This is a 
full list of connectors in the cluster gathered
-     * from the current configuration. However, note
-     *
-     * @returns A list of connector names
-     * @throws org.apache.kafka.copycat.runtime.distributed.NotLeaderException 
if this node can not resolve the request
-     *         (e.g., because it has not joined the cluster or does not have 
configs in sync with the group) and it is
-     *         also not the leader
-     * @throws org.apache.kafka.copycat.errors.CopycatException if this node 
is the leader, but still cannot resolve the
-     *         request (e.g., it is not in sync with other worker's config 
state)
-     */
-    void connectors(Callback<Collection<String>> callback);
-
-    /**
-     * Get the definition and status of a connector.
-     */
-    void connectorInfo(String connName, Callback<ConnectorInfo> callback);
-
-    /**
-     * Get the configuration for a connector.
-     * @param connName name of the connector
-     * @param callback callback to invoke with the configuration
-     */
-    void connectorConfig(String connName, Callback<Map<String, String>> 
callback);
-
-    /**
-     * Set the configuration for a connector. This supports creation, update, 
and deletion.
-     * @param connName name of the connector
-     * @param config the connectors configuration, or null if deleting the 
connector
-     * @param allowReplace if true, allow overwriting previous configs; if 
false, throw AlreadyExistsException if a connector
-     *                     with the same name already exists
-     * @param callback callback to invoke when the configuration has been 
written
-     */
-    void putConnectorConfig(String connName, Map<String, String> config, 
boolean allowReplace, Callback<Created<ConnectorInfo>> callback);
-
-    /**
-     * Requests reconfiguration of the task. This should only be triggered by
-     * {@link HerderConnectorContext}.
-     *
-     * @param connName name of the connector that should be reconfigured
-     */
-    void requestTaskReconfiguration(String connName);
-
-    /**
-     * Get the configurations for the current set of tasks of a connector.
-     * @param connName connector to update
-     * @param callback callback to invoke upon completion
-     */
-    void taskConfigs(String connName, Callback<List<TaskInfo>> callback);
-
-    /**
-     * Set the configurations for the tasks of a connector. This should always 
include all tasks in the connector; if
-     * there are existing configurations and fewer are provided, this will 
reduce the number of tasks, and if more are
-     * provided it will increase the number of tasks.
-     * @param connName connector to update
-     * @param configs list of configurations
-     * @param callback callback to invoke upon completion
-     */
-    void putTaskConfigs(String connName, List<Map<String, String>> configs, 
Callback<Void> callback);
-
-
-    class Created<T> {
-        private final boolean created;
-        private final T result;
-
-        public Created(boolean created, T result) {
-            this.created = created;
-            this.result = result;
-        }
-
-        public boolean created() {
-            return created;
-        }
-
-        public T result() {
-            return result;
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (this == o) return true;
-            if (o == null || getClass() != o.getClass()) return false;
-            Created<?> created1 = (Created<?>) o;
-            return Objects.equals(created, created1.created) &&
-                    Objects.equals(result, created1.result);
-        }
-
-        @Override
-        public int hashCode() {
-            return Objects.hash(created, result);
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/HerderConnectorContext.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/HerderConnectorContext.java
 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/HerderConnectorContext.java
deleted file mode 100644
index 7a64bd5..0000000
--- 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/HerderConnectorContext.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.runtime;
-
-import org.apache.kafka.copycat.connector.ConnectorContext;
-
-/**
- * ConnectorContext for use with the StandaloneHerder, which maintains all 
connectors and tasks
- * in a single process.
- */
-public class HerderConnectorContext implements ConnectorContext {
-
-    private Herder herder;
-    private String connectorName;
-
-    public HerderConnectorContext(Herder herder, String connectorName) {
-        this.herder = herder;
-        this.connectorName = connectorName;
-    }
-
-    @Override
-    public void requestTaskReconfiguration() {
-        // This is trivial to forward since there is only one herder and it's 
in memory in this
-        // process
-        herder.requestTaskReconfiguration(connectorName);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/SourceTaskOffsetCommitter.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/SourceTaskOffsetCommitter.java
 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/SourceTaskOffsetCommitter.java
deleted file mode 100644
index 6bb51b9..0000000
--- 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/SourceTaskOffsetCommitter.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.runtime;
-
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.copycat.errors.CopycatException;
-import org.apache.kafka.copycat.util.ConnectorTaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-
-/**
- * <p>
- * Manages offset commit scheduling and execution for SourceTasks.
- * </p>
- * <p>
- * Unlike sink tasks which directly manage their offset commits in the main 
poll() thread since
- * they drive the event loop and control (for all intents and purposes) the 
timeouts, source
- * tasks are at the whim of the connector and cannot be guaranteed to wake up 
on the necessary
- * schedule. Instead, this class tracks all the active tasks, their schedule 
for commits, and
- * ensures they are invoked in a timely fashion.
- * </p>
- */
-class SourceTaskOffsetCommitter {
-    private static final Logger log = 
LoggerFactory.getLogger(SourceTaskOffsetCommitter.class);
-
-    private Time time;
-    private WorkerConfig config;
-    private ScheduledExecutorService commitExecutorService = null;
-    private HashMap<ConnectorTaskId, ScheduledCommitTask> committers = new 
HashMap<>();
-
-    SourceTaskOffsetCommitter(Time time, WorkerConfig config) {
-        this.time = time;
-        this.config = config;
-        commitExecutorService = Executors.newSingleThreadScheduledExecutor();
-    }
-
-    public void close(long timeoutMs) {
-        commitExecutorService.shutdown();
-        try {
-            if (!commitExecutorService.awaitTermination(timeoutMs, 
TimeUnit.MILLISECONDS)) {
-                log.error("Graceful shutdown of offset commitOffsets thread 
timed out.");
-            }
-        } catch (InterruptedException e) {
-            // ignore and allow to exit immediately
-        }
-    }
-
-    public void schedule(final ConnectorTaskId id, final WorkerSourceTask 
workerTask) {
-        synchronized (committers) {
-            long commitIntervalMs = 
config.getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG);
-            ScheduledFuture<?> commitFuture = 
commitExecutorService.schedule(new Runnable() {
-                @Override
-                public void run() {
-                    commit(id, workerTask);
-                }
-            }, commitIntervalMs, TimeUnit.MILLISECONDS);
-            committers.put(id, new ScheduledCommitTask(commitFuture));
-        }
-    }
-
-    public void remove(ConnectorTaskId id) {
-        final ScheduledCommitTask task;
-        synchronized (committers) {
-            task = committers.remove(id);
-            task.cancelled = true;
-            task.commitFuture.cancel(false);
-        }
-        if (task.finishedLatch != null) {
-            try {
-                task.finishedLatch.await();
-            } catch (InterruptedException e) {
-                throw new CopycatException("Unexpected interruption in 
SourceTaskOffsetCommitter.", e);
-            }
-        }
-    }
-
-    public void commit(ConnectorTaskId id, WorkerSourceTask workerTask) {
-        final ScheduledCommitTask task;
-        synchronized (committers) {
-            task = committers.get(id);
-            if (task == null || task.cancelled)
-                return;
-            task.finishedLatch = new CountDownLatch(1);
-        }
-
-        try {
-            log.debug("Committing offsets for {}", workerTask);
-            boolean success = workerTask.commitOffsets();
-            if (!success) {
-                log.error("Failed to commit offsets for {}", workerTask);
-            }
-        } catch (Throwable t) {
-            // We're very careful about exceptions here since any uncaught 
exceptions in the commit
-            // thread would cause the fixed interval schedule on the 
ExecutorService to stop running
-            // for that task
-            log.error("Unhandled exception when committing {}: ", workerTask, 
t);
-        } finally {
-            synchronized (committers) {
-                task.finishedLatch.countDown();
-                if (!task.cancelled)
-                    schedule(id, workerTask);
-            }
-        }
-    }
-
-    private static class ScheduledCommitTask {
-        ScheduledFuture<?> commitFuture;
-        boolean cancelled;
-        CountDownLatch finishedLatch;
-
-        ScheduledCommitTask(ScheduledFuture<?> commitFuture) {
-            this.commitFuture = commitFuture;
-            this.cancelled = false;
-            this.finishedLatch = null;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/TaskConfig.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/TaskConfig.java
 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/TaskConfig.java
deleted file mode 100644
index be97879..0000000
--- 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/TaskConfig.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.runtime;
-
-import org.apache.kafka.common.config.AbstractConfig;
-import org.apache.kafka.common.config.ConfigDef;
-import org.apache.kafka.common.config.ConfigDef.Importance;
-import org.apache.kafka.common.config.ConfigDef.Type;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * <p>
- * Configuration options for Tasks. These only include Copycat system-level 
configuration
- * options.
- * </p>
- */
-public class TaskConfig extends AbstractConfig {
-
-    public static final String TASK_CLASS_CONFIG = "task.class";
-    private static final String TASK_CLASS_DOC =
-            "Name of the class for this task. Must be a subclass of 
org.apache.kafka.copycat.connector.Task";
-
-    private static ConfigDef config;
-
-    static {
-        config = new ConfigDef()
-                .define(TASK_CLASS_CONFIG, Type.CLASS, Importance.HIGH, 
TASK_CLASS_DOC);
-    }
-
-    public TaskConfig() {
-        this(new HashMap<String, String>());
-    }
-
-    public TaskConfig(Map<String, ?> props) {
-        super(config, props);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java
deleted file mode 100644
index 91fa175..0000000
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java
+++ /dev/null
@@ -1,331 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.runtime;
-
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.utils.SystemTime;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.copycat.connector.Connector;
-import org.apache.kafka.copycat.connector.ConnectorContext;
-import org.apache.kafka.copycat.connector.Task;
-import org.apache.kafka.copycat.errors.CopycatException;
-import org.apache.kafka.copycat.sink.SinkTask;
-import org.apache.kafka.copycat.source.SourceTask;
-import org.apache.kafka.copycat.storage.*;
-import org.apache.kafka.copycat.util.ConnectorTaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * <p>
- * Worker runs a (dynamic) set of tasks in a set of threads, doing the work of 
actually moving
- * data to/from Kafka.
- * </p>
- * <p>
- * Since each task has a dedicated thread, this is mainly just a container for 
them.
- * </p>
- */
-public class Worker {
-    private static final Logger log = LoggerFactory.getLogger(Worker.class);
-
-    private Time time;
-    private WorkerConfig config;
-    private Converter keyConverter;
-    private Converter valueConverter;
-    private Converter internalKeyConverter;
-    private Converter internalValueConverter;
-    private OffsetBackingStore offsetBackingStore;
-    private HashMap<String, Connector> connectors = new HashMap<>();
-    private HashMap<ConnectorTaskId, WorkerTask> tasks = new HashMap<>();
-    private KafkaProducer<byte[], byte[]> producer;
-    private SourceTaskOffsetCommitter sourceTaskOffsetCommitter;
-
-    public Worker(WorkerConfig config, OffsetBackingStore offsetBackingStore) {
-        this(new SystemTime(), config, offsetBackingStore);
-    }
-
-    @SuppressWarnings("unchecked")
-    public Worker(Time time, WorkerConfig config, OffsetBackingStore 
offsetBackingStore) {
-        this.time = time;
-        this.config = config;
-        this.keyConverter = 
config.getConfiguredInstance(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, 
Converter.class);
-        
this.keyConverter.configure(config.originalsWithPrefix("key.converter."), true);
-        this.valueConverter = 
config.getConfiguredInstance(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, 
Converter.class);
-        
this.valueConverter.configure(config.originalsWithPrefix("value.converter."), 
false);
-        this.internalKeyConverter = 
config.getConfiguredInstance(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, 
Converter.class);
-        
this.internalKeyConverter.configure(config.originalsWithPrefix("internal.key.converter."),
 true);
-        this.internalValueConverter = 
config.getConfiguredInstance(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG,
 Converter.class);
-        
this.internalValueConverter.configure(config.originalsWithPrefix("internal.value.converter."),
 false);
-
-        this.offsetBackingStore = offsetBackingStore;
-        this.offsetBackingStore.configure(config.originals());
-    }
-
-    public void start() {
-        log.info("Worker starting");
-
-        Map<String, Object> producerProps = new HashMap<>();
-        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
Utils.join(config.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ","));
-        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArraySerializer");
-        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArraySerializer");
-        producerProps.putAll(config.unusedConfigs());
-
-        producer = new KafkaProducer<>(producerProps);
-
-        offsetBackingStore.start();
-        sourceTaskOffsetCommitter = new SourceTaskOffsetCommitter(time, 
config);
-
-        log.info("Worker started");
-    }
-
-    public void stop() {
-        log.info("Worker stopping");
-
-        long started = time.milliseconds();
-        long limit = started + 
config.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG);
-
-        for (Map.Entry<String, Connector> entry : connectors.entrySet()) {
-            Connector conn = entry.getValue();
-            log.warn("Shutting down connector {} uncleanly; herder should have 
shut down connectors before the" +
-                    "Worker is stopped.", conn);
-            try {
-                conn.stop();
-            } catch (CopycatException e) {
-                log.error("Error while shutting down connector " + conn, e);
-            }
-        }
-
-        for (Map.Entry<ConnectorTaskId, WorkerTask> entry : tasks.entrySet()) {
-            WorkerTask task = entry.getValue();
-            log.warn("Shutting down task {} uncleanly; herder should have shut 
down "
-                    + "tasks before the Worker is stopped.", task);
-            try {
-                task.stop();
-            } catch (CopycatException e) {
-                log.error("Error while shutting down task " + task, e);
-            }
-        }
-
-        for (Map.Entry<ConnectorTaskId, WorkerTask> entry : tasks.entrySet()) {
-            WorkerTask task = entry.getValue();
-            log.debug("Waiting for task {} to finish shutting down", task);
-            if (!task.awaitStop(Math.max(limit - time.milliseconds(), 0)))
-                log.error("Graceful shutdown of task {} failed.", task);
-            task.close();
-        }
-
-        long timeoutMs = limit - time.milliseconds();
-        sourceTaskOffsetCommitter.close(timeoutMs);
-
-        offsetBackingStore.stop();
-
-        log.info("Worker stopped");
-    }
-
-    public WorkerConfig config() {
-        return config;
-    }
-
-    /**
-     * Add a new connector.
-     * @param connConfig connector configuration
-     * @param ctx context for the connector
-     */
-    public void addConnector(ConnectorConfig connConfig, ConnectorContext ctx) 
{
-        String connName = connConfig.getString(ConnectorConfig.NAME_CONFIG);
-        Class<?> maybeConnClass = 
connConfig.getClass(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
-        log.info("Creating connector {} of type {}", connName, 
maybeConnClass.getName());
-
-        Class<? extends Connector> connClass;
-        try {
-            connClass = maybeConnClass.asSubclass(Connector.class);
-        } catch (ClassCastException e) {
-            throw new CopycatException("Specified class is not a subclass of 
Connector: " + maybeConnClass.getName());
-        }
-
-        if (connectors.containsKey(connName))
-            throw new CopycatException("Connector with name " + connName + " 
already exists");
-
-        final Connector connector = instantiateConnector(connClass);
-        log.info("Instantiated connector {} with version {} of type {}", 
connName, connector.version(), connClass.getName());
-        connector.initialize(ctx);
-        try {
-            connector.start(connConfig.originalsStrings());
-        } catch (CopycatException e) {
-            throw new CopycatException("Connector threw an exception while 
starting", e);
-        }
-
-        connectors.put(connName, connector);
-
-        log.info("Finished creating connector {}", connName);
-    }
-
-    private static Connector instantiateConnector(Class<? extends Connector> 
connClass) {
-        try {
-            return Utils.newInstance(connClass);
-        } catch (Throwable t) {
-            // Catches normal exceptions due to instantiation errors as well 
as any runtime errors that
-            // may be caused by user code
-            throw new CopycatException("Failed to create connector instance", 
t);
-        }
-    }
-
-    public List<Map<String, String>> connectorTaskConfigs(String connName, int 
maxTasks, List<String> sinkTopics) {
-        log.trace("Reconfiguring connector tasks for {}", connName);
-
-        Connector connector = connectors.get(connName);
-        if (connector == null)
-            throw new CopycatException("Connector " + connName + " not found 
in this worker.");
-
-        List<Map<String, String>> result = new ArrayList<>();
-        String taskClassName = connector.taskClass().getName();
-        for (Map<String, String> taskProps : connector.taskConfigs(maxTasks)) {
-            Map<String, String> taskConfig = new HashMap<>(taskProps); // 
Ensure we don't modify the connector's copy of the config
-            taskConfig.put(TaskConfig.TASK_CLASS_CONFIG, taskClassName);
-            if (sinkTopics != null)
-                taskConfig.put(SinkTask.TOPICS_CONFIG, Utils.join(sinkTopics, 
","));
-            result.add(taskConfig);
-        }
-        return result;
-    }
-
-    public void stopConnector(String connName) {
-        log.info("Stopping connector {}", connName);
-
-        Connector connector = connectors.get(connName);
-        if (connector == null)
-            throw new CopycatException("Connector " + connName + " not found 
in this worker.");
-
-        try {
-            connector.stop();
-        } catch (CopycatException e) {
-            log.error("Error shutting down connector {}: ", connector, e);
-        }
-
-        connectors.remove(connName);
-
-        log.info("Stopped connector {}", connName);
-    }
-
-    /**
-     * Get the IDs of the connectors currently running in this worker.
-     */
-    public Set<String> connectorNames() {
-        return connectors.keySet();
-    }
-
-    /**
-     * Add a new task.
-     * @param id Globally unique ID for this task.
-     * @param taskConfig the parsed task configuration
-     */
-    public void addTask(ConnectorTaskId id, TaskConfig taskConfig) {
-        log.info("Creating task {}", id);
-
-        if (tasks.containsKey(id)) {
-            String msg = "Task already exists in this worker; the herder 
should not have requested "
-                    + "that this : " + id;
-            log.error(msg);
-            throw new CopycatException(msg);
-        }
-
-        Class<? extends Task> taskClass = 
taskConfig.getClass(TaskConfig.TASK_CLASS_CONFIG).asSubclass(Task.class);
-        final Task task = instantiateTask(taskClass);
-        log.info("Instantiated task {} with version {} of type {}", id, 
task.version(), taskClass.getName());
-
-        // Decide which type of worker task we need based on the type of task.
-        final WorkerTask workerTask;
-        if (task instanceof SourceTask) {
-            SourceTask sourceTask = (SourceTask) task;
-            OffsetStorageReader offsetReader = new 
OffsetStorageReaderImpl(offsetBackingStore, id.connector(),
-                    internalKeyConverter, internalValueConverter);
-            OffsetStorageWriter offsetWriter = new 
OffsetStorageWriter(offsetBackingStore, id.connector(),
-                    internalKeyConverter, internalValueConverter);
-            workerTask = new WorkerSourceTask(id, sourceTask, keyConverter, 
valueConverter, producer,
-                    offsetReader, offsetWriter, config, time);
-        } else if (task instanceof SinkTask) {
-            workerTask = new WorkerSinkTask(id, (SinkTask) task, config, 
keyConverter, valueConverter, time);
-        } else {
-            log.error("Tasks must be a subclass of either SourceTask or 
SinkTask", task);
-            throw new CopycatException("Tasks must be a subclass of either 
SourceTask or SinkTask");
-        }
-
-        // Start the task before adding modifying any state, any exceptions 
are caught higher up the
-        // call chain and there's no cleanup to do here
-        workerTask.start(taskConfig.originalsStrings());
-        if (task instanceof SourceTask) {
-            WorkerSourceTask workerSourceTask = (WorkerSourceTask) workerTask;
-            sourceTaskOffsetCommitter.schedule(id, workerSourceTask);
-        }
-        tasks.put(id, workerTask);
-    }
-
-    private static Task instantiateTask(Class<? extends Task> taskClass) {
-        try {
-            return Utils.newInstance(taskClass);
-        } catch (KafkaException e) {
-            throw new CopycatException("Task class not found", e);
-        }
-    }
-
-    public void stopTask(ConnectorTaskId id) {
-        log.info("Stopping task {}", id);
-
-        WorkerTask task = getTask(id);
-        if (task instanceof WorkerSourceTask)
-            sourceTaskOffsetCommitter.remove(id);
-        task.stop();
-        if 
(!task.awaitStop(config.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG)))
-            log.error("Graceful stop of task {} failed.", task);
-        task.close();
-        tasks.remove(id);
-    }
-
-    /**
-     * Get the IDs of the tasks currently running in this worker.
-     */
-    public Set<ConnectorTaskId> taskIds() {
-        return tasks.keySet();
-    }
-
-    private WorkerTask getTask(ConnectorTaskId id) {
-        WorkerTask task = tasks.get(id);
-        if (task == null) {
-            log.error("Task not found: " + id);
-            throw new CopycatException("Task not found: " + id);
-        }
-        return task;
-    }
-
-    public Converter getInternalKeyConverter() {
-        return internalKeyConverter;
-    }
-
-    public Converter getInternalValueConverter() {
-        return internalValueConverter;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerConfig.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerConfig.java
 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerConfig.java
deleted file mode 100644
index b962d54..0000000
--- 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerConfig.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.runtime;
-
-import org.apache.kafka.common.annotation.InterfaceStability;
-import org.apache.kafka.common.config.AbstractConfig;
-import org.apache.kafka.common.config.ConfigDef;
-import org.apache.kafka.common.config.ConfigDef.Importance;
-import org.apache.kafka.common.config.ConfigDef.Type;
-
-import java.util.Map;
-
-/**
- * Common base class providing configuration for Copycat workers, whether 
standalone or distributed.
- */
-@InterfaceStability.Unstable
-public class WorkerConfig extends AbstractConfig {
-
-    public static final String CLUSTER_CONFIG = "cluster";
-    private static final String CLUSTER_CONFIG_DOC =
-            "ID for this cluster, which is used to provide a namespace so 
multiple Copycat clusters "
-                    + "or instances may co-exist while sharing a single Kafka 
cluster.";
-    public static final String CLUSTER_DEFAULT = "copycat";
-
-    public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";
-    public static final String BOOTSTRAP_SERVERS_DOC
-            = "A list of host/port pairs to use for establishing the initial 
connection to the Kafka "
-            + "cluster. The client will make use of all servers irrespective 
of which servers are "
-            + "specified here for bootstrapping&mdash;this list only impacts 
the initial hosts used "
-            + "to discover the full set of servers. This list should be in the 
form "
-            + "<code>host1:port1,host2:port2,...</code>. Since these servers 
are just used for the "
-            + "initial connection to discover the full cluster membership 
(which may change "
-            + "dynamically), this list need not contain the full set of 
servers (you may want more "
-            + "than one, though, in case a server is down).";
-    public static final String BOOTSTRAP_SERVERS_DEFAULT = "localhost:9092";
-
-    public static final String KEY_CONVERTER_CLASS_CONFIG = "key.converter";
-    public static final String KEY_CONVERTER_CLASS_DOC =
-            "Converter class for key Copycat data that implements the 
<code>Converter</code> interface.";
-
-    public static final String VALUE_CONVERTER_CLASS_CONFIG = 
"value.converter";
-    public static final String VALUE_CONVERTER_CLASS_DOC =
-            "Converter class for value Copycat data that implements the 
<code>Converter</code> interface.";
-
-    public static final String INTERNAL_KEY_CONVERTER_CLASS_CONFIG = 
"internal.key.converter";
-    public static final String INTERNAL_KEY_CONVERTER_CLASS_DOC =
-            "Converter class for internal key Copycat data that implements the 
<code>Converter</code> interface. Used for converting data like offsets and 
configs.";
-
-    public static final String INTERNAL_VALUE_CONVERTER_CLASS_CONFIG = 
"internal.value.converter";
-    public static final String INTERNAL_VALUE_CONVERTER_CLASS_DOC =
-            "Converter class for offset value Copycat data that implements the 
<code>Converter</code> interface. Used for converting data like offsets and 
configs.";
-
-    public static final String TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG
-            = "task.shutdown.graceful.timeout.ms";
-    private static final String TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DOC =
-            "Amount of time to wait for tasks to shutdown gracefully. This is 
the total amount of time,"
-                    + " not per task. All task have shutdown triggered, then 
they are waited on sequentially.";
-    private static final String TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DEFAULT = 
"5000";
-
-    public static final String OFFSET_COMMIT_INTERVAL_MS_CONFIG = 
"offset.flush.interval.ms";
-    private static final String OFFSET_COMMIT_INTERVAL_MS_DOC
-            = "Interval at which to try committing offsets for tasks.";
-    public static final long OFFSET_COMMIT_INTERVAL_MS_DEFAULT = 60000L;
-
-    public static final String OFFSET_COMMIT_TIMEOUT_MS_CONFIG = 
"offset.flush.timeout.ms";
-    private static final String OFFSET_COMMIT_TIMEOUT_MS_DOC
-            = "Maximum number of milliseconds to wait for records to flush and 
partition offset data to be"
-            + " committed to offset storage before cancelling the process and 
restoring the offset "
-            + "data to be committed in a future attempt.";
-    public static final long OFFSET_COMMIT_TIMEOUT_MS_DEFAULT = 5000L;
-
-    public static final String REST_HOST_NAME_CONFIG = "rest.host.name";
-    private static final String REST_HOST_NAME_DOC
-            = "Hostname for the REST API. If this is set, it will only bind to 
this interface.";
-
-    public static final String REST_PORT_CONFIG = "rest.port";
-    private static final String REST_PORT_DOC
-            = "Port for the REST API to listen on.";
-    public static final int REST_PORT_DEFAULT = 8083;
-
-    public static final String REST_ADVERTISED_HOST_NAME_CONFIG = 
"rest.advertised.host.name";
-    private static final String REST_ADVERTISED_HOST_NAME_DOC
-            = "If this is set, this is the hostname that will be given out to 
other workers to connect to.";
-
-    public static final String REST_ADVERTISED_PORT_CONFIG = 
"rest.advertised.port";
-    private static final String REST_ADVERTISED_PORT_DOC
-            = "If this is set, this is the port that will be given out to 
other workers to connect to.";
-
-    /**
-     * Get a basic ConfigDef for a WorkerConfig. This includes all the common 
settings. Subclasses can use this to
-     * bootstrap their own ConfigDef.
-     * @return a ConfigDef with all the common options specified
-     */
-    protected static ConfigDef baseConfigDef() {
-        return new ConfigDef()
-                .define(CLUSTER_CONFIG, Type.STRING, CLUSTER_DEFAULT, 
Importance.HIGH, CLUSTER_CONFIG_DOC)
-                .define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, 
BOOTSTRAP_SERVERS_DEFAULT,
-                        Importance.HIGH, BOOTSTRAP_SERVERS_DOC)
-                .define(KEY_CONVERTER_CLASS_CONFIG, Type.CLASS,
-                        Importance.HIGH, KEY_CONVERTER_CLASS_DOC)
-                .define(VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS,
-                        Importance.HIGH, VALUE_CONVERTER_CLASS_DOC)
-                .define(INTERNAL_KEY_CONVERTER_CLASS_CONFIG, Type.CLASS,
-                        Importance.HIGH, INTERNAL_KEY_CONVERTER_CLASS_DOC)
-                .define(INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS,
-                        Importance.HIGH, INTERNAL_VALUE_CONVERTER_CLASS_DOC)
-                .define(TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG, Type.LONG,
-                        TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DEFAULT, 
Importance.LOW,
-                        TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DOC)
-                .define(OFFSET_COMMIT_INTERVAL_MS_CONFIG, Type.LONG, 
OFFSET_COMMIT_INTERVAL_MS_DEFAULT,
-                        Importance.LOW, OFFSET_COMMIT_INTERVAL_MS_DOC)
-                .define(OFFSET_COMMIT_TIMEOUT_MS_CONFIG, Type.LONG, 
OFFSET_COMMIT_TIMEOUT_MS_DEFAULT,
-                        Importance.LOW, OFFSET_COMMIT_TIMEOUT_MS_DOC)
-                .define(REST_HOST_NAME_CONFIG, Type.STRING, null, 
Importance.LOW, REST_HOST_NAME_DOC)
-                .define(REST_PORT_CONFIG, Type.INT, REST_PORT_DEFAULT, 
Importance.LOW, REST_PORT_DOC)
-                .define(REST_ADVERTISED_HOST_NAME_CONFIG, Type.STRING,  null, 
Importance.LOW, REST_ADVERTISED_HOST_NAME_DOC)
-                .define(REST_ADVERTISED_PORT_CONFIG, Type.INT,  null, 
Importance.LOW, REST_ADVERTISED_PORT_DOC);
-    }
-
-    public WorkerConfig(ConfigDef definition, Map<String, String> props) {
-        super(definition, props);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
deleted file mode 100644
index ad6d872..0000000
--- 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
+++ /dev/null
@@ -1,370 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.runtime;
-
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.clients.consumer.OffsetCommitCallback;
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.copycat.data.SchemaAndValue;
-import org.apache.kafka.copycat.errors.CopycatException;
-import org.apache.kafka.copycat.errors.RetriableException;
-import org.apache.kafka.copycat.sink.SinkRecord;
-import org.apache.kafka.copycat.sink.SinkTask;
-import org.apache.kafka.copycat.storage.Converter;
-import org.apache.kafka.copycat.util.ConnectorTaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-/**
- * WorkerTask that uses a SinkTask to export data from Kafka.
- */
-class WorkerSinkTask implements WorkerTask {
-    private static final Logger log = 
LoggerFactory.getLogger(WorkerSinkTask.class);
-
-    private final ConnectorTaskId id;
-    private final SinkTask task;
-    private final WorkerConfig workerConfig;
-    private final Time time;
-    private final Converter keyConverter;
-    private final Converter valueConverter;
-    private WorkerSinkTaskThread workThread;
-    private Map<String, String> taskProps;
-    private KafkaConsumer<byte[], byte[]> consumer;
-    private WorkerSinkTaskContext context;
-    private boolean started;
-    private final List<SinkRecord> messageBatch;
-    private Map<TopicPartition, OffsetAndMetadata> lastCommittedOffsets;
-    private Map<TopicPartition, OffsetAndMetadata> currentOffsets;
-    private boolean pausedForRedelivery;
-
-    public WorkerSinkTask(ConnectorTaskId id, SinkTask task, WorkerConfig 
workerConfig,
-                          Converter keyConverter, Converter valueConverter, 
Time time) {
-        this.id = id;
-        this.task = task;
-        this.workerConfig = workerConfig;
-        this.keyConverter = keyConverter;
-        this.valueConverter = valueConverter;
-        this.time = time;
-        this.started = false;
-        this.messageBatch = new ArrayList<>();
-        this.currentOffsets = new HashMap<>();
-        this.pausedForRedelivery = false;
-    }
-
-    @Override
-    public void start(Map<String, String> props) {
-        taskProps = props;
-        consumer = createConsumer();
-        context = new WorkerSinkTaskContext(consumer);
-
-        workThread = createWorkerThread();
-        workThread.start();
-    }
-
-    @Override
-    public void stop() {
-        // Offset commit is handled upon exit in work thread
-        if (workThread != null)
-            workThread.startGracefulShutdown();
-        consumer.wakeup();
-    }
-
-    @Override
-    public boolean awaitStop(long timeoutMs) {
-        boolean success = true;
-        if (workThread != null) {
-            try {
-                success = workThread.awaitShutdown(timeoutMs, 
TimeUnit.MILLISECONDS);
-                if (!success)
-                    workThread.forceShutdown();
-            } catch (InterruptedException e) {
-                success = false;
-            }
-        }
-        task.stop();
-        return success;
-    }
-
-    @Override
-    public void close() {
-        // FIXME Kafka needs to add a timeout parameter here for us to 
properly obey the timeout
-        // passed in
-        if (consumer != null)
-            consumer.close();
-    }
-
-    /**
-     * Preforms initial join process for consumer group, ensures we have an 
assignment, and initializes + starts the
-     * SinkTask.
-     *
-     * @returns true if successful, false if joining the consumer group was 
interrupted
-     */
-    public boolean joinConsumerGroupAndStart() {
-        String topicsStr = taskProps.get(SinkTask.TOPICS_CONFIG);
-        if (topicsStr == null || topicsStr.isEmpty())
-            throw new CopycatException("Sink tasks require a list of topics.");
-        String[] topics = topicsStr.split(",");
-        log.debug("Task {} subscribing to topics {}", id, topics);
-        consumer.subscribe(Arrays.asList(topics), new HandleRebalance());
-
-        // Ensure we're in the group so that if start() wants to rewind 
offsets, it will have an assignment of partitions
-        // to work with. Any rewinding will be handled immediately when 
polling starts.
-        try {
-            consumer.poll(0);
-        } catch (WakeupException e) {
-            log.error("Sink task {} was stopped before completing join group. 
Task initialization and start is being skipped", this);
-            return false;
-        }
-        task.initialize(context);
-        task.start(taskProps);
-        log.info("Sink task {} finished initialization and start", this);
-        started = true;
-        return true;
-    }
-
-    /** Poll for new messages with the given timeout. Should only be invoked 
by the worker thread. */
-    public void poll(long timeoutMs) {
-        try {
-            rewind();
-            long retryTimeout = context.timeout();
-            if (retryTimeout > 0) {
-                timeoutMs = Math.min(timeoutMs, retryTimeout);
-                context.timeout(-1L);
-            }
-
-            log.trace("{} polling consumer with timeout {} ms", id, timeoutMs);
-            ConsumerRecords<byte[], byte[]> msgs = consumer.poll(timeoutMs);
-            assert messageBatch.isEmpty() || msgs.isEmpty();
-            log.trace("{} polling returned {} messages", id, msgs.count());
-
-            convertMessages(msgs);
-            deliverMessages();
-        } catch (WakeupException we) {
-            log.trace("{} consumer woken up", id);
-        }
-    }
-
-    /**
-     * Starts an offset commit by flushing outstanding messages from the task 
and then starting
-     * the write commit. This should only be invoked by the 
WorkerSinkTaskThread.
-     **/
-    public void commitOffsets(boolean sync, final int seqno) {
-        log.info("{} Committing offsets", this);
-
-        final Map<TopicPartition, OffsetAndMetadata> offsets = new 
HashMap<>(currentOffsets);
-
-        try {
-            task.flush(offsets);
-        } catch (Throwable t) {
-            log.error("Commit of {} offsets failed due to exception while 
flushing: {}", this, t);
-            log.error("Rewinding offsets to last committed offsets");
-            for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : 
lastCommittedOffsets.entrySet()) {
-                log.debug("{} Rewinding topic partition {} to offset {}", id, 
entry.getKey(), entry.getValue().offset());
-                consumer.seek(entry.getKey(), entry.getValue().offset());
-            }
-            currentOffsets = new HashMap<>(lastCommittedOffsets);
-            workThread.onCommitCompleted(t, seqno);
-            return;
-        }
-
-        if (sync) {
-            try {
-                consumer.commitSync(offsets);
-                lastCommittedOffsets = offsets;
-                workThread.onCommitCompleted(null, seqno);
-            } catch (KafkaException e) {
-                workThread.onCommitCompleted(e, seqno);
-            }
-        } else {
-            OffsetCommitCallback cb = new OffsetCommitCallback() {
-                @Override
-                public void onComplete(Map<TopicPartition, OffsetAndMetadata> 
offsets, Exception error) {
-                    lastCommittedOffsets = offsets;
-                    workThread.onCommitCompleted(error, seqno);
-                }
-            };
-            consumer.commitAsync(offsets, cb);
-        }
-    }
-
-    public Time time() {
-        return time;
-    }
-
-    public WorkerConfig workerConfig() {
-        return workerConfig;
-    }
-
-    private KafkaConsumer<byte[], byte[]> createConsumer() {
-        // Include any unknown worker configs so consumer configs can be set 
globally on the worker
-        // and through to the task
-        Map<String, Object> props = workerConfig.unusedConfigs();
-        props.put(ConsumerConfig.GROUP_ID_CONFIG, "copycat-" + id.connector());
-        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
-                
Utils.join(workerConfig.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ","));
-        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
-        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
-        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
-
-        KafkaConsumer<byte[], byte[]> newConsumer;
-        try {
-            newConsumer = new KafkaConsumer<>(props);
-        } catch (Throwable t) {
-            throw new CopycatException("Failed to create consumer", t);
-        }
-
-        return newConsumer;
-    }
-
-    private WorkerSinkTaskThread createWorkerThread() {
-        return new WorkerSinkTaskThread(this, "WorkerSinkTask-" + id, time, 
workerConfig);
-    }
-
-    private void convertMessages(ConsumerRecords<byte[], byte[]> msgs) {
-        for (ConsumerRecord<byte[], byte[]> msg : msgs) {
-            log.trace("Consuming message with key {}, value {}", msg.key(), 
msg.value());
-            SchemaAndValue keyAndSchema = 
keyConverter.toCopycatData(msg.topic(), msg.key());
-            SchemaAndValue valueAndSchema = 
valueConverter.toCopycatData(msg.topic(), msg.value());
-            messageBatch.add(
-                    new SinkRecord(msg.topic(), msg.partition(),
-                            keyAndSchema.schema(), keyAndSchema.value(),
-                            valueAndSchema.schema(), valueAndSchema.value(),
-                            msg.offset())
-            );
-        }
-    }
-
-    private void deliverMessages() {
-        // Finally, deliver this batch to the sink
-        try {
-            // Since we reuse the messageBatch buffer, ensure we give the task 
its own copy
-            task.put(new ArrayList<>(messageBatch));
-            for (SinkRecord record : messageBatch)
-                currentOffsets.put(new TopicPartition(record.topic(), 
record.kafkaPartition()),
-                        new OffsetAndMetadata(record.kafkaOffset() + 1));
-            messageBatch.clear();
-            // If we had paused all consumer topic partitions to try to 
redeliver data, then we should resume any that
-            // the task had not explicitly paused
-            if (pausedForRedelivery) {
-                for (TopicPartition tp : consumer.assignment())
-                    if (!context.pausedPartitions().contains(tp))
-                        consumer.resume(tp);
-                pausedForRedelivery = false;
-            }
-        } catch (RetriableException e) {
-            log.error("RetriableException from SinkTask {}: {}", id, e);
-            // If we're retrying a previous batch, make sure we've paused all 
topic partitions so we don't get new data,
-            // but will still be able to poll in order to handle 
user-requested timeouts, keep group membership, etc.
-            pausedForRedelivery = true;
-            for (TopicPartition tp : consumer.assignment())
-                consumer.pause(tp);
-            // Let this exit normally, the batch will be reprocessed on the 
next loop.
-        } catch (Throwable t) {
-            log.error("Task {} threw an uncaught and unrecoverable exception", 
id);
-            log.error("Task is being killed and will not recover until 
manually restarted:", t);
-            throw new CopycatException("Exiting WorkerSinkTask due to 
unrecoverable exception.");
-        }
-    }
-
-    private void rewind() {
-        Map<TopicPartition, Long> offsets = context.offsets();
-        if (offsets.isEmpty()) {
-            return;
-        }
-        for (TopicPartition tp: offsets.keySet()) {
-            Long offset = offsets.get(tp);
-            if (offset != null) {
-                log.trace("Rewind {} to offset {}.", tp, offset);
-                consumer.seek(tp, offset);
-                lastCommittedOffsets.put(tp, new OffsetAndMetadata(offset));
-                currentOffsets.put(tp, new OffsetAndMetadata(offset));
-            }
-        }
-        context.clearOffsets();
-    }
-
-    private class HandleRebalance implements ConsumerRebalanceListener {
-        @Override
-        public void onPartitionsAssigned(Collection<TopicPartition> 
partitions) {
-            lastCommittedOffsets = new HashMap<>();
-            currentOffsets = new HashMap<>();
-            for (TopicPartition tp : partitions) {
-                long pos = consumer.position(tp);
-                lastCommittedOffsets.put(tp, new OffsetAndMetadata(pos));
-                currentOffsets.put(tp, new OffsetAndMetadata(pos));
-                log.debug("{} assigned topic partition {} with offset {}", id, 
tp, pos);
-            }
-
-            // If we paused everything for redelivery (which is no longer 
relevant since we discarded the data), make
-            // sure anything we paused that the task didn't request to be 
paused *and* which we still own is resumed.
-            // Also make sure our tracking of paused partitions is updated to 
remove any partitions we no longer own.
-            if (pausedForRedelivery) {
-                pausedForRedelivery = false;
-                Set<TopicPartition> assigned = new HashSet<>(partitions);
-                Set<TopicPartition> taskPaused = context.pausedPartitions();
-
-                for (TopicPartition tp : partitions) {
-                    if (!taskPaused.contains(tp))
-                        consumer.resume(tp);
-                }
-
-                Iterator<TopicPartition> tpIter = taskPaused.iterator();
-                while (tpIter.hasNext()) {
-                    TopicPartition tp = tpIter.next();
-                    if (assigned.contains(tp))
-                        tpIter.remove();
-                }
-            }
-
-            // Instead of invoking the assignment callback on initialization, 
we guarantee the consumer is ready upon
-            // task start. Since this callback gets invoked during that 
initial setup before we've started the task, we
-            // need to guard against invoking the user's callback method 
during that period.
-            if (started)
-                task.onPartitionsAssigned(partitions);
-        }
-
-        @Override
-        public void onPartitionsRevoked(Collection<TopicPartition> partitions) 
{
-            task.onPartitionsRevoked(partitions);
-            commitOffsets(true, -1);
-            // Make sure we don't have any leftover data since offsets will be 
reset to committed positions
-            messageBatch.clear();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskContext.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskContext.java
 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskContext.java
deleted file mode 100644
index 5257ee4..0000000
--- 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskContext.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
- * agreements.  See the NOTICE file distributed with this work for additional 
information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the 
License.  You may obtain a
- * copy of the License at <p/> http://www.apache.org/licenses/LICENSE-2.0 <p/> 
Unless required by
- * applicable law or agreed to in writing, software distributed under the 
License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See
- * the License for the specific language governing permissions and limitations 
under the License.
- **/
-
-package org.apache.kafka.copycat.runtime;
-
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.copycat.errors.IllegalWorkerStateException;
-import org.apache.kafka.copycat.sink.SinkTaskContext;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-public class WorkerSinkTaskContext implements SinkTaskContext {
-    private Map<TopicPartition, Long> offsets;
-    private long timeoutMs;
-    private KafkaConsumer<byte[], byte[]> consumer;
-    private final Set<TopicPartition> pausedPartitions;
-
-    public WorkerSinkTaskContext(KafkaConsumer<byte[], byte[]> consumer) {
-        this.offsets = new HashMap<>();
-        this.timeoutMs = -1L;
-        this.consumer = consumer;
-        this.pausedPartitions = new HashSet<>();
-    }
-
-    @Override
-    public void offset(Map<TopicPartition, Long> offsets) {
-        this.offsets.putAll(offsets);
-    }
-
-    @Override
-    public void offset(TopicPartition tp, long offset) {
-        offsets.put(tp, offset);
-    }
-
-    public void clearOffsets() {
-        offsets.clear();
-    }
-
-    /**
-     * Get offsets that the SinkTask has submitted to be reset. Used by the 
Copycat framework.
-     * @return the map of offsets
-     */
-    public Map<TopicPartition, Long> offsets() {
-        return offsets;
-    }
-
-    @Override
-    public void timeout(long timeoutMs) {
-        this.timeoutMs = timeoutMs;
-    }
-
-    /**
-     * Get the timeout in milliseconds set by SinkTasks. Used by the Copycat 
framework.
-     * @return the backoff timeout in milliseconds.
-     */
-    public long timeout() {
-        return timeoutMs;
-    }
-
-    @Override
-    public Set<TopicPartition> assignment() {
-        if (consumer == null) {
-            throw new IllegalWorkerStateException("SinkTaskContext may not be 
used to look up partition assignment until the task is initialized");
-        }
-        return consumer.assignment();
-    }
-
-    @Override
-    public void pause(TopicPartition... partitions) {
-        if (consumer == null) {
-            throw new IllegalWorkerStateException("SinkTaskContext may not be 
used to pause consumption until the task is initialized");
-        }
-        try {
-            for (TopicPartition partition : partitions)
-                pausedPartitions.add(partition);
-            consumer.pause(partitions);
-        } catch (IllegalStateException e) {
-            throw new IllegalWorkerStateException("SinkTasks may not pause 
partitions that are not currently assigned to them.", e);
-        }
-    }
-
-    @Override
-    public void resume(TopicPartition... partitions) {
-        if (consumer == null) {
-            throw new IllegalWorkerStateException("SinkTaskContext may not be 
used to resume consumption until the task is initialized");
-        }
-        try {
-            for (TopicPartition partition : partitions)
-                pausedPartitions.remove(partition);
-            consumer.resume(partitions);
-        } catch (IllegalStateException e) {
-            throw new IllegalWorkerStateException("SinkTasks may not resume 
partitions that are not currently assigned to them.", e);
-        }
-    }
-
-    public Set<TopicPartition> pausedPartitions() {
-        return pausedPartitions;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskThread.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskThread.java
 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskThread.java
deleted file mode 100644
index ab3f1fe..0000000
--- 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskThread.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.runtime;
-
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.copycat.util.ShutdownableThread;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Worker thread for a WorkerSinkTask. These classes are very tightly coupled, 
but separated to
- * simplify testing.
- */
-class WorkerSinkTaskThread extends ShutdownableThread {
-    private static final Logger log = 
LoggerFactory.getLogger(WorkerSinkTask.class);
-
-    private final WorkerSinkTask task;
-    private long nextCommit;
-    private boolean committing;
-    private int commitSeqno;
-    private long commitStarted;
-    private int commitFailures;
-
-    public WorkerSinkTaskThread(WorkerSinkTask task, String name, Time time,
-                                WorkerConfig workerConfig) {
-        super(name);
-        this.task = task;
-        this.nextCommit = time.milliseconds() +
-                
workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG);
-        this.committing = false;
-        this.commitSeqno = 0;
-        this.commitStarted = -1;
-        this.commitFailures = 0;
-    }
-
-    @Override
-    public void execute() {
-        // Try to join and start. If we're interrupted before this completes, 
bail.
-        if (!task.joinConsumerGroupAndStart())
-            return;
-
-        while (getRunning()) {
-            iteration();
-        }
-
-        // Make sure any uncommitted data has committed
-        task.commitOffsets(true, -1);
-    }
-
-    public void iteration() {
-        long now = task.time().milliseconds();
-
-        // Maybe commit
-        if (!committing && now >= nextCommit) {
-            synchronized (this) {
-                committing = true;
-                commitSeqno += 1;
-                commitStarted = now;
-            }
-            task.commitOffsets(false, commitSeqno);
-            nextCommit += 
task.workerConfig().getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG);
-        }
-
-        // Check for timed out commits
-        long commitTimeout = commitStarted + task.workerConfig().getLong(
-                WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG);
-        if (committing && now >= commitTimeout) {
-            log.warn("Commit of {} offsets timed out", this);
-            commitFailures++;
-            committing = false;
-        }
-
-        // And process messages
-        long timeoutMs = Math.max(nextCommit - now, 0);
-        task.poll(timeoutMs);
-    }
-
-    public void onCommitCompleted(Throwable error, long seqno) {
-        synchronized (this) {
-            if (commitSeqno != seqno) {
-                log.debug("Got callback for timed out commit {}: {}, but most 
recent commit is {}",
-                        this,
-                        seqno, commitSeqno);
-            } else {
-                if (error != null) {
-                    log.error("Commit of {} offsets threw an unexpected 
exception: ", this, error);
-                    commitFailures++;
-                } else {
-                    log.debug("Finished {} offset commit successfully in {} 
ms",
-                            this, task.time().milliseconds() - commitStarted);
-                    commitFailures = 0;
-                }
-                committing = false;
-            }
-        }
-    }
-
-    public int commitFailures() {
-        return commitFailures;
-    }
-}

Reply via email to