http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/KafkaOffsetBackingStoreTest.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/KafkaOffsetBackingStoreTest.java
 
b/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/KafkaOffsetBackingStoreTest.java
deleted file mode 100644
index 69d9ab4..0000000
--- 
a/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/KafkaOffsetBackingStoreTest.java
+++ /dev/null
@@ -1,357 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.storage;
-
-import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.copycat.errors.CopycatException;
-import org.apache.kafka.copycat.util.Callback;
-import org.apache.kafka.copycat.util.KafkaBasedLog;
-import org.easymock.Capture;
-import org.easymock.EasyMock;
-import org.easymock.IAnswer;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.api.easymock.PowerMock;
-import org.powermock.api.easymock.annotation.Mock;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-import org.powermock.reflect.Whitebox;
-
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(KafkaOffsetBackingStore.class)
-@PowerMockIgnore("javax.management.*")
-public class KafkaOffsetBackingStoreTest {
-    private static final String TOPIC = "copycat-offsets";
-    private static final Map<String, String> DEFAULT_PROPS = new HashMap<>();
-    static {
-        DEFAULT_PROPS.put(KafkaOffsetBackingStore.OFFSET_STORAGE_TOPIC_CONFIG, 
TOPIC);
-        DEFAULT_PROPS.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
"broker1:9092,broker2:9093");
-    }
-    private static final Map<ByteBuffer, ByteBuffer> FIRST_SET = new 
HashMap<>();
-    static {
-        FIRST_SET.put(buffer("key"), buffer("value"));
-        FIRST_SET.put(null, null);
-    }
-
-    private static final ByteBuffer TP0_KEY = buffer("TP0KEY");
-    private static final ByteBuffer TP1_KEY = buffer("TP1KEY");
-    private static final ByteBuffer TP2_KEY = buffer("TP2KEY");
-    private static final ByteBuffer TP0_VALUE = buffer("VAL0");
-    private static final ByteBuffer TP1_VALUE = buffer("VAL1");
-    private static final ByteBuffer TP2_VALUE = buffer("VAL2");
-    private static final ByteBuffer TP0_VALUE_NEW = buffer("VAL0_NEW");
-    private static final ByteBuffer TP1_VALUE_NEW = buffer("VAL1_NEW");
-
-    @Mock
-    KafkaBasedLog<byte[], byte[]> storeLog;
-    private KafkaOffsetBackingStore store;
-
-    private Capture<String> capturedTopic = EasyMock.newCapture();
-    private Capture<Map<String, Object>> capturedProducerProps = 
EasyMock.newCapture();
-    private Capture<Map<String, Object>> capturedConsumerProps = 
EasyMock.newCapture();
-    private Capture<Callback<ConsumerRecord<byte[], byte[]>>> 
capturedConsumedCallback = EasyMock.newCapture();
-
-    @Before
-    public void setUp() throws Exception {
-        store = 
PowerMock.createPartialMockAndInvokeDefaultConstructor(KafkaOffsetBackingStore.class,
 new String[]{"createKafkaBasedLog"});
-    }
-
-    @Test(expected = CopycatException.class)
-    public void testMissingTopic() {
-        store = new KafkaOffsetBackingStore();
-        store.configure(Collections.<String, Object>emptyMap());
-    }
-
-    @Test
-    public void testStartStop() throws Exception {
-        expectConfigure();
-        expectStart(Collections.EMPTY_LIST);
-        expectStop();
-
-        PowerMock.replayAll();
-
-        store.configure(DEFAULT_PROPS);
-        assertEquals(TOPIC, capturedTopic.getValue());
-        
assertEquals("org.apache.kafka.common.serialization.ByteArraySerializer", 
capturedProducerProps.getValue().get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG));
-        
assertEquals("org.apache.kafka.common.serialization.ByteArraySerializer", 
capturedProducerProps.getValue().get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG));
-        
assertEquals("org.apache.kafka.common.serialization.ByteArrayDeserializer", 
capturedConsumerProps.getValue().get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG));
-        
assertEquals("org.apache.kafka.common.serialization.ByteArrayDeserializer", 
capturedConsumerProps.getValue().get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG));
-
-        store.start();
-        store.stop();
-
-        PowerMock.verifyAll();
-    }
-
-    @Test
-    public void testReloadOnStart() throws Exception {
-        expectConfigure();
-        expectStart(Arrays.asList(
-                new ConsumerRecord<>(TOPIC, 0, 0, TP0_KEY.array(), 
TP0_VALUE.array()),
-                new ConsumerRecord<>(TOPIC, 1, 0, TP1_KEY.array(), 
TP1_VALUE.array()),
-                new ConsumerRecord<>(TOPIC, 0, 1, TP0_KEY.array(), 
TP0_VALUE_NEW.array()),
-                new ConsumerRecord<>(TOPIC, 1, 1, TP1_KEY.array(), 
TP1_VALUE_NEW.array())
-        ));
-        expectStop();
-
-        PowerMock.replayAll();
-
-        store.configure(DEFAULT_PROPS);
-        store.start();
-        HashMap<ByteBuffer, ByteBuffer> data = 
Whitebox.getInternalState(store, "data");
-        assertEquals(TP0_VALUE_NEW, data.get(TP0_KEY));
-        assertEquals(TP1_VALUE_NEW, data.get(TP1_KEY));
-
-        store.stop();
-
-        PowerMock.verifyAll();
-    }
-
-    @Test
-    public void testGetSet() throws Exception {
-        expectConfigure();
-        expectStart(Collections.EMPTY_LIST);
-        expectStop();
-
-        // First get() against an empty store
-        final Capture<Callback<Void>> firstGetReadToEndCallback = 
EasyMock.newCapture();
-        storeLog.readToEnd(EasyMock.capture(firstGetReadToEndCallback));
-        PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
-            @Override
-            public Object answer() throws Throwable {
-                firstGetReadToEndCallback.getValue().onCompletion(null, null);
-                return null;
-            }
-        });
-
-        // Set offsets
-        Capture<org.apache.kafka.clients.producer.Callback> callback0 = 
EasyMock.newCapture();
-        storeLog.send(EasyMock.aryEq(TP0_KEY.array()), 
EasyMock.aryEq(TP0_VALUE.array()), EasyMock.capture(callback0));
-        PowerMock.expectLastCall();
-        Capture<org.apache.kafka.clients.producer.Callback> callback1 = 
EasyMock.newCapture();
-        storeLog.send(EasyMock.aryEq(TP1_KEY.array()), 
EasyMock.aryEq(TP1_VALUE.array()), EasyMock.capture(callback1));
-        PowerMock.expectLastCall();
-
-        // Second get() should get the produced data and return the new values
-        final Capture<Callback<Void>> secondGetReadToEndCallback = 
EasyMock.newCapture();
-        storeLog.readToEnd(EasyMock.capture(secondGetReadToEndCallback));
-        PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
-            @Override
-            public Object answer() throws Throwable {
-                capturedConsumedCallback.getValue().onCompletion(null, new 
ConsumerRecord<>(TOPIC, 0, 0, TP0_KEY.array(), TP0_VALUE.array()));
-                capturedConsumedCallback.getValue().onCompletion(null, new 
ConsumerRecord<>(TOPIC, 1, 0, TP1_KEY.array(), TP1_VALUE.array()));
-                secondGetReadToEndCallback.getValue().onCompletion(null, null);
-                return null;
-            }
-        });
-
-        // Third get() should pick up data produced by someone else and return 
those values
-        final Capture<Callback<Void>> thirdGetReadToEndCallback = 
EasyMock.newCapture();
-        storeLog.readToEnd(EasyMock.capture(thirdGetReadToEndCallback));
-        PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
-            @Override
-            public Object answer() throws Throwable {
-                capturedConsumedCallback.getValue().onCompletion(null, new 
ConsumerRecord<>(TOPIC, 0, 1, TP0_KEY.array(), TP0_VALUE_NEW.array()));
-                capturedConsumedCallback.getValue().onCompletion(null, new 
ConsumerRecord<>(TOPIC, 1, 1, TP1_KEY.array(), TP1_VALUE_NEW.array()));
-                thirdGetReadToEndCallback.getValue().onCompletion(null, null);
-                return null;
-            }
-        });
-
-        PowerMock.replayAll();
-
-
-
-        store.configure(DEFAULT_PROPS);
-        store.start();
-
-        // Getting from empty store should return nulls
-        final AtomicBoolean getInvokedAndPassed = new AtomicBoolean(false);
-        store.get(Arrays.asList(TP0_KEY, TP1_KEY), new 
Callback<Map<ByteBuffer, ByteBuffer>>() {
-            @Override
-            public void onCompletion(Throwable error, Map<ByteBuffer, 
ByteBuffer> result) {
-                // Since we didn't read them yet, these will be null
-                assertEquals(null, result.get(TP0_KEY));
-                assertEquals(null, result.get(TP1_KEY));
-                getInvokedAndPassed.set(true);
-            }
-        }).get(10000, TimeUnit.MILLISECONDS);
-        assertTrue(getInvokedAndPassed.get());
-
-        // Set some offsets
-        Map<ByteBuffer, ByteBuffer> toSet = new HashMap<>();
-        toSet.put(TP0_KEY, TP0_VALUE);
-        toSet.put(TP1_KEY, TP1_VALUE);
-        final AtomicBoolean invoked = new AtomicBoolean(false);
-        Future<Void> setFuture = store.set(toSet, new Callback<Void>() {
-            @Override
-            public void onCompletion(Throwable error, Void result) {
-                invoked.set(true);
-            }
-        });
-        assertFalse(setFuture.isDone());
-        // Out of order callbacks shouldn't matter, should still require all 
to be invoked before invoking the callback
-        // for the store's set callback
-        callback1.getValue().onCompletion(null, null);
-        assertFalse(invoked.get());
-        callback0.getValue().onCompletion(null, null);
-        setFuture.get(10000, TimeUnit.MILLISECONDS);
-        assertTrue(invoked.get());
-
-        // Getting data should read to end of our published data and return it
-        final AtomicBoolean secondGetInvokedAndPassed = new 
AtomicBoolean(false);
-        store.get(Arrays.asList(TP0_KEY, TP1_KEY), new 
Callback<Map<ByteBuffer, ByteBuffer>>() {
-            @Override
-            public void onCompletion(Throwable error, Map<ByteBuffer, 
ByteBuffer> result) {
-                assertEquals(TP0_VALUE, result.get(TP0_KEY));
-                assertEquals(TP1_VALUE, result.get(TP1_KEY));
-                secondGetInvokedAndPassed.set(true);
-            }
-        }).get(10000, TimeUnit.MILLISECONDS);
-        assertTrue(secondGetInvokedAndPassed.get());
-
-        // Getting data should read to end of our published data and return it
-        final AtomicBoolean thirdGetInvokedAndPassed = new 
AtomicBoolean(false);
-        store.get(Arrays.asList(TP0_KEY, TP1_KEY), new 
Callback<Map<ByteBuffer, ByteBuffer>>() {
-            @Override
-            public void onCompletion(Throwable error, Map<ByteBuffer, 
ByteBuffer> result) {
-                assertEquals(TP0_VALUE_NEW, result.get(TP0_KEY));
-                assertEquals(TP1_VALUE_NEW, result.get(TP1_KEY));
-                thirdGetInvokedAndPassed.set(true);
-            }
-        }).get(10000, TimeUnit.MILLISECONDS);
-        assertTrue(thirdGetInvokedAndPassed.get());
-
-        store.stop();
-
-        PowerMock.verifyAll();
-    }
-
-    @Test
-    public void testSetFailure() throws Exception {
-        expectConfigure();
-        expectStart(Collections.EMPTY_LIST);
-        expectStop();
-
-        // Set offsets
-        Capture<org.apache.kafka.clients.producer.Callback> callback0 = 
EasyMock.newCapture();
-        storeLog.send(EasyMock.aryEq(TP0_KEY.array()), 
EasyMock.aryEq(TP0_VALUE.array()), EasyMock.capture(callback0));
-        PowerMock.expectLastCall();
-        Capture<org.apache.kafka.clients.producer.Callback> callback1 = 
EasyMock.newCapture();
-        storeLog.send(EasyMock.aryEq(TP1_KEY.array()), 
EasyMock.aryEq(TP1_VALUE.array()), EasyMock.capture(callback1));
-        PowerMock.expectLastCall();
-        Capture<org.apache.kafka.clients.producer.Callback> callback2 = 
EasyMock.newCapture();
-        storeLog.send(EasyMock.aryEq(TP2_KEY.array()), 
EasyMock.aryEq(TP2_VALUE.array()), EasyMock.capture(callback2));
-        PowerMock.expectLastCall();
-
-        PowerMock.replayAll();
-
-
-
-        store.configure(DEFAULT_PROPS);
-        store.start();
-
-        // Set some offsets
-        Map<ByteBuffer, ByteBuffer> toSet = new HashMap<>();
-        toSet.put(TP0_KEY, TP0_VALUE);
-        toSet.put(TP1_KEY, TP1_VALUE);
-        toSet.put(TP2_KEY, TP2_VALUE);
-        final AtomicBoolean invoked = new AtomicBoolean(false);
-        final AtomicBoolean invokedFailure = new AtomicBoolean(false);
-        Future<Void> setFuture = store.set(toSet, new Callback<Void>() {
-            @Override
-            public void onCompletion(Throwable error, Void result) {
-                invoked.set(true);
-                if (error != null)
-                    invokedFailure.set(true);
-            }
-        });
-        assertFalse(setFuture.isDone());
-        // Out of order callbacks shouldn't matter, should still require all 
to be invoked before invoking the callback
-        // for the store's set callback
-        callback1.getValue().onCompletion(null, null);
-        assertFalse(invoked.get());
-        callback2.getValue().onCompletion(null, new KafkaException("bogus 
error"));
-        assertTrue(invoked.get());
-        assertTrue(invokedFailure.get());
-        callback0.getValue().onCompletion(null, null);
-        try {
-            setFuture.get(10000, TimeUnit.MILLISECONDS);
-            fail("Should have seen KafkaException thrown when waiting on 
KafkaOffsetBackingStore.set() future");
-        } catch (ExecutionException e) {
-            // expected
-            assertNotNull(e.getCause());
-            assertTrue(e.getCause() instanceof KafkaException);
-        }
-
-        store.stop();
-
-        PowerMock.verifyAll();
-    }
-
-    private void expectConfigure() throws Exception {
-        PowerMock.expectPrivate(store, "createKafkaBasedLog", 
EasyMock.capture(capturedTopic), EasyMock.capture(capturedProducerProps),
-                EasyMock.capture(capturedConsumerProps), 
EasyMock.capture(capturedConsumedCallback))
-                .andReturn(storeLog);
-    }
-
-    private void expectStart(final List<ConsumerRecord<byte[], byte[]>> 
preexistingRecords) throws Exception {
-        storeLog.start();
-        PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
-            @Override
-            public Object answer() throws Throwable {
-                for (ConsumerRecord<byte[], byte[]> rec : preexistingRecords)
-                    capturedConsumedCallback.getValue().onCompletion(null, 
rec);
-                return null;
-            }
-        });
-    }
-
-    private void expectStop() {
-        storeLog.stop();
-        PowerMock.expectLastCall();
-    }
-
-    private static ByteBuffer buffer(String v) {
-        return ByteBuffer.wrap(v.getBytes());
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/OffsetStorageWriterTest.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/OffsetStorageWriterTest.java
 
b/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/OffsetStorageWriterTest.java
deleted file mode 100644
index 3dd0b52..0000000
--- 
a/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/OffsetStorageWriterTest.java
+++ /dev/null
@@ -1,272 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.storage;
-
-import org.apache.kafka.copycat.errors.CopycatException;
-import org.apache.kafka.copycat.util.Callback;
-import org.easymock.Capture;
-import org.easymock.EasyMock;
-import org.easymock.IAnswer;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.api.easymock.PowerMock;
-import org.powermock.api.easymock.annotation.Mock;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.*;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-@RunWith(PowerMockRunner.class)
-public class OffsetStorageWriterTest {
-    private static final String NAMESPACE = "namespace";
-    // Copycat format - any types should be accepted here
-    private static final Map<String, String> OFFSET_KEY = 
Collections.singletonMap("key", "key");
-    private static final Map<String, Integer> OFFSET_VALUE = 
Collections.singletonMap("key", 12);
-
-    // Serialized
-    private static final byte[] OFFSET_KEY_SERIALIZED = 
"key-serialized".getBytes();
-    private static final byte[] OFFSET_VALUE_SERIALIZED = 
"value-serialized".getBytes();
-
-    @Mock private OffsetBackingStore store;
-    @Mock private Converter keyConverter;
-    @Mock private Converter valueConverter;
-    private OffsetStorageWriter writer;
-
-    private static Exception exception = new RuntimeException("error");
-
-    private ExecutorService service;
-
-    @Before
-    public void setup() {
-        writer = new OffsetStorageWriter(store, NAMESPACE, keyConverter, 
valueConverter);
-        service = Executors.newFixedThreadPool(1);
-    }
-
-    @After
-    public void teardown() {
-        service.shutdownNow();
-    }
-
-    @Test
-    public void testWriteFlush() throws Exception {
-        @SuppressWarnings("unchecked")
-        Callback<Void> callback = PowerMock.createMock(Callback.class);
-        expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, 
OFFSET_VALUE_SERIALIZED, callback, false, null);
-
-        PowerMock.replayAll();
-
-        writer.offset(OFFSET_KEY, OFFSET_VALUE);
-
-        assertTrue(writer.beginFlush());
-        writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS);
-
-        PowerMock.verifyAll();
-    }
-
-    // It should be possible to set offset values to null
-    @Test
-    public void testWriteNullValueFlush() throws Exception {
-        @SuppressWarnings("unchecked")
-        Callback<Void> callback = PowerMock.createMock(Callback.class);
-        expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, null, null, callback, 
false, null);
-
-        PowerMock.replayAll();
-
-        writer.offset(OFFSET_KEY, null);
-
-        assertTrue(writer.beginFlush());
-        writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS);
-
-        PowerMock.verifyAll();
-    }
-
-    // It should be possible to use null keys. These aren't actually stored as 
null since the key is wrapped to include
-    // info about the namespace (connector)
-    @Test
-    public void testWriteNullKeyFlush() throws Exception {
-        @SuppressWarnings("unchecked")
-        Callback<Void> callback = PowerMock.createMock(Callback.class);
-        expectStore(null, null, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, 
callback, false, null);
-
-        PowerMock.replayAll();
-
-        writer.offset(null, OFFSET_VALUE);
-
-        assertTrue(writer.beginFlush());
-        writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS);
-
-        PowerMock.verifyAll();
-    }
-
-    @Test
-    public void testNoOffsetsToFlush() {
-        // If no offsets are flushed, we should finish immediately and not 
have made any calls to the
-        // underlying storage layer
-
-        PowerMock.replayAll();
-
-        // Should not return a future
-        assertFalse(writer.beginFlush());
-
-        PowerMock.verifyAll();
-    }
-
-    @Test
-    public void testFlushFailureReplacesOffsets() throws Exception {
-        // When a flush fails, we shouldn't just lose the offsets. Instead, 
they should be restored
-        // such that a subsequent flush will write them.
-
-        @SuppressWarnings("unchecked")
-        final Callback<Void> callback = PowerMock.createMock(Callback.class);
-        // First time the write fails
-        expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, 
OFFSET_VALUE_SERIALIZED, callback, true, null);
-        // Second time it succeeds
-        expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, 
OFFSET_VALUE_SERIALIZED, callback, false, null);
-        // Third time it has no data to flush so we won't get past beginFlush()
-
-        PowerMock.replayAll();
-
-        writer.offset(OFFSET_KEY, OFFSET_VALUE);
-        assertTrue(writer.beginFlush());
-        writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS);
-        assertTrue(writer.beginFlush());
-        writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS);
-        assertFalse(writer.beginFlush());
-
-        PowerMock.verifyAll();
-    }
-
-    @Test(expected = CopycatException.class)
-    public void testAlreadyFlushing() throws Exception {
-        @SuppressWarnings("unchecked")
-        final Callback<Void> callback = PowerMock.createMock(Callback.class);
-        // Trigger the send, but don't invoke the callback so we'll still be 
mid-flush
-        CountDownLatch allowStoreCompleteCountdown = new CountDownLatch(1);
-        expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, 
OFFSET_VALUE_SERIALIZED, null, false, allowStoreCompleteCountdown);
-
-        PowerMock.replayAll();
-
-        writer.offset(OFFSET_KEY, OFFSET_VALUE);
-        assertTrue(writer.beginFlush());
-        writer.doFlush(callback);
-        assertTrue(writer.beginFlush()); // should throw
-
-        PowerMock.verifyAll();
-    }
-
-    @Test
-    public void testCancelBeforeAwaitFlush() {
-        PowerMock.replayAll();
-
-        writer.offset(OFFSET_KEY, OFFSET_VALUE);
-        assertTrue(writer.beginFlush());
-        writer.cancelFlush();
-
-        PowerMock.verifyAll();
-    }
-
-    @Test
-    public void testCancelAfterAwaitFlush() throws Exception {
-        @SuppressWarnings("unchecked")
-        Callback<Void> callback = PowerMock.createMock(Callback.class);
-        CountDownLatch allowStoreCompleteCountdown = new CountDownLatch(1);
-        // In this test, the write should be cancelled so the callback will 
not be invoked and is not
-        // passed to the expectStore call
-        expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, 
OFFSET_VALUE_SERIALIZED, null, false, allowStoreCompleteCountdown);
-
-        PowerMock.replayAll();
-
-        writer.offset(OFFSET_KEY, OFFSET_VALUE);
-        assertTrue(writer.beginFlush());
-        // Start the flush, then immediately cancel before allowing the mocked 
store request to finish
-        Future<Void> flushFuture = writer.doFlush(callback);
-        writer.cancelFlush();
-        allowStoreCompleteCountdown.countDown();
-        flushFuture.get(1000, TimeUnit.MILLISECONDS);
-
-        PowerMock.verifyAll();
-    }
-
-    /**
-     * Expect a request to store data to the underlying OffsetBackingStore.
-     *
-     * @param key the key for the offset
-     * @param keySerialized serialized version of the key
-     * @param value the value for the offset
-     * @param valueSerialized serialized version of the value
-     * @param callback the callback to invoke when completed, or null if the 
callback isn't
-     *                 expected to be invoked
-     * @param fail if true, treat
-     * @param waitForCompletion if non-null, a CountDownLatch that should be 
awaited on before
-     *                          invoking the callback. A (generous) timeout is 
still imposed to
-     *                          ensure tests complete.
-     * @return the captured set of ByteBuffer key-value pairs passed to the 
storage layer
-     */
-    private void expectStore(Map<String, String> key, byte[] keySerialized,
-                             Map<String, Integer> value, byte[] 
valueSerialized,
-                             final Callback<Void> callback,
-                             final boolean fail,
-                             final CountDownLatch waitForCompletion) {
-        List<Object> keyWrapped = Arrays.asList(NAMESPACE, key);
-        EasyMock.expect(keyConverter.fromCopycatData(NAMESPACE, null, 
keyWrapped)).andReturn(keySerialized);
-        EasyMock.expect(valueConverter.fromCopycatData(NAMESPACE, null, 
value)).andReturn(valueSerialized);
-
-        final Capture<Callback<Void>> storeCallback = Capture.newInstance();
-        final Map<ByteBuffer, ByteBuffer> offsetsSerialized = 
Collections.singletonMap(
-                keySerialized == null ? null : ByteBuffer.wrap(keySerialized),
-                valueSerialized == null ? null : 
ByteBuffer.wrap(valueSerialized));
-        EasyMock.expect(store.set(EasyMock.eq(offsetsSerialized), 
EasyMock.capture(storeCallback)))
-                .andAnswer(new IAnswer<Future<Void>>() {
-                    @Override
-                    public Future<Void> answer() throws Throwable {
-                        return service.submit(new Callable<Void>() {
-                            @Override
-                            public Void call() throws Exception {
-                                if (waitForCompletion != null)
-                                    assertTrue(waitForCompletion.await(10000, 
TimeUnit.MILLISECONDS));
-
-                                if (fail) {
-                                    
storeCallback.getValue().onCompletion(exception, null);
-                                } else {
-                                    
storeCallback.getValue().onCompletion(null, null);
-                                }
-                                return null;
-                            }
-                        });
-                    }
-                });
-        if (callback != null) {
-            if (fail) {
-                callback.onCompletion(EasyMock.eq(exception), 
EasyMock.eq((Void) null));
-            } else {
-                callback.onCompletion(null, null);
-            }
-        }
-        PowerMock.expectLastCall();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/ByteArrayProducerRecordEquals.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/ByteArrayProducerRecordEquals.java
 
b/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/ByteArrayProducerRecordEquals.java
deleted file mode 100644
index 929ea85..0000000
--- 
a/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/ByteArrayProducerRecordEquals.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.util;
-
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.easymock.EasyMock;
-import org.easymock.IArgumentMatcher;
-
-import java.util.Arrays;
-
-public class ByteArrayProducerRecordEquals implements IArgumentMatcher {
-    private ProducerRecord<byte[], byte[]> record;
-
-    public static ProducerRecord<byte[], byte[]> 
eqProducerRecord(ProducerRecord<byte[], byte[]> in) {
-        EasyMock.reportMatcher(new ByteArrayProducerRecordEquals(in));
-        return null;
-    }
-
-    public ByteArrayProducerRecordEquals(ProducerRecord<byte[], byte[]> 
record) {
-        this.record = record;
-    }
-
-    @Override
-    public boolean matches(Object argument) {
-        if (!(argument instanceof ProducerRecord))
-            return false;
-        ProducerRecord<byte[], byte[]> other = (ProducerRecord<byte[], 
byte[]>) argument;
-        return record.topic().equals(other.topic()) &&
-                record.partition() != null ? 
record.partition().equals(other.partition()) : other.partition() == null &&
-                record.key() != null ? Arrays.equals(record.key(), 
other.key()) : other.key() == null &&
-                record.value() != null ? Arrays.equals(record.value(), 
other.value()) : other.value() == null;
-    }
-
-    @Override
-    public void appendTo(StringBuffer buffer) {
-        buffer.append(record.toString());
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/KafkaBasedLogTest.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/KafkaBasedLogTest.java
 
b/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/KafkaBasedLogTest.java
deleted file mode 100644
index eb62c9e..0000000
--- 
a/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/KafkaBasedLogTest.java
+++ /dev/null
@@ -1,437 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.util;
-
-import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.MockConsumer;
-import org.apache.kafka.clients.consumer.OffsetResetStrategy;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.LeaderNotAvailableException;
-import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.utils.Time;
-import org.easymock.Capture;
-import org.easymock.EasyMock;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.api.easymock.PowerMock;
-import org.powermock.api.easymock.annotation.Mock;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-import org.powermock.reflect.Whitebox;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(KafkaBasedLog.class)
-@PowerMockIgnore("javax.management.*")
-public class KafkaBasedLogTest {
-
-    private static final String TOPIC = "copycat-log";
-    private static final TopicPartition TP0 = new TopicPartition(TOPIC, 0);
-    private static final TopicPartition TP1 = new TopicPartition(TOPIC, 1);
-    private static final Map<String, Object> PRODUCER_PROPS = new HashMap<>();
-    static {
-        PRODUCER_PROPS.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
"broker1:9092,broker2:9093");
-        PRODUCER_PROPS.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer");
-        PRODUCER_PROPS.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer");
-    }
-    private static final Map<String, Object> CONSUMER_PROPS = new HashMap<>();
-    static {
-        CONSUMER_PROPS.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
"broker1:9092,broker2:9093");
-        CONSUMER_PROPS.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringDeserializer");
-        CONSUMER_PROPS.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringDeserializer");
-    }
-
-    private static final Set<TopicPartition> CONSUMER_ASSIGNMENT = new 
HashSet<>(Arrays.asList(TP0, TP1));
-    private static final Map<String, String> FIRST_SET = new HashMap<>();
-    static {
-        FIRST_SET.put("key", "value");
-        FIRST_SET.put(null, null);
-    }
-
-    private static final Node LEADER = new Node(1, "broker1", 9092);
-    private static final Node REPLICA = new Node(1, "broker2", 9093);
-
-    private static final PartitionInfo TPINFO0 = new PartitionInfo(TOPIC, 0, 
LEADER, new Node[]{REPLICA}, new Node[]{REPLICA});
-    private static final PartitionInfo TPINFO1 = new PartitionInfo(TOPIC, 1, 
LEADER, new Node[]{REPLICA}, new Node[]{REPLICA});
-
-    private static final String TP0_KEY = "TP0KEY";
-    private static final String TP1_KEY = "TP1KEY";
-    private static final String TP0_VALUE = "VAL0";
-    private static final String TP1_VALUE = "VAL1";
-    private static final String TP0_VALUE_NEW = "VAL0_NEW";
-    private static final String TP1_VALUE_NEW = "VAL1_NEW";
-
-    private Time time = new MockTime();
-    private KafkaBasedLog<String, String> store;
-
-    @Mock
-    private KafkaProducer<String, String> producer;
-    private MockConsumer<String, String> consumer;
-
-    private List<ConsumerRecord<String, String>> consumedRecords = new 
ArrayList<>();
-    private Callback<ConsumerRecord<String, String>> consumedCallback = new 
Callback<ConsumerRecord<String, String>>() {
-        @Override
-        public void onCompletion(Throwable error, ConsumerRecord<String, 
String> record) {
-            consumedRecords.add(record);
-        }
-    };
-
-    @Before
-    public void setUp() throws Exception {
-        store = PowerMock.createPartialMock(KafkaBasedLog.class, new 
String[]{"createConsumer", "createProducer"},
-                TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, consumedCallback, time);
-        consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
-        consumer.updatePartitions(TOPIC, Arrays.asList(TPINFO0, TPINFO1));
-        Map<TopicPartition, Long> beginningOffsets = new HashMap<>();
-        beginningOffsets.put(TP0, 0L);
-        beginningOffsets.put(TP1, 0L);
-        consumer.updateBeginningOffsets(beginningOffsets);
-    }
-
-    @Test
-    public void testStartStop() throws Exception {
-        expectStart();
-        expectStop();
-
-        PowerMock.replayAll();
-
-        Map<TopicPartition, Long> endOffsets = new HashMap<>();
-        endOffsets.put(TP0, 0L);
-        endOffsets.put(TP1, 0L);
-        consumer.updateEndOffsets(endOffsets);
-        store.start();
-        assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment());
-
-        store.stop();
-
-        assertFalse(Whitebox.<Thread>getInternalState(store, 
"thread").isAlive());
-        assertTrue(consumer.closed());
-        PowerMock.verifyAll();
-    }
-
-    @Test
-    public void testReloadOnStart() throws Exception {
-        expectStart();
-        expectStop();
-
-        PowerMock.replayAll();
-
-        Map<TopicPartition, Long> endOffsets = new HashMap<>();
-        endOffsets.put(TP0, 1L);
-        endOffsets.put(TP1, 1L);
-        consumer.updateEndOffsets(endOffsets);
-        final CountDownLatch finishedLatch = new CountDownLatch(1);
-        consumer.schedulePollTask(new Runnable() { // Use first poll task to 
setup sequence of remaining responses to polls
-            @Override
-            public void run() {
-                // Should keep polling until it reaches current log end offset 
for all partitions. Should handle
-                // as many empty polls as needed
-                consumer.scheduleNopPollTask();
-                consumer.scheduleNopPollTask();
-                consumer.schedulePollTask(new Runnable() {
-                    @Override
-                    public void run() {
-                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, 
TP0_KEY, TP0_VALUE));
-                    }
-                });
-                consumer.scheduleNopPollTask();
-                consumer.scheduleNopPollTask();
-                consumer.schedulePollTask(new Runnable() {
-                    @Override
-                    public void run() {
-                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 0, 
TP1_KEY, TP1_VALUE));
-                    }
-                });
-                consumer.schedulePollTask(new Runnable() {
-                    @Override
-                    public void run() {
-                        finishedLatch.countDown();
-                    }
-                });
-            }
-        });
-        store.start();
-        assertTrue(finishedLatch.await(10000, TimeUnit.MILLISECONDS));
-
-        assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment());
-        assertEquals(2, consumedRecords.size());
-        assertEquals(TP0_VALUE, consumedRecords.get(0).value());
-        assertEquals(TP1_VALUE, consumedRecords.get(1).value());
-
-        store.stop();
-
-        assertFalse(Whitebox.<Thread>getInternalState(store, 
"thread").isAlive());
-        assertTrue(consumer.closed());
-        PowerMock.verifyAll();
-    }
-
-    @Test
-    public void testSendAndReadToEnd() throws Exception {
-        expectStart();
-        TestFuture<RecordMetadata> tp0Future = new TestFuture<>();
-        ProducerRecord<String, String> tp0Record = new ProducerRecord<>(TOPIC, 
TP0_KEY, TP0_VALUE);
-        Capture<org.apache.kafka.clients.producer.Callback> callback0 = 
EasyMock.newCapture();
-        EasyMock.expect(producer.send(EasyMock.eq(tp0Record), 
EasyMock.capture(callback0))).andReturn(tp0Future);
-        TestFuture<RecordMetadata> tp1Future = new TestFuture<>();
-        ProducerRecord<String, String> tp1Record = new ProducerRecord<>(TOPIC, 
TP1_KEY, TP1_VALUE);
-        Capture<org.apache.kafka.clients.producer.Callback> callback1 = 
EasyMock.newCapture();
-        EasyMock.expect(producer.send(EasyMock.eq(tp1Record), 
EasyMock.capture(callback1))).andReturn(tp1Future);
-
-        // Producer flushes when read to log end is called
-        producer.flush();
-        PowerMock.expectLastCall();
-
-        expectStop();
-
-        PowerMock.replayAll();
-
-        Map<TopicPartition, Long> endOffsets = new HashMap<>();
-        endOffsets.put(TP0, 0L);
-        endOffsets.put(TP1, 0L);
-        consumer.updateEndOffsets(endOffsets);
-        store.start();
-        assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment());
-        assertEquals(0L, consumer.position(TP0));
-        assertEquals(0L, consumer.position(TP1));
-
-        // Set some keys
-        final AtomicInteger invoked = new AtomicInteger(0);
-        org.apache.kafka.clients.producer.Callback producerCallback = new 
org.apache.kafka.clients.producer.Callback() {
-            @Override
-            public void onCompletion(RecordMetadata metadata, Exception 
exception) {
-                invoked.incrementAndGet();
-            }
-        };
-        store.send(TP0_KEY, TP0_VALUE, producerCallback);
-        store.send(TP1_KEY, TP1_VALUE, producerCallback);
-        assertEquals(0, invoked.get());
-        tp1Future.resolve((RecordMetadata) null); // Output not used, so safe 
to not return a real value for testing
-        callback1.getValue().onCompletion(null, null);
-        assertEquals(1, invoked.get());
-        tp0Future.resolve((RecordMetadata) null);
-        callback0.getValue().onCompletion(null, null);
-        assertEquals(2, invoked.get());
-
-        // Now we should have to wait for the records to be read back when we 
call readToEnd()
-        final AtomicBoolean getInvokedAndPassed = new AtomicBoolean(false);
-        final FutureCallback<Void> readEndFutureCallback = new 
FutureCallback<>(new Callback<Void>() {
-            @Override
-            public void onCompletion(Throwable error, Void result) {
-                assertEquals(4, consumedRecords.size());
-                assertEquals(TP0_VALUE_NEW, consumedRecords.get(2).value());
-                assertEquals(TP1_VALUE_NEW, consumedRecords.get(3).value());
-                getInvokedAndPassed.set(true);
-            }
-        });
-        consumer.schedulePollTask(new Runnable() {
-            @Override
-            public void run() {
-                // Once we're synchronized in a poll, start the read to end 
and schedule the exact set of poll events
-                // that should follow. This readToEnd call will immediately 
wakeup this consumer.poll() call without
-                // returning any data.
-                store.readToEnd(readEndFutureCallback);
-
-                // Needs to seek to end to find end offsets
-                consumer.schedulePollTask(new Runnable() {
-                    @Override
-                    public void run() {
-                        Map<TopicPartition, Long> newEndOffsets = new 
HashMap<>();
-                        newEndOffsets.put(TP0, 2L);
-                        newEndOffsets.put(TP1, 2L);
-                        consumer.updateEndOffsets(newEndOffsets);
-                    }
-                });
-
-                // Should keep polling until it reaches current log end offset 
for all partitions
-                consumer.scheduleNopPollTask();
-                consumer.scheduleNopPollTask();
-                consumer.scheduleNopPollTask();
-                consumer.schedulePollTask(new Runnable() {
-                    @Override
-                    public void run() {
-                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, 
TP0_KEY, TP0_VALUE));
-                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 1, 
TP0_KEY, TP0_VALUE_NEW));
-                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 0, 
TP1_KEY, TP1_VALUE));
-                    }
-                });
-
-                consumer.schedulePollTask(new Runnable() {
-                    @Override
-                    public void run() {
-                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 1, 
TP1_KEY, TP1_VALUE_NEW));
-                    }
-                });
-
-                // Already have FutureCallback that should be invoked/awaited, 
so no need for follow up finishedLatch
-            }
-        });
-        readEndFutureCallback.get(10000, TimeUnit.MILLISECONDS);
-        assertTrue(getInvokedAndPassed.get());
-
-        // Cleanup
-        store.stop();
-
-        assertFalse(Whitebox.<Thread>getInternalState(store, 
"thread").isAlive());
-        assertTrue(consumer.closed());
-        PowerMock.verifyAll();
-    }
-
-    @Test
-    public void testConsumerError() throws Exception {
-        expectStart();
-        expectStop();
-
-        PowerMock.replayAll();
-
-        final CountDownLatch finishedLatch = new CountDownLatch(1);
-        Map<TopicPartition, Long> endOffsets = new HashMap<>();
-        endOffsets.put(TP0, 1L);
-        endOffsets.put(TP1, 1L);
-        consumer.updateEndOffsets(endOffsets);
-        consumer.schedulePollTask(new Runnable() {
-            @Override
-            public void run() {
-                // Trigger exception
-                consumer.schedulePollTask(new Runnable() {
-                    @Override
-                    public void run() {
-                        
consumer.setException(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.exception());
-                    }
-                });
-
-                // Should keep polling until it reaches current log end offset 
for all partitions
-                consumer.scheduleNopPollTask();
-                consumer.scheduleNopPollTask();
-                consumer.schedulePollTask(new Runnable() {
-                    @Override
-                    public void run() {
-                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, 
TP0_KEY, TP0_VALUE_NEW));
-                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 0, 
TP0_KEY, TP0_VALUE_NEW));
-                    }
-                });
-
-                consumer.schedulePollTask(new Runnable() {
-                    @Override
-                    public void run() {
-                        finishedLatch.countDown();
-                    }
-                });
-            }
-        });
-        store.start();
-        assertTrue(finishedLatch.await(10000, TimeUnit.MILLISECONDS));
-        assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment());
-        assertEquals(1L, consumer.position(TP0));
-
-        store.stop();
-
-        assertFalse(Whitebox.<Thread>getInternalState(store, 
"thread").isAlive());
-        assertTrue(consumer.closed());
-        PowerMock.verifyAll();
-    }
-
-    @Test
-    public void testProducerError() throws Exception {
-        expectStart();
-        TestFuture<RecordMetadata> tp0Future = new TestFuture<>();
-        ProducerRecord<String, String> tp0Record = new ProducerRecord<>(TOPIC, 
TP0_KEY, TP0_VALUE);
-        Capture<org.apache.kafka.clients.producer.Callback> callback0 = 
EasyMock.newCapture();
-        EasyMock.expect(producer.send(EasyMock.eq(tp0Record), 
EasyMock.capture(callback0))).andReturn(tp0Future);
-
-        expectStop();
-
-        PowerMock.replayAll();
-
-        Map<TopicPartition, Long> endOffsets = new HashMap<>();
-        endOffsets.put(TP0, 0L);
-        endOffsets.put(TP1, 0L);
-        consumer.updateEndOffsets(endOffsets);
-        store.start();
-        assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment());
-        assertEquals(0L, consumer.position(TP0));
-        assertEquals(0L, consumer.position(TP1));
-
-        final AtomicReference<Throwable> setException = new 
AtomicReference<>();
-        store.send(TP0_KEY, TP0_VALUE, new 
org.apache.kafka.clients.producer.Callback() {
-            @Override
-            public void onCompletion(RecordMetadata metadata, Exception 
exception) {
-                assertNull(setException.get()); // Should only be invoked once
-                setException.set(exception);
-            }
-        });
-        KafkaException exc = new LeaderNotAvailableException("Error");
-        tp0Future.resolve(exc);
-        callback0.getValue().onCompletion(null, exc);
-        assertNotNull(setException.get());
-
-        store.stop();
-
-        assertFalse(Whitebox.<Thread>getInternalState(store, 
"thread").isAlive());
-        assertTrue(consumer.closed());
-        PowerMock.verifyAll();
-    }
-
-
-    private void expectStart() throws Exception {
-        PowerMock.expectPrivate(store, "createProducer")
-                .andReturn(producer);
-        PowerMock.expectPrivate(store, "createConsumer")
-                .andReturn(consumer);
-    }
-
-    private void expectStop() {
-        producer.close();
-        PowerMock.expectLastCall();
-        // MockConsumer close is checked after test.
-    }
-
-    private static ByteBuffer buffer(String v) {
-        return ByteBuffer.wrap(v.getBytes());
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/MockTime.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/MockTime.java 
b/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/MockTime.java
deleted file mode 100644
index 53149db..0000000
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/MockTime.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-package org.apache.kafka.copycat.util;
-
-import org.apache.kafka.common.utils.Time;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * A clock that you can manually advance by calling sleep
- */
-public class MockTime implements Time {
-
-    private long nanos = 0;
-
-    public MockTime() {
-        this.nanos = System.nanoTime();
-    }
-
-    @Override
-    public long milliseconds() {
-        return TimeUnit.MILLISECONDS.convert(this.nanos, TimeUnit.NANOSECONDS);
-    }
-
-    @Override
-    public long nanoseconds() {
-        return nanos;
-    }
-
-    @Override
-    public void sleep(long ms) {
-        this.nanos += TimeUnit.NANOSECONDS.convert(ms, TimeUnit.MILLISECONDS);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/ShutdownableThreadTest.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/ShutdownableThreadTest.java
 
b/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/ShutdownableThreadTest.java
deleted file mode 100644
index 4880ca1..0000000
--- 
a/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/ShutdownableThreadTest.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.util;
-
-import org.junit.Test;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-public class ShutdownableThreadTest {
-
-    @Test
-    public void testGracefulShutdown() throws InterruptedException {
-        ShutdownableThread thread = new ShutdownableThread("graceful") {
-            @Override
-            public void execute() {
-                while (getRunning()) {
-                    try {
-                        Thread.sleep(1);
-                    } catch (InterruptedException e) {
-                        // Ignore
-                    }
-                }
-            }
-        };
-        thread.start();
-        Thread.sleep(10);
-        assertTrue(thread.gracefulShutdown(1000, TimeUnit.MILLISECONDS));
-    }
-
-    @Test
-    public void testForcibleShutdown() throws InterruptedException {
-        final CountDownLatch startedLatch = new CountDownLatch(1);
-        ShutdownableThread thread = new ShutdownableThread("forcible") {
-            @Override
-            public void execute() {
-                try {
-                    startedLatch.countDown();
-                    Thread.sleep(100000);
-                } catch (InterruptedException e) {
-                    // Ignore
-                }
-            }
-        };
-        thread.start();
-        startedLatch.await();
-        thread.forceShutdown();
-        // Not all threads can be forcibly stopped since interrupt() doesn't 
work on threads in
-        // certain conditions, but in this case we know the thread is 
interruptible so we should be
-        // able join() it
-        thread.join(1000);
-        assertFalse(thread.isAlive());
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/TestBackgroundThreadExceptionHandler.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/TestBackgroundThreadExceptionHandler.java
 
b/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/TestBackgroundThreadExceptionHandler.java
deleted file mode 100644
index 5dc6d33..0000000
--- 
a/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/TestBackgroundThreadExceptionHandler.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.util;
-
-/**
- * An UncaughtExceptionHandler that can be registered with one or more threads 
which tracks the
- * first exception so the main thread can check for uncaught exceptions.
- */
-public class TestBackgroundThreadExceptionHandler implements 
Thread.UncaughtExceptionHandler {
-    private Throwable firstException = null;
-
-    @Override
-    public void uncaughtException(Thread t, Throwable e) {
-        if (this.firstException == null)
-            this.firstException = e;
-    }
-
-    public void verifyNoExceptions() {
-        if (this.firstException != null)
-            throw new AssertionError(this.firstException);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/TestFuture.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/TestFuture.java 
b/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/TestFuture.java
deleted file mode 100644
index 55e24c8..0000000
--- 
a/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/TestFuture.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.util;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-public class TestFuture<T> implements Future<T> {
-    private volatile boolean resolved;
-    private T result;
-    private Throwable exception;
-    private CountDownLatch getCalledLatch;
-
-    private volatile boolean resolveOnGet;
-    private T resolveOnGetResult;
-    private Throwable resolveOnGetException;
-
-    public TestFuture() {
-        resolved = false;
-        getCalledLatch = new CountDownLatch(1);
-
-        resolveOnGet = false;
-        resolveOnGetResult = null;
-        resolveOnGetException = null;
-    }
-
-    public void resolve(T val) {
-        this.result = val;
-        resolved = true;
-        synchronized (this) {
-            this.notifyAll();
-        }
-    }
-
-    public void resolve(Throwable t) {
-        exception = t;
-        resolved = true;
-        synchronized (this) {
-            this.notifyAll();
-        }
-    }
-
-    @Override
-    public boolean cancel(boolean mayInterruptIfRunning) {
-        return false;
-    }
-
-    @Override
-    public boolean isCancelled() {
-        return false;
-    }
-
-    @Override
-    public boolean isDone() {
-        return resolved;
-    }
-
-    @Override
-    public T get() throws InterruptedException, ExecutionException {
-        getCalledLatch.countDown();
-        while (true) {
-            try {
-                return get(Integer.MAX_VALUE, TimeUnit.DAYS);
-            } catch (TimeoutException e) {
-                // ignore
-            }
-        }
-    }
-
-    @Override
-    public T get(long timeout, TimeUnit unit) throws InterruptedException, 
ExecutionException, TimeoutException {
-        getCalledLatch.countDown();
-
-        if (resolveOnGet) {
-            if (resolveOnGetException != null)
-                resolve(resolveOnGetException);
-            else
-                resolve(resolveOnGetResult);
-        }
-
-        synchronized (this) {
-            while (!resolved) {
-                this.wait(TimeUnit.MILLISECONDS.convert(timeout, unit));
-            }
-        }
-
-        if (exception != null) {
-            if (exception instanceof TimeoutException)
-                throw (TimeoutException) exception;
-            else if (exception instanceof InterruptedException)
-                throw (InterruptedException) exception;
-            else
-                throw new ExecutionException(exception);
-        }
-        return result;
-    }
-
-    /**
-     * Set a flag to resolve the future as soon as one of the get() methods 
has been called. Returns immediately.
-     * @param val the value to return from the future
-     */
-    public void resolveOnGet(T val) {
-        resolveOnGet = true;
-        resolveOnGetResult = val;
-    }
-
-    /**
-     * Set a flag to resolve the future as soon as one of the get() methods 
has been called. Returns immediately.
-     * @param t the exception to return from the future
-     */
-    public void resolveOnGet(Throwable t) {
-        resolveOnGet = true;
-        resolveOnGetException = t;
-    }
-
-    /**
-     * Block, waiting for another thread to call one of the get() methods, and 
then immediately resolve the future with
-     * the specified value.
-     * @param val the value to return from the future
-     */
-    public void waitForGetAndResolve(T val) {
-        waitForGet();
-        resolve(val);
-    }
-
-    /**
-     * Block, waiting for another thread to call one of the get() methods, and 
then immediately resolve the future with
-     * the specified value.
-     * @param t the exception to use to resolve the future
-     */
-    public void waitForGetAndResolve(Throwable t) {
-        waitForGet();
-        resolve(t);
-    }
-
-    private void waitForGet() {
-        try {
-            getCalledLatch.await();
-        } catch (InterruptedException e) {
-            throw new RuntimeException("Unexpected interruption: ", e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/ThreadedTest.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/ThreadedTest.java 
b/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/ThreadedTest.java
deleted file mode 100644
index ed99247..0000000
--- 
a/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/ThreadedTest.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.util;
-
-import org.junit.After;
-import org.junit.Before;
-
-/**
- * Base class for tests that use threads. It sets up uncaught exception 
handlers for all known
- * thread classes and checks for errors at the end of the test so that 
failures in background
- * threads will cause the test to fail.
- */
-public class ThreadedTest {
-
-    protected TestBackgroundThreadExceptionHandler 
backgroundThreadExceptionHandler;
-
-    @Before
-    public void setup() {
-        backgroundThreadExceptionHandler = new 
TestBackgroundThreadExceptionHandler();
-        ShutdownableThread.funcaughtExceptionHandler = 
backgroundThreadExceptionHandler;
-    }
-
-    @After
-    public void teardown() {
-        backgroundThreadExceptionHandler.verifyNoExceptions();
-        ShutdownableThread.funcaughtExceptionHandler = null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/runtime/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/resources/log4j.properties 
b/copycat/runtime/src/test/resources/log4j.properties
deleted file mode 100644
index d5e90fe..0000000
--- a/copycat/runtime/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,23 +0,0 @@
-##
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-##
-log4j.rootLogger=OFF, stdout
-
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
-
-log4j.logger.org.apache.kafka=ERROR

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala 
b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
index c1278e4..5be410c 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
@@ -164,7 +164,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     assertEquals(Errors.NONE.code, joinGroupResult.errorCode)
 
     EasyMock.reset(replicaManager)
-    val otherJoinGroupResult = joinGroup(groupId, otherMemberId, 
DefaultSessionTimeout, "copycat", protocols)
+    val otherJoinGroupResult = joinGroup(groupId, otherMemberId, 
DefaultSessionTimeout, "connect", protocols)
     assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL.code, 
otherJoinGroupResult.errorCode)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/docs/security.html
----------------------------------------------------------------------
diff --git a/docs/security.html b/docs/security.html
index d250553..80e30bc 100644
--- a/docs/security.html
+++ b/docs/security.html
@@ -224,7 +224,7 @@ Apache Kafka allows clients to connect over SSL. By default 
SSL is disabled but
             </ol>
         </li>
         <li><b><a name="jaas_client">Creating Client Side JAAS 
Config</a></b><br>
-        Clients (producers, consumers, copycat workers, etc) will authenticate 
to the cluster with their own principal (usually with the same name as the user 
used for running the client), so obtain or create these principals as needed. 
Then create a JAAS file as follows:
+        Clients (producers, consumers, connect workers, etc) will authenticate 
to the cluster with their own principal (usually with the same name as the user 
used for running the client), so obtain or create these principals as needed. 
Then create a JAAS file as follows:
             <pre>
                 KafkaClient {
                     com.sun.security.auth.module.Krb5LoginModule required

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/settings.gradle
----------------------------------------------------------------------
diff --git a/settings.gradle b/settings.gradle
index 357305b..0875611 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -15,4 +15,4 @@
 
 apply from: file('scala.gradle')
 include 'core', 'contrib:hadoop-consumer', 'contrib:hadoop-producer', 
'examples', 'clients', 'tools', 'streams', 'log4j-appender',
-        'copycat:api', 'copycat:runtime', 'copycat:json', 'copycat:file'
\ No newline at end of file
+        'connect:api', 'connect:runtime', 'connect:json', 'connect:file'
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/tests/kafkatest/services/connect.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/connect.py 
b/tests/kafkatest/services/connect.py
new file mode 100644
index 0000000..fbac565
--- /dev/null
+++ b/tests/kafkatest/services/connect.py
@@ -0,0 +1,191 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.services.service import Service
+from ducktape.utils.util import wait_until
+from ducktape.errors import DucktapeError
+
+from kafkatest.services.kafka.directory import kafka_dir
+import signal, random, requests
+
+class ConnectServiceBase(Service):
+    """Base class for Kafka Connect services providing some common settings 
and functionality"""
+
+    logs = {
+        "kafka_log": {
+            "path": "/mnt/connect.log",
+            "collect_default": True},
+    }
+
+    def __init__(self, context, num_nodes, kafka, files):
+        super(ConnectServiceBase, self).__init__(context, num_nodes)
+        self.kafka = kafka
+        self.files = files
+
+    def pids(self, node):
+        """Return process ids for Kafka Connect processes."""
+        try:
+            return [pid for pid in node.account.ssh_capture("cat 
/mnt/connect.pid", callback=int)]
+        except:
+            return []
+
+    def set_configs(self, config_template_func, 
connector_config_templates=None):
+        """
+        Set configurations for the worker and the connector to run on
+        it. These are not provided in the constructor because the worker
+        config generally needs access to ZK/Kafka services to
+        create the configuration.
+        """
+        self.config_template_func = config_template_func
+        self.connector_config_templates = connector_config_templates
+
+    def stop_node(self, node, clean_shutdown=True):
+        pids = self.pids(node)
+        sig = signal.SIGTERM if clean_shutdown else signal.SIGKILL
+
+        for pid in pids:
+            node.account.signal(pid, sig, allow_fail=False)
+        for pid in pids:
+            wait_until(lambda: not node.account.alive(pid), timeout_sec=10, 
err_msg="Kafka Connect standalone process took too long to exit")
+
+        node.account.ssh("rm -f /mnt/connect.pid", allow_fail=False)
+
+    def restart(self):
+        # We don't want to do any clean up here, just restart the process.
+        for node in self.nodes:
+            self.stop_node(node)
+            self.start_node(node)
+
+    def clean_node(self, node):
+        if len(self.pids(node)) > 0:
+            self.logger.warn("%s %s was still alive at cleanup time. Killing 
forcefully..." %
+                             (self.__class__.__name__, node.account))
+        for pid in self.pids(node):
+            node.account.signal(pid, signal.SIGKILL, allow_fail=False)
+
+        node.account.ssh("rm -rf /mnt/connect.pid /mnt/connect.log 
/mnt/connect.properties  " + " ".join(self.config_filenames() + self.files), 
allow_fail=False)
+
+    def config_filenames(self):
+        return ["/mnt/connect-connector-" + str(idx) + ".properties" for idx, 
template in enumerate(self.connector_config_templates or [])]
+
+
+    def list_connectors(self, node=None):
+        return self._rest('/connectors', node=node)
+
+    def create_connector(self, config, node=None):
+        create_request = {
+            'name': config['name'],
+            'config': config
+        }
+        return self._rest('/connectors', create_request, node=node, 
method="POST")
+
+    def get_connector(self, name, node=None):
+        return self._rest('/connectors/' + name, node=node)
+
+    def get_connector_config(self, name, node=None):
+        return self._rest('/connectors/' + name + '/config', node=node)
+
+    def set_connector_config(self, name, config, node=None):
+        return self._rest('/connectors/' + name + '/config', config, 
node=node, method="PUT")
+
+    def get_connector_tasks(self, name, node=None):
+        return self._rest('/connectors/' + name + '/tasks', node=node)
+
+    def delete_connector(self, name, node=None):
+        return self._rest('/connectors/' + name, node=node, method="DELETE")
+
+    def _rest(self, path, body=None, node=None, method="GET"):
+        if node is None:
+            node = random.choice(self.nodes)
+
+        meth = getattr(requests, method.lower())
+        url = self._base_url(node) + path
+        resp = meth(url, json=body)
+        self.logger.debug("%s %s response: %d", url, method, resp.status_code)
+        if resp.status_code > 400:
+            raise ConnectRestError(resp.status_code, resp.text, resp.url)
+        if resp.status_code == 204:
+            return None
+        else:
+            return resp.json()
+
+
+    def _base_url(self, node):
+        return 'http://' + node.account.hostname + ':' + '8083'
+
+class ConnectStandaloneService(ConnectServiceBase):
+    """Runs Kafka Connect in standalone mode."""
+
+    def __init__(self, context, kafka, files):
+        super(ConnectStandaloneService, self).__init__(context, 1, kafka, 
files)
+
+    # For convenience since this service only makes sense with a single node
+    @property
+    def node(self):
+        return self.nodes[0]
+
+    def start_node(self, node):
+        node.account.create_file("/mnt/connect.properties", 
self.config_template_func(node))
+        remote_connector_configs = []
+        for idx, template in enumerate(self.connector_config_templates):
+            target_file = "/mnt/connect-connector-" + str(idx) + ".properties"
+            node.account.create_file(target_file, template)
+            remote_connector_configs.append(target_file)
+
+        self.logger.info("Starting Kafka Connect standalone process")
+        with node.account.monitor_log("/mnt/connect.log") as monitor:
+            node.account.ssh("/opt/%s/bin/connect-standalone.sh 
/mnt/connect.properties " % kafka_dir(node) +
+                             " ".join(remote_connector_configs) +
+                             " 1>> /mnt/connect.log 2>> /mnt/connect.log & 
echo $! > /mnt/connect.pid")
+            monitor.wait_until('Kafka Connect started', timeout_sec=10, 
err_msg="Never saw message indicating Kafka Connect finished startup")
+
+        if len(self.pids(node)) == 0:
+            raise RuntimeError("No process ids recorded")
+
+
+class ConnectDistributedService(ConnectServiceBase):
+    """Runs Kafka Connect in distributed mode."""
+
+    def __init__(self, context, num_nodes, kafka, files, 
offsets_topic="connect-offsets", configs_topic="connect-configs"):
+        super(ConnectDistributedService, self).__init__(context, num_nodes, 
kafka, files)
+        self.offsets_topic = offsets_topic
+        self.configs_topic = configs_topic
+
+    def start_node(self, node):
+        node.account.create_file("/mnt/connect.properties", 
self.config_template_func(node))
+        if self.connector_config_templates:
+            raise DucktapeError("Config files are not valid in distributed 
mode, submit connectors via the REST API")
+
+        self.logger.info("Starting Kafka Connect distributed process")
+        with node.account.monitor_log("/mnt/connect.log") as monitor:
+            cmd = "/opt/%s/bin/connect-distributed.sh /mnt/connect.properties 
" % kafka_dir(node)
+            cmd += " 1>> /mnt/connect.log 2>> /mnt/connect.log & echo $! > 
/mnt/connect.pid"
+            node.account.ssh(cmd)
+            monitor.wait_until('Kafka Connect started', timeout_sec=10, 
err_msg="Never saw message indicating Kafka Connect finished startup")
+
+        if len(self.pids(node)) == 0:
+            raise RuntimeError("No process ids recorded")
+
+
+
+
+class ConnectRestError(RuntimeError):
+    def __init__(self, status, msg, url):
+        self.status = status
+        self.message = msg
+        self.url = url
+
+    def __unicode__(self):
+        return "Kafka Connect REST call failed: returned " + self.status + " 
for " + self.url + ". Response: " + self.message
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/tests/kafkatest/services/copycat.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/copycat.py 
b/tests/kafkatest/services/copycat.py
deleted file mode 100644
index 5ad9dd5..0000000
--- a/tests/kafkatest/services/copycat.py
+++ /dev/null
@@ -1,191 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ducktape.services.service import Service
-from ducktape.utils.util import wait_until
-from ducktape.errors import DucktapeError
-
-from kafkatest.services.kafka.directory import kafka_dir
-import signal, random, requests
-
-class CopycatServiceBase(Service):
-    """Base class for Copycat services providing some common settings and 
functionality"""
-
-    logs = {
-        "kafka_log": {
-            "path": "/mnt/copycat.log",
-            "collect_default": True},
-    }
-
-    def __init__(self, context, num_nodes, kafka, files):
-        super(CopycatServiceBase, self).__init__(context, num_nodes)
-        self.kafka = kafka
-        self.files = files
-
-    def pids(self, node):
-        """Return process ids for Copycat processes."""
-        try:
-            return [pid for pid in node.account.ssh_capture("cat 
/mnt/copycat.pid", callback=int)]
-        except:
-            return []
-
-    def set_configs(self, config_template_func, 
connector_config_templates=None):
-        """
-        Set configurations for the worker and the connector to run on
-        it. These are not provided in the constructor because the worker
-        config generally needs access to ZK/Kafka services to
-        create the configuration.
-        """
-        self.config_template_func = config_template_func
-        self.connector_config_templates = connector_config_templates
-
-    def stop_node(self, node, clean_shutdown=True):
-        pids = self.pids(node)
-        sig = signal.SIGTERM if clean_shutdown else signal.SIGKILL
-
-        for pid in pids:
-            node.account.signal(pid, sig, allow_fail=False)
-        for pid in pids:
-            wait_until(lambda: not node.account.alive(pid), timeout_sec=10, 
err_msg="Copycat standalone process took too long to exit")
-
-        node.account.ssh("rm -f /mnt/copycat.pid", allow_fail=False)
-
-    def restart(self):
-        # We don't want to do any clean up here, just restart the process.
-        for node in self.nodes:
-            self.stop_node(node)
-            self.start_node(node)
-
-    def clean_node(self, node):
-        if len(self.pids(node)) > 0:
-            self.logger.warn("%s %s was still alive at cleanup time. Killing 
forcefully..." %
-                             (self.__class__.__name__, node.account))
-        for pid in self.pids(node):
-            node.account.signal(pid, signal.SIGKILL, allow_fail=False)
-
-        node.account.ssh("rm -rf /mnt/copycat.pid /mnt/copycat.log 
/mnt/copycat.properties  " + " ".join(self.config_filenames() + self.files), 
allow_fail=False)
-
-    def config_filenames(self):
-        return ["/mnt/copycat-connector-" + str(idx) + ".properties" for idx, 
template in enumerate(self.connector_config_templates or [])]
-
-
-    def list_connectors(self, node=None):
-        return self._rest('/connectors', node=node)
-
-    def create_connector(self, config, node=None):
-        create_request = {
-            'name': config['name'],
-            'config': config
-        }
-        return self._rest('/connectors', create_request, node=node, 
method="POST")
-
-    def get_connector(self, name, node=None):
-        return self._rest('/connectors/' + name, node=node)
-
-    def get_connector_config(self, name, node=None):
-        return self._rest('/connectors/' + name + '/config', node=node)
-
-    def set_connector_config(self, name, config, node=None):
-        return self._rest('/connectors/' + name + '/config', config, 
node=node, method="PUT")
-
-    def get_connector_tasks(self, name, node=None):
-        return self._rest('/connectors/' + name + '/tasks', node=node)
-
-    def delete_connector(self, name, node=None):
-        return self._rest('/connectors/' + name, node=node, method="DELETE")
-
-    def _rest(self, path, body=None, node=None, method="GET"):
-        if node is None:
-            node = random.choice(self.nodes)
-
-        meth = getattr(requests, method.lower())
-        url = self._base_url(node) + path
-        resp = meth(url, json=body)
-        self.logger.debug("%s %s response: %d", url, method, resp.status_code)
-        if resp.status_code > 400:
-            raise CopycatRestError(resp.status_code, resp.text, resp.url)
-        if resp.status_code == 204:
-            return None
-        else:
-            return resp.json()
-
-
-    def _base_url(self, node):
-        return 'http://' + node.account.hostname + ':' + '8083'
-
-class CopycatStandaloneService(CopycatServiceBase):
-    """Runs Copycat in standalone mode."""
-
-    def __init__(self, context, kafka, files):
-        super(CopycatStandaloneService, self).__init__(context, 1, kafka, 
files)
-
-    # For convenience since this service only makes sense with a single node
-    @property
-    def node(self):
-        return self.nodes[0]
-
-    def start_node(self, node):
-        node.account.create_file("/mnt/copycat.properties", 
self.config_template_func(node))
-        remote_connector_configs = []
-        for idx, template in enumerate(self.connector_config_templates):
-            target_file = "/mnt/copycat-connector-" + str(idx) + ".properties"
-            node.account.create_file(target_file, template)
-            remote_connector_configs.append(target_file)
-
-        self.logger.info("Starting Copycat standalone process")
-        with node.account.monitor_log("/mnt/copycat.log") as monitor:
-            node.account.ssh("/opt/%s/bin/copycat-standalone.sh 
/mnt/copycat.properties " % kafka_dir(node) +
-                             " ".join(remote_connector_configs) +
-                             " 1>> /mnt/copycat.log 2>> /mnt/copycat.log & 
echo $! > /mnt/copycat.pid")
-            monitor.wait_until('Copycat started', timeout_sec=10, 
err_msg="Never saw message indicating Copycat finished startup")
-
-        if len(self.pids(node)) == 0:
-            raise RuntimeError("No process ids recorded")
-
-
-class CopycatDistributedService(CopycatServiceBase):
-    """Runs Copycat in distributed mode."""
-
-    def __init__(self, context, num_nodes, kafka, files, 
offsets_topic="copycat-offsets", configs_topic="copycat-configs"):
-        super(CopycatDistributedService, self).__init__(context, num_nodes, 
kafka, files)
-        self.offsets_topic = offsets_topic
-        self.configs_topic = configs_topic
-
-    def start_node(self, node):
-        node.account.create_file("/mnt/copycat.properties", 
self.config_template_func(node))
-        if self.connector_config_templates:
-            raise DucktapeError("Config files are not valid in distributed 
mode, submit connectors via the REST API")
-
-        self.logger.info("Starting Copycat distributed process")
-        with node.account.monitor_log("/mnt/copycat.log") as monitor:
-            cmd = "/opt/%s/bin/copycat-distributed.sh /mnt/copycat.properties 
" % kafka_dir(node)
-            cmd += " 1>> /mnt/copycat.log 2>> /mnt/copycat.log & echo $! > 
/mnt/copycat.pid"
-            node.account.ssh(cmd)
-            monitor.wait_until('Copycat started', timeout_sec=10, 
err_msg="Never saw message indicating Copycat finished startup")
-
-        if len(self.pids(node)) == 0:
-            raise RuntimeError("No process ids recorded")
-
-
-
-
-class CopycatRestError(RuntimeError):
-    def __init__(self, status, msg, url):
-        self.status = status
-        self.message = msg
-        self.url = url
-
-    def __unicode__(self):
-        return "Copycat REST call failed: returned " + self.status + " for " + 
self.url + ". Response: " + self.message
\ No newline at end of file

Reply via email to