http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Herder.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Herder.java 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Herder.java
new file mode 100644
index 0000000..7f8b7c2
--- /dev/null
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Herder.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.runtime;
+
+import org.apache.kafka.copycat.util.Callback;
+
+import java.util.Properties;
+
+/**
+ * <p>
+ * The herder interface tracks and manages workers and connectors. It is the 
main interface for external components
+ * to make changes to the state of the cluster. For example, in distributed 
mode, an implementation of this class
+ * knows how to accept a connector configuration, may need to route it to the 
current leader worker for the cluster so
+ * the config can be written to persistent storage, and then ensures the new 
connector is correctly instantiated on one
+ * of the workers.
+ * </p>
+ * <p>
+ * This class must implement all the actions that can be taken on the cluster 
(add/remove connectors, pause/resume tasks,
+ * get state of connectors and tasks, etc). The non-Java interfaces to the 
cluster (REST API and CLI) are very simple
+ * wrappers of the functionality provided by this interface.
+ * </p>
+ * <p>
+ * In standalone mode, this implementation of this class will be trivial 
because no coordination is needed. In that case,
+ * the implementation will mainly be delegating tasks directly to other 
components. For example, when creating a new
+ * connector in standalone mode, there is no need to persist the config and 
the connector and its tasks must run in the
+ * same process, so the standalone herder implementation can immediately 
instantiate and start the connector and its
+ * tasks.
+ * </p>
+ */
+public interface Herder {
+
+    void start();
+
+    void stop();
+
+    /**
+     * Submit a connector job to the cluster. This works from any node by 
forwarding the request to
+     * the leader herder if necessary.
+     *
+     * @param connectorProps user-specified properties for this job
+     * @param callback callback to invoke when the request completes
+     */
+    void addConnector(Properties connectorProps, Callback<String> callback);
+
+    /**
+     * Delete a connector job by name.
+     *
+     * @param name name of the connector job to shutdown and delete
+     * @param callback callback to invoke when the request completes
+     */
+    void deleteConnector(String name, Callback<Void> callback);
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/SinkTaskContextImpl.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/SinkTaskContextImpl.java
 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/SinkTaskContextImpl.java
new file mode 100644
index 0000000..f47c984
--- /dev/null
+++ 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/SinkTaskContextImpl.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.runtime;
+
+import org.apache.kafka.copycat.sink.SinkTaskContext;
+
+class SinkTaskContextImpl extends SinkTaskContext {
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/SourceTaskOffsetCommitter.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/SourceTaskOffsetCommitter.java
 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/SourceTaskOffsetCommitter.java
new file mode 100644
index 0000000..953cfa5
--- /dev/null
+++ 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/SourceTaskOffsetCommitter.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.runtime;
+
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.copycat.cli.WorkerConfig;
+import org.apache.kafka.copycat.util.ConnectorTaskId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * <p>
+ * Manages offset commit scheduling and execution for SourceTasks.
+ * </p>
+ * <p>
+ * Unlike sink tasks which directly manage their offset commits in the main 
poll() thread since
+ * they drive the event loop and control (for all intents and purposes) the 
timeouts, source
+ * tasks are at the whim of the connector and cannot be guaranteed to wake up 
on the necessary
+ * schedule. Instead, this class tracks all the active tasks, their schedule 
for commits, and
+ * ensures they are invoked in a timely fashion.
+ * </p>
+ * <p>
+ *   The current implementation uses a single thread to process commits and
+ * </p>
+ */
+class SourceTaskOffsetCommitter {
+    private static final Logger log = 
LoggerFactory.getLogger(SourceTaskOffsetCommitter.class);
+
+    private Time time;
+    private WorkerConfig config;
+    private ScheduledExecutorService commitExecutorService = null;
+    private HashMap<ConnectorTaskId, ScheduledFuture<?>> committers = new 
HashMap<>();
+
+    SourceTaskOffsetCommitter(Time time, WorkerConfig config) {
+        this.time = time;
+        this.config = config;
+        commitExecutorService = Executors.newSingleThreadScheduledExecutor();
+    }
+
+    public void close(long timeoutMs) {
+        commitExecutorService.shutdown();
+        try {
+            if (!commitExecutorService.awaitTermination(timeoutMs, 
TimeUnit.MILLISECONDS)) {
+                log.error("Graceful shutdown of offset commitOffsets thread 
timed out.");
+            }
+        } catch (InterruptedException e) {
+            // ignore and allow to exit immediately
+        }
+    }
+
+    public void schedule(final ConnectorTaskId id, final WorkerSourceTask 
workerTask) {
+        long commitIntervalMs = 
config.getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG);
+        ScheduledFuture<?> commitFuture = 
commitExecutorService.scheduleWithFixedDelay(new Runnable() {
+            @Override
+            public void run() {
+                commit(workerTask);
+            }
+        }, commitIntervalMs, commitIntervalMs, TimeUnit.MILLISECONDS);
+        committers.put(id, commitFuture);
+    }
+
+    public void remove(ConnectorTaskId id) {
+        ScheduledFuture<?> commitFuture = committers.remove(id);
+        commitFuture.cancel(false);
+    }
+
+    public void commit(WorkerSourceTask workerTask) {
+        try {
+            log.debug("Committing offsets for {}", workerTask);
+            boolean success = workerTask.commitOffsets();
+            if (!success) {
+                log.error("Failed to commit offsets for {}", workerTask);
+            }
+        } catch (Throwable t) {
+            // We're very careful about exceptions here since any uncaught 
exceptions in the commit
+            // thread would cause the fixed interval schedule on the 
ExecutorService to stop running
+            // for that task
+            log.error("Unhandled exception when committing {}: ", workerTask, 
t);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java
new file mode 100644
index 0000000..55d0784
--- /dev/null
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.runtime;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.copycat.cli.WorkerConfig;
+import org.apache.kafka.copycat.connector.Task;
+import org.apache.kafka.copycat.errors.CopycatException;
+import org.apache.kafka.copycat.sink.SinkTask;
+import org.apache.kafka.copycat.source.SourceTask;
+import org.apache.kafka.copycat.storage.*;
+import org.apache.kafka.copycat.util.ConnectorTaskId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * <p>
+ * Worker runs a (dynamic) set of tasks in a set of threads, doing the work of 
actually moving
+ * data to/from Kafka.
+ * </p>
+ * <p>
+ * Since each task has a dedicated thread, this is mainly just a container for 
them.
+ * </p>
+ */
+public class Worker<K, V> {
+    private static final Logger log = LoggerFactory.getLogger(Worker.class);
+
+    private Time time;
+    private WorkerConfig config;
+    private Converter<K> keyConverter;
+    private Converter<V> valueConverter;
+    private OffsetBackingStore offsetBackingStore;
+    private Serializer<K> offsetKeySerializer;
+    private Serializer<V> offsetValueSerializer;
+    private Deserializer<K> offsetKeyDeserializer;
+    private Deserializer<V> offsetValueDeserializer;
+    private HashMap<ConnectorTaskId, WorkerTask> tasks = new HashMap<>();
+    private KafkaProducer<K, V> producer;
+    private SourceTaskOffsetCommitter sourceTaskOffsetCommitter;
+
+    public Worker(WorkerConfig config) {
+        this(new SystemTime(), config, null, null, null, null, null);
+    }
+
+    @SuppressWarnings("unchecked")
+    public Worker(Time time, WorkerConfig config, OffsetBackingStore 
offsetBackingStore,
+                  Serializer offsetKeySerializer, Serializer 
offsetValueSerializer,
+                  Deserializer offsetKeyDeserializer, Deserializer 
offsetValueDeserializer) {
+        this.time = time;
+        this.config = config;
+        this.keyConverter = 
config.getConfiguredInstance(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, 
Converter.class);
+        this.valueConverter = 
config.getConfiguredInstance(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, 
Converter.class);
+
+        if (offsetBackingStore != null) {
+            this.offsetBackingStore = offsetBackingStore;
+        } else {
+            this.offsetBackingStore = new FileOffsetBackingStore();
+            this.offsetBackingStore.configure(config.originals());
+        }
+
+        if (offsetKeySerializer != null) {
+            this.offsetKeySerializer = offsetKeySerializer;
+        } else {
+            this.offsetKeySerializer = 
config.getConfiguredInstance(WorkerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
Serializer.class);
+            this.offsetKeySerializer.configure(config.originals(), true);
+        }
+
+        if (offsetValueSerializer != null) {
+            this.offsetValueSerializer = offsetValueSerializer;
+        } else {
+            this.offsetValueSerializer = 
config.getConfiguredInstance(WorkerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
Serializer.class);
+            this.offsetValueSerializer.configure(config.originals(), false);
+        }
+
+        if (offsetKeyDeserializer != null) {
+            this.offsetKeyDeserializer = offsetKeyDeserializer;
+        } else {
+            this.offsetKeyDeserializer = 
config.getConfiguredInstance(WorkerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
Deserializer.class);
+            this.offsetKeyDeserializer.configure(config.originals(), true);
+        }
+
+        if (offsetValueDeserializer != null) {
+            this.offsetValueDeserializer = offsetValueDeserializer;
+        } else {
+            this.offsetValueDeserializer = 
config.getConfiguredInstance(WorkerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
Deserializer.class);
+            this.offsetValueDeserializer.configure(config.originals(), false);
+        }
+    }
+
+    public void start() {
+        log.info("Worker starting");
+
+        Properties unusedConfigs = config.getUnusedProperties();
+
+        Map<String, Object> producerProps = new HashMap<>();
+        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
Utils.join(config.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ","));
+        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
config.getClass(WorkerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName());
+        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
config.getClass(WorkerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName());
+        for (String propName : unusedConfigs.stringPropertyNames()) {
+            producerProps.put(propName, unusedConfigs.getProperty(propName));
+        }
+        producer = new KafkaProducer<>(producerProps);
+
+        offsetBackingStore.start();
+        sourceTaskOffsetCommitter = new SourceTaskOffsetCommitter(time, 
config);
+
+        log.info("Worker started");
+    }
+
+    public void stop() {
+        log.info("Worker stopping");
+
+        long started = time.milliseconds();
+        long limit = started + 
config.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG);
+
+        for (Map.Entry<ConnectorTaskId, WorkerTask> entry : tasks.entrySet()) {
+            WorkerTask task = entry.getValue();
+            log.warn("Shutting down task {} uncleanly; herder should have shut 
down "
+                    + "tasks before the Worker is stopped.", task);
+            try {
+                task.stop();
+            } catch (CopycatException e) {
+                log.error("Error while shutting down task " + task, e);
+            }
+        }
+
+        for (Map.Entry<ConnectorTaskId, WorkerTask> entry : tasks.entrySet()) {
+            WorkerTask task = entry.getValue();
+            log.debug("Waiting for task {} to finish shutting down", task);
+            if (!task.awaitStop(Math.max(limit - time.milliseconds(), 0)))
+                log.error("Graceful shutdown of task {} failed.", task);
+            task.close();
+        }
+
+        long timeoutMs = limit - time.milliseconds();
+        sourceTaskOffsetCommitter.close(timeoutMs);
+
+        offsetBackingStore.start();
+
+        log.info("Worker stopped");
+    }
+
+    /**
+     * Add a new task.
+     * @param id Globally unique ID for this task.
+     * @param taskClassName name of the {@link 
org.apache.kafka.copycat.connector.Task}
+     *                      class to instantiate. Must be a subclass of either
+     *                      {@link org.apache.kafka.copycat.source.SourceTask} 
or
+     *                      {@link org.apache.kafka.copycat.sink.SinkTask}.
+     * @param props configuration options for the task
+     */
+    public void addTask(ConnectorTaskId id, String taskClassName, Properties 
props) {
+        if (tasks.containsKey(id)) {
+            String msg = "Task already exists in this worker; the herder 
should not have requested "
+                    + "that this : " + id;
+            log.error(msg);
+            throw new CopycatException(msg);
+        }
+
+        final Task task = instantiateTask(taskClassName);
+
+        // Decide which type of worker task we need based on the type of task.
+        final WorkerTask workerTask;
+        if (task instanceof SourceTask) {
+            SourceTask sourceTask = (SourceTask) task;
+            OffsetStorageReader offsetReader = new 
OffsetStorageReaderImpl<>(offsetBackingStore, id.getConnector(),
+                    keyConverter, valueConverter, offsetKeySerializer, 
offsetValueDeserializer);
+            OffsetStorageWriter<K, V> offsetWriter = new 
OffsetStorageWriter<>(offsetBackingStore, id.getConnector(),
+                    keyConverter, valueConverter, offsetKeySerializer, 
offsetValueSerializer);
+            workerTask = new WorkerSourceTask<>(id, sourceTask, keyConverter, 
valueConverter, producer,
+                    offsetReader, offsetWriter, config, time);
+        } else if (task instanceof SinkTask) {
+            workerTask = new WorkerSinkTask<>(id, (SinkTask) task, config, 
keyConverter, valueConverter, time);
+        } else {
+            log.error("Tasks must be a subclass of either SourceTask or 
SinkTask", task);
+            throw new CopycatException("Tasks must be a subclass of either 
SourceTask or SinkTask");
+        }
+
+        // Start the task before adding modifying any state, any exceptions 
are caught higher up the
+        // call chain and there's no cleanup to do here
+        workerTask.start(props);
+        tasks.put(id, workerTask);
+    }
+
+    private static Task instantiateTask(String taskClassName) {
+        try {
+            return 
Utils.newInstance(Class.forName(taskClassName).asSubclass(Task.class));
+        } catch (ClassNotFoundException e) {
+            throw new CopycatException("Task class not found", e);
+        }
+    }
+
+    public void stopTask(ConnectorTaskId id) {
+        WorkerTask task = getTask(id);
+        task.stop();
+        if 
(!task.awaitStop(config.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG)))
+            log.error("Graceful stop of task {} failed.", task);
+        task.close();
+        tasks.remove(id);
+    }
+
+    private WorkerTask getTask(ConnectorTaskId id) {
+        WorkerTask task = tasks.get(id);
+        if (task == null) {
+            log.error("Task not found: " + id);
+            throw new CopycatException("Task not found: " + id);
+        }
+        return task;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
new file mode 100644
index 0000000..4eaf756
--- /dev/null
+++ 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.runtime;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.clients.consumer.*;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.copycat.cli.WorkerConfig;
+import org.apache.kafka.copycat.errors.CopycatException;
+import org.apache.kafka.copycat.sink.SinkRecord;
+import org.apache.kafka.copycat.sink.SinkTask;
+import org.apache.kafka.copycat.sink.SinkTaskContext;
+import org.apache.kafka.copycat.storage.Converter;
+import org.apache.kafka.copycat.util.ConnectorTaskId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * WorkerTask that uses a SinkTask to export data from Kafka.
+ */
+class WorkerSinkTask<K, V> implements WorkerTask {
+    private static final Logger log = 
LoggerFactory.getLogger(WorkerSinkTask.class);
+
+    private final ConnectorTaskId id;
+    private final SinkTask task;
+    private final WorkerConfig workerConfig;
+    private final Time time;
+    private final Converter<K> keyConverter;
+    private final Converter<V> valueConverter;
+    private WorkerSinkTaskThread workThread;
+    private KafkaConsumer<K, V> consumer;
+    private final SinkTaskContext context;
+
+    public WorkerSinkTask(ConnectorTaskId id, SinkTask task, WorkerConfig 
workerConfig,
+                          Converter<K> keyConverter, Converter<V> 
valueConverter, Time time) {
+        this.id = id;
+        this.task = task;
+        this.workerConfig = workerConfig;
+        this.keyConverter = keyConverter;
+        this.valueConverter = valueConverter;
+        context = new SinkTaskContextImpl();
+        this.time = time;
+    }
+
+    @Override
+    public void start(Properties props) {
+        task.initialize(context);
+        task.start(props);
+        consumer = createConsumer(props);
+        workThread = createWorkerThread();
+        workThread.start();
+    }
+
+    @Override
+    public void stop() {
+        // Offset commit is handled upon exit in work thread
+        task.stop();
+        if (workThread != null)
+            workThread.startGracefulShutdown();
+        consumer.wakeup();
+    }
+
+    @Override
+    public boolean awaitStop(long timeoutMs) {
+        if (workThread != null) {
+            try {
+                boolean success = workThread.awaitShutdown(timeoutMs, 
TimeUnit.MILLISECONDS);
+                if (!success)
+                    workThread.forceShutdown();
+                return success;
+            } catch (InterruptedException e) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    @Override
+    public void close() {
+        // FIXME Kafka needs to add a timeout parameter here for us to 
properly obey the timeout
+        // passed in
+        if (consumer != null)
+            consumer.close();
+    }
+
+    /** Poll for new messages with the given timeout. Should only be invoked 
by the worker thread. */
+    public void poll(long timeoutMs) {
+        try {
+            log.trace("{} polling consumer with timeout {} ms", id, timeoutMs);
+            ConsumerRecords<K, V> msgs = consumer.poll(timeoutMs);
+            log.trace("{} polling returned {} messages", id, msgs.count());
+            deliverMessages(msgs);
+        } catch (ConsumerWakeupException we) {
+            log.trace("{} consumer woken up", id);
+        }
+    }
+
+    /**
+     * Starts an offset commit by flushing outstanding messages from the task 
and then starting
+     * the write commit. This should only be invoked by the 
WorkerSinkTaskThread.
+     **/
+    public void commitOffsets(long now, boolean sync, final int seqno, boolean 
flush) {
+        HashMap<TopicPartition, Long> offsets = new HashMap<>();
+        for (TopicPartition tp : consumer.subscriptions()) {
+            offsets.put(tp, consumer.position(tp));
+        }
+        // We only don't flush the task in one case: when shutting down, the 
task has already been
+        // stopped and all data should have already been flushed
+        if (flush) {
+            try {
+                task.flush(offsets);
+            } catch (Throwable t) {
+                log.error("Commit of {} offsets failed due to exception while 
flushing: {}", this, t);
+                workThread.onCommitCompleted(t, seqno);
+                return;
+            }
+        }
+
+        ConsumerCommitCallback cb = new ConsumerCommitCallback() {
+            @Override
+            public void onComplete(Map<TopicPartition, Long> offsets, 
Exception error) {
+                workThread.onCommitCompleted(error, seqno);
+            }
+        };
+        consumer.commit(offsets, sync ? CommitType.SYNC : CommitType.ASYNC, 
cb);
+    }
+
+    public Time getTime() {
+        return time;
+    }
+
+    public WorkerConfig getWorkerConfig() {
+        return workerConfig;
+    }
+
+    private KafkaConsumer<K, V> createConsumer(Properties taskProps) {
+        String topicsStr = taskProps.getProperty(SinkTask.TOPICS_CONFIG);
+        if (topicsStr == null || topicsStr.isEmpty())
+            throw new CopycatException("Sink tasks require a list of topics.");
+        String[] topics = topicsStr.split(",");
+
+        // Include any unknown worker configs so consumer configs can be set 
globally on the worker
+        // and through to the task
+        Properties props = workerConfig.getUnusedProperties();
+        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "copycat-" + 
id.toString());
+        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
+                
Utils.join(workerConfig.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ","));
+        props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+        props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+                
workerConfig.getClass(WorkerConfig.KEY_DESERIALIZER_CLASS_CONFIG).getName());
+        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+                
workerConfig.getClass(WorkerConfig.VALUE_DESERIALIZER_CLASS_CONFIG).getName());
+
+        KafkaConsumer<K, V> newConsumer;
+        try {
+            newConsumer = new KafkaConsumer<>(props);
+        } catch (Throwable t) {
+            throw new CopycatException("Failed to create consumer", t);
+        }
+
+        log.debug("Task {} subscribing to topics {}", id, topics);
+        newConsumer.subscribe(topics);
+
+        // Seek to any user-provided offsets. This is useful if offsets are 
tracked in the downstream system (e.g., to
+        // enable exactly once delivery to that system).
+        //
+        // To do this correctly, we need to first make sure we have been 
assigned partitions, which poll() will guarantee.
+        // We ask for offsets after this poll to make sure any offsets 
committed before the rebalance are picked up correctly.
+        newConsumer.poll(0);
+        Map<TopicPartition, Long> offsets = context.getOffsets();
+        for (TopicPartition tp : newConsumer.subscriptions()) {
+            Long offset = offsets.get(tp);
+            if (offset != null)
+                newConsumer.seek(tp, offset);
+        }
+        return newConsumer;
+    }
+
+    private WorkerSinkTaskThread createWorkerThread() {
+        return new WorkerSinkTaskThread(this, "WorkerSinkTask-" + id, time, 
workerConfig);
+    }
+
+    private void deliverMessages(ConsumerRecords<K, V> msgs) {
+        // Finally, deliver this batch to the sink
+        if (msgs.count() > 0) {
+            List<SinkRecord> records = new ArrayList<>();
+            for (ConsumerRecord<K, V> msg : msgs) {
+                log.trace("Consuming message with key {}, value {}", 
msg.key(), msg.value());
+                records.add(
+                        new SinkRecord(msg.topic(), msg.partition(),
+                                keyConverter.toCopycatData(msg.key()),
+                                valueConverter.toCopycatData(msg.value()),
+                                msg.offset())
+                );
+            }
+
+            try {
+                task.put(records);
+            } catch (CopycatException e) {
+                log.error("Exception from SinkTask {}: ", id, e);
+            } catch (Throwable t) {
+                log.error("Unexpected exception from SinkTask {}: ", id, t);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskThread.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskThread.java
 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskThread.java
new file mode 100644
index 0000000..b946343
--- /dev/null
+++ 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskThread.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.runtime;
+
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.copycat.cli.WorkerConfig;
+import org.apache.kafka.copycat.util.ShutdownableThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Worker thread for a WorkerSinkTask. These classes are very tightly coupled, 
but separated to
+ * simplify testing.
+ */
+class WorkerSinkTaskThread extends ShutdownableThread {
+    private static final Logger log = 
LoggerFactory.getLogger(WorkerSinkTask.class);
+
+    private final WorkerSinkTask task;
+    private long nextCommit;
+    private boolean committing;
+    private int commitSeqno;
+    private long commitStarted;
+    private int commitFailures;
+
+    public WorkerSinkTaskThread(WorkerSinkTask task, String name, Time time,
+                                WorkerConfig workerConfig) {
+        super(name);
+        this.task = task;
+        this.nextCommit = time.milliseconds() +
+                
workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG);
+        this.committing = false;
+        this.commitSeqno = 0;
+        this.commitStarted = -1;
+        this.commitFailures = 0;
+    }
+
+    @Override
+    public void execute() {
+        while (getRunning()) {
+            iteration();
+        }
+        // Make sure any uncommitted data has committed
+        task.commitOffsets(task.getTime().milliseconds(), true, -1, false);
+    }
+
+    public void iteration() {
+        long now = task.getTime().milliseconds();
+
+        // Maybe commit
+        if (!committing && now >= nextCommit) {
+            synchronized (this) {
+                committing = true;
+                commitSeqno += 1;
+                commitStarted = now;
+            }
+            task.commitOffsets(now, false, commitSeqno, true);
+            nextCommit += 
task.getWorkerConfig().getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG);
+        }
+
+        // Check for timed out commits
+        long commitTimeout = commitStarted + task.getWorkerConfig().getLong(
+                WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG);
+        if (committing && now >= commitTimeout) {
+            log.warn("Commit of {} offsets timed out", this);
+            commitFailures++;
+            committing = false;
+        }
+
+        // And process messages
+        long timeoutMs = Math.max(nextCommit - now, 0);
+        task.poll(timeoutMs);
+    }
+
+    public void onCommitCompleted(Throwable error, long seqno) {
+        synchronized (this) {
+            if (commitSeqno != seqno) {
+                log.debug("Got callback for timed out commit {}: {}, but most 
recent commit is {}",
+                        this,
+                        seqno, commitSeqno);
+            } else {
+                if (error != null) {
+                    log.error("Commit of {} offsets threw an unexpected 
exception: ", this, error);
+                    commitFailures++;
+                } else {
+                    log.debug("Finished {} offset commit successfully in {} 
ms",
+                            this, task.getTime().milliseconds() - 
commitStarted);
+                    commitFailures = 0;
+                }
+                committing = false;
+            }
+        }
+    }
+
+    public int getCommitFailures() {
+        return commitFailures;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java
 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java
new file mode 100644
index 0000000..c80adb4
--- /dev/null
+++ 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java
@@ -0,0 +1,310 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.runtime;
+
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.copycat.cli.WorkerConfig;
+import org.apache.kafka.copycat.source.SourceRecord;
+import org.apache.kafka.copycat.source.SourceTask;
+import org.apache.kafka.copycat.source.SourceTaskContext;
+import org.apache.kafka.copycat.storage.Converter;
+import org.apache.kafka.copycat.storage.OffsetStorageReader;
+import org.apache.kafka.copycat.storage.OffsetStorageWriter;
+import org.apache.kafka.copycat.util.ConnectorTaskId;
+import org.apache.kafka.copycat.util.ShutdownableThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * WorkerTask that uses a SourceTask to ingest data into Kafka.
+ */
+class WorkerSourceTask<K, V> implements WorkerTask {
+    private static final Logger log = 
LoggerFactory.getLogger(WorkerSourceTask.class);
+
+    private ConnectorTaskId id;
+    private SourceTask task;
+    private final Converter<K> keyConverter;
+    private final Converter<V> valueConverter;
+    private KafkaProducer<K, V> producer;
+    private WorkerSourceTaskThread workThread;
+    private OffsetStorageReader offsetReader;
+    private OffsetStorageWriter<K, V> offsetWriter;
+    private final WorkerConfig workerConfig;
+    private final Time time;
+
+    // Use IdentityHashMap to ensure correctness with duplicate records. This 
is a HashMap because
+    // there is no IdentityHashSet.
+    private IdentityHashMap<ProducerRecord<K, V>, ProducerRecord<K, V>>
+            outstandingMessages;
+    // A second buffer is used while an offset flush is running
+    private IdentityHashMap<ProducerRecord<K, V>, ProducerRecord<K, V>>
+            outstandingMessagesBacklog;
+    private boolean flushing;
+
+    public WorkerSourceTask(ConnectorTaskId id, SourceTask task,
+                            Converter<K> keyConverter, Converter<V> 
valueConverter,
+                            KafkaProducer<K, V> producer,
+                            OffsetStorageReader offsetReader, 
OffsetStorageWriter<K, V> offsetWriter,
+                            WorkerConfig workerConfig, Time time) {
+        this.id = id;
+        this.task = task;
+        this.keyConverter = keyConverter;
+        this.valueConverter = valueConverter;
+        this.producer = producer;
+        this.offsetReader = offsetReader;
+        this.offsetWriter = offsetWriter;
+        this.workerConfig = workerConfig;
+        this.time = time;
+
+        this.outstandingMessages = new IdentityHashMap<>();
+        this.outstandingMessagesBacklog = new IdentityHashMap<>();
+        this.flushing = false;
+    }
+
+    @Override
+    public void start(Properties props) {
+        task.initialize(new SourceTaskContext(offsetReader));
+        task.start(props);
+        workThread = new WorkerSourceTaskThread("WorkerSourceTask-" + id);
+        workThread.start();
+    }
+
+    @Override
+    public void stop() {
+        task.stop();
+        commitOffsets();
+        if (workThread != null)
+            workThread.startGracefulShutdown();
+    }
+
+    @Override
+    public boolean awaitStop(long timeoutMs) {
+        if (workThread != null) {
+            try {
+                boolean success = workThread.awaitShutdown(timeoutMs, 
TimeUnit.MILLISECONDS);
+                if (!success)
+                    workThread.forceShutdown();
+                return success;
+            } catch (InterruptedException e) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    @Override
+    public void close() {
+        // Nothing to do
+    }
+
+    /**
+     * Send a batch of records. This is atomic up to the point of getting the 
messages into the
+     * Producer and recorded in our set of outstanding messages, so either all 
or none will be sent
+     * @param records
+     */
+    private synchronized void sendRecords(List<SourceRecord> records) {
+        for (SourceRecord record : records) {
+            final ProducerRecord<K, V> producerRecord
+                    = new ProducerRecord<>(record.getTopic(), 
record.getKafkaPartition(),
+                    keyConverter.fromCopycatData(record.getKey()),
+                    valueConverter.fromCopycatData(record.getValue()));
+            log.trace("Appending record with key {}, value {}", 
record.getKey(), record.getValue());
+            if (!flushing) {
+                outstandingMessages.put(producerRecord, producerRecord);
+            } else {
+                outstandingMessagesBacklog.put(producerRecord, producerRecord);
+            }
+            producer.send(
+                    producerRecord,
+                    new Callback() {
+                        @Override
+                        public void onCompletion(RecordMetadata 
recordMetadata, Exception e) {
+                            if (e != null) {
+                                log.error("Failed to send record: ", e);
+                            } else {
+                                log.trace("Wrote record successfully: topic {} 
partition {} offset {}",
+                                        recordMetadata.topic(), 
recordMetadata.partition(),
+                                        recordMetadata.offset());
+                            }
+                            recordSent(producerRecord);
+                        }
+                    });
+            // Offsets are converted & serialized in the OffsetWriter
+            offsetWriter.setOffset(record.getSourcePartition(), 
record.getSourceOffset());
+        }
+    }
+
+    private synchronized void recordSent(final ProducerRecord<K, V> record) {
+        ProducerRecord<K, V> removed = outstandingMessages.remove(record);
+        // While flushing, we may also see callbacks for items in the backlog
+        if (removed == null && flushing)
+            removed = outstandingMessagesBacklog.remove(record);
+        // But if neither one had it, something is very wrong
+        if (removed == null) {
+            log.error("Saw callback for record that was not present in the 
outstanding message set: "
+                    + "{}", record);
+        } else if (flushing && outstandingMessages.isEmpty()) {
+            // flush thread may be waiting on the outstanding messages to clear
+            this.notifyAll();
+        }
+    }
+
+    public boolean commitOffsets() {
+        long commitTimeoutMs = 
workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG);
+
+        long started = time.milliseconds();
+        long timeout = started + commitTimeoutMs;
+
+        synchronized (this) {
+            // First we need to make sure we snapshot everything in exactly 
the current state. This
+            // means both the current set of messages we're still waiting to 
finish, stored in this
+            // class, which setting flushing = true will handle by storing any 
new values into a new
+            // buffer; and the current set of user-specified offsets, stored 
in the
+            // OffsetStorageWriter, for which we can use beginFlush() to 
initiate the snapshot.
+            flushing = true;
+            boolean flushStarted = offsetWriter.beginFlush();
+            // Still wait for any producer records to flush, even if there 
aren't any offsets to write
+            // to persistent storage
+
+            // Next we need to wait for all outstanding messages to finish 
sending
+            while (!outstandingMessages.isEmpty()) {
+                try {
+                    long timeoutMs = timeout - time.milliseconds();
+                    if (timeoutMs <= 0) {
+                        log.error(
+                                "Failed to flush {}, timed out while waiting 
for producer to flush outstanding "
+                                        + "messages", this.toString());
+                        finishFailedFlush();
+                        return false;
+                    }
+                    this.wait(timeoutMs);
+                } catch (InterruptedException e) {
+                    // ignore
+                }
+            }
+
+            if (!flushStarted) {
+                // There was nothing in the offsets to process, but we still 
waited for the data in the
+                // buffer to flush. This is useful since this can feed into 
metrics to monitor, e.g.
+                // flush time, which can be used for monitoring even if the 
connector doesn't record any
+                // offsets.
+                finishSuccessfulFlush();
+                log.debug("Finished {} offset commitOffsets successfully in {} 
ms",
+                        this, time.milliseconds() - started);
+                return true;
+            }
+        }
+
+        // Now we can actually flush the offsets to user storage.
+        Future<Void> flushFuture = offsetWriter.doFlush(new 
org.apache.kafka.copycat.util.Callback<Void>() {
+            @Override
+            public void onCompletion(Throwable error, Void result) {
+                if (error != null) {
+                    log.error("Failed to flush {} offsets to storage: ", this, 
error);
+                } else {
+                    log.trace("Finished flushing {} offsets to storage", this);
+                }
+            }
+        });
+        // Very rare case: offsets were unserializable and we finished 
immediately, unable to store
+        // any data
+        if (flushFuture == null) {
+            finishFailedFlush();
+            return false;
+        }
+        try {
+            flushFuture.get(Math.max(timeout - time.milliseconds(), 0), 
TimeUnit.MILLISECONDS);
+            // There's a small race here where we can get the callback just as 
this times out (and log
+            // success), but then catch the exception below and cancel 
everything. This won't cause any
+            // errors, is only wasteful in this minor edge case, and the worst 
result is that the log
+            // could look a little confusing.
+        } catch (InterruptedException e) {
+            log.warn("Flush of {} offsets interrupted, cancelling", this);
+            finishFailedFlush();
+            return false;
+        } catch (ExecutionException e) {
+            log.error("Flush of {} offsets threw an unexpected exception: ", 
this, e);
+            finishFailedFlush();
+            return false;
+        } catch (TimeoutException e) {
+            log.error("Timed out waiting to flush {} offsets to storage", 
this);
+            finishFailedFlush();
+            return false;
+        }
+
+        finishSuccessfulFlush();
+        log.debug("Finished {} commitOffsets successfully in {} ms",
+                this, time.milliseconds() - started);
+        return true;
+    }
+
+    private synchronized void finishFailedFlush() {
+        offsetWriter.cancelFlush();
+        outstandingMessages.putAll(outstandingMessagesBacklog);
+        outstandingMessagesBacklog.clear();
+        flushing = false;
+    }
+
+    private void finishSuccessfulFlush() {
+        // If we were successful, we can just swap instead of replacing items 
back into the original map
+        IdentityHashMap<ProducerRecord<K, V>, ProducerRecord<K, V>> temp = 
outstandingMessages;
+        outstandingMessages = outstandingMessagesBacklog;
+        outstandingMessagesBacklog = temp;
+        flushing = false;
+    }
+
+
+    private class WorkerSourceTaskThread extends ShutdownableThread {
+        public WorkerSourceTaskThread(String name) {
+            super(name);
+        }
+
+        @Override
+        public void execute() {
+            try {
+                while (getRunning()) {
+                    List<SourceRecord> records = task.poll();
+                    if (records == null)
+                        continue;
+                    sendRecords(records);
+                }
+            } catch (InterruptedException e) {
+                // Ignore and allow to exit.
+            }
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "WorkerSourceTask{" +
+                "id=" + id +
+                '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerTask.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerTask.java
 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerTask.java
new file mode 100644
index 0000000..af225bb
--- /dev/null
+++ 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerTask.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.runtime;
+
+import java.util.Properties;
+
+/**
+ * Handles processing for an individual task. This interface only provides the 
basic methods
+ * used by {@link Worker} to manage the tasks. Implementations combine a 
user-specified Task with
+ * Kafka to create a data flow.
+ */
+interface WorkerTask {
+    /**
+     * Start the Task
+     * @param props initial configuration
+     */
+    void start(Properties props);
+
+    /**
+     * Stop this task from processing messages. This method does not block, it 
only triggers
+     * shutdown. Use #{@link #awaitStop} to block until completion.
+     */
+    void stop();
+
+    /**
+     * Wait for this task to finish stopping.
+     *
+     * @param timeoutMs
+     * @return true if successful, false if the timeout was reached
+     */
+    boolean awaitStop(long timeoutMs);
+
+    /**
+     * Close this task. This is different from #{@link #stop} and #{@link 
#awaitStop} in that the
+     * stop methods ensure processing has stopped but may leave resources 
allocated. This method
+     * should clean up all resources.
+     */
+    void close();
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneConnectorContext.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneConnectorContext.java
 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneConnectorContext.java
new file mode 100644
index 0000000..0e14015
--- /dev/null
+++ 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneConnectorContext.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.runtime.standalone;
+
+import org.apache.kafka.copycat.connector.ConnectorContext;
+
+/**
+ * ConnectorContext for use with the StandaloneHerder, which maintains all 
connectors and tasks
+ * in a single process.
+ */
+class StandaloneConnectorContext implements ConnectorContext {
+
+    private StandaloneHerder herder;
+    private String connectorName;
+
+    public StandaloneConnectorContext(StandaloneHerder herder, String 
connectorName) {
+        this.herder = herder;
+        this.connectorName = connectorName;
+    }
+
+    @Override
+    public void requestTaskReconfiguration() {
+        // This is trivial to forward since there is only one herder and it's 
in memory in this
+        // process
+        herder.requestTaskReconfiguration(connectorName);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerder.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerder.java
 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerder.java
new file mode 100644
index 0000000..2ed9183
--- /dev/null
+++ 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerder.java
@@ -0,0 +1,257 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.runtime.standalone;
+
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.copycat.connector.Connector;
+import org.apache.kafka.copycat.errors.CopycatException;
+import org.apache.kafka.copycat.runtime.ConnectorConfig;
+import org.apache.kafka.copycat.runtime.Herder;
+import org.apache.kafka.copycat.runtime.Worker;
+import org.apache.kafka.copycat.sink.SinkConnector;
+import org.apache.kafka.copycat.sink.SinkTask;
+import org.apache.kafka.copycat.util.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+/**
+ * Single process, in-memory "herder". Useful for a standalone copycat process.
+ */
+public class StandaloneHerder implements Herder {
+    private static final Logger log = 
LoggerFactory.getLogger(StandaloneHerder.class);
+
+    private Worker worker;
+    private HashMap<String, ConnectorState> connectors = new HashMap<>();
+
+    public StandaloneHerder(Worker worker) {
+        this.worker = worker;
+    }
+
+    public synchronized void start() {
+        log.info("Herder starting");
+        log.info("Herder started");
+    }
+
+    public synchronized void stop() {
+        log.info("Herder stopping");
+
+        // There's no coordination/hand-off to do here since this is all 
standalone. Instead, we
+        // should just clean up the stuff we normally would, i.e. cleanly 
checkpoint and shutdown all
+        // the tasks.
+        for (Map.Entry<String, ConnectorState> entry : connectors.entrySet()) {
+            ConnectorState state = entry.getValue();
+            stopConnector(state);
+        }
+        connectors.clear();
+
+        log.info("Herder stopped");
+    }
+
+    @Override
+    public synchronized void addConnector(Properties connectorProps,
+                                          Callback<String> callback) {
+        try {
+            ConnectorState connState = createConnector(connectorProps);
+            if (callback != null)
+                callback.onCompletion(null, connState.name);
+            // This should always be a new job, create jobs from scratch
+            createConnectorTasks(connState);
+        } catch (CopycatException e) {
+            if (callback != null)
+                callback.onCompletion(e, null);
+        }
+    }
+
+    @Override
+    public synchronized void deleteConnector(String name, Callback<Void> 
callback) {
+        try {
+            destroyConnector(name);
+            if (callback != null)
+                callback.onCompletion(null, null);
+        } catch (CopycatException e) {
+            if (callback != null)
+                callback.onCompletion(e, null);
+        }
+    }
+
+    // Creates the and configures the connector. Does not setup any tasks
+    private ConnectorState createConnector(Properties connectorProps) {
+        ConnectorConfig connConfig = new ConnectorConfig(connectorProps);
+        String connName = connConfig.getString(ConnectorConfig.NAME_CONFIG);
+        String className = 
connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+        log.info("Creating connector {} of type {}", connName, className);
+        int maxTasks = connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG);
+        List<String> topics = 
connConfig.getList(ConnectorConfig.TOPICS_CONFIG); // Sinks only
+        Properties configs = connConfig.getUnusedProperties();
+
+        if (connectors.containsKey(connName)) {
+            log.error("Ignoring request to create connector due to conflicting 
connector name");
+            throw new CopycatException("Connector with name " + connName + " 
already exists");
+        }
+
+        final Connector connector;
+        try {
+            connector = instantiateConnector(className);
+        } catch (Throwable t) {
+            // Catches normal exceptions due to instantiation errors as well 
as any runtime errors that
+            // may be caused by user code
+            throw new CopycatException("Failed to create connector instance", 
t);
+        }
+        connector.initialize(new StandaloneConnectorContext(this, connName));
+        try {
+            connector.start(configs);
+        } catch (CopycatException e) {
+            throw new CopycatException("Connector threw an exception while 
starting", e);
+        }
+        ConnectorState state = new ConnectorState(connName, connector, 
maxTasks, topics);
+        connectors.put(connName, state);
+
+        log.info("Finished creating connector {}", connName);
+
+        return state;
+    }
+
+    private static Connector instantiateConnector(String className) {
+        try {
+            return Utils.newInstance(className, Connector.class);
+        } catch (ClassNotFoundException e) {
+            throw new CopycatException("Couldn't instantiate connector class", 
e);
+        }
+    }
+
+    private void destroyConnector(String connName) {
+        log.info("Destroying connector {}", connName);
+        ConnectorState state = connectors.get(connName);
+        if (state == null) {
+            log.error("Failed to destroy connector {} because it does not 
exist", connName);
+            throw new CopycatException("Connector does not exist");
+        }
+
+        stopConnector(state);
+        connectors.remove(state.name);
+
+        log.info("Finished destroying connector {}", connName);
+    }
+
+    // Stops a connectors tasks, then the connector
+    private void stopConnector(ConnectorState state) {
+        removeConnectorTasks(state);
+        try {
+            state.connector.stop();
+        } catch (CopycatException e) {
+            log.error("Error shutting down connector {}: ", state.connector, 
e);
+        }
+    }
+
+    private void createConnectorTasks(ConnectorState state) {
+        String taskClassName = state.connector.getTaskClass().getName();
+
+        log.info("Creating tasks for connector {} of type {}", state.name, 
taskClassName);
+
+        List<Properties> taskConfigs = 
state.connector.getTaskConfigs(state.maxTasks);
+
+        // Generate the final configs, including framework provided settings
+        Map<ConnectorTaskId, Properties> taskProps = new HashMap<>();
+        for (int i = 0; i < taskConfigs.size(); i++) {
+            ConnectorTaskId taskId = new ConnectorTaskId(state.name, i);
+            Properties config = taskConfigs.get(i);
+            // TODO: This probably shouldn't be in the Herder. It's nice to 
have Copycat ensure the list of topics
+            // is automatically provided to tasks since it is required by the 
framework, but this
+            String subscriptionTopics = Utils.join(state.inputTopics, ",");
+            if (state.connector instanceof SinkConnector) {
+                // Make sure we don't modify the original since the connector 
may reuse it internally
+                Properties configForSink = new Properties();
+                configForSink.putAll(config);
+                configForSink.setProperty(SinkTask.TOPICS_CONFIG, 
subscriptionTopics);
+                config = configForSink;
+            }
+            taskProps.put(taskId, config);
+        }
+
+        // And initiate the tasks
+        for (int i = 0; i < taskConfigs.size(); i++) {
+            ConnectorTaskId taskId = new ConnectorTaskId(state.name, i);
+            Properties config = taskProps.get(taskId);
+            try {
+                worker.addTask(taskId, taskClassName, config);
+                // We only need to store the task IDs so we can clean up.
+                state.tasks.add(taskId);
+            } catch (Throwable e) {
+                log.error("Failed to add task {}: ", taskId, e);
+                // Swallow this so we can continue updating the rest of the 
tasks
+                // FIXME what's the proper response? Kill all the tasks? 
Consider this the same as a task
+                // that died after starting successfully.
+            }
+        }
+    }
+
+    private void removeConnectorTasks(ConnectorState state) {
+        Iterator<ConnectorTaskId> taskIter = state.tasks.iterator();
+        while (taskIter.hasNext()) {
+            ConnectorTaskId taskId = taskIter.next();
+            try {
+                worker.stopTask(taskId);
+                taskIter.remove();
+            } catch (CopycatException e) {
+                log.error("Failed to stop task {}: ", taskId, e);
+                // Swallow this so we can continue stopping the rest of the 
tasks
+                // FIXME: Forcibly kill the task?
+            }
+        }
+    }
+
+    private void updateConnectorTasks(ConnectorState state) {
+        removeConnectorTasks(state);
+        createConnectorTasks(state);
+    }
+
+    /**
+     * Requests reconfiguration of the task. This should only be triggered by
+     * {@link StandaloneConnectorContext}.
+     *
+     * @param connName name of the connector that should be reconfigured
+     */
+    public synchronized void requestTaskReconfiguration(String connName) {
+        ConnectorState state = connectors.get(connName);
+        if (state == null) {
+            log.error("Task that requested reconfiguration does not exist: 
{}", connName);
+            return;
+        }
+        updateConnectorTasks(state);
+    }
+
+
+    private static class ConnectorState {
+        public String name;
+        public Connector connector;
+        public int maxTasks;
+        public List<String> inputTopics;
+        Set<ConnectorTaskId> tasks;
+
+        public ConnectorState(String name, Connector connector, int maxTasks,
+                              List<String> inputTopics) {
+            this.name = name;
+            this.connector = connector;
+            this.maxTasks = maxTasks;
+            this.inputTopics = inputTopics;
+            this.tasks = new HashSet<>();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/FileOffsetBackingStore.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/FileOffsetBackingStore.java
 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/FileOffsetBackingStore.java
new file mode 100644
index 0000000..dfa9e78
--- /dev/null
+++ 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/FileOffsetBackingStore.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.storage;
+
+import org.apache.kafka.copycat.errors.CopycatException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Implementation of OffsetBackingStore that saves data locally to a file. To 
ensure this behaves
+ * similarly to a real backing store, operations are executed asynchronously 
on a background thread.
+ */
+public class FileOffsetBackingStore extends MemoryOffsetBackingStore {
+    private static final Logger log = 
LoggerFactory.getLogger(FileOffsetBackingStore.class);
+
+    public final static String OFFSET_STORAGE_FILE_FILENAME_CONFIG = 
"offset.storage.file.filename";
+    private File file;
+
+    public FileOffsetBackingStore() {
+
+    }
+
+    @Override
+    public void configure(Map<String, ?> props) {
+        super.configure(props);
+        String filename = (String) 
props.get(OFFSET_STORAGE_FILE_FILENAME_CONFIG);
+        file = new File(filename);
+    }
+
+    @Override
+    public synchronized void start() {
+        super.start();
+        log.info("Starting FileOffsetBackingStore with file {}", file);
+        load();
+    }
+
+    @Override
+    public synchronized void stop() {
+        super.stop();
+        // Nothing to do since this doesn't maintain any outstanding 
connections/data
+        log.info("Stopped FileOffsetBackingStore");
+    }
+
+    @SuppressWarnings("unchecked")
+    private void load() {
+        try {
+            ObjectInputStream is = new ObjectInputStream(new 
FileInputStream(file));
+            Object obj = is.readObject();
+            if (!(obj instanceof HashMap))
+                throw new CopycatException("Expected HashMap but found " + 
obj.getClass());
+            HashMap<String, Map<byte[], byte[]>> raw = (HashMap<String, 
Map<byte[], byte[]>>) obj;
+            data = new HashMap<>();
+            for (Map.Entry<String, Map<byte[], byte[]>> entry : 
raw.entrySet()) {
+                HashMap<ByteBuffer, ByteBuffer> converted = new HashMap<>();
+                for (Map.Entry<byte[], byte[]> mapEntry : 
entry.getValue().entrySet()) {
+                    ByteBuffer key = (mapEntry.getKey() != null) ? 
ByteBuffer.wrap(mapEntry.getKey()) : null;
+                    ByteBuffer value = (mapEntry.getValue() != null) ? 
ByteBuffer.wrap(mapEntry.getValue()) :
+                            null;
+                    converted.put(key, value);
+                }
+                data.put(entry.getKey(), converted);
+            }
+            is.close();
+        } catch (FileNotFoundException | EOFException e) {
+            // FileNotFoundException: Ignore, may be new.
+            // EOFException: Ignore, this means the file was missing or corrupt
+        } catch (IOException | ClassNotFoundException e) {
+            throw new CopycatException(e);
+        }
+    }
+
+    protected void save() {
+        try {
+            ObjectOutputStream os = new ObjectOutputStream(new 
FileOutputStream(file));
+            HashMap<String, Map<byte[], byte[]>> raw = new HashMap<>();
+            for (Map.Entry<String, Map<ByteBuffer, ByteBuffer>> entry : 
data.entrySet()) {
+                HashMap<byte[], byte[]> converted = new HashMap<>();
+                for (Map.Entry<ByteBuffer, ByteBuffer> mapEntry : 
entry.getValue().entrySet()) {
+                    byte[] key = (mapEntry.getKey() != null) ? 
mapEntry.getKey().array() : null;
+                    byte[] value = (mapEntry.getValue() != null) ? 
mapEntry.getValue().array() : null;
+                    converted.put(key, value);
+                }
+                raw.put(entry.getKey(), converted);
+            }
+            os.writeObject(raw);
+            os.close();
+        } catch (IOException e) {
+            throw new CopycatException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/MemoryOffsetBackingStore.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/MemoryOffsetBackingStore.java
 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/MemoryOffsetBackingStore.java
new file mode 100644
index 0000000..6ffba58
--- /dev/null
+++ 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/MemoryOffsetBackingStore.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.storage;
+
+import org.apache.kafka.copycat.util.Callback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+/**
+ * Implementation of OffsetBackingStore that doesn't actually persist any 
data. To ensure this
+ * behaves similarly to a real backing store, operations are executed 
asynchronously on a
+ * background thread.
+ */
+public class MemoryOffsetBackingStore implements OffsetBackingStore {
+    private static final Logger log = 
LoggerFactory.getLogger(MemoryOffsetBackingStore.class);
+
+    protected HashMap<String, Map<ByteBuffer, ByteBuffer>> data = new 
HashMap<>();
+    protected ExecutorService executor = Executors.newSingleThreadExecutor();
+
+    public MemoryOffsetBackingStore() {
+
+    }
+
+    @Override
+    public void configure(Map<String, ?> props) {
+    }
+
+    @Override
+    public synchronized void start() {
+    }
+
+    @Override
+    public synchronized void stop() {
+        // Nothing to do since this doesn't maintain any outstanding 
connections/data
+    }
+
+    @Override
+    public Future<Map<ByteBuffer, ByteBuffer>> get(
+            final String namespace, final Collection<ByteBuffer> keys,
+            final Callback<Map<ByteBuffer, ByteBuffer>> callback) {
+        return executor.submit(new Callable<Map<ByteBuffer, ByteBuffer>>() {
+            @Override
+            public Map<ByteBuffer, ByteBuffer> call() throws Exception {
+                Map<ByteBuffer, ByteBuffer> result = new HashMap<>();
+                synchronized (MemoryOffsetBackingStore.this) {
+                    Map<ByteBuffer, ByteBuffer> namespaceData = 
data.get(namespace);
+                    if (namespaceData == null)
+                        return result;
+                    for (ByteBuffer key : keys) {
+                        result.put(key, namespaceData.get(key));
+                    }
+                }
+                if (callback != null)
+                    callback.onCompletion(null, result);
+                return result;
+            }
+        });
+
+    }
+
+    @Override
+    public Future<Void> set(final String namespace, final Map<ByteBuffer, 
ByteBuffer> values,
+                            final Callback<Void> callback) {
+        return executor.submit(new Callable<Void>() {
+            @Override
+            public Void call() throws Exception {
+                synchronized (MemoryOffsetBackingStore.this) {
+                    Map<ByteBuffer, ByteBuffer> namespaceData = 
data.get(namespace);
+                    if (namespaceData == null) {
+                        namespaceData = new HashMap<>();
+                        data.put(namespace, namespaceData);
+                    }
+                    for (Map.Entry<ByteBuffer, ByteBuffer> entry : 
values.entrySet()) {
+                        namespaceData.put(entry.getKey(), entry.getValue());
+                    }
+                    save();
+                }
+                if (callback != null)
+                    callback.onCompletion(null, null);
+                return null;
+            }
+        });
+    }
+
+    // Hook to allow subclasses to persist data
+    protected void save() {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetBackingStore.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetBackingStore.java
 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetBackingStore.java
new file mode 100644
index 0000000..e8cb2ae
--- /dev/null
+++ 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetBackingStore.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.storage;
+
+import org.apache.kafka.common.Configurable;
+import org.apache.kafka.copycat.util.Callback;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.Future;
+
+/**
+ * <p>
+ * OffsetBackingStore is an interface for storage backends that store 
key-value data. The backing
+ * store doesn't need to handle serialization or deserialization. It only 
needs to support
+ * reading/writing bytes. Since it is expected these operations will require 
network
+ * operations, only bulk operations are supported.
+ * </p>
+ * <p>
+ * Since OffsetBackingStore is a shared resource that may be used by many 
OffsetStorage instances
+ * that are associated with individual tasks, all operations include a 
namespace which should be
+ * used to isolate different key spaces.
+ * </p>
+ */
+public interface OffsetBackingStore extends Configurable {
+
+    /**
+     * Start this offset store.
+     */
+    public void start();
+
+    /**
+     * Stop the backing store. Implementations should attempt to shutdown 
gracefully, but not block
+     * indefinitely.
+     */
+    public void stop();
+
+    /**
+     * Get the values for the specified keys
+     * @param namespace prefix for the keys in this request
+     * @param keys list of keys to look up
+     * @param callback callback to invoke on completion
+     * @return future for the resulting map from key to value
+     */
+    public Future<Map<ByteBuffer, ByteBuffer>> get(
+            String namespace, Collection<ByteBuffer> keys,
+            Callback<Map<ByteBuffer, ByteBuffer>> callback);
+
+    /**
+     * Set the specified keys and values.
+     * @param namespace prefix for the keys in this request
+     * @param values map from key to value
+     * @param callback callback to invoke on completion
+     * @return void future for the operation
+     */
+    public Future<Void> set(String namespace, Map<ByteBuffer, ByteBuffer> 
values,
+                            Callback<Void> callback);
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReaderImpl.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReaderImpl.java
 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReaderImpl.java
new file mode 100644
index 0000000..7a050dc
--- /dev/null
+++ 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReaderImpl.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.storage;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.copycat.errors.CopycatException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Implementation of OffsetStorageReader. Unlike OffsetStorageWriter which is 
implemented
+ * directly, the interface is only separate from this implementation because 
it needs to be
+ * included in the public API package.
+ */
+public class OffsetStorageReaderImpl<K, V> implements OffsetStorageReader {
+    private static final Logger log = 
LoggerFactory.getLogger(OffsetStorageReaderImpl.class);
+
+    private final OffsetBackingStore backingStore;
+    private final String namespace;
+    private final Converter<K> keyConverter;
+    private final Converter<V> valueConverter;
+    private final Serializer<K> keySerializer;
+    private final Deserializer<V> valueDeserializer;
+
+    public OffsetStorageReaderImpl(OffsetBackingStore backingStore, String 
namespace,
+                                   Converter<K> keyConverter, Converter<V> 
valueConverter,
+                                   Serializer<K> keySerializer, 
Deserializer<V> valueDeserializer) {
+        this.backingStore = backingStore;
+        this.namespace = namespace;
+        this.keyConverter = keyConverter;
+        this.valueConverter = valueConverter;
+        this.keySerializer = keySerializer;
+        this.valueDeserializer = valueDeserializer;
+    }
+
+    @Override
+    public Object getOffset(Object partition) {
+        return getOffsets(Arrays.asList(partition)).get(partition);
+    }
+
+    @Override
+    public Map<Object, Object> getOffsets(Collection<Object> partitions) {
+        // Serialize keys so backing store can work with them
+        Map<ByteBuffer, Object> serializedToOriginal = new 
HashMap<>(partitions.size());
+        for (Object key : partitions) {
+            try {
+                byte[] keySerialized = keySerializer.serialize(namespace, 
keyConverter.fromCopycatData(key));
+                ByteBuffer keyBuffer = (keySerialized != null) ? 
ByteBuffer.wrap(keySerialized) : null;
+                serializedToOriginal.put(keyBuffer, key);
+            } catch (Throwable t) {
+                log.error("CRITICAL: Failed to serialize partition key when 
getting offsets for task with "
+                        + "namespace {}. No value for this data will be 
returned, which may break the "
+                        + "task or cause it to skip some data.", namespace, t);
+            }
+        }
+
+        // Get serialized key -> serialized value from backing store
+        Map<ByteBuffer, ByteBuffer> raw;
+        try {
+            raw = backingStore.get(namespace, serializedToOriginal.keySet(), 
null).get();
+        } catch (Exception e) {
+            log.error("Failed to fetch offsets from namespace {}: ", 
namespace, e);
+            throw new CopycatException("Failed to fetch offsets.", e);
+        }
+
+        // Deserialize all the values and map back to the original keys
+        Map<Object, Object> result = new HashMap<>(partitions.size());
+        for (Map.Entry<ByteBuffer, ByteBuffer> rawEntry : raw.entrySet()) {
+            try {
+                // Since null could be a valid key, explicitly check whether 
map contains the key
+                if (!serializedToOriginal.containsKey(rawEntry.getKey())) {
+                    log.error("Should be able to map {} back to a requested 
partition-offset key, backing "
+                            + "store may have returned invalid data", 
rawEntry.getKey());
+                    continue;
+                }
+                Object origKey = serializedToOriginal.get(rawEntry.getKey());
+                Object deserializedValue = valueConverter.toCopycatData(
+                        valueDeserializer.deserialize(namespace, 
rawEntry.getValue().array())
+                );
+
+                result.put(origKey, deserializedValue);
+            } catch (Throwable t) {
+                log.error("CRITICAL: Failed to deserialize offset data when 
getting offsets for task with"
+                        + " namespace {}. No value for this data will be 
returned, which may break the "
+                        + "task or cause it to skip some data. This could 
either be due to an error in "
+                        + "the connector implementation or incompatible 
schema.", namespace, t);
+            }
+        }
+
+        return result;
+    }
+}

Reply via email to