http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageWriter.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageWriter.java
 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageWriter.java
new file mode 100644
index 0000000..c6e829c
--- /dev/null
+++ 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageWriter.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.storage;
+
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.copycat.errors.CopycatException;
+import org.apache.kafka.copycat.util.Callback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Future;
+
+/**
+ * <p>
+ * OffsetStorageWriter is a buffered writer that wraps the simple 
OffsetBackingStore interface.
+ * It maintains a copy of the key-value data in memory and buffers writes. It 
allows you to take
+ * a snapshot, which can then be asynchronously flushed to the backing store 
while new writes
+ * continue to be processed. This allows Copycat to process offset commits in 
the background
+ * while continuing to process messages.
+ * </p>
+ * <p>
+ * Copycat uses an OffsetStorage implementation to save state about the 
current progress of
+ * source (import to Kafka) jobs, which may have many input partitions and 
"offsets" may not be as
+ * simple as they are for Kafka partitions or files. Offset storage is not 
required for sink jobs
+ * because they can use Kafka's native offset storage (or the sink data store 
can handle offset
+ * storage to achieve exactly once semantics).
+ * </p>
+ * <p>
+ * Both partitions and offsets are generic data objects. This allows different 
connectors to use
+ * whatever representation they need, even arbitrarily complex records. These 
are translated
+ * internally into the serialized form the OffsetBackingStore uses.
+ * </p>
+ * <p>
+ * Note that this only provides write functionality. This is intentional to 
ensure stale data is
+ * never read. Offset data should only be read during startup or 
reconfiguration of a task. By
+ * always serving those requests by reading the values from the backing store, 
we ensure we never
+ * accidentally use stale data. (One example of how this can occur: a task is 
processing input
+ * partition A, writing offsets; reconfiguration causes partition A to be 
reassigned elsewhere;
+ * reconfiguration causes partition A to be reassigned to this node, but now 
the offset data is out
+ * of date). Since these offsets are created and managed by the connector 
itself, there's no way
+ * for the offset management layer to know which keys are "owned" by which 
tasks at any given
+ * time.
+ * </p>
+ * <p>
+ * This class is not thread-safe. It should only be accessed from a Task's 
processing thread.
+ * </p>
+ */
+public class OffsetStorageWriter<K, V> {
+    private static final Logger log = 
LoggerFactory.getLogger(OffsetStorageWriter.class);
+
+    private final OffsetBackingStore backingStore;
+    private final Converter<K> keyConverter;
+    private final Converter<V> valueConverter;
+    private final Serializer<K> keySerializer;
+    private final Serializer<V> valueSerializer;
+    private final String namespace;
+    // Offset data in Copycat format
+    private Map<Object, Object> data = new HashMap<>();
+
+    // Not synchronized, should only be accessed by flush thread
+    private Map<Object, Object> toFlush = null;
+    // Unique ID for each flush request to handle callbacks after timeouts
+    private long currentFlushId = 0;
+
+    public OffsetStorageWriter(OffsetBackingStore backingStore,
+                               String namespace, Converter<K> keyConverter, 
Converter<V> valueConverter,
+                               Serializer<K> keySerializer, Serializer<V> 
valueSerializer) {
+        this.backingStore = backingStore;
+        this.namespace = namespace;
+        this.keyConverter = keyConverter;
+        this.valueConverter = valueConverter;
+        this.keySerializer = keySerializer;
+        this.valueSerializer = valueSerializer;
+    }
+
+    /**
+     * Set an offset for a partition using Copycat data values
+     * @param partition the partition to store an offset for
+     * @param offset the offset
+     */
+    public synchronized void setOffset(Object partition, Object offset) {
+        data.put(partition, offset);
+    }
+
+    private boolean flushing() {
+        return toFlush != null;
+    }
+
+    /**
+     * Performs the first step of a flush operation, snapshotting the current 
state. This does not
+     * actually initiate the flush with the underlying storage.
+     *
+     * @return true if a flush was initiated, false if no data was available
+     */
+    public synchronized boolean beginFlush() {
+        if (flushing()) {
+            log.error("Invalid call to OffsetStorageWriter flush() while 
already flushing, the "
+                    + "framework should not allow this");
+            throw new CopycatException("OffsetStorageWriter is already 
flushing");
+        }
+
+        if (data.isEmpty())
+            return false;
+
+        assert !flushing();
+        toFlush = data;
+        data = new HashMap<>();
+        return true;
+    }
+
+    /**
+     * Flush the current offsets and clear them from this writer. This is 
non-blocking: it
+     * moves the current set of offsets out of the way, serializes the data, 
and asynchronously
+     * writes the data to the backing store. If no offsets need to be written, 
the callback is
+     * still invoked, but no Future is returned.
+     *
+     * @return a Future, or null if there are no offsets to commitOffsets
+     */
+    public Future<Void> doFlush(final Callback<Void> callback) {
+        final long flushId = currentFlushId;
+
+        // Serialize
+        Map<ByteBuffer, ByteBuffer> offsetsSerialized;
+        try {
+            offsetsSerialized = new HashMap<>();
+            for (Map.Entry<Object, Object> entry : toFlush.entrySet()) {
+                byte[] key = keySerializer.serialize(namespace, 
keyConverter.fromCopycatData(entry.getKey()));
+                ByteBuffer keyBuffer = (key != null) ? ByteBuffer.wrap(key) : 
null;
+                byte[] value = valueSerializer.serialize(namespace, 
valueConverter.fromCopycatData(entry.getValue()));
+                ByteBuffer valueBuffer = (value != null) ? 
ByteBuffer.wrap(value) : null;
+                offsetsSerialized.put(keyBuffer, valueBuffer);
+            }
+        } catch (Throwable t) {
+            // Must handle errors properly here or the writer will be left 
mid-flush forever and be
+            // unable to make progress.
+            log.error("CRITICAL: Failed to serialize offset data, making it 
impossible to commit "
+                    + "offsets under namespace {}. This likely won't recover 
unless the "
+                    + "unserializable partition or offset information is 
overwritten.", namespace);
+            callback.onCompletion(t, null);
+            return null;
+        }
+
+        // And submit the data
+        log.debug("Submitting {} entries to backing store", 
offsetsSerialized.size());
+        return backingStore.set(namespace, offsetsSerialized, new 
Callback<Void>() {
+            @Override
+            public void onCompletion(Throwable error, Void result) {
+                boolean isCurrent = handleFinishWrite(flushId, error, result);
+                if (isCurrent && callback != null)
+                    callback.onCompletion(error, result);
+            }
+        });
+    }
+
+    /**
+     * Cancel a flush that has been initiated by {@link #beginFlush}. This 
should not be called if
+     * {@link #doFlush} has already been invoked. It should be used if an 
operation performed
+     * between beginFlush and doFlush failed.
+     */
+    public synchronized void cancelFlush() {
+        // Verify we're still flushing data to handle a race between 
cancelFlush() calls from up the
+        // call stack and callbacks from the write request to underlying 
storage
+        if (flushing()) {
+            // Just recombine the data and place it back in the primary storage
+            toFlush.putAll(data);
+            data = toFlush;
+            currentFlushId++;
+            toFlush = null;
+        }
+    }
+
+    /**
+     * Handle completion of a write. Returns true if this callback is for the 
current flush
+     * operation, false if it's for an old operation that should now be 
ignored.
+     */
+    private synchronized boolean handleFinishWrite(long flushId, Throwable 
error, Void result) {
+        // Callbacks need to be handled carefully since the flush operation 
may have already timed
+        // out and been cancelled.
+        if (flushId != currentFlushId)
+            return false;
+
+        if (error != null) {
+            cancelFlush();
+        } else {
+            currentFlushId++;
+            toFlush = null;
+        }
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/Callback.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/Callback.java 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/Callback.java
new file mode 100644
index 0000000..5cf1423
--- /dev/null
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/Callback.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.util;
+
+/**
+ * Generic interface for callbacks
+ */
+public interface Callback<V> {
+    /**
+     * Invoked upon completion of the operation.
+     *
+     * @param error the error that caused the operation to fail, or null if no 
error occurred
+     * @param result the return value, or null if the operation failed
+     */
+    void onCompletion(Throwable error, V result);
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ConnectorTaskId.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ConnectorTaskId.java
 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ConnectorTaskId.java
new file mode 100644
index 0000000..44a9e41
--- /dev/null
+++ 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ConnectorTaskId.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.util;
+
+import java.io.Serializable;
+
+/**
+ * Unique ID for a single task. It includes a unique connector ID and a task 
ID that is unique within
+ * the connector.
+ */
+public class ConnectorTaskId implements Serializable {
+    private final String connector;
+    private final int task;
+
+    public ConnectorTaskId(String job, int task) {
+        this.connector = job;
+        this.task = task;
+    }
+
+    public String getConnector() {
+        return connector;
+    }
+
+    public int getTask() {
+        return task;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        ConnectorTaskId that = (ConnectorTaskId) o;
+
+        if (task != that.task)
+            return false;
+        if (connector != null ? !connector.equals(that.connector) : 
that.connector != null)
+            return false;
+
+        return true;
+    }
+
+    @Override
+    public int hashCode() {
+        int result = connector != null ? connector.hashCode() : 0;
+        result = 31 * result + task;
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return connector + '-' + task;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/FutureCallback.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/FutureCallback.java
 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/FutureCallback.java
new file mode 100644
index 0000000..278fdd3
--- /dev/null
+++ 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/FutureCallback.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.util;
+
+import java.util.concurrent.*;
+
+public class FutureCallback<T> implements Callback<T>, Future<T> {
+
+    private Callback<T> underlying;
+    private CountDownLatch finishedLatch;
+    private T result = null;
+    private Throwable exception = null;
+
+    public FutureCallback(Callback<T> underlying) {
+        this.underlying = underlying;
+        this.finishedLatch = new CountDownLatch(1);
+    }
+
+    @Override
+    public void onCompletion(Throwable error, T result) {
+        underlying.onCompletion(error, result);
+        this.exception = error;
+        this.result = result;
+        finishedLatch.countDown();
+    }
+
+    @Override
+    public boolean cancel(boolean b) {
+        return false;
+    }
+
+    @Override
+    public boolean isCancelled() {
+        return false;
+    }
+
+    @Override
+    public boolean isDone() {
+        return finishedLatch.getCount() == 0;
+    }
+
+    @Override
+    public T get() throws InterruptedException, ExecutionException {
+        finishedLatch.await();
+        return getResult();
+    }
+
+    @Override
+    public T get(long l, TimeUnit timeUnit)
+            throws InterruptedException, ExecutionException, TimeoutException {
+        finishedLatch.await(l, timeUnit);
+        return getResult();
+    }
+
+    private T getResult() throws ExecutionException {
+        if (exception != null) {
+            throw new ExecutionException(exception);
+        }
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ShutdownableThread.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ShutdownableThread.java
 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ShutdownableThread.java
new file mode 100644
index 0000000..3e23f29
--- /dev/null
+++ 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ShutdownableThread.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * <p>
+ * Thread class with support for triggering graceful and forcible shutdown. In 
graceful shutdown,
+ * a flag is set, which the thread should detect and try to exit gracefully 
from. In forcible
+ * shutdown, the thread is interrupted. These can be combined to give a thread 
a chance to exit
+ * gracefully, but then force it to exit if it takes too long.
+ * </p>
+ * <p>
+ * Implementations should override the {@link #execute} method and check 
{@link #getRunning} to
+ * determine whether they should try to gracefully exit.
+ * </p>
+ */
+public abstract class ShutdownableThread extends Thread {
+    private static final Logger log = 
LoggerFactory.getLogger(ShutdownableThread.class);
+
+    private AtomicBoolean isRunning = new AtomicBoolean(true);
+    private CountDownLatch shutdownLatch = new CountDownLatch(1);
+
+    /**
+     * An UncaughtExceptionHandler to register on every instance of this 
class. This is useful for
+     * testing, where AssertionExceptions in the thread may not cause the test 
to fail. Since one
+     * instance is used for all threads, it must be thread-safe.
+     */
+    volatile public static UncaughtExceptionHandler funcaughtExceptionHandler 
= null;
+
+    public ShutdownableThread(String name) {
+        // The default is daemon=true so that these threads will not prevent 
shutdown. We use this
+        // default because threads that are running user code that may not 
clean up properly, even
+        // when we attempt to forcibly shut them down.
+        this(name, true);
+    }
+
+    public ShutdownableThread(String name, boolean daemon) {
+        super(name);
+        this.setDaemon(daemon);
+        if (funcaughtExceptionHandler != null)
+            this.setUncaughtExceptionHandler(funcaughtExceptionHandler);
+    }
+
+    /**
+     * Implementations should override this method with the main body for the 
thread.
+     */
+    public abstract void execute();
+
+    /**
+     * Returns true if the thread hasn't exited yet and none of the shutdown 
methods have been
+     * invoked
+     */
+    public boolean getRunning() {
+        return isRunning.get();
+    }
+
+    @Override
+    public void run() {
+        try {
+            execute();
+        } catch (Error | RuntimeException e) {
+            log.error("Thread {} exiting with uncaught exception: ", 
getName(), e);
+            throw e;
+        } finally {
+            shutdownLatch.countDown();
+        }
+    }
+
+    /**
+     * Shutdown the thread, first trying to shut down gracefully using the 
specified timeout, then
+     * forcibly interrupting the thread.
+     * @param gracefulTimeout the maximum time to wait for a graceful exit
+     * @param unit the time unit of the timeout argument
+     */
+    public void shutdown(long gracefulTimeout, TimeUnit unit)
+            throws InterruptedException {
+        boolean success = gracefulShutdown(gracefulTimeout, unit);
+        if (!success)
+            forceShutdown();
+    }
+
+    /**
+     * Attempt graceful shutdown
+     * @param timeout the maximum time to wait
+     * @param unit the time unit of the timeout argument
+     * @return true if successful, false if the timeout elapsed
+     */
+    public boolean gracefulShutdown(long timeout, TimeUnit unit) throws 
InterruptedException {
+        startGracefulShutdown();
+        return awaitShutdown(timeout, unit);
+    }
+
+    /**
+     * Start shutting down this thread gracefully, but do not block waiting 
for it to exit.
+     */
+    public void startGracefulShutdown() {
+        log.info("Starting graceful shutdown of thread {}", getName());
+        isRunning.set(false);
+    }
+
+    /**
+     * Awaits shutdown of this thread, waiting up to the timeout.
+     * @param timeout the maximum time to wait
+     * @param unit the time unit of the timeout argument
+     * @return true if successful, false if the timeout elapsed
+     * @throws InterruptedException
+     */
+    public boolean awaitShutdown(long timeout, TimeUnit unit) throws 
InterruptedException {
+        return shutdownLatch.await(timeout, unit);
+    }
+
+    /**
+     * Immediately tries to force the thread to shut down by interrupting it. 
This does not try to
+     * wait for the thread to truly exit because forcible shutdown is not 
always possible. By
+     * default, threads are marked as daemon threads so they will not prevent 
the process from
+     * exiting.
+     */
+    public void forceShutdown() throws InterruptedException {
+        log.info("Forcing shutdown of thread {}", getName());
+        isRunning.set(false);
+        interrupt();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
 
b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
new file mode 100644
index 0000000..0c6f950
--- /dev/null
+++ 
b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
@@ -0,0 +1,367 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.runtime;
+
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.clients.consumer.*;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.copycat.cli.WorkerConfig;
+import org.apache.kafka.copycat.sink.SinkRecord;
+import org.apache.kafka.copycat.sink.SinkTask;
+import org.apache.kafka.copycat.sink.SinkTaskContext;
+import org.apache.kafka.copycat.storage.Converter;
+import org.apache.kafka.copycat.util.ConnectorTaskId;
+import org.apache.kafka.copycat.util.MockTime;
+import org.apache.kafka.copycat.util.ThreadedTest;
+import org.easymock.*;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.api.easymock.annotation.Mock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.reflect.Whitebox;
+
+import java.util.*;
+
+import static org.junit.Assert.assertEquals;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(WorkerSinkTask.class)
+@PowerMockIgnore("javax.management.*")
+public class WorkerSinkTaskTest extends ThreadedTest {
+
+    // These are fixed to keep this code simpler. In this example we assume 
byte[] raw values
+    // with mix of integer/string in Copycat
+    private static final String TOPIC = "test";
+    private static final int PARTITION = 12;
+    private static final long FIRST_OFFSET = 45;
+    private static final int KEY = 12;
+    private static final String VALUE = "VALUE";
+    private static final byte[] RAW_KEY = "key".getBytes();
+    private static final byte[] RAW_VALUE = "value".getBytes();
+
+    private static final TopicPartition TOPIC_PARTITION = new 
TopicPartition(TOPIC, PARTITION);
+
+    private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
+    private Time time;
+    @Mock private SinkTask sinkTask;
+    private WorkerConfig workerConfig;
+    @Mock private Converter<byte[]> keyConverter;
+    @Mock
+    private Converter<byte[]> valueConverter;
+    private WorkerSinkTask<Integer, String> workerTask;
+    @Mock private KafkaConsumer<byte[], byte[]> consumer;
+    private WorkerSinkTaskThread workerThread;
+
+    private long recordsReturned;
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void setup() {
+        super.setup();
+        time = new MockTime();
+        Properties workerProps = new Properties();
+        workerProps.setProperty("key.converter", 
"org.apache.kafka.copycat.json.JsonConverter");
+        workerProps.setProperty("value.converter", 
"org.apache.kafka.copycat.json.JsonConverter");
+        workerProps.setProperty("key.serializer", 
"org.apache.kafka.copycat.json.JsonSerializer");
+        workerProps.setProperty("value.serializer", 
"org.apache.kafka.copycat.json.JsonSerializer");
+        workerProps.setProperty("key.deserializer", 
"org.apache.kafka.copycat.json.JsonDeserializer");
+        workerProps.setProperty("value.deserializer", 
"org.apache.kafka.copycat.json.JsonDeserializer");
+        workerConfig = new WorkerConfig(workerProps);
+        workerTask = PowerMock.createPartialMock(
+                WorkerSinkTask.class, new String[]{"createConsumer", 
"createWorkerThread"},
+                taskId, sinkTask, workerConfig, keyConverter, valueConverter, 
time);
+
+        recordsReturned = 0;
+    }
+
+    @Test
+    public void testPollsInBackground() throws Exception {
+        Properties taskProps = new Properties();
+
+        expectInitializeTask(taskProps);
+        Capture<Collection<SinkRecord>> capturedRecords = expectPolls(1L);
+        expectStopTask(10L);
+
+        PowerMock.replayAll();
+
+        workerTask.start(taskProps);
+        for (int i = 0; i < 10; i++) {
+            workerThread.iteration();
+        }
+        workerTask.stop();
+        // No need for awaitStop since the thread is mocked
+        workerTask.close();
+
+        // Verify contents match expected values, i.e. that they were 
translated properly. With max
+        // batch size 1 and poll returns 1 message at a time, we should have a 
matching # of batches
+        assertEquals(10, capturedRecords.getValues().size());
+        int offset = 0;
+        for (Collection<SinkRecord> recs : capturedRecords.getValues()) {
+            assertEquals(1, recs.size());
+            for (SinkRecord rec : recs) {
+                SinkRecord referenceSinkRecord
+                        = new SinkRecord(TOPIC, PARTITION, KEY, VALUE, 
FIRST_OFFSET + offset);
+                assertEquals(referenceSinkRecord, rec);
+                offset++;
+            }
+        }
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testDeliverConvertsData() throws Exception {
+        // Validate conversion is performed when data is delivered
+        Integer record = 12;
+
+        ConsumerRecords<byte[], byte[]> records = new ConsumerRecords<>(
+                Collections.singletonMap(
+                        new TopicPartition("topic", 0),
+                        Collections.singletonList(new 
ConsumerRecord<>("topic", 0, 0, RAW_KEY, RAW_VALUE))));
+
+        // Exact data doesn't matter, but should be passed directly to sink 
task
+        EasyMock.expect(keyConverter.toCopycatData(RAW_KEY)).andReturn(record);
+        
EasyMock.expect(valueConverter.toCopycatData(RAW_VALUE)).andReturn(record);
+        Capture<Collection<SinkRecord>> capturedRecords
+                = EasyMock.newCapture(CaptureType.ALL);
+        sinkTask.put(EasyMock.capture(capturedRecords));
+        EasyMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        Whitebox.invokeMethod(workerTask, "deliverMessages", records);
+        assertEquals(record, 
capturedRecords.getValue().iterator().next().getKey());
+        assertEquals(record, 
capturedRecords.getValue().iterator().next().getValue());
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testCommit() throws Exception {
+        Properties taskProps = new Properties();
+
+        expectInitializeTask(taskProps);
+        // Make each poll() take the offset commit interval
+        Capture<Collection<SinkRecord>> capturedRecords
+                = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT);
+        expectOffsetFlush(1L, null, null, 0, true);
+        expectStopTask(2);
+
+        PowerMock.replayAll();
+
+        workerTask.start(taskProps);
+        // First iteration gets one record
+        workerThread.iteration();
+        // Second triggers commit, gets a second offset
+        workerThread.iteration();
+        // Commit finishes synchronously for testing so we can check this 
immediately
+        assertEquals(0, workerThread.getCommitFailures());
+        workerTask.stop();
+        workerTask.close();
+
+        assertEquals(2, capturedRecords.getValues().size());
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testCommitTaskFlushFailure() throws Exception {
+        Properties taskProps = new Properties();
+
+        expectInitializeTask(taskProps);
+        Capture<Collection<SinkRecord>> capturedRecords
+                = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT);
+        expectOffsetFlush(1L, new RuntimeException(), null, 0, true);
+        expectStopTask(2);
+
+        PowerMock.replayAll();
+
+        workerTask.start(taskProps);
+        // Second iteration triggers commit
+        workerThread.iteration();
+        workerThread.iteration();
+        assertEquals(1, workerThread.getCommitFailures());
+        assertEquals(false, Whitebox.getInternalState(workerThread, 
"committing"));
+        workerTask.stop();
+        workerTask.close();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testCommitConsumerFailure() throws Exception {
+        Properties taskProps = new Properties();
+
+        expectInitializeTask(taskProps);
+        Capture<Collection<SinkRecord>> capturedRecords
+                = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT);
+        expectOffsetFlush(1L, null, new Exception(), 0, true);
+        expectStopTask(2);
+
+        PowerMock.replayAll();
+
+        workerTask.start(taskProps);
+        // Second iteration triggers commit
+        workerThread.iteration();
+        workerThread.iteration();
+        // TODO Response to consistent failures?
+        assertEquals(1, workerThread.getCommitFailures());
+        assertEquals(false, Whitebox.getInternalState(workerThread, 
"committing"));
+        workerTask.stop();
+        workerTask.close();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testCommitTimeout() throws Exception {
+        Properties taskProps = new Properties();
+
+        expectInitializeTask(taskProps);
+        // Cut down amount of time to pass in each poll so we trigger exactly 
1 offset commit
+        Capture<Collection<SinkRecord>> capturedRecords
+                = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT / 
2);
+        expectOffsetFlush(2L, null, null, 
WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_DEFAULT, false);
+        expectStopTask(4);
+
+        PowerMock.replayAll();
+
+        workerTask.start(taskProps);
+        // Third iteration triggers commit, fourth gives a chance to trigger 
the timeout but doesn't
+        // trigger another commit
+        workerThread.iteration();
+        workerThread.iteration();
+        workerThread.iteration();
+        workerThread.iteration();
+        // TODO Response to consistent failures?
+        assertEquals(1, workerThread.getCommitFailures());
+        assertEquals(false, Whitebox.getInternalState(workerThread, 
"committing"));
+        workerTask.stop();
+        workerTask.close();
+
+        PowerMock.verifyAll();
+    }
+
+    private KafkaConsumer<byte[], byte[]> expectInitializeTask(Properties 
taskProps)
+            throws Exception {
+        sinkTask.initialize(EasyMock.anyObject(SinkTaskContext.class));
+        PowerMock.expectLastCall();
+        sinkTask.start(taskProps);
+        PowerMock.expectLastCall();
+
+        PowerMock.expectPrivate(workerTask, "createConsumer", taskProps)
+                .andReturn(consumer);
+        workerThread = PowerMock.createPartialMock(WorkerSinkTaskThread.class, 
new String[]{"start"},
+                workerTask, "mock-worker-thread", time,
+                workerConfig);
+        PowerMock.expectPrivate(workerTask, "createWorkerThread")
+                .andReturn(workerThread);
+        workerThread.start();
+        PowerMock.expectLastCall();
+        return consumer;
+    }
+
+    private void expectStopTask(final long expectedMessages) throws Exception {
+        final long finalOffset = FIRST_OFFSET + expectedMessages - 1;
+
+        sinkTask.stop();
+        PowerMock.expectLastCall();
+
+        // No offset commit since it happens in the mocked worker thread, but 
the main thread does need to wake up the
+        // consumer so it exits quickly
+        consumer.wakeup();
+        PowerMock.expectLastCall();
+
+        consumer.close();
+        PowerMock.expectLastCall();
+    }
+
+    // Note that this can only be called once per test currently
+    private Capture<Collection<SinkRecord>> expectPolls(final long 
pollDelayMs) throws Exception {
+        // Stub out all the consumer stream/iterator responses, which we just 
want to verify occur,
+        // but don't care about the exact details here.
+        EasyMock.expect(consumer.poll(EasyMock.anyLong())).andStubAnswer(
+                new IAnswer<ConsumerRecords<byte[], byte[]>>() {
+                    @Override
+                    public ConsumerRecords<byte[], byte[]> answer() throws 
Throwable {
+                        // "Sleep" so time will progress
+                        time.sleep(pollDelayMs);
+                        ConsumerRecords<byte[], byte[]> records = new 
ConsumerRecords<>(
+                                Collections.singletonMap(
+                                        new TopicPartition(TOPIC, PARTITION),
+                                        Arrays.asList(
+                                                new ConsumerRecord<>(TOPIC, 
PARTITION, FIRST_OFFSET + recordsReturned, RAW_KEY, RAW_VALUE)
+                                        )));
+                        recordsReturned++;
+                        return records;
+                    }
+                });
+        
EasyMock.expect(keyConverter.toCopycatData(RAW_KEY)).andReturn(KEY).anyTimes();
+        
EasyMock.expect(valueConverter.toCopycatData(RAW_VALUE)).andReturn(VALUE).anyTimes();
+        Capture<Collection<SinkRecord>> capturedRecords = 
EasyMock.newCapture(CaptureType.ALL);
+        sinkTask.put(EasyMock.capture(capturedRecords));
+        EasyMock.expectLastCall().anyTimes();
+        return capturedRecords;
+    }
+
+    private Capture<ConsumerCommitCallback> expectOffsetFlush(final long 
expectedMessages,
+                                                              final 
RuntimeException flushError,
+                                                              final Exception 
consumerCommitError,
+                                                              final long 
consumerCommitDelayMs,
+                                                              final boolean 
invokeCallback)
+            throws Exception {
+        final long finalOffset = FIRST_OFFSET + expectedMessages - 1;
+
+        
EasyMock.expect(consumer.subscriptions()).andReturn(Collections.singleton(TOPIC_PARTITION));
+        EasyMock.expect(consumer.position(TOPIC_PARTITION)).andAnswer(
+                new IAnswer<Long>() {
+                    @Override
+                    public Long answer() throws Throwable {
+                        return FIRST_OFFSET + recordsReturned - 1;
+                    }
+                }
+        );
+
+        sinkTask.flush(Collections.singletonMap(TOPIC_PARTITION, finalOffset));
+        IExpectationSetters<Object> flushExpectation = 
PowerMock.expectLastCall();
+        if (flushError != null) {
+            flushExpectation.andThrow(flushError).once();
+            return null;
+        }
+
+        final Capture<ConsumerCommitCallback> capturedCallback = 
EasyMock.newCapture();
+        final Map<TopicPartition, Long> offsets = 
Collections.singletonMap(TOPIC_PARTITION, finalOffset);
+        consumer.commit(EasyMock.eq(offsets),
+                EasyMock.eq(CommitType.ASYNC),
+                EasyMock.capture(capturedCallback));
+        PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
+            @Override
+            public Object answer() throws Throwable {
+                time.sleep(consumerCommitDelayMs);
+                if (invokeCallback)
+                    capturedCallback.getValue().onComplete(offsets, 
consumerCommitError);
+                return null;
+            }
+        });
+        return capturedCallback;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java
 
b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java
new file mode 100644
index 0000000..60e1462
--- /dev/null
+++ 
b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java
@@ -0,0 +1,279 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.runtime;
+
+import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.copycat.cli.WorkerConfig;
+import org.apache.kafka.copycat.source.SourceRecord;
+import org.apache.kafka.copycat.source.SourceTask;
+import org.apache.kafka.copycat.source.SourceTaskContext;
+import org.apache.kafka.copycat.storage.Converter;
+import org.apache.kafka.copycat.storage.OffsetStorageReader;
+import org.apache.kafka.copycat.storage.OffsetStorageWriter;
+import org.apache.kafka.copycat.util.Callback;
+import org.apache.kafka.copycat.util.ConnectorTaskId;
+import org.apache.kafka.copycat.util.ThreadedTest;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.easymock.IExpectationSetters;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.api.easymock.annotation.Mock;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.reflect.Whitebox;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.junit.Assert.*;
+
+@RunWith(PowerMockRunner.class)
+public class WorkerSourceTaskTest extends ThreadedTest {
+    private static final byte[] PARTITION_BYTES = "partition".getBytes();
+    private static final byte[] OFFSET_BYTES = "offset-1".getBytes();
+
+    // Copycat-format data
+    private static final Integer KEY = -1;
+    private static final Long RECORD = 12L;
+    // Native-formatted data. The actual format of this data doesn't matter -- 
we just want to see that the right version
+    // is used in the right place.
+    private static final ByteBuffer CONVERTED_KEY = 
ByteBuffer.wrap("converted-key".getBytes());
+    private static final String CONVERTED_RECORD = "converted-record";
+
+    private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
+    private WorkerConfig config;
+    @Mock private SourceTask sourceTask;
+    @Mock private Converter<ByteBuffer> keyConverter;
+    @Mock private Converter<String> valueConverter;
+    @Mock private KafkaProducer<ByteBuffer, String> producer;
+    @Mock private OffsetStorageReader offsetReader;
+    @Mock private OffsetStorageWriter<ByteBuffer, String> offsetWriter;
+    private WorkerSourceTask<ByteBuffer, String> workerTask;
+    @Mock private Future<RecordMetadata> sendFuture;
+
+    private Capture<org.apache.kafka.clients.producer.Callback> 
producerCallbacks;
+
+    private static final Properties EMPTY_TASK_PROPS = new Properties();
+    private static final List<SourceRecord> RECORDS = Arrays.asList(
+            new SourceRecord(PARTITION_BYTES, OFFSET_BYTES, "topic", null, 
KEY, RECORD)
+    );
+
+    @Override
+    public void setup() {
+        super.setup();
+        Properties workerProps = new Properties();
+        workerProps.setProperty("key.converter", 
"org.apache.kafka.copycat.json.JsonConverter");
+        workerProps.setProperty("value.converter", 
"org.apache.kafka.copycat.json.JsonConverter");
+        workerProps.setProperty("key.serializer", 
"org.apache.kafka.copycat.json.JsonSerializer");
+        workerProps.setProperty("value.serializer", 
"org.apache.kafka.copycat.json.JsonSerializer");
+        workerProps.setProperty("key.deserializer", 
"org.apache.kafka.copycat.json.JsonDeserializer");
+        workerProps.setProperty("value.deserializer", 
"org.apache.kafka.copycat.json.JsonDeserializer");
+        config = new WorkerConfig(workerProps);
+        producerCallbacks = EasyMock.newCapture();
+    }
+
+    private void createWorkerTask() {
+        workerTask = new WorkerSourceTask<>(taskId, sourceTask, keyConverter, 
valueConverter, producer,
+                offsetReader, offsetWriter, config, new SystemTime());
+    }
+
+    @Test
+    public void testPollsInBackground() throws Exception {
+        createWorkerTask();
+
+        sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
+        EasyMock.expectLastCall();
+        sourceTask.start(EMPTY_TASK_PROPS);
+        EasyMock.expectLastCall();
+
+        final CountDownLatch pollLatch = expectPolls(10);
+        // In this test, we don't flush, so nothing goes any further than the 
offset writer
+
+        sourceTask.stop();
+        EasyMock.expectLastCall();
+        expectOffsetFlush(true);
+
+        PowerMock.replayAll();
+
+        workerTask.start(EMPTY_TASK_PROPS);
+        awaitPolls(pollLatch);
+        workerTask.stop();
+        assertEquals(true, workerTask.awaitStop(1000));
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testCommit() throws Exception {
+        // Test that the task commits properly when prompted
+        createWorkerTask();
+
+        sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
+        EasyMock.expectLastCall();
+        sourceTask.start(EMPTY_TASK_PROPS);
+        EasyMock.expectLastCall();
+
+        // We'll wait for some data, then trigger a flush
+        final CountDownLatch pollLatch = expectPolls(1);
+        expectOffsetFlush(true);
+
+        sourceTask.stop();
+        EasyMock.expectLastCall();
+        expectOffsetFlush(true);
+
+        PowerMock.replayAll();
+
+        workerTask.start(EMPTY_TASK_PROPS);
+        awaitPolls(pollLatch);
+        assertTrue(workerTask.commitOffsets());
+        workerTask.stop();
+        assertEquals(true, workerTask.awaitStop(1000));
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testCommitFailure() throws Exception {
+        // Test that the task commits properly when prompted
+        createWorkerTask();
+
+        sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
+        EasyMock.expectLastCall();
+        sourceTask.start(EMPTY_TASK_PROPS);
+        EasyMock.expectLastCall();
+
+        // We'll wait for some data, then trigger a flush
+        final CountDownLatch pollLatch = expectPolls(1);
+        expectOffsetFlush(false);
+
+        sourceTask.stop();
+        EasyMock.expectLastCall();
+        expectOffsetFlush(true);
+
+        PowerMock.replayAll();
+
+        workerTask.start(EMPTY_TASK_PROPS);
+        awaitPolls(pollLatch);
+        assertFalse(workerTask.commitOffsets());
+        workerTask.stop();
+        assertEquals(true, workerTask.awaitStop(1000));
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testSendRecordsConvertsData() throws Exception {
+        createWorkerTask();
+
+        List<SourceRecord> records = new ArrayList<>();
+        // Can just use the same record for key and value
+        records.add(new SourceRecord(PARTITION_BYTES, OFFSET_BYTES, "topic", 
null, KEY, RECORD));
+
+        Capture<ProducerRecord<ByteBuffer, String>> sent = expectSendRecord();
+
+        PowerMock.replayAll();
+
+        Whitebox.invokeMethod(workerTask, "sendRecords", records);
+        assertEquals(CONVERTED_KEY, sent.getValue().key());
+        assertEquals(CONVERTED_RECORD, sent.getValue().value());
+
+        PowerMock.verifyAll();
+    }
+
+
+    private CountDownLatch expectPolls(int count) throws InterruptedException {
+        final CountDownLatch latch = new CountDownLatch(count);
+        // Note that we stub these to allow any number of calls because the 
thread will continue to
+        // run. The count passed in + latch returned just makes sure we get 
*at least* that number of
+        // calls
+        EasyMock.expect(sourceTask.poll())
+                .andStubAnswer(new IAnswer<List<SourceRecord>>() {
+                    @Override
+                    public List<SourceRecord> answer() throws Throwable {
+                        latch.countDown();
+                        return RECORDS;
+                    }
+                });
+        // Fallout of the poll() call
+        expectSendRecord();
+        return latch;
+    }
+
+    private Capture<ProducerRecord<ByteBuffer, String>> expectSendRecord() 
throws InterruptedException {
+        
EasyMock.expect(keyConverter.fromCopycatData(KEY)).andStubReturn(CONVERTED_KEY);
+        
EasyMock.expect(valueConverter.fromCopycatData(RECORD)).andStubReturn(CONVERTED_RECORD);
+
+        Capture<ProducerRecord<ByteBuffer, String>> sent = 
EasyMock.newCapture();
+        // 1. Converted data passed to the producer, which will need callbacks 
invoked for flush to work
+        EasyMock.expect(
+                producer.send(EasyMock.capture(sent),
+                        EasyMock.capture(producerCallbacks)))
+                .andStubAnswer(new IAnswer<Future<RecordMetadata>>() {
+                    @Override
+                    public Future<RecordMetadata> answer() throws Throwable {
+                        synchronized (producerCallbacks) {
+                            for (org.apache.kafka.clients.producer.Callback cb 
: producerCallbacks.getValues()) {
+                                cb.onCompletion(new RecordMetadata(new 
TopicPartition("foo", 0), 0, 0), null);
+                            }
+                            producerCallbacks.reset();
+                        }
+                        return sendFuture;
+                    }
+                });
+        // 2. Offset data is passed to the offset storage.
+        offsetWriter.setOffset(PARTITION_BYTES, OFFSET_BYTES);
+        PowerMock.expectLastCall().anyTimes();
+
+        return sent;
+    }
+
+    private void awaitPolls(CountDownLatch latch) throws InterruptedException {
+        latch.await(1000, TimeUnit.MILLISECONDS);
+    }
+
+    @SuppressWarnings("unchecked")
+    private void expectOffsetFlush(boolean succeed) throws Exception {
+        EasyMock.expect(offsetWriter.beginFlush()).andReturn(true);
+        Future<Void> flushFuture = PowerMock.createMock(Future.class);
+        
EasyMock.expect(offsetWriter.doFlush(EasyMock.anyObject(Callback.class))).andReturn(flushFuture);
+        // Should throw for failure
+        IExpectationSetters<Void> futureGetExpect = EasyMock.expect(
+                flushFuture.get(EasyMock.anyLong(), 
EasyMock.anyObject(TimeUnit.class)));
+        if (succeed) {
+            futureGetExpect.andReturn(null);
+        } else {
+            futureGetExpect.andThrow(new TimeoutException());
+            offsetWriter.cancelFlush();
+            PowerMock.expectLastCall();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java
 
b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java
new file mode 100644
index 0000000..32e7ff9
--- /dev/null
+++ 
b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.runtime;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.copycat.cli.WorkerConfig;
+import org.apache.kafka.copycat.errors.CopycatException;
+import org.apache.kafka.copycat.source.SourceRecord;
+import org.apache.kafka.copycat.source.SourceTask;
+import org.apache.kafka.copycat.storage.*;
+import org.apache.kafka.copycat.util.ConnectorTaskId;
+import org.apache.kafka.copycat.util.MockTime;
+import org.apache.kafka.copycat.util.ThreadedTest;
+import org.easymock.EasyMock;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.List;
+import java.util.Properties;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(Worker.class)
+@PowerMockIgnore("javax.management.*")
+public class WorkerTest extends ThreadedTest {
+
+    private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
+    private Worker worker;
+    private OffsetBackingStore offsetBackingStore = 
PowerMock.createMock(OffsetBackingStore.class);
+    private Serializer offsetKeySerializer = 
PowerMock.createMock(Serializer.class);
+    private Serializer offsetValueSerializer = 
PowerMock.createMock(Serializer.class);
+    private Deserializer offsetKeyDeserializer = 
PowerMock.createMock(Deserializer.class);
+    private Deserializer offsetValueDeserializer = 
PowerMock.createMock(Deserializer.class);
+
+    @Before
+    public void setup() {
+        super.setup();
+
+        Properties workerProps = new Properties();
+        workerProps.setProperty("key.converter", 
"org.apache.kafka.copycat.json.JsonConverter");
+        workerProps.setProperty("value.converter", 
"org.apache.kafka.copycat.json.JsonConverter");
+        workerProps.setProperty("key.serializer", 
"org.apache.kafka.copycat.json.JsonSerializer");
+        workerProps.setProperty("value.serializer", 
"org.apache.kafka.copycat.json.JsonSerializer");
+        workerProps.setProperty("key.deserializer", 
"org.apache.kafka.copycat.json.JsonDeserializer");
+        workerProps.setProperty("value.deserializer", 
"org.apache.kafka.copycat.json.JsonDeserializer");
+        WorkerConfig config = new WorkerConfig(workerProps);
+        worker = new Worker(new MockTime(), config, offsetBackingStore,
+                offsetKeySerializer, offsetValueSerializer,
+                offsetKeyDeserializer, offsetValueDeserializer);
+        worker.start();
+    }
+
+    @Test
+    public void testAddRemoveTask() throws Exception {
+        ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
+
+        // Create
+        TestSourceTask task = PowerMock.createMock(TestSourceTask.class);
+        WorkerSourceTask workerTask = 
PowerMock.createMock(WorkerSourceTask.class);
+
+        PowerMock.mockStatic(Worker.class);
+        PowerMock.expectPrivate(Worker.class, "instantiateTask", 
TestSourceTask.class.getName()).andReturn(task);
+
+        PowerMock.expectNew(
+                WorkerSourceTask.class, EasyMock.eq(taskId), EasyMock.eq(task),
+                EasyMock.anyObject(Converter.class),
+                EasyMock.anyObject(Converter.class),
+                EasyMock.anyObject(KafkaProducer.class),
+                EasyMock.anyObject(OffsetStorageReader.class),
+                EasyMock.anyObject(OffsetStorageWriter.class),
+                EasyMock.anyObject(WorkerConfig.class),
+                EasyMock.anyObject(Time.class))
+                .andReturn(workerTask);
+        Properties origProps = new Properties();
+        workerTask.start(origProps);
+        EasyMock.expectLastCall();
+
+        // Remove
+        workerTask.stop();
+        EasyMock.expectLastCall();
+        
EasyMock.expect(workerTask.awaitStop(EasyMock.anyLong())).andStubReturn(true);
+        workerTask.close();
+        EasyMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        worker.addTask(taskId, TestSourceTask.class.getName(), origProps);
+        worker.stopTask(taskId);
+        // Nothing should be left, so this should effectively be a nop
+        worker.stop();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test(expected = CopycatException.class)
+    public void testStopInvalidTask() {
+        worker.stopTask(taskId);
+    }
+
+    @Test
+    public void testCleanupTasksOnStop() throws Exception {
+        // Create
+        TestSourceTask task = PowerMock.createMock(TestSourceTask.class);
+        WorkerSourceTask workerTask = 
PowerMock.createMock(WorkerSourceTask.class);
+
+        PowerMock.mockStatic(Worker.class);
+        PowerMock.expectPrivate(Worker.class, "instantiateTask", 
TestSourceTask.class.getName()).andReturn(task);
+
+        PowerMock.expectNew(
+                WorkerSourceTask.class, EasyMock.eq(taskId), EasyMock.eq(task),
+                EasyMock.anyObject(Converter.class),
+                EasyMock.anyObject(Converter.class),
+                EasyMock.anyObject(KafkaProducer.class),
+                EasyMock.anyObject(OffsetStorageReader.class),
+                EasyMock.anyObject(OffsetStorageWriter.class),
+                EasyMock.anyObject(WorkerConfig.class),
+                EasyMock.anyObject(Time.class))
+                .andReturn(workerTask);
+        Properties origProps = new Properties();
+        workerTask.start(origProps);
+        EasyMock.expectLastCall();
+
+        // Remove on Worker.stop()
+        workerTask.stop();
+        EasyMock.expectLastCall();
+        
EasyMock.expect(workerTask.awaitStop(EasyMock.anyLong())).andReturn(true);
+        // Note that in this case we *do not* commit offsets since it's an 
unclean shutdown
+        workerTask.close();
+        EasyMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        worker.addTask(taskId, TestSourceTask.class.getName(), origProps);
+        worker.stop();
+
+        PowerMock.verifyAll();
+    }
+
+
+    private static class TestSourceTask extends SourceTask {
+        public TestSourceTask() {
+        }
+
+        @Override
+        public void start(Properties props) {
+        }
+
+        @Override
+        public List<SourceRecord> poll() throws InterruptedException {
+            return null;
+        }
+
+        @Override
+        public void stop() {
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerderTest.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerderTest.java
 
b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerderTest.java
new file mode 100644
index 0000000..5ac7e38
--- /dev/null
+++ 
b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerderTest.java
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.runtime.standalone;
+
+import org.apache.kafka.copycat.connector.Connector;
+import org.apache.kafka.copycat.connector.Task;
+import org.apache.kafka.copycat.runtime.ConnectorConfig;
+import org.apache.kafka.copycat.runtime.Worker;
+import org.apache.kafka.copycat.sink.SinkConnector;
+import org.apache.kafka.copycat.sink.SinkTask;
+import org.apache.kafka.copycat.source.SourceConnector;
+import org.apache.kafka.copycat.source.SourceTask;
+import org.apache.kafka.copycat.util.Callback;
+import org.apache.kafka.copycat.util.ConnectorTaskId;
+import org.apache.kafka.copycat.util.FutureCallback;
+import org.easymock.EasyMock;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.api.easymock.annotation.Mock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({StandaloneHerder.class})
+@PowerMockIgnore("javax.management.*")
+public class StandaloneHerderTest {
+    private static final String CONNECTOR_NAME = "test";
+    private static final String TOPICS_LIST_STR = "topic1,topic2";
+
+    private StandaloneHerder herder;
+    @Mock protected Worker worker;
+    private Connector connector;
+    @Mock protected Callback<String> createCallback;
+
+    private Properties connectorProps;
+    private Properties taskProps;
+
+    @Before
+    public void setup() {
+        worker = PowerMock.createMock(Worker.class);
+        herder = new StandaloneHerder(worker);
+
+        connectorProps = new Properties();
+        connectorProps.setProperty(ConnectorConfig.NAME_CONFIG, 
CONNECTOR_NAME);
+        connectorProps.setProperty(SinkConnector.TOPICS_CONFIG, 
TOPICS_LIST_STR);
+        PowerMock.mockStatic(StandaloneHerder.class);
+
+        // These can be anything since connectors can pass along whatever they 
want.
+        taskProps = new Properties();
+        taskProps.setProperty("foo", "bar");
+    }
+
+    @Test
+    public void testCreateSourceConnector() throws Exception {
+        connector = PowerMock.createMock(BogusSourceClass.class);
+        expectAdd(BogusSourceClass.class, BogusSourceTask.class, false);
+        PowerMock.replayAll();
+
+        herder.addConnector(connectorProps, createCallback);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testCreateSinkConnector() throws Exception {
+        connector = PowerMock.createMock(BogusSinkClass.class);
+        expectAdd(BogusSinkClass.class, BogusSinkTask.class, true);
+
+        PowerMock.replayAll();
+
+        herder.addConnector(connectorProps, createCallback);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testDestroyConnector() throws Exception {
+        connector = PowerMock.createMock(BogusSourceClass.class);
+        expectAdd(BogusSourceClass.class, BogusSourceTask.class, false);
+        expectDestroy();
+        PowerMock.replayAll();
+
+        herder.addConnector(connectorProps, createCallback);
+        FutureCallback<Void> futureCb = new FutureCallback<>(new 
Callback<Void>() {
+            @Override
+            public void onCompletion(Throwable error, Void result) {
+
+            }
+        });
+        herder.deleteConnector(CONNECTOR_NAME, futureCb);
+        futureCb.get(1000L, TimeUnit.MILLISECONDS);
+        PowerMock.verifyAll();
+    }
+
+
+    private void expectAdd(Class<? extends Connector> connClass,
+                             Class<? extends Task> taskClass,
+                             boolean sink) throws Exception {
+        expectCreate(connClass, taskClass, sink, true);
+    }
+
+    private void expectRestore(Class<? extends Connector> connClass,
+                                 Class<? extends Task> taskClass) throws 
Exception {
+        // Restore never uses a callback. These tests always use sources
+        expectCreate(connClass, taskClass, false, false);
+    }
+
+    private void expectCreate(Class<? extends Connector> connClass,
+                                Class<? extends Task> taskClass,
+                                boolean sink, boolean expectCallback) throws 
Exception {
+        connectorProps.setProperty(ConnectorConfig.CONNECTOR_CLASS_CONFIG, 
connClass.getName());
+
+        PowerMock.expectPrivate(StandaloneHerder.class, 
"instantiateConnector", connClass.getName())
+                .andReturn(connector);
+        if (expectCallback) {
+            createCallback.onCompletion(null, CONNECTOR_NAME);
+            PowerMock.expectLastCall();
+        }
+
+        
connector.initialize(EasyMock.anyObject(StandaloneConnectorContext.class));
+        PowerMock.expectLastCall();
+        connector.start(new Properties());
+        PowerMock.expectLastCall();
+
+        // Just return the connector properties for the individual task we 
generate by default
+        EasyMock.<Class<? extends 
Task>>expect(connector.getTaskClass()).andReturn(taskClass);
+
+        
EasyMock.expect(connector.getTaskConfigs(ConnectorConfig.TASKS_MAX_DEFAULT))
+                .andReturn(Arrays.asList(taskProps));
+        // And we should instantiate the tasks. For a sink task, we should see 
added properties for
+        // the input topic partitions
+        Properties generatedTaskProps = new Properties();
+        generatedTaskProps.putAll(taskProps);
+        if (sink)
+            generatedTaskProps.setProperty(SinkTask.TOPICS_CONFIG, 
TOPICS_LIST_STR);
+        worker.addTask(new ConnectorTaskId(CONNECTOR_NAME, 0), 
taskClass.getName(), generatedTaskProps);
+        PowerMock.expectLastCall();
+    }
+
+    private void expectStop() {
+        worker.stopTask(new ConnectorTaskId(CONNECTOR_NAME, 0));
+        EasyMock.expectLastCall();
+        connector.stop();
+        EasyMock.expectLastCall();
+    }
+
+    private void expectDestroy() {
+        expectStop();
+    }
+
+    // We need to use a real class here due to some issue with mocking 
java.lang.Class
+    private abstract class BogusSourceClass extends SourceConnector {
+    }
+
+    private abstract class BogusSourceTask extends SourceTask {
+    }
+
+    private abstract class BogusSinkClass extends SinkConnector {
+    }
+
+    private abstract class BogusSinkTask extends SourceTask {
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/FileOffsetBackingStoreTest.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/FileOffsetBackingStoreTest.java
 
b/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/FileOffsetBackingStoreTest.java
new file mode 100644
index 0000000..bbcbdc9
--- /dev/null
+++ 
b/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/FileOffsetBackingStoreTest.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.storage;
+
+import org.apache.kafka.copycat.util.Callback;
+import org.easymock.EasyMock;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.powermock.api.easymock.PowerMock;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class FileOffsetBackingStoreTest {
+
+    FileOffsetBackingStore store;
+    Map<String, Object> props;
+    File tempFile;
+
+    private static Map<ByteBuffer, ByteBuffer> firstSet = new HashMap<>();
+
+    static {
+        firstSet.put(buffer("key"), buffer("value"));
+        firstSet.put(null, null);
+    }
+
+    @Before
+    public void setup() throws IOException {
+        store = new FileOffsetBackingStore();
+        tempFile = File.createTempFile("fileoffsetbackingstore", null);
+        props = new HashMap<>();
+        props.put(FileOffsetBackingStore.OFFSET_STORAGE_FILE_FILENAME_CONFIG, 
tempFile.getAbsolutePath());
+        store.configure(props);
+        store.start();
+    }
+
+    @After
+    public void teardown() {
+        tempFile.delete();
+    }
+
+    @Test
+    public void testGetSet() throws Exception {
+        Callback<Void> setCallback = expectSuccessfulSetCallback();
+        Callback<Map<ByteBuffer, ByteBuffer>> getCallback = 
expectSuccessfulGetCallback();
+        PowerMock.replayAll();
+
+        store.set("namespace", firstSet, setCallback).get();
+
+        Map<ByteBuffer, ByteBuffer> values = store.get("namespace", 
Arrays.asList(buffer("key"), buffer("bad")), getCallback).get();
+        assertEquals(buffer("value"), values.get(buffer("key")));
+        assertEquals(null, values.get(buffer("bad")));
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testSaveRestore() throws Exception {
+        Callback<Void> setCallback = expectSuccessfulSetCallback();
+        Callback<Map<ByteBuffer, ByteBuffer>> getCallback = 
expectSuccessfulGetCallback();
+        PowerMock.replayAll();
+
+        store.set("namespace", firstSet, setCallback).get();
+        store.stop();
+
+        // Restore into a new store to ensure correct reload from scratch
+        FileOffsetBackingStore restore = new FileOffsetBackingStore();
+        restore.configure(props);
+        restore.start();
+        Map<ByteBuffer, ByteBuffer> values = restore.get("namespace", 
Arrays.asList(buffer("key")), getCallback).get();
+        assertEquals(buffer("value"), values.get(buffer("key")));
+
+        PowerMock.verifyAll();
+    }
+
+    private static ByteBuffer buffer(String v) {
+        return ByteBuffer.wrap(v.getBytes());
+    }
+
+    private Callback<Void> expectSuccessfulSetCallback() {
+        @SuppressWarnings("unchecked")
+        Callback<Void> setCallback = PowerMock.createMock(Callback.class);
+        setCallback.onCompletion(EasyMock.isNull(Throwable.class), 
EasyMock.isNull(Void.class));
+        PowerMock.expectLastCall();
+        return setCallback;
+    }
+
+    @SuppressWarnings("unchecked")
+    private Callback<Map<ByteBuffer, ByteBuffer>> 
expectSuccessfulGetCallback() {
+        Callback<Map<ByteBuffer, ByteBuffer>> getCallback = 
PowerMock.createMock(Callback.class);
+        getCallback.onCompletion(EasyMock.isNull(Throwable.class), 
EasyMock.anyObject(Map.class));
+        PowerMock.expectLastCall();
+        return getCallback;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/OffsetStorageWriterTest.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/OffsetStorageWriterTest.java
 
b/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/OffsetStorageWriterTest.java
new file mode 100644
index 0000000..3d49f05
--- /dev/null
+++ 
b/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/OffsetStorageWriterTest.java
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.storage;
+
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.copycat.errors.CopycatException;
+import org.apache.kafka.copycat.util.Callback;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.api.easymock.annotation.Mock;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.*;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(PowerMockRunner.class)
+public class OffsetStorageWriterTest {
+    private static final String NAMESPACE = "namespace";
+    // Copycat format - any types should be accepted here
+    private static final List<String> OFFSET_KEY = Arrays.asList("key", "key");
+    private static final String OFFSET_VALUE = "value";
+    // Native objects - must match serializer types
+    private static final int OFFSET_KEY_CONVERTED = 12;
+    private static final String OFFSET_VALUE_CONVERTED = "value-converted";
+    // Serialized
+    private static final byte[] OFFSET_KEY_SERIALIZED = 
"key-serialized".getBytes();
+    private static final byte[] OFFSET_VALUE_SERIALIZED = 
"value-serialized".getBytes();
+    private static final Map<ByteBuffer, ByteBuffer> OFFSETS_SERIALIZED
+            = Collections.singletonMap(ByteBuffer.wrap(OFFSET_KEY_SERIALIZED),
+            ByteBuffer.wrap(OFFSET_VALUE_SERIALIZED));
+
+    @Mock private OffsetBackingStore store;
+    @Mock private Converter<Integer> keyConverter;
+    @Mock private Converter<String> valueConverter;
+    @Mock private Serializer<Integer> keySerializer;
+    @Mock private Serializer<String> valueSerializer;
+    private OffsetStorageWriter<Integer, String> writer;
+
+    private static Exception exception = new RuntimeException("error");
+
+    private ExecutorService service;
+
+    @Before
+    public void setup() {
+        writer = new OffsetStorageWriter<>(store, NAMESPACE, keyConverter, 
valueConverter, keySerializer, valueSerializer);
+        service = Executors.newFixedThreadPool(1);
+    }
+
+    @After
+    public void teardown() {
+        service.shutdownNow();
+    }
+
+    @Test
+    public void testWriteFlush() throws Exception {
+        @SuppressWarnings("unchecked")
+        Callback<Void> callback = PowerMock.createMock(Callback.class);
+        expectStore(callback, false);
+
+        PowerMock.replayAll();
+
+        writer.setOffset(OFFSET_KEY, OFFSET_VALUE);
+
+        assertTrue(writer.beginFlush());
+        writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testNoOffsetsToFlush() {
+        // If no offsets are flushed, we should finish immediately and not 
have made any calls to the
+        // underlying storage layer
+
+        PowerMock.replayAll();
+
+        // Should not return a future
+        assertFalse(writer.beginFlush());
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testFlushFailureReplacesOffsets() throws Exception {
+        // When a flush fails, we shouldn't just lose the offsets. Instead, 
they should be restored
+        // such that a subsequent flush will write them.
+
+        @SuppressWarnings("unchecked")
+        final Callback<Void> callback = PowerMock.createMock(Callback.class);
+        // First time the write fails
+        expectStore(callback, true);
+        // Second time it succeeds
+        expectStore(callback, false);
+        // Third time it has no data to flush so we won't get past beginFlush()
+
+        PowerMock.replayAll();
+
+        writer.setOffset(OFFSET_KEY, OFFSET_VALUE);
+        assertTrue(writer.beginFlush());
+        writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS);
+        assertTrue(writer.beginFlush());
+        writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS);
+        assertFalse(writer.beginFlush());
+
+        PowerMock.verifyAll();
+    }
+
+    @Test(expected = CopycatException.class)
+    public void testAlreadyFlushing() throws Exception {
+        @SuppressWarnings("unchecked")
+        final Callback<Void> callback = PowerMock.createMock(Callback.class);
+        // Trigger the send, but don't invoke the callback so we'll still be 
mid-flush
+        CountDownLatch allowStoreCompleteCountdown = new CountDownLatch(1);
+        expectStore(null, false, allowStoreCompleteCountdown);
+
+        PowerMock.replayAll();
+
+        writer.setOffset(OFFSET_KEY, OFFSET_VALUE);
+        assertTrue(writer.beginFlush());
+        writer.doFlush(callback);
+        assertTrue(writer.beginFlush()); // should throw
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testCancelBeforeAwaitFlush() {
+        PowerMock.replayAll();
+
+        writer.setOffset(OFFSET_KEY, OFFSET_VALUE);
+        assertTrue(writer.beginFlush());
+        writer.cancelFlush();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testCancelAfterAwaitFlush() throws Exception {
+        @SuppressWarnings("unchecked")
+        Callback<Void> callback = PowerMock.createMock(Callback.class);
+        CountDownLatch allowStoreCompleteCountdown = new CountDownLatch(1);
+        // In this test, the write should be cancelled so the callback will 
not be invoked and is not
+        // passed to the expectStore call
+        expectStore(null, false, allowStoreCompleteCountdown);
+
+        PowerMock.replayAll();
+
+        writer.setOffset(OFFSET_KEY, OFFSET_VALUE);
+        assertTrue(writer.beginFlush());
+        // Start the flush, then immediately cancel before allowing the mocked 
store request to finish
+        Future<Void> flushFuture = writer.doFlush(callback);
+        writer.cancelFlush();
+        allowStoreCompleteCountdown.countDown();
+        flushFuture.get(1000, TimeUnit.MILLISECONDS);
+
+        PowerMock.verifyAll();
+    }
+
+    private void expectStore(final Callback<Void> callback, final boolean 
fail) {
+        expectStore(callback, fail, null);
+    }
+
+    /**
+     * Expect a request to store data to the underlying OffsetBackingStore.
+     *
+     * @param callback the callback to invoke when completed, or null if the 
callback isn't
+     *                 expected to be invoked
+     * @param fail if true, treat
+     * @param waitForCompletion if non-null, a CountDownLatch that should be 
awaited on before
+     *                          invoking the callback. A (generous) timeout is 
still imposed to
+     *                          ensure tests complete.
+     * @return the captured set of ByteBuffer key-value pairs passed to the 
storage layer
+     */
+    private void expectStore(final Callback<Void> callback,
+                             final boolean fail,
+                             final CountDownLatch waitForCompletion) {
+        
EasyMock.expect(keyConverter.fromCopycatData(OFFSET_KEY)).andReturn(OFFSET_KEY_CONVERTED);
+        EasyMock.expect(keySerializer.serialize(NAMESPACE, 
OFFSET_KEY_CONVERTED)).andReturn(OFFSET_KEY_SERIALIZED);
+        
EasyMock.expect(valueConverter.fromCopycatData(OFFSET_VALUE)).andReturn(OFFSET_VALUE_CONVERTED);
+        EasyMock.expect(valueSerializer.serialize(NAMESPACE, 
OFFSET_VALUE_CONVERTED)).andReturn(OFFSET_VALUE_SERIALIZED);
+
+        final Capture<Callback<Void>> storeCallback = Capture.newInstance();
+        EasyMock.expect(store.set(EasyMock.eq(NAMESPACE), 
EasyMock.eq(OFFSETS_SERIALIZED),
+                EasyMock.capture(storeCallback)))
+                .andAnswer(new IAnswer<Future<Void>>() {
+                    @Override
+                    public Future<Void> answer() throws Throwable {
+                        return service.submit(new Callable<Void>() {
+                            @Override
+                            public Void call() throws Exception {
+                                if (waitForCompletion != null)
+                                    assertTrue(waitForCompletion.await(10000, 
TimeUnit.MILLISECONDS));
+
+                                if (fail) {
+                                    
storeCallback.getValue().onCompletion(exception, null);
+                                } else {
+                                    
storeCallback.getValue().onCompletion(null, null);
+                                }
+                                return null;
+                            }
+                        });
+                    }
+                });
+        if (callback != null) {
+            if (fail) {
+                callback.onCompletion(EasyMock.eq(exception), 
EasyMock.eq((Void) null));
+            } else {
+                callback.onCompletion(null, null);
+            }
+        }
+        PowerMock.expectLastCall();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/MockTime.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/MockTime.java 
b/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/MockTime.java
new file mode 100644
index 0000000..53149db
--- /dev/null
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/MockTime.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.copycat.util;
+
+import org.apache.kafka.common.utils.Time;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A clock that you can manually advance by calling sleep
+ */
+public class MockTime implements Time {
+
+    private long nanos = 0;
+
+    public MockTime() {
+        this.nanos = System.nanoTime();
+    }
+
+    @Override
+    public long milliseconds() {
+        return TimeUnit.MILLISECONDS.convert(this.nanos, TimeUnit.NANOSECONDS);
+    }
+
+    @Override
+    public long nanoseconds() {
+        return nanos;
+    }
+
+    @Override
+    public void sleep(long ms) {
+        this.nanos += TimeUnit.NANOSECONDS.convert(ms, TimeUnit.MILLISECONDS);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/ShutdownableThreadTest.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/ShutdownableThreadTest.java
 
b/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/ShutdownableThreadTest.java
new file mode 100644
index 0000000..4880ca1
--- /dev/null
+++ 
b/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/ShutdownableThreadTest.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.util;
+
+import org.junit.Test;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class ShutdownableThreadTest {
+
+    @Test
+    public void testGracefulShutdown() throws InterruptedException {
+        ShutdownableThread thread = new ShutdownableThread("graceful") {
+            @Override
+            public void execute() {
+                while (getRunning()) {
+                    try {
+                        Thread.sleep(1);
+                    } catch (InterruptedException e) {
+                        // Ignore
+                    }
+                }
+            }
+        };
+        thread.start();
+        Thread.sleep(10);
+        assertTrue(thread.gracefulShutdown(1000, TimeUnit.MILLISECONDS));
+    }
+
+    @Test
+    public void testForcibleShutdown() throws InterruptedException {
+        final CountDownLatch startedLatch = new CountDownLatch(1);
+        ShutdownableThread thread = new ShutdownableThread("forcible") {
+            @Override
+            public void execute() {
+                try {
+                    startedLatch.countDown();
+                    Thread.sleep(100000);
+                } catch (InterruptedException e) {
+                    // Ignore
+                }
+            }
+        };
+        thread.start();
+        startedLatch.await();
+        thread.forceShutdown();
+        // Not all threads can be forcibly stopped since interrupt() doesn't 
work on threads in
+        // certain conditions, but in this case we know the thread is 
interruptible so we should be
+        // able join() it
+        thread.join(1000);
+        assertFalse(thread.isAlive());
+    }
+}

Reply via email to