http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
new file mode 100644
index 0000000..4e54bf1
--- /dev/null
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
@@ -0,0 +1,357 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.storage;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.KafkaBasedLog;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.api.easymock.annotation.Mock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.reflect.Whitebox;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(KafkaOffsetBackingStore.class)
+@PowerMockIgnore("javax.management.*")
+public class KafkaOffsetBackingStoreTest {
+    private static final String TOPIC = "connect-offsets";
+    private static final Map<String, String> DEFAULT_PROPS = new HashMap<>();
+    static {
+        DEFAULT_PROPS.put(KafkaOffsetBackingStore.OFFSET_STORAGE_TOPIC_CONFIG, 
TOPIC);
+        DEFAULT_PROPS.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
"broker1:9092,broker2:9093");
+    }
+    private static final Map<ByteBuffer, ByteBuffer> FIRST_SET = new 
HashMap<>();
+    static {
+        FIRST_SET.put(buffer("key"), buffer("value"));
+        FIRST_SET.put(null, null);
+    }
+
+    private static final ByteBuffer TP0_KEY = buffer("TP0KEY");
+    private static final ByteBuffer TP1_KEY = buffer("TP1KEY");
+    private static final ByteBuffer TP2_KEY = buffer("TP2KEY");
+    private static final ByteBuffer TP0_VALUE = buffer("VAL0");
+    private static final ByteBuffer TP1_VALUE = buffer("VAL1");
+    private static final ByteBuffer TP2_VALUE = buffer("VAL2");
+    private static final ByteBuffer TP0_VALUE_NEW = buffer("VAL0_NEW");
+    private static final ByteBuffer TP1_VALUE_NEW = buffer("VAL1_NEW");
+
+    @Mock
+    KafkaBasedLog<byte[], byte[]> storeLog;
+    private KafkaOffsetBackingStore store;
+
+    private Capture<String> capturedTopic = EasyMock.newCapture();
+    private Capture<Map<String, Object>> capturedProducerProps = 
EasyMock.newCapture();
+    private Capture<Map<String, Object>> capturedConsumerProps = 
EasyMock.newCapture();
+    private Capture<Callback<ConsumerRecord<byte[], byte[]>>> 
capturedConsumedCallback = EasyMock.newCapture();
+
+    @Before
+    public void setUp() throws Exception {
+        store = 
PowerMock.createPartialMockAndInvokeDefaultConstructor(KafkaOffsetBackingStore.class,
 new String[]{"createKafkaBasedLog"});
+    }
+
+    @Test(expected = ConnectException.class)
+    public void testMissingTopic() {
+        store = new KafkaOffsetBackingStore();
+        store.configure(Collections.<String, Object>emptyMap());
+    }
+
+    @Test
+    public void testStartStop() throws Exception {
+        expectConfigure();
+        expectStart(Collections.EMPTY_LIST);
+        expectStop();
+
+        PowerMock.replayAll();
+
+        store.configure(DEFAULT_PROPS);
+        assertEquals(TOPIC, capturedTopic.getValue());
+        
assertEquals("org.apache.kafka.common.serialization.ByteArraySerializer", 
capturedProducerProps.getValue().get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG));
+        
assertEquals("org.apache.kafka.common.serialization.ByteArraySerializer", 
capturedProducerProps.getValue().get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG));
+        
assertEquals("org.apache.kafka.common.serialization.ByteArrayDeserializer", 
capturedConsumerProps.getValue().get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG));
+        
assertEquals("org.apache.kafka.common.serialization.ByteArrayDeserializer", 
capturedConsumerProps.getValue().get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG));
+
+        store.start();
+        store.stop();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testReloadOnStart() throws Exception {
+        expectConfigure();
+        expectStart(Arrays.asList(
+                new ConsumerRecord<>(TOPIC, 0, 0, TP0_KEY.array(), 
TP0_VALUE.array()),
+                new ConsumerRecord<>(TOPIC, 1, 0, TP1_KEY.array(), 
TP1_VALUE.array()),
+                new ConsumerRecord<>(TOPIC, 0, 1, TP0_KEY.array(), 
TP0_VALUE_NEW.array()),
+                new ConsumerRecord<>(TOPIC, 1, 1, TP1_KEY.array(), 
TP1_VALUE_NEW.array())
+        ));
+        expectStop();
+
+        PowerMock.replayAll();
+
+        store.configure(DEFAULT_PROPS);
+        store.start();
+        HashMap<ByteBuffer, ByteBuffer> data = 
Whitebox.getInternalState(store, "data");
+        assertEquals(TP0_VALUE_NEW, data.get(TP0_KEY));
+        assertEquals(TP1_VALUE_NEW, data.get(TP1_KEY));
+
+        store.stop();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testGetSet() throws Exception {
+        expectConfigure();
+        expectStart(Collections.EMPTY_LIST);
+        expectStop();
+
+        // First get() against an empty store
+        final Capture<Callback<Void>> firstGetReadToEndCallback = 
EasyMock.newCapture();
+        storeLog.readToEnd(EasyMock.capture(firstGetReadToEndCallback));
+        PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
+            @Override
+            public Object answer() throws Throwable {
+                firstGetReadToEndCallback.getValue().onCompletion(null, null);
+                return null;
+            }
+        });
+
+        // Set offsets
+        Capture<org.apache.kafka.clients.producer.Callback> callback0 = 
EasyMock.newCapture();
+        storeLog.send(EasyMock.aryEq(TP0_KEY.array()), 
EasyMock.aryEq(TP0_VALUE.array()), EasyMock.capture(callback0));
+        PowerMock.expectLastCall();
+        Capture<org.apache.kafka.clients.producer.Callback> callback1 = 
EasyMock.newCapture();
+        storeLog.send(EasyMock.aryEq(TP1_KEY.array()), 
EasyMock.aryEq(TP1_VALUE.array()), EasyMock.capture(callback1));
+        PowerMock.expectLastCall();
+
+        // Second get() should get the produced data and return the new values
+        final Capture<Callback<Void>> secondGetReadToEndCallback = 
EasyMock.newCapture();
+        storeLog.readToEnd(EasyMock.capture(secondGetReadToEndCallback));
+        PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
+            @Override
+            public Object answer() throws Throwable {
+                capturedConsumedCallback.getValue().onCompletion(null, new 
ConsumerRecord<>(TOPIC, 0, 0, TP0_KEY.array(), TP0_VALUE.array()));
+                capturedConsumedCallback.getValue().onCompletion(null, new 
ConsumerRecord<>(TOPIC, 1, 0, TP1_KEY.array(), TP1_VALUE.array()));
+                secondGetReadToEndCallback.getValue().onCompletion(null, null);
+                return null;
+            }
+        });
+
+        // Third get() should pick up data produced by someone else and return 
those values
+        final Capture<Callback<Void>> thirdGetReadToEndCallback = 
EasyMock.newCapture();
+        storeLog.readToEnd(EasyMock.capture(thirdGetReadToEndCallback));
+        PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
+            @Override
+            public Object answer() throws Throwable {
+                capturedConsumedCallback.getValue().onCompletion(null, new 
ConsumerRecord<>(TOPIC, 0, 1, TP0_KEY.array(), TP0_VALUE_NEW.array()));
+                capturedConsumedCallback.getValue().onCompletion(null, new 
ConsumerRecord<>(TOPIC, 1, 1, TP1_KEY.array(), TP1_VALUE_NEW.array()));
+                thirdGetReadToEndCallback.getValue().onCompletion(null, null);
+                return null;
+            }
+        });
+
+        PowerMock.replayAll();
+
+
+
+        store.configure(DEFAULT_PROPS);
+        store.start();
+
+        // Getting from empty store should return nulls
+        final AtomicBoolean getInvokedAndPassed = new AtomicBoolean(false);
+        store.get(Arrays.asList(TP0_KEY, TP1_KEY), new 
Callback<Map<ByteBuffer, ByteBuffer>>() {
+            @Override
+            public void onCompletion(Throwable error, Map<ByteBuffer, 
ByteBuffer> result) {
+                // Since we didn't read them yet, these will be null
+                assertEquals(null, result.get(TP0_KEY));
+                assertEquals(null, result.get(TP1_KEY));
+                getInvokedAndPassed.set(true);
+            }
+        }).get(10000, TimeUnit.MILLISECONDS);
+        assertTrue(getInvokedAndPassed.get());
+
+        // Set some offsets
+        Map<ByteBuffer, ByteBuffer> toSet = new HashMap<>();
+        toSet.put(TP0_KEY, TP0_VALUE);
+        toSet.put(TP1_KEY, TP1_VALUE);
+        final AtomicBoolean invoked = new AtomicBoolean(false);
+        Future<Void> setFuture = store.set(toSet, new Callback<Void>() {
+            @Override
+            public void onCompletion(Throwable error, Void result) {
+                invoked.set(true);
+            }
+        });
+        assertFalse(setFuture.isDone());
+        // Out of order callbacks shouldn't matter, should still require all 
to be invoked before invoking the callback
+        // for the store's set callback
+        callback1.getValue().onCompletion(null, null);
+        assertFalse(invoked.get());
+        callback0.getValue().onCompletion(null, null);
+        setFuture.get(10000, TimeUnit.MILLISECONDS);
+        assertTrue(invoked.get());
+
+        // Getting data should read to end of our published data and return it
+        final AtomicBoolean secondGetInvokedAndPassed = new 
AtomicBoolean(false);
+        store.get(Arrays.asList(TP0_KEY, TP1_KEY), new 
Callback<Map<ByteBuffer, ByteBuffer>>() {
+            @Override
+            public void onCompletion(Throwable error, Map<ByteBuffer, 
ByteBuffer> result) {
+                assertEquals(TP0_VALUE, result.get(TP0_KEY));
+                assertEquals(TP1_VALUE, result.get(TP1_KEY));
+                secondGetInvokedAndPassed.set(true);
+            }
+        }).get(10000, TimeUnit.MILLISECONDS);
+        assertTrue(secondGetInvokedAndPassed.get());
+
+        // Getting data should read to end of our published data and return it
+        final AtomicBoolean thirdGetInvokedAndPassed = new 
AtomicBoolean(false);
+        store.get(Arrays.asList(TP0_KEY, TP1_KEY), new 
Callback<Map<ByteBuffer, ByteBuffer>>() {
+            @Override
+            public void onCompletion(Throwable error, Map<ByteBuffer, 
ByteBuffer> result) {
+                assertEquals(TP0_VALUE_NEW, result.get(TP0_KEY));
+                assertEquals(TP1_VALUE_NEW, result.get(TP1_KEY));
+                thirdGetInvokedAndPassed.set(true);
+            }
+        }).get(10000, TimeUnit.MILLISECONDS);
+        assertTrue(thirdGetInvokedAndPassed.get());
+
+        store.stop();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testSetFailure() throws Exception {
+        expectConfigure();
+        expectStart(Collections.EMPTY_LIST);
+        expectStop();
+
+        // Set offsets
+        Capture<org.apache.kafka.clients.producer.Callback> callback0 = 
EasyMock.newCapture();
+        storeLog.send(EasyMock.aryEq(TP0_KEY.array()), 
EasyMock.aryEq(TP0_VALUE.array()), EasyMock.capture(callback0));
+        PowerMock.expectLastCall();
+        Capture<org.apache.kafka.clients.producer.Callback> callback1 = 
EasyMock.newCapture();
+        storeLog.send(EasyMock.aryEq(TP1_KEY.array()), 
EasyMock.aryEq(TP1_VALUE.array()), EasyMock.capture(callback1));
+        PowerMock.expectLastCall();
+        Capture<org.apache.kafka.clients.producer.Callback> callback2 = 
EasyMock.newCapture();
+        storeLog.send(EasyMock.aryEq(TP2_KEY.array()), 
EasyMock.aryEq(TP2_VALUE.array()), EasyMock.capture(callback2));
+        PowerMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+
+
+        store.configure(DEFAULT_PROPS);
+        store.start();
+
+        // Set some offsets
+        Map<ByteBuffer, ByteBuffer> toSet = new HashMap<>();
+        toSet.put(TP0_KEY, TP0_VALUE);
+        toSet.put(TP1_KEY, TP1_VALUE);
+        toSet.put(TP2_KEY, TP2_VALUE);
+        final AtomicBoolean invoked = new AtomicBoolean(false);
+        final AtomicBoolean invokedFailure = new AtomicBoolean(false);
+        Future<Void> setFuture = store.set(toSet, new Callback<Void>() {
+            @Override
+            public void onCompletion(Throwable error, Void result) {
+                invoked.set(true);
+                if (error != null)
+                    invokedFailure.set(true);
+            }
+        });
+        assertFalse(setFuture.isDone());
+        // Out of order callbacks shouldn't matter, should still require all 
to be invoked before invoking the callback
+        // for the store's set callback
+        callback1.getValue().onCompletion(null, null);
+        assertFalse(invoked.get());
+        callback2.getValue().onCompletion(null, new KafkaException("bogus 
error"));
+        assertTrue(invoked.get());
+        assertTrue(invokedFailure.get());
+        callback0.getValue().onCompletion(null, null);
+        try {
+            setFuture.get(10000, TimeUnit.MILLISECONDS);
+            fail("Should have seen KafkaException thrown when waiting on 
KafkaOffsetBackingStore.set() future");
+        } catch (ExecutionException e) {
+            // expected
+            assertNotNull(e.getCause());
+            assertTrue(e.getCause() instanceof KafkaException);
+        }
+
+        store.stop();
+
+        PowerMock.verifyAll();
+    }
+
+    private void expectConfigure() throws Exception {
+        PowerMock.expectPrivate(store, "createKafkaBasedLog", 
EasyMock.capture(capturedTopic), EasyMock.capture(capturedProducerProps),
+                EasyMock.capture(capturedConsumerProps), 
EasyMock.capture(capturedConsumedCallback))
+                .andReturn(storeLog);
+    }
+
+    private void expectStart(final List<ConsumerRecord<byte[], byte[]>> 
preexistingRecords) throws Exception {
+        storeLog.start();
+        PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
+            @Override
+            public Object answer() throws Throwable {
+                for (ConsumerRecord<byte[], byte[]> rec : preexistingRecords)
+                    capturedConsumedCallback.getValue().onCompletion(null, 
rec);
+                return null;
+            }
+        });
+    }
+
+    private void expectStop() {
+        storeLog.stop();
+        PowerMock.expectLastCall();
+    }
+
+    private static ByteBuffer buffer(String v) {
+        return ByteBuffer.wrap(v.getBytes());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java
new file mode 100644
index 0000000..3e0347c
--- /dev/null
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java
@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.storage;
+
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.util.Callback;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.api.easymock.annotation.Mock;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.*;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(PowerMockRunner.class)
+public class OffsetStorageWriterTest {
+    private static final String NAMESPACE = "namespace";
+    // Connect format - any types should be accepted here
+    private static final Map<String, String> OFFSET_KEY = 
Collections.singletonMap("key", "key");
+    private static final Map<String, Integer> OFFSET_VALUE = 
Collections.singletonMap("key", 12);
+
+    // Serialized
+    private static final byte[] OFFSET_KEY_SERIALIZED = 
"key-serialized".getBytes();
+    private static final byte[] OFFSET_VALUE_SERIALIZED = 
"value-serialized".getBytes();
+
+    @Mock private OffsetBackingStore store;
+    @Mock private Converter keyConverter;
+    @Mock private Converter valueConverter;
+    private OffsetStorageWriter writer;
+
+    private static Exception exception = new RuntimeException("error");
+
+    private ExecutorService service;
+
+    @Before
+    public void setup() {
+        writer = new OffsetStorageWriter(store, NAMESPACE, keyConverter, 
valueConverter);
+        service = Executors.newFixedThreadPool(1);
+    }
+
+    @After
+    public void teardown() {
+        service.shutdownNow();
+    }
+
+    @Test
+    public void testWriteFlush() throws Exception {
+        @SuppressWarnings("unchecked")
+        Callback<Void> callback = PowerMock.createMock(Callback.class);
+        expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, 
OFFSET_VALUE_SERIALIZED, callback, false, null);
+
+        PowerMock.replayAll();
+
+        writer.offset(OFFSET_KEY, OFFSET_VALUE);
+
+        assertTrue(writer.beginFlush());
+        writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS);
+
+        PowerMock.verifyAll();
+    }
+
+    // It should be possible to set offset values to null
+    @Test
+    public void testWriteNullValueFlush() throws Exception {
+        @SuppressWarnings("unchecked")
+        Callback<Void> callback = PowerMock.createMock(Callback.class);
+        expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, null, null, callback, 
false, null);
+
+        PowerMock.replayAll();
+
+        writer.offset(OFFSET_KEY, null);
+
+        assertTrue(writer.beginFlush());
+        writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS);
+
+        PowerMock.verifyAll();
+    }
+
+    // It should be possible to use null keys. These aren't actually stored as 
null since the key is wrapped to include
+    // info about the namespace (connector)
+    @Test
+    public void testWriteNullKeyFlush() throws Exception {
+        @SuppressWarnings("unchecked")
+        Callback<Void> callback = PowerMock.createMock(Callback.class);
+        expectStore(null, null, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, 
callback, false, null);
+
+        PowerMock.replayAll();
+
+        writer.offset(null, OFFSET_VALUE);
+
+        assertTrue(writer.beginFlush());
+        writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testNoOffsetsToFlush() {
+        // If no offsets are flushed, we should finish immediately and not 
have made any calls to the
+        // underlying storage layer
+
+        PowerMock.replayAll();
+
+        // Should not return a future
+        assertFalse(writer.beginFlush());
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testFlushFailureReplacesOffsets() throws Exception {
+        // When a flush fails, we shouldn't just lose the offsets. Instead, 
they should be restored
+        // such that a subsequent flush will write them.
+
+        @SuppressWarnings("unchecked")
+        final Callback<Void> callback = PowerMock.createMock(Callback.class);
+        // First time the write fails
+        expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, 
OFFSET_VALUE_SERIALIZED, callback, true, null);
+        // Second time it succeeds
+        expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, 
OFFSET_VALUE_SERIALIZED, callback, false, null);
+        // Third time it has no data to flush so we won't get past beginFlush()
+
+        PowerMock.replayAll();
+
+        writer.offset(OFFSET_KEY, OFFSET_VALUE);
+        assertTrue(writer.beginFlush());
+        writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS);
+        assertTrue(writer.beginFlush());
+        writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS);
+        assertFalse(writer.beginFlush());
+
+        PowerMock.verifyAll();
+    }
+
+    @Test(expected = ConnectException.class)
+    public void testAlreadyFlushing() throws Exception {
+        @SuppressWarnings("unchecked")
+        final Callback<Void> callback = PowerMock.createMock(Callback.class);
+        // Trigger the send, but don't invoke the callback so we'll still be 
mid-flush
+        CountDownLatch allowStoreCompleteCountdown = new CountDownLatch(1);
+        expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, 
OFFSET_VALUE_SERIALIZED, null, false, allowStoreCompleteCountdown);
+
+        PowerMock.replayAll();
+
+        writer.offset(OFFSET_KEY, OFFSET_VALUE);
+        assertTrue(writer.beginFlush());
+        writer.doFlush(callback);
+        assertTrue(writer.beginFlush()); // should throw
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testCancelBeforeAwaitFlush() {
+        PowerMock.replayAll();
+
+        writer.offset(OFFSET_KEY, OFFSET_VALUE);
+        assertTrue(writer.beginFlush());
+        writer.cancelFlush();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testCancelAfterAwaitFlush() throws Exception {
+        @SuppressWarnings("unchecked")
+        Callback<Void> callback = PowerMock.createMock(Callback.class);
+        CountDownLatch allowStoreCompleteCountdown = new CountDownLatch(1);
+        // In this test, the write should be cancelled so the callback will 
not be invoked and is not
+        // passed to the expectStore call
+        expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, 
OFFSET_VALUE_SERIALIZED, null, false, allowStoreCompleteCountdown);
+
+        PowerMock.replayAll();
+
+        writer.offset(OFFSET_KEY, OFFSET_VALUE);
+        assertTrue(writer.beginFlush());
+        // Start the flush, then immediately cancel before allowing the mocked 
store request to finish
+        Future<Void> flushFuture = writer.doFlush(callback);
+        writer.cancelFlush();
+        allowStoreCompleteCountdown.countDown();
+        flushFuture.get(1000, TimeUnit.MILLISECONDS);
+
+        PowerMock.verifyAll();
+    }
+
+    /**
+     * Expect a request to store data to the underlying OffsetBackingStore.
+     *
+     * @param key the key for the offset
+     * @param keySerialized serialized version of the key
+     * @param value the value for the offset
+     * @param valueSerialized serialized version of the value
+     * @param callback the callback to invoke when completed, or null if the 
callback isn't
+     *                 expected to be invoked
+     * @param fail if true, treat
+     * @param waitForCompletion if non-null, a CountDownLatch that should be 
awaited on before
+     *                          invoking the callback. A (generous) timeout is 
still imposed to
+     *                          ensure tests complete.
+     * @return the captured set of ByteBuffer key-value pairs passed to the 
storage layer
+     */
+    private void expectStore(Map<String, String> key, byte[] keySerialized,
+                             Map<String, Integer> value, byte[] 
valueSerialized,
+                             final Callback<Void> callback,
+                             final boolean fail,
+                             final CountDownLatch waitForCompletion) {
+        List<Object> keyWrapped = Arrays.asList(NAMESPACE, key);
+        EasyMock.expect(keyConverter.fromConnectData(NAMESPACE, null, 
keyWrapped)).andReturn(keySerialized);
+        EasyMock.expect(valueConverter.fromConnectData(NAMESPACE, null, 
value)).andReturn(valueSerialized);
+
+        final Capture<Callback<Void>> storeCallback = Capture.newInstance();
+        final Map<ByteBuffer, ByteBuffer> offsetsSerialized = 
Collections.singletonMap(
+                keySerialized == null ? null : ByteBuffer.wrap(keySerialized),
+                valueSerialized == null ? null : 
ByteBuffer.wrap(valueSerialized));
+        EasyMock.expect(store.set(EasyMock.eq(offsetsSerialized), 
EasyMock.capture(storeCallback)))
+                .andAnswer(new IAnswer<Future<Void>>() {
+                    @Override
+                    public Future<Void> answer() throws Throwable {
+                        return service.submit(new Callable<Void>() {
+                            @Override
+                            public Void call() throws Exception {
+                                if (waitForCompletion != null)
+                                    assertTrue(waitForCompletion.await(10000, 
TimeUnit.MILLISECONDS));
+
+                                if (fail) {
+                                    
storeCallback.getValue().onCompletion(exception, null);
+                                } else {
+                                    
storeCallback.getValue().onCompletion(null, null);
+                                }
+                                return null;
+                            }
+                        });
+                    }
+                });
+        if (callback != null) {
+            if (fail) {
+                callback.onCompletion(EasyMock.eq(exception), 
EasyMock.eq((Void) null));
+            } else {
+                callback.onCompletion(null, null);
+            }
+        }
+        PowerMock.expectLastCall();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/runtime/src/test/java/org/apache/kafka/connect/util/ByteArrayProducerRecordEquals.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/ByteArrayProducerRecordEquals.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/ByteArrayProducerRecordEquals.java
new file mode 100644
index 0000000..4d17ac4
--- /dev/null
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/ByteArrayProducerRecordEquals.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.easymock.EasyMock;
+import org.easymock.IArgumentMatcher;
+
+import java.util.Arrays;
+
+public class ByteArrayProducerRecordEquals implements IArgumentMatcher {
+    private ProducerRecord<byte[], byte[]> record;
+
+    public static ProducerRecord<byte[], byte[]> 
eqProducerRecord(ProducerRecord<byte[], byte[]> in) {
+        EasyMock.reportMatcher(new ByteArrayProducerRecordEquals(in));
+        return null;
+    }
+
+    public ByteArrayProducerRecordEquals(ProducerRecord<byte[], byte[]> 
record) {
+        this.record = record;
+    }
+
+    @Override
+    public boolean matches(Object argument) {
+        if (!(argument instanceof ProducerRecord))
+            return false;
+        ProducerRecord<byte[], byte[]> other = (ProducerRecord<byte[], 
byte[]>) argument;
+        return record.topic().equals(other.topic()) &&
+                record.partition() != null ? 
record.partition().equals(other.partition()) : other.partition() == null &&
+                record.key() != null ? Arrays.equals(record.key(), 
other.key()) : other.key() == null &&
+                record.value() != null ? Arrays.equals(record.value(), 
other.value()) : other.value() == null;
+    }
+
+    @Override
+    public void appendTo(StringBuffer buffer) {
+        buffer.append(record.toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
new file mode 100644
index 0000000..1c3b842
--- /dev/null
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
@@ -0,0 +1,437 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.LeaderNotAvailableException;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.utils.Time;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.api.easymock.annotation.Mock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.reflect.Whitebox;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(KafkaBasedLog.class)
+@PowerMockIgnore("javax.management.*")
+public class KafkaBasedLogTest {
+
+    private static final String TOPIC = "connect-log";
+    private static final TopicPartition TP0 = new TopicPartition(TOPIC, 0);
+    private static final TopicPartition TP1 = new TopicPartition(TOPIC, 1);
+    private static final Map<String, Object> PRODUCER_PROPS = new HashMap<>();
+    static {
+        PRODUCER_PROPS.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
"broker1:9092,broker2:9093");
+        PRODUCER_PROPS.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer");
+        PRODUCER_PROPS.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer");
+    }
+    private static final Map<String, Object> CONSUMER_PROPS = new HashMap<>();
+    static {
+        CONSUMER_PROPS.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
"broker1:9092,broker2:9093");
+        CONSUMER_PROPS.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringDeserializer");
+        CONSUMER_PROPS.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringDeserializer");
+    }
+
+    private static final Set<TopicPartition> CONSUMER_ASSIGNMENT = new 
HashSet<>(Arrays.asList(TP0, TP1));
+    private static final Map<String, String> FIRST_SET = new HashMap<>();
+    static {
+        FIRST_SET.put("key", "value");
+        FIRST_SET.put(null, null);
+    }
+
+    private static final Node LEADER = new Node(1, "broker1", 9092);
+    private static final Node REPLICA = new Node(1, "broker2", 9093);
+
+    private static final PartitionInfo TPINFO0 = new PartitionInfo(TOPIC, 0, 
LEADER, new Node[]{REPLICA}, new Node[]{REPLICA});
+    private static final PartitionInfo TPINFO1 = new PartitionInfo(TOPIC, 1, 
LEADER, new Node[]{REPLICA}, new Node[]{REPLICA});
+
+    private static final String TP0_KEY = "TP0KEY";
+    private static final String TP1_KEY = "TP1KEY";
+    private static final String TP0_VALUE = "VAL0";
+    private static final String TP1_VALUE = "VAL1";
+    private static final String TP0_VALUE_NEW = "VAL0_NEW";
+    private static final String TP1_VALUE_NEW = "VAL1_NEW";
+
+    private Time time = new MockTime();
+    private KafkaBasedLog<String, String> store;
+
+    @Mock
+    private KafkaProducer<String, String> producer;
+    private MockConsumer<String, String> consumer;
+
+    private List<ConsumerRecord<String, String>> consumedRecords = new 
ArrayList<>();
+    private Callback<ConsumerRecord<String, String>> consumedCallback = new 
Callback<ConsumerRecord<String, String>>() {
+        @Override
+        public void onCompletion(Throwable error, ConsumerRecord<String, 
String> record) {
+            consumedRecords.add(record);
+        }
+    };
+
+    @Before
+    public void setUp() throws Exception {
+        store = PowerMock.createPartialMock(KafkaBasedLog.class, new 
String[]{"createConsumer", "createProducer"},
+                TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, consumedCallback, time);
+        consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+        consumer.updatePartitions(TOPIC, Arrays.asList(TPINFO0, TPINFO1));
+        Map<TopicPartition, Long> beginningOffsets = new HashMap<>();
+        beginningOffsets.put(TP0, 0L);
+        beginningOffsets.put(TP1, 0L);
+        consumer.updateBeginningOffsets(beginningOffsets);
+    }
+
+    @Test
+    public void testStartStop() throws Exception {
+        expectStart();
+        expectStop();
+
+        PowerMock.replayAll();
+
+        Map<TopicPartition, Long> endOffsets = new HashMap<>();
+        endOffsets.put(TP0, 0L);
+        endOffsets.put(TP1, 0L);
+        consumer.updateEndOffsets(endOffsets);
+        store.start();
+        assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment());
+
+        store.stop();
+
+        assertFalse(Whitebox.<Thread>getInternalState(store, 
"thread").isAlive());
+        assertTrue(consumer.closed());
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testReloadOnStart() throws Exception {
+        expectStart();
+        expectStop();
+
+        PowerMock.replayAll();
+
+        Map<TopicPartition, Long> endOffsets = new HashMap<>();
+        endOffsets.put(TP0, 1L);
+        endOffsets.put(TP1, 1L);
+        consumer.updateEndOffsets(endOffsets);
+        final CountDownLatch finishedLatch = new CountDownLatch(1);
+        consumer.schedulePollTask(new Runnable() { // Use first poll task to 
setup sequence of remaining responses to polls
+            @Override
+            public void run() {
+                // Should keep polling until it reaches current log end offset 
for all partitions. Should handle
+                // as many empty polls as needed
+                consumer.scheduleNopPollTask();
+                consumer.scheduleNopPollTask();
+                consumer.schedulePollTask(new Runnable() {
+                    @Override
+                    public void run() {
+                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, 
TP0_KEY, TP0_VALUE));
+                    }
+                });
+                consumer.scheduleNopPollTask();
+                consumer.scheduleNopPollTask();
+                consumer.schedulePollTask(new Runnable() {
+                    @Override
+                    public void run() {
+                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 0, 
TP1_KEY, TP1_VALUE));
+                    }
+                });
+                consumer.schedulePollTask(new Runnable() {
+                    @Override
+                    public void run() {
+                        finishedLatch.countDown();
+                    }
+                });
+            }
+        });
+        store.start();
+        assertTrue(finishedLatch.await(10000, TimeUnit.MILLISECONDS));
+
+        assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment());
+        assertEquals(2, consumedRecords.size());
+        assertEquals(TP0_VALUE, consumedRecords.get(0).value());
+        assertEquals(TP1_VALUE, consumedRecords.get(1).value());
+
+        store.stop();
+
+        assertFalse(Whitebox.<Thread>getInternalState(store, 
"thread").isAlive());
+        assertTrue(consumer.closed());
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testSendAndReadToEnd() throws Exception {
+        expectStart();
+        TestFuture<RecordMetadata> tp0Future = new TestFuture<>();
+        ProducerRecord<String, String> tp0Record = new ProducerRecord<>(TOPIC, 
TP0_KEY, TP0_VALUE);
+        Capture<org.apache.kafka.clients.producer.Callback> callback0 = 
EasyMock.newCapture();
+        EasyMock.expect(producer.send(EasyMock.eq(tp0Record), 
EasyMock.capture(callback0))).andReturn(tp0Future);
+        TestFuture<RecordMetadata> tp1Future = new TestFuture<>();
+        ProducerRecord<String, String> tp1Record = new ProducerRecord<>(TOPIC, 
TP1_KEY, TP1_VALUE);
+        Capture<org.apache.kafka.clients.producer.Callback> callback1 = 
EasyMock.newCapture();
+        EasyMock.expect(producer.send(EasyMock.eq(tp1Record), 
EasyMock.capture(callback1))).andReturn(tp1Future);
+
+        // Producer flushes when read to log end is called
+        producer.flush();
+        PowerMock.expectLastCall();
+
+        expectStop();
+
+        PowerMock.replayAll();
+
+        Map<TopicPartition, Long> endOffsets = new HashMap<>();
+        endOffsets.put(TP0, 0L);
+        endOffsets.put(TP1, 0L);
+        consumer.updateEndOffsets(endOffsets);
+        store.start();
+        assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment());
+        assertEquals(0L, consumer.position(TP0));
+        assertEquals(0L, consumer.position(TP1));
+
+        // Set some keys
+        final AtomicInteger invoked = new AtomicInteger(0);
+        org.apache.kafka.clients.producer.Callback producerCallback = new 
org.apache.kafka.clients.producer.Callback() {
+            @Override
+            public void onCompletion(RecordMetadata metadata, Exception 
exception) {
+                invoked.incrementAndGet();
+            }
+        };
+        store.send(TP0_KEY, TP0_VALUE, producerCallback);
+        store.send(TP1_KEY, TP1_VALUE, producerCallback);
+        assertEquals(0, invoked.get());
+        tp1Future.resolve((RecordMetadata) null); // Output not used, so safe 
to not return a real value for testing
+        callback1.getValue().onCompletion(null, null);
+        assertEquals(1, invoked.get());
+        tp0Future.resolve((RecordMetadata) null);
+        callback0.getValue().onCompletion(null, null);
+        assertEquals(2, invoked.get());
+
+        // Now we should have to wait for the records to be read back when we 
call readToEnd()
+        final AtomicBoolean getInvokedAndPassed = new AtomicBoolean(false);
+        final FutureCallback<Void> readEndFutureCallback = new 
FutureCallback<>(new Callback<Void>() {
+            @Override
+            public void onCompletion(Throwable error, Void result) {
+                assertEquals(4, consumedRecords.size());
+                assertEquals(TP0_VALUE_NEW, consumedRecords.get(2).value());
+                assertEquals(TP1_VALUE_NEW, consumedRecords.get(3).value());
+                getInvokedAndPassed.set(true);
+            }
+        });
+        consumer.schedulePollTask(new Runnable() {
+            @Override
+            public void run() {
+                // Once we're synchronized in a poll, start the read to end 
and schedule the exact set of poll events
+                // that should follow. This readToEnd call will immediately 
wakeup this consumer.poll() call without
+                // returning any data.
+                store.readToEnd(readEndFutureCallback);
+
+                // Needs to seek to end to find end offsets
+                consumer.schedulePollTask(new Runnable() {
+                    @Override
+                    public void run() {
+                        Map<TopicPartition, Long> newEndOffsets = new 
HashMap<>();
+                        newEndOffsets.put(TP0, 2L);
+                        newEndOffsets.put(TP1, 2L);
+                        consumer.updateEndOffsets(newEndOffsets);
+                    }
+                });
+
+                // Should keep polling until it reaches current log end offset 
for all partitions
+                consumer.scheduleNopPollTask();
+                consumer.scheduleNopPollTask();
+                consumer.scheduleNopPollTask();
+                consumer.schedulePollTask(new Runnable() {
+                    @Override
+                    public void run() {
+                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, 
TP0_KEY, TP0_VALUE));
+                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 1, 
TP0_KEY, TP0_VALUE_NEW));
+                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 0, 
TP1_KEY, TP1_VALUE));
+                    }
+                });
+
+                consumer.schedulePollTask(new Runnable() {
+                    @Override
+                    public void run() {
+                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 1, 
TP1_KEY, TP1_VALUE_NEW));
+                    }
+                });
+
+                // Already have FutureCallback that should be invoked/awaited, 
so no need for follow up finishedLatch
+            }
+        });
+        readEndFutureCallback.get(10000, TimeUnit.MILLISECONDS);
+        assertTrue(getInvokedAndPassed.get());
+
+        // Cleanup
+        store.stop();
+
+        assertFalse(Whitebox.<Thread>getInternalState(store, 
"thread").isAlive());
+        assertTrue(consumer.closed());
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testConsumerError() throws Exception {
+        expectStart();
+        expectStop();
+
+        PowerMock.replayAll();
+
+        final CountDownLatch finishedLatch = new CountDownLatch(1);
+        Map<TopicPartition, Long> endOffsets = new HashMap<>();
+        endOffsets.put(TP0, 1L);
+        endOffsets.put(TP1, 1L);
+        consumer.updateEndOffsets(endOffsets);
+        consumer.schedulePollTask(new Runnable() {
+            @Override
+            public void run() {
+                // Trigger exception
+                consumer.schedulePollTask(new Runnable() {
+                    @Override
+                    public void run() {
+                        
consumer.setException(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.exception());
+                    }
+                });
+
+                // Should keep polling until it reaches current log end offset 
for all partitions
+                consumer.scheduleNopPollTask();
+                consumer.scheduleNopPollTask();
+                consumer.schedulePollTask(new Runnable() {
+                    @Override
+                    public void run() {
+                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, 
TP0_KEY, TP0_VALUE_NEW));
+                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 0, 
TP0_KEY, TP0_VALUE_NEW));
+                    }
+                });
+
+                consumer.schedulePollTask(new Runnable() {
+                    @Override
+                    public void run() {
+                        finishedLatch.countDown();
+                    }
+                });
+            }
+        });
+        store.start();
+        assertTrue(finishedLatch.await(10000, TimeUnit.MILLISECONDS));
+        assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment());
+        assertEquals(1L, consumer.position(TP0));
+
+        store.stop();
+
+        assertFalse(Whitebox.<Thread>getInternalState(store, 
"thread").isAlive());
+        assertTrue(consumer.closed());
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testProducerError() throws Exception {
+        expectStart();
+        TestFuture<RecordMetadata> tp0Future = new TestFuture<>();
+        ProducerRecord<String, String> tp0Record = new ProducerRecord<>(TOPIC, 
TP0_KEY, TP0_VALUE);
+        Capture<org.apache.kafka.clients.producer.Callback> callback0 = 
EasyMock.newCapture();
+        EasyMock.expect(producer.send(EasyMock.eq(tp0Record), 
EasyMock.capture(callback0))).andReturn(tp0Future);
+
+        expectStop();
+
+        PowerMock.replayAll();
+
+        Map<TopicPartition, Long> endOffsets = new HashMap<>();
+        endOffsets.put(TP0, 0L);
+        endOffsets.put(TP1, 0L);
+        consumer.updateEndOffsets(endOffsets);
+        store.start();
+        assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment());
+        assertEquals(0L, consumer.position(TP0));
+        assertEquals(0L, consumer.position(TP1));
+
+        final AtomicReference<Throwable> setException = new 
AtomicReference<>();
+        store.send(TP0_KEY, TP0_VALUE, new 
org.apache.kafka.clients.producer.Callback() {
+            @Override
+            public void onCompletion(RecordMetadata metadata, Exception 
exception) {
+                assertNull(setException.get()); // Should only be invoked once
+                setException.set(exception);
+            }
+        });
+        KafkaException exc = new LeaderNotAvailableException("Error");
+        tp0Future.resolve(exc);
+        callback0.getValue().onCompletion(null, exc);
+        assertNotNull(setException.get());
+
+        store.stop();
+
+        assertFalse(Whitebox.<Thread>getInternalState(store, 
"thread").isAlive());
+        assertTrue(consumer.closed());
+        PowerMock.verifyAll();
+    }
+
+
+    private void expectStart() throws Exception {
+        PowerMock.expectPrivate(store, "createProducer")
+                .andReturn(producer);
+        PowerMock.expectPrivate(store, "createConsumer")
+                .andReturn(consumer);
+    }
+
+    private void expectStop() {
+        producer.close();
+        PowerMock.expectLastCall();
+        // MockConsumer close is checked after test.
+    }
+
+    private static ByteBuffer buffer(String v) {
+        return ByteBuffer.wrap(v.getBytes());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/runtime/src/test/java/org/apache/kafka/connect/util/MockTime.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/MockTime.java 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/MockTime.java
new file mode 100644
index 0000000..85f6895
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/MockTime.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.common.utils.Time;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A clock that you can manually advance by calling sleep
+ */
+public class MockTime implements Time {
+
+    private long nanos = 0;
+
+    public MockTime() {
+        this.nanos = System.nanoTime();
+    }
+
+    @Override
+    public long milliseconds() {
+        return TimeUnit.MILLISECONDS.convert(this.nanos, TimeUnit.NANOSECONDS);
+    }
+
+    @Override
+    public long nanoseconds() {
+        return nanos;
+    }
+
+    @Override
+    public void sleep(long ms) {
+        this.nanos += TimeUnit.NANOSECONDS.convert(ms, TimeUnit.MILLISECONDS);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/runtime/src/test/java/org/apache/kafka/connect/util/ShutdownableThreadTest.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/ShutdownableThreadTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/ShutdownableThreadTest.java
new file mode 100644
index 0000000..5c3c224
--- /dev/null
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/ShutdownableThreadTest.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.util;
+
+import org.junit.Test;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class ShutdownableThreadTest {
+
+    @Test
+    public void testGracefulShutdown() throws InterruptedException {
+        ShutdownableThread thread = new ShutdownableThread("graceful") {
+            @Override
+            public void execute() {
+                while (getRunning()) {
+                    try {
+                        Thread.sleep(1);
+                    } catch (InterruptedException e) {
+                        // Ignore
+                    }
+                }
+            }
+        };
+        thread.start();
+        Thread.sleep(10);
+        assertTrue(thread.gracefulShutdown(1000, TimeUnit.MILLISECONDS));
+    }
+
+    @Test
+    public void testForcibleShutdown() throws InterruptedException {
+        final CountDownLatch startedLatch = new CountDownLatch(1);
+        ShutdownableThread thread = new ShutdownableThread("forcible") {
+            @Override
+            public void execute() {
+                try {
+                    startedLatch.countDown();
+                    Thread.sleep(100000);
+                } catch (InterruptedException e) {
+                    // Ignore
+                }
+            }
+        };
+        thread.start();
+        startedLatch.await();
+        thread.forceShutdown();
+        // Not all threads can be forcibly stopped since interrupt() doesn't 
work on threads in
+        // certain conditions, but in this case we know the thread is 
interruptible so we should be
+        // able join() it
+        thread.join(1000);
+        assertFalse(thread.isAlive());
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/runtime/src/test/java/org/apache/kafka/connect/util/TestBackgroundThreadExceptionHandler.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TestBackgroundThreadExceptionHandler.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TestBackgroundThreadExceptionHandler.java
new file mode 100644
index 0000000..12bac98
--- /dev/null
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TestBackgroundThreadExceptionHandler.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.util;
+
+/**
+ * An UncaughtExceptionHandler that can be registered with one or more threads 
which tracks the
+ * first exception so the main thread can check for uncaught exceptions.
+ */
+public class TestBackgroundThreadExceptionHandler implements 
Thread.UncaughtExceptionHandler {
+    private Throwable firstException = null;
+
+    @Override
+    public void uncaughtException(Thread t, Throwable e) {
+        if (this.firstException == null)
+            this.firstException = e;
+    }
+
+    public void verifyNoExceptions() {
+        if (this.firstException != null)
+            throw new AssertionError(this.firstException);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/runtime/src/test/java/org/apache/kafka/connect/util/TestFuture.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TestFuture.java 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TestFuture.java
new file mode 100644
index 0000000..3683f91
--- /dev/null
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TestFuture.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.util;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class TestFuture<T> implements Future<T> {
+    private volatile boolean resolved;
+    private T result;
+    private Throwable exception;
+    private CountDownLatch getCalledLatch;
+
+    private volatile boolean resolveOnGet;
+    private T resolveOnGetResult;
+    private Throwable resolveOnGetException;
+
+    public TestFuture() {
+        resolved = false;
+        getCalledLatch = new CountDownLatch(1);
+
+        resolveOnGet = false;
+        resolveOnGetResult = null;
+        resolveOnGetException = null;
+    }
+
+    public void resolve(T val) {
+        this.result = val;
+        resolved = true;
+        synchronized (this) {
+            this.notifyAll();
+        }
+    }
+
+    public void resolve(Throwable t) {
+        exception = t;
+        resolved = true;
+        synchronized (this) {
+            this.notifyAll();
+        }
+    }
+
+    @Override
+    public boolean cancel(boolean mayInterruptIfRunning) {
+        return false;
+    }
+
+    @Override
+    public boolean isCancelled() {
+        return false;
+    }
+
+    @Override
+    public boolean isDone() {
+        return resolved;
+    }
+
+    @Override
+    public T get() throws InterruptedException, ExecutionException {
+        getCalledLatch.countDown();
+        while (true) {
+            try {
+                return get(Integer.MAX_VALUE, TimeUnit.DAYS);
+            } catch (TimeoutException e) {
+                // ignore
+            }
+        }
+    }
+
+    @Override
+    public T get(long timeout, TimeUnit unit) throws InterruptedException, 
ExecutionException, TimeoutException {
+        getCalledLatch.countDown();
+
+        if (resolveOnGet) {
+            if (resolveOnGetException != null)
+                resolve(resolveOnGetException);
+            else
+                resolve(resolveOnGetResult);
+        }
+
+        synchronized (this) {
+            while (!resolved) {
+                this.wait(TimeUnit.MILLISECONDS.convert(timeout, unit));
+            }
+        }
+
+        if (exception != null) {
+            if (exception instanceof TimeoutException)
+                throw (TimeoutException) exception;
+            else if (exception instanceof InterruptedException)
+                throw (InterruptedException) exception;
+            else
+                throw new ExecutionException(exception);
+        }
+        return result;
+    }
+
+    /**
+     * Set a flag to resolve the future as soon as one of the get() methods 
has been called. Returns immediately.
+     * @param val the value to return from the future
+     */
+    public void resolveOnGet(T val) {
+        resolveOnGet = true;
+        resolveOnGetResult = val;
+    }
+
+    /**
+     * Set a flag to resolve the future as soon as one of the get() methods 
has been called. Returns immediately.
+     * @param t the exception to return from the future
+     */
+    public void resolveOnGet(Throwable t) {
+        resolveOnGet = true;
+        resolveOnGetException = t;
+    }
+
+    /**
+     * Block, waiting for another thread to call one of the get() methods, and 
then immediately resolve the future with
+     * the specified value.
+     * @param val the value to return from the future
+     */
+    public void waitForGetAndResolve(T val) {
+        waitForGet();
+        resolve(val);
+    }
+
+    /**
+     * Block, waiting for another thread to call one of the get() methods, and 
then immediately resolve the future with
+     * the specified value.
+     * @param t the exception to use to resolve the future
+     */
+    public void waitForGetAndResolve(Throwable t) {
+        waitForGet();
+        resolve(t);
+    }
+
+    private void waitForGet() {
+        try {
+            getCalledLatch.await();
+        } catch (InterruptedException e) {
+            throw new RuntimeException("Unexpected interruption: ", e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/runtime/src/test/java/org/apache/kafka/connect/util/ThreadedTest.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/ThreadedTest.java 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/ThreadedTest.java
new file mode 100644
index 0000000..0241ea3
--- /dev/null
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/ThreadedTest.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.util;
+
+import org.junit.After;
+import org.junit.Before;
+
+/**
+ * Base class for tests that use threads. It sets up uncaught exception 
handlers for all known
+ * thread classes and checks for errors at the end of the test so that 
failures in background
+ * threads will cause the test to fail.
+ */
+public class ThreadedTest {
+
+    protected TestBackgroundThreadExceptionHandler 
backgroundThreadExceptionHandler;
+
+    @Before
+    public void setup() {
+        backgroundThreadExceptionHandler = new 
TestBackgroundThreadExceptionHandler();
+        ShutdownableThread.funcaughtExceptionHandler = 
backgroundThreadExceptionHandler;
+    }
+
+    @After
+    public void teardown() {
+        backgroundThreadExceptionHandler.verifyNoExceptions();
+        ShutdownableThread.funcaughtExceptionHandler = null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/runtime/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/resources/log4j.properties 
b/connect/runtime/src/test/resources/log4j.properties
new file mode 100644
index 0000000..d5e90fe
--- /dev/null
+++ b/connect/runtime/src/test/resources/log4j.properties
@@ -0,0 +1,23 @@
+##
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+log4j.rootLogger=OFF, stdout
+
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
+
+log4j.logger.org.apache.kafka=ERROR

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Connector.java
----------------------------------------------------------------------
diff --git 
a/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Connector.java 
b/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Connector.java
deleted file mode 100644
index 4d0e1bd..0000000
--- 
a/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Connector.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.connector;
-
-import org.apache.kafka.common.annotation.InterfaceStability;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * <p>
- * Connectors manage integration of Copycat with another system, either as an 
input that ingests
- * data into Kafka or an output that passes data to an external system. 
Implementations should
- * not use this class directly; they should inherit from SourceConnector or 
SinkConnector.
- * </p>
- * <p>
- * Connectors have two primary tasks. First, given some configuration, they 
are responsible for
- * creating configurations for a set of {@link Task}s that split up the data 
processing. For
- * example, a database Connector might create Tasks by dividing the set of 
tables evenly among
- * tasks. Second, they are responsible for monitoring inputs for changes that 
require
- * reconfiguration and notifying the Copycat runtime via the ConnectorContext. 
Continuing the
- * previous example, the connector might periodically check for new tables and 
notify Copycat of
- * additions and deletions. Copycat will then request new configurations and 
update the running
- * Tasks.
- * </p>
- */
-@InterfaceStability.Unstable
-public abstract class Connector {
-
-    protected ConnectorContext context;
-
-    /**
-     * Get the version of this connector.
-     *
-     * @return the version, formatted as a String
-     */
-    public abstract String version();
-
-    /**
-     * Initialize this connector, using the provided ConnectorContext to 
notify the runtime of
-     * input configuration changes.
-     * @param ctx context object used to interact with the Copycat runtime
-     */
-    public void initialize(ConnectorContext ctx) {
-        context = ctx;
-    }
-
-    /**
-     * <p>
-     * Initialize this connector, using the provided ConnectorContext to 
notify the runtime of
-     * input configuration changes and using the provided set of Task 
configurations.
-     * This version is only used to recover from failures.
-     * </p>
-     * <p>
-     * The default implementation ignores the provided Task configurations. 
During recovery, Copycat will request
-     * an updated set of configurations and update the running Tasks 
appropriately. However, Connectors should
-     * implement special handling of this case if it will avoid unnecessary 
changes to running Tasks.
-     * </p>
-     *
-     * @param ctx context object used to interact with the Copycat runtime
-     * @param taskConfigs existing task configurations, which may be used when 
generating new task configs to avoid
-     *                    churn in partition to task assignments
-     */
-    public void initialize(ConnectorContext ctx, List<Map<String, String>> 
taskConfigs) {
-        context = ctx;
-        // Ignore taskConfigs. May result in more churn of tasks during 
recovery if updated configs
-        // are very different, but reduces the difficulty of implementing a 
Connector
-    }
-
-    /**
-     * Start this Connector. This method will only be called on a clean 
Connector, i.e. it has
-     * either just been instantiated and initialized or {@link #stop()} has 
been invoked.
-     *
-     * @param props configuration settings
-     */
-    public abstract void start(Map<String, String> props);
-
-    /**
-     * Reconfigure this Connector. Most implementations will not override 
this, using the default
-     * implementation that calls {@link #stop()} followed by {@link 
#start(Map)}.
-     * Implementations only need to override this if they want to handle this 
process more
-     * efficiently, e.g. without shutting down network connections to the 
external system.
-     *
-     * @param props new configuration settings
-     */
-    public void reconfigure(Map<String, String> props) {
-        stop();
-        start(props);
-    }
-
-    /**
-     * Returns the Task implementation for this Connector.
-     */
-    public abstract Class<? extends Task> taskClass();
-
-    /**
-     * Returns a set of configurations for Tasks based on the current 
configuration,
-     * producing at most count configurations.
-     *
-     * @param maxTasks maximum number of configurations to generate
-     * @return configurations for Tasks
-     */
-    public abstract List<Map<String, String>> taskConfigs(int maxTasks);
-
-    /**
-     * Stop this connector.
-     */
-    public abstract void stop();
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/api/src/main/java/org/apache/kafka/copycat/connector/ConnectorContext.java
----------------------------------------------------------------------
diff --git 
a/copycat/api/src/main/java/org/apache/kafka/copycat/connector/ConnectorContext.java
 
b/copycat/api/src/main/java/org/apache/kafka/copycat/connector/ConnectorContext.java
deleted file mode 100644
index ecba69a..0000000
--- 
a/copycat/api/src/main/java/org/apache/kafka/copycat/connector/ConnectorContext.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.connector;
-
-import org.apache.kafka.common.annotation.InterfaceStability;
-
-/**
- * ConnectorContext allows Connectors to proactively interact with the Copycat 
runtime.
- */
-@InterfaceStability.Unstable
-public interface ConnectorContext {
-    /**
-     * Requests that the runtime reconfigure the Tasks for this source. This 
should be used to
-     * indicate to the runtime that something about the input/output has 
changed (e.g. partitions
-     * added/removed) and the running Tasks will need to be modified.
-     */
-    void requestTaskReconfiguration();
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/api/src/main/java/org/apache/kafka/copycat/connector/CopycatRecord.java
----------------------------------------------------------------------
diff --git 
a/copycat/api/src/main/java/org/apache/kafka/copycat/connector/CopycatRecord.java
 
b/copycat/api/src/main/java/org/apache/kafka/copycat/connector/CopycatRecord.java
deleted file mode 100644
index 0d3e8dc..0000000
--- 
a/copycat/api/src/main/java/org/apache/kafka/copycat/connector/CopycatRecord.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.connector;
-
-import org.apache.kafka.common.annotation.InterfaceStability;
-import org.apache.kafka.copycat.data.Schema;
-
-/**
- * <p>
- * Base class for records containing data to be copied to/from Kafka. This 
corresponds closely to
- * Kafka's ProducerRecord and ConsumerRecord classes, and holds the data that 
may be used by both
- * sources and sinks (topic, kafkaPartition, key, value). Although both 
implementations include a
- * notion of offset, it is not included here because they differ in type.
- * </p>
- */
-@InterfaceStability.Unstable
-public abstract class CopycatRecord {
-    private final String topic;
-    private final Integer kafkaPartition;
-    private final Schema keySchema;
-    private final Object key;
-    private final Schema valueSchema;
-    private final Object value;
-
-    public CopycatRecord(String topic, Integer kafkaPartition, Schema 
valueSchema, Object value) {
-        this(topic, kafkaPartition, null, null, valueSchema, value);
-    }
-
-    public CopycatRecord(String topic, Integer kafkaPartition, Schema 
keySchema, Object key, Schema valueSchema, Object value) {
-        this.topic = topic;
-        this.kafkaPartition = kafkaPartition;
-        this.keySchema = keySchema;
-        this.key = key;
-        this.valueSchema = valueSchema;
-        this.value = value;
-    }
-
-    public String topic() {
-        return topic;
-    }
-
-    public Integer kafkaPartition() {
-        return kafkaPartition;
-    }
-
-    public Object key() {
-        return key;
-    }
-
-    public Schema keySchema() {
-        return keySchema;
-    }
-
-    public Object value() {
-        return value;
-    }
-
-    public Schema valueSchema() {
-        return valueSchema;
-    }
-
-    @Override
-    public String toString() {
-        return "CopycatRecord{" +
-                "topic='" + topic + '\'' +
-                ", kafkaPartition=" + kafkaPartition +
-                ", key=" + key +
-                ", value=" + value +
-                '}';
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o)
-            return true;
-        if (o == null || getClass() != o.getClass())
-            return false;
-
-        CopycatRecord that = (CopycatRecord) o;
-
-        if (kafkaPartition != null ? 
!kafkaPartition.equals(that.kafkaPartition) : that.kafkaPartition != null)
-            return false;
-        if (topic != null ? !topic.equals(that.topic) : that.topic != null)
-            return false;
-        if (keySchema != null ? !keySchema.equals(that.keySchema) : 
that.keySchema != null)
-            return false;
-        if (key != null ? !key.equals(that.key) : that.key != null)
-            return false;
-        if (valueSchema != null ? !valueSchema.equals(that.valueSchema) : 
that.valueSchema != null)
-            return false;
-        if (value != null ? !value.equals(that.value) : that.value != null)
-            return false;
-
-        return true;
-    }
-
-    @Override
-    public int hashCode() {
-        int result = topic != null ? topic.hashCode() : 0;
-        result = 31 * result + (kafkaPartition != null ? 
kafkaPartition.hashCode() : 0);
-        result = 31 * result + (keySchema != null ? keySchema.hashCode() : 0);
-        result = 31 * result + (key != null ? key.hashCode() : 0);
-        result = 31 * result + (valueSchema != null ? valueSchema.hashCode() : 
0);
-        result = 31 * result + (value != null ? value.hashCode() : 0);
-        return result;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Task.java
----------------------------------------------------------------------
diff --git 
a/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Task.java 
b/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Task.java
deleted file mode 100644
index cb8b719..0000000
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Task.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.connector;
-
-import org.apache.kafka.common.annotation.InterfaceStability;
-
-import java.util.Map;
-
-/**
- * <p>
- * Tasks contain the code that actually copies data to/from another system. 
They receive
- * a configuration from their parent Connector, assigning them a fraction of a 
Copycat job's work.
- * The Copycat framework then pushes/pulls data from the Task. The Task must 
also be able to
- * respond to reconfiguration requests.
- * </p>
- * <p>
- * Task only contains the minimal shared functionality between
- * {@link org.apache.kafka.copycat.source.SourceTask} and
- * {@link org.apache.kafka.copycat.sink.SinkTask}.
- * </p>
- */
-@InterfaceStability.Unstable
-public interface Task {
-    /**
-     * Get the version of this task. Usually this should be the same as the 
corresponding {@link Connector} class's version.
-     *
-     * @return the version, formatted as a String
-     */
-    String version();
-
-    /**
-     * Start the Task
-     * @param props initial configuration
-     */
-    void start(Map<String, String> props);
-
-    /**
-     * Stop this task.
-     */
-    void stop();
-}

Reply via email to