http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
new file mode 100644
index 0000000..65bd9d0
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.storage;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.ConvertingFutureCallback;
+import org.apache.kafka.connect.util.KafkaBasedLog;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * <p>
+ *     Implementation of OffsetBackingStore that uses a Kafka topic to store 
offset data.
+ * </p>
+ * <p>
+ *     Internally, this implementation both produces to and consumes from a 
Kafka topic which stores the offsets.
+ *     It accepts producer and consumer overrides via its configuration but 
forces some settings to specific values
+ *     to ensure correct behavior (e.g. acks, auto.offset.reset).
+ * </p>
+ */
+public class KafkaOffsetBackingStore implements OffsetBackingStore {
+    private static final Logger log = 
LoggerFactory.getLogger(KafkaOffsetBackingStore.class);
+
+    public final static String OFFSET_STORAGE_TOPIC_CONFIG = 
"offset.storage.topic";
+
+    private KafkaBasedLog<byte[], byte[]> offsetLog;
+    private HashMap<ByteBuffer, ByteBuffer> data;
+
+    @Override
+    public void configure(Map<String, ?> configs) {
+        String topic = (String) configs.get(OFFSET_STORAGE_TOPIC_CONFIG);
+        if (topic == null)
+            throw new ConnectException("Offset storage topic must be 
specified");
+
+        data = new HashMap<>();
+
+        Map<String, Object> producerProps = new HashMap<>();
+        producerProps.putAll(configs);
+        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArraySerializer");
+        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArraySerializer");
+        producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
+
+        Map<String, Object> consumerProps = new HashMap<>();
+        consumerProps.putAll(configs);
+        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
+        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
+        consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+
+        offsetLog = createKafkaBasedLog(topic, producerProps, consumerProps, 
consumedCallback);
+    }
+
+    @Override
+    public void start() {
+        log.info("Starting KafkaOffsetBackingStore");
+        offsetLog.start();
+        log.info("Finished reading offsets topic and starting 
KafkaOffsetBackingStore");
+    }
+
+    @Override
+    public void stop() {
+        log.info("Stopping KafkaOffsetBackingStore");
+        offsetLog.stop();
+        log.info("Stopped KafkaOffsetBackingStore");
+    }
+
+    @Override
+    public Future<Map<ByteBuffer, ByteBuffer>> get(final 
Collection<ByteBuffer> keys,
+                                                   final 
Callback<Map<ByteBuffer, ByteBuffer>> callback) {
+        ConvertingFutureCallback<Void, Map<ByteBuffer, ByteBuffer>> future = 
new ConvertingFutureCallback<Void, Map<ByteBuffer, ByteBuffer>>(callback) {
+            @Override
+            public Map<ByteBuffer, ByteBuffer> convert(Void result) {
+                Map<ByteBuffer, ByteBuffer> values = new HashMap<>();
+                for (ByteBuffer key : keys)
+                    values.put(key, data.get(key));
+                return values;
+            }
+        };
+        // This operation may be relatively (but not too) expensive since it 
always requires checking end offsets, even
+        // if we've already read up to the end. However, it also should not be 
common (offsets should only be read when
+        // resetting a task). Always requiring that we read to the end is 
simpler than trying to differentiate when it
+        // is safe not to (which should only be if we *know* we've maintained 
ownership since the last write).
+        offsetLog.readToEnd(future);
+        return future;
+    }
+
+    @Override
+    public Future<Void> set(final Map<ByteBuffer, ByteBuffer> values, final 
Callback<Void> callback) {
+        SetCallbackFuture producerCallback = new 
SetCallbackFuture(values.size(), callback);
+
+        for (Map.Entry<ByteBuffer, ByteBuffer> entry : values.entrySet())
+            offsetLog.send(entry.getKey().array(), entry.getValue().array(), 
producerCallback);
+
+        return producerCallback;
+    }
+
+    private final Callback<ConsumerRecord<byte[], byte[]>> consumedCallback = 
new Callback<ConsumerRecord<byte[], byte[]>>() {
+        @Override
+        public void onCompletion(Throwable error, ConsumerRecord<byte[], 
byte[]> record) {
+            ByteBuffer key = record.key() != null ? 
ByteBuffer.wrap(record.key()) : null;
+            ByteBuffer value = record.value() != null ? 
ByteBuffer.wrap(record.value()) : null;
+            data.put(key, value);
+        }
+    };
+
+    private KafkaBasedLog<byte[], byte[]> createKafkaBasedLog(String topic, 
Map<String, Object> producerProps,
+                                                              Map<String, 
Object> consumerProps, Callback<ConsumerRecord<byte[], byte[]>> 
consumedCallback) {
+        return new KafkaBasedLog<>(topic, producerProps, consumerProps, 
consumedCallback, new SystemTime());
+    }
+
+    private static class SetCallbackFuture implements 
org.apache.kafka.clients.producer.Callback, Future<Void> {
+        private int numLeft;
+        private boolean completed = false;
+        private Throwable exception = null;
+        private final Callback<Void> callback;
+
+        public SetCallbackFuture(int numRecords, Callback<Void> callback) {
+            numLeft = numRecords;
+            this.callback = callback;
+        }
+
+        @Override
+        public synchronized void onCompletion(RecordMetadata metadata, 
Exception exception) {
+            if (exception != null) {
+                if (!completed) {
+                    this.exception = exception;
+                    callback.onCompletion(exception, null);
+                    completed = true;
+                    this.notify();
+                }
+                return;
+            }
+
+            numLeft -= 1;
+            if (numLeft == 0) {
+                callback.onCompletion(null, null);
+                completed = true;
+                this.notify();
+            }
+        }
+
+        @Override
+        public synchronized boolean cancel(boolean mayInterruptIfRunning) {
+            return false;
+        }
+
+        @Override
+        public synchronized boolean isCancelled() {
+            return false;
+        }
+
+        @Override
+        public synchronized boolean isDone() {
+            return completed;
+        }
+
+        @Override
+        public synchronized Void get() throws InterruptedException, 
ExecutionException {
+            while (!completed) {
+                this.wait();
+            }
+            if (exception != null)
+                throw new ExecutionException(exception);
+            return null;
+        }
+
+        @Override
+        public synchronized Void get(long timeout, TimeUnit unit) throws 
InterruptedException, ExecutionException, TimeoutException {
+            long started = System.currentTimeMillis();
+            long limit = started + unit.toMillis(timeout);
+            while (!completed) {
+                long leftMs = limit - System.currentTimeMillis();
+                if (leftMs < 0)
+                    throw new TimeoutException("KafkaOffsetBackingStore Future 
timed out.");
+                this.wait(leftMs);
+            }
+            if (exception != null)
+                throw new ExecutionException(exception);
+            return null;
+        }
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryOffsetBackingStore.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryOffsetBackingStore.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryOffsetBackingStore.java
new file mode 100644
index 0000000..d62e38f
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryOffsetBackingStore.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.storage;
+
+import org.apache.kafka.connect.util.Callback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+/**
+ * Implementation of OffsetBackingStore that doesn't actually persist any 
data. To ensure this
+ * behaves similarly to a real backing store, operations are executed 
asynchronously on a
+ * background thread.
+ */
+public class MemoryOffsetBackingStore implements OffsetBackingStore {
+    private static final Logger log = 
LoggerFactory.getLogger(MemoryOffsetBackingStore.class);
+
+    protected Map<ByteBuffer, ByteBuffer> data = new HashMap<>();
+    protected ExecutorService executor = Executors.newSingleThreadExecutor();
+
+    public MemoryOffsetBackingStore() {
+
+    }
+
+    @Override
+    public void configure(Map<String, ?> props) {
+    }
+
+    @Override
+    public synchronized void start() {
+    }
+
+    @Override
+    public synchronized void stop() {
+        // Nothing to do since this doesn't maintain any outstanding 
connections/data
+    }
+
+    @Override
+    public Future<Map<ByteBuffer, ByteBuffer>> get(
+            final Collection<ByteBuffer> keys,
+            final Callback<Map<ByteBuffer, ByteBuffer>> callback) {
+        return executor.submit(new Callable<Map<ByteBuffer, ByteBuffer>>() {
+            @Override
+            public Map<ByteBuffer, ByteBuffer> call() throws Exception {
+                Map<ByteBuffer, ByteBuffer> result = new HashMap<>();
+                synchronized (MemoryOffsetBackingStore.this) {
+                    for (ByteBuffer key : keys) {
+                        result.put(key, data.get(key));
+                    }
+                }
+                if (callback != null)
+                    callback.onCompletion(null, result);
+                return result;
+            }
+        });
+
+    }
+
+    @Override
+    public Future<Void> set(final Map<ByteBuffer, ByteBuffer> values,
+                            final Callback<Void> callback) {
+        return executor.submit(new Callable<Void>() {
+            @Override
+            public Void call() throws Exception {
+                synchronized (MemoryOffsetBackingStore.this) {
+                    for (Map.Entry<ByteBuffer, ByteBuffer> entry : 
values.entrySet()) {
+                        data.put(entry.getKey(), entry.getValue());
+                    }
+                    save();
+                }
+                if (callback != null)
+                    callback.onCompletion(null, null);
+                return null;
+            }
+        });
+    }
+
+    // Hook to allow subclasses to persist data
+    protected void save() {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetBackingStore.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetBackingStore.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetBackingStore.java
new file mode 100644
index 0000000..83fdb53
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetBackingStore.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.storage;
+
+import org.apache.kafka.common.Configurable;
+import org.apache.kafka.connect.util.Callback;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.Future;
+
+/**
+ * <p>
+ * OffsetBackingStore is an interface for storage backends that store 
key-value data. The backing
+ * store doesn't need to handle serialization or deserialization. It only 
needs to support
+ * reading/writing bytes. Since it is expected these operations will require 
network
+ * operations, only bulk operations are supported.
+ * </p>
+ * <p>
+ * Since OffsetBackingStore is a shared resource that may be used by many 
OffsetStorage instances
+ * that are associated with individual tasks, the caller must be sure keys 
include information about the
+ * connector so that the shared namespace does not result in conflicting keys.
+ * </p>
+ */
+public interface OffsetBackingStore extends Configurable {
+
+    /**
+     * Start this offset store.
+     */
+    public void start();
+
+    /**
+     * Stop the backing store. Implementations should attempt to shutdown 
gracefully, but not block
+     * indefinitely.
+     */
+    public void stop();
+
+    /**
+     * Get the values for the specified keys
+     * @param keys list of keys to look up
+     * @param callback callback to invoke on completion
+     * @return future for the resulting map from key to value
+     */
+    public Future<Map<ByteBuffer, ByteBuffer>> get(
+            Collection<ByteBuffer> keys,
+            Callback<Map<ByteBuffer, ByteBuffer>> callback);
+
+    /**
+     * Set the specified keys and values.
+     * @param values map from key to value
+     * @param callback callback to invoke on completion
+     * @return void future for the operation
+     */
+    public Future<Void> set(Map<ByteBuffer, ByteBuffer> values,
+                            Callback<Void> callback);
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageReaderImpl.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageReaderImpl.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageReaderImpl.java
new file mode 100644
index 0000000..23c1019
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageReaderImpl.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.storage;
+
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Implementation of OffsetStorageReader. Unlike OffsetStorageWriter which is 
implemented
+ * directly, the interface is only separate from this implementation because 
it needs to be
+ * included in the public API package.
+ */
+public class OffsetStorageReaderImpl implements OffsetStorageReader {
+    private static final Logger log = 
LoggerFactory.getLogger(OffsetStorageReaderImpl.class);
+
+    private final OffsetBackingStore backingStore;
+    private final String namespace;
+    private final Converter keyConverter;
+    private final Converter valueConverter;
+
+    public OffsetStorageReaderImpl(OffsetBackingStore backingStore, String 
namespace,
+                                   Converter keyConverter, Converter 
valueConverter) {
+        this.backingStore = backingStore;
+        this.namespace = namespace;
+        this.keyConverter = keyConverter;
+        this.valueConverter = valueConverter;
+    }
+
+    @Override
+    public <T> Map<String, Object> offset(Map<String, T> partition) {
+        return offsets(Arrays.asList(partition)).get(partition);
+    }
+
+    @Override
+    public <T> Map<Map<String, T>, Map<String, Object>> 
offsets(Collection<Map<String, T>> partitions) {
+        // Serialize keys so backing store can work with them
+        Map<ByteBuffer, Map<String, T>> serializedToOriginal = new 
HashMap<>(partitions.size());
+        for (Map<String, T> key : partitions) {
+            try {
+                // Offsets are treated as schemaless, their format is only 
validated here (and the returned value below)
+                OffsetUtils.validateFormat(key);
+                byte[] keySerialized = keyConverter.fromConnectData(namespace, 
null, Arrays.asList(namespace, key));
+                ByteBuffer keyBuffer = (keySerialized != null) ? 
ByteBuffer.wrap(keySerialized) : null;
+                serializedToOriginal.put(keyBuffer, key);
+            } catch (Throwable t) {
+                log.error("CRITICAL: Failed to serialize partition key when 
getting offsets for task with "
+                        + "namespace {}. No value for this data will be 
returned, which may break the "
+                        + "task or cause it to skip some data.", namespace, t);
+            }
+        }
+
+        // Get serialized key -> serialized value from backing store
+        Map<ByteBuffer, ByteBuffer> raw;
+        try {
+            raw = backingStore.get(serializedToOriginal.keySet(), null).get();
+        } catch (Exception e) {
+            log.error("Failed to fetch offsets from namespace {}: ", 
namespace, e);
+            throw new ConnectException("Failed to fetch offsets.", e);
+        }
+
+        // Deserialize all the values and map back to the original keys
+        Map<Map<String, T>, Map<String, Object>> result = new 
HashMap<>(partitions.size());
+        for (Map.Entry<ByteBuffer, ByteBuffer> rawEntry : raw.entrySet()) {
+            try {
+                // Since null could be a valid key, explicitly check whether 
map contains the key
+                if (!serializedToOriginal.containsKey(rawEntry.getKey())) {
+                    log.error("Should be able to map {} back to a requested 
partition-offset key, backing "
+                            + "store may have returned invalid data", 
rawEntry.getKey());
+                    continue;
+                }
+                Map<String, T> origKey = 
serializedToOriginal.get(rawEntry.getKey());
+                SchemaAndValue deserializedSchemaAndValue = 
valueConverter.toConnectData(namespace, rawEntry.getValue() != null ? 
rawEntry.getValue().array() : null);
+                Object deserializedValue = deserializedSchemaAndValue.value();
+                OffsetUtils.validateFormat(deserializedValue);
+
+                result.put(origKey, (Map<String, Object>) deserializedValue);
+            } catch (Throwable t) {
+                log.error("CRITICAL: Failed to deserialize offset data when 
getting offsets for task with"
+                        + " namespace {}. No value for this data will be 
returned, which may break the "
+                        + "task or cause it to skip some data. This could 
either be due to an error in "
+                        + "the connector implementation or incompatible 
schema.", namespace, t);
+            }
+        }
+
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java
new file mode 100644
index 0000000..58376e5
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.storage;
+
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.util.Callback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.Future;
+
+/**
+ * <p>
+ * OffsetStorageWriter is a buffered writer that wraps the simple 
OffsetBackingStore interface.
+ * It maintains a copy of the key-value data in memory and buffers writes. It 
allows you to take
+ * a snapshot, which can then be asynchronously flushed to the backing store 
while new writes
+ * continue to be processed. This allows Kafka Connect to process offset 
commits in the background
+ * while continuing to process messages.
+ * </p>
+ * <p>
+ * Connect uses an OffsetStorage implementation to save state about the 
current progress of
+ * source (import to Kafka) jobs, which may have many input partitions and 
"offsets" may not be as
+ * simple as they are for Kafka partitions or files. Offset storage is not 
required for sink jobs
+ * because they can use Kafka's native offset storage (or the sink data store 
can handle offset
+ * storage to achieve exactly once semantics).
+ * </p>
+ * <p>
+ * Both partitions and offsets are generic data objects. This allows different 
connectors to use
+ * whatever representation they need, even arbitrarily complex records. These 
are translated
+ * internally into the serialized form the OffsetBackingStore uses.
+ * </p>
+ * <p>
+ * Note that this only provides write functionality. This is intentional to 
ensure stale data is
+ * never read. Offset data should only be read during startup or 
reconfiguration of a task. By
+ * always serving those requests by reading the values from the backing store, 
we ensure we never
+ * accidentally use stale data. (One example of how this can occur: a task is 
processing input
+ * partition A, writing offsets; reconfiguration causes partition A to be 
reassigned elsewhere;
+ * reconfiguration causes partition A to be reassigned to this node, but now 
the offset data is out
+ * of date). Since these offsets are created and managed by the connector 
itself, there's no way
+ * for the offset management layer to know which keys are "owned" by which 
tasks at any given
+ * time.
+ * </p>
+ * <p>
+ * This class is not thread-safe. It should only be accessed from a Task's 
processing thread.
+ * </p>
+ */
+public class OffsetStorageWriter {
+    private static final Logger log = 
LoggerFactory.getLogger(OffsetStorageWriter.class);
+
+    private final OffsetBackingStore backingStore;
+    private final Converter keyConverter;
+    private final Converter valueConverter;
+    private final String namespace;
+    // Offset data in Connect format
+    private Map<Map<String, Object>, Map<String, Object>> data = new 
HashMap<>();
+
+    // Not synchronized, should only be accessed by flush thread
+    private Map<Map<String, Object>, Map<String, Object>> toFlush = null;
+    // Unique ID for each flush request to handle callbacks after timeouts
+    private long currentFlushId = 0;
+
+    public OffsetStorageWriter(OffsetBackingStore backingStore,
+                               String namespace, Converter keyConverter, 
Converter valueConverter) {
+        this.backingStore = backingStore;
+        this.namespace = namespace;
+        this.keyConverter = keyConverter;
+        this.valueConverter = valueConverter;
+    }
+
+    /**
+     * Set an offset for a partition using Connect data values
+     * @param partition the partition to store an offset for
+     * @param offset the offset
+     */
+    public synchronized void offset(Map<String, ?> partition, Map<String, ?> 
offset) {
+        data.put((Map<String, Object>) partition, (Map<String, Object>) 
offset);
+    }
+
+    private boolean flushing() {
+        return toFlush != null;
+    }
+
+    /**
+     * Performs the first step of a flush operation, snapshotting the current 
state. This does not
+     * actually initiate the flush with the underlying storage.
+     *
+     * @return true if a flush was initiated, false if no data was available
+     */
+    public synchronized boolean beginFlush() {
+        if (flushing()) {
+            log.error("Invalid call to OffsetStorageWriter flush() while 
already flushing, the "
+                    + "framework should not allow this");
+            throw new ConnectException("OffsetStorageWriter is already 
flushing");
+        }
+
+        if (data.isEmpty())
+            return false;
+
+        assert !flushing();
+        toFlush = data;
+        data = new HashMap<>();
+        return true;
+    }
+
+    /**
+     * Flush the current offsets and clear them from this writer. This is 
non-blocking: it
+     * moves the current set of offsets out of the way, serializes the data, 
and asynchronously
+     * writes the data to the backing store. If no offsets need to be written, 
the callback is
+     * still invoked, but no Future is returned.
+     *
+     * @return a Future, or null if there are no offsets to commitOffsets
+     */
+    public Future<Void> doFlush(final Callback<Void> callback) {
+        final long flushId = currentFlushId;
+
+        // Serialize
+        Map<ByteBuffer, ByteBuffer> offsetsSerialized;
+        try {
+            offsetsSerialized = new HashMap<>();
+            for (Map.Entry<Map<String, Object>, Map<String, Object>> entry : 
toFlush.entrySet()) {
+                // Offsets are specified as schemaless to the converter, using 
whatever internal schema is appropriate
+                // for that data. The only enforcement of the format is here.
+                OffsetUtils.validateFormat(entry.getKey());
+                OffsetUtils.validateFormat(entry.getValue());
+                // When serializing the key, we add in the namespace 
information so the key is [namespace, real key]
+                byte[] key = keyConverter.fromConnectData(namespace, null, 
Arrays.asList(namespace, entry.getKey()));
+                ByteBuffer keyBuffer = (key != null) ? ByteBuffer.wrap(key) : 
null;
+                byte[] value = valueConverter.fromConnectData(namespace, null, 
entry.getValue());
+                ByteBuffer valueBuffer = (value != null) ? 
ByteBuffer.wrap(value) : null;
+                offsetsSerialized.put(keyBuffer, valueBuffer);
+            }
+        } catch (Throwable t) {
+            // Must handle errors properly here or the writer will be left 
mid-flush forever and be
+            // unable to make progress.
+            log.error("CRITICAL: Failed to serialize offset data, making it 
impossible to commit "
+                    + "offsets under namespace {}. This likely won't recover 
unless the "
+                    + "unserializable partition or offset information is 
overwritten.", namespace);
+            log.error("Cause of serialization failure:", t);
+            callback.onCompletion(t, null);
+            return null;
+        }
+
+        // And submit the data
+        log.debug("Submitting {} entries to backing store", 
offsetsSerialized.size());
+        return backingStore.set(offsetsSerialized, new Callback<Void>() {
+            @Override
+            public void onCompletion(Throwable error, Void result) {
+                boolean isCurrent = handleFinishWrite(flushId, error, result);
+                if (isCurrent && callback != null)
+                    callback.onCompletion(error, result);
+            }
+        });
+    }
+
+    /**
+     * Cancel a flush that has been initiated by {@link #beginFlush}. This 
should not be called if
+     * {@link #doFlush} has already been invoked. It should be used if an 
operation performed
+     * between beginFlush and doFlush failed.
+     */
+    public synchronized void cancelFlush() {
+        // Verify we're still flushing data to handle a race between 
cancelFlush() calls from up the
+        // call stack and callbacks from the write request to underlying 
storage
+        if (flushing()) {
+            // Just recombine the data and place it back in the primary storage
+            toFlush.putAll(data);
+            data = toFlush;
+            currentFlushId++;
+            toFlush = null;
+        }
+    }
+
+    /**
+     * Handle completion of a write. Returns true if this callback is for the 
current flush
+     * operation, false if it's for an old operation that should now be 
ignored.
+     */
+    private synchronized boolean handleFinishWrite(long flushId, Throwable 
error, Void result) {
+        // Callbacks need to be handled carefully since the flush operation 
may have already timed
+        // out and been cancelled.
+        if (flushId != currentFlushId)
+            return false;
+
+        if (error != null) {
+            cancelFlush();
+        } else {
+            currentFlushId++;
+            toFlush = null;
+        }
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetUtils.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetUtils.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetUtils.java
new file mode 100644
index 0000000..f31715a
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetUtils.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.storage;
+
+import org.apache.kafka.connect.data.ConnectSchema;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.errors.DataException;
+
+import java.util.Map;
+
+public class OffsetUtils {
+    public static void validateFormat(Object offsetData) {
+        if (offsetData == null)
+            return;
+
+        if (!(offsetData instanceof Map))
+            throw new DataException("Offsets must be specified as a Map");
+        validateFormat((Map<Object, Object>) offsetData);
+    }
+
+    public static <K, V> void validateFormat(Map<K, V> offsetData) {
+        // Both keys and values for offsets may be null. For values, this is a 
useful way to delete offsets or indicate
+        // that there's not usable concept of offsets in your source system.
+        if (offsetData == null)
+            return;
+
+        for (Map.Entry<K, V> entry : offsetData.entrySet()) {
+            if (!(entry.getKey() instanceof String))
+                throw new DataException("Offsets may only use String keys");
+
+            Object value = entry.getValue();
+            if (value == null)
+                continue;
+            Schema.Type schemaType = 
ConnectSchema.schemaType(value.getClass());
+            if (!schemaType.isPrimitive())
+                throw new DataException("Offsets may only contain primitive 
types as values, but field " + entry.getKey() + " contains " + schemaType);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/util/Callback.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/Callback.java 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/Callback.java
new file mode 100644
index 0000000..0db2ce2
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/Callback.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.util;
+
+/**
+ * Generic interface for callbacks
+ */
+public interface Callback<V> {
+    /**
+     * Invoked upon completion of the operation.
+     *
+     * @param error the error that caused the operation to fail, or null if no 
error occurred
+     * @param result the return value, or null if the operation failed
+     */
+    void onCompletion(Throwable error, V result);
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectorTaskId.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectorTaskId.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectorTaskId.java
new file mode 100644
index 0000000..8176d82
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectorTaskId.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.util;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.io.Serializable;
+
+/**
+ * Unique ID for a single task. It includes a unique connector ID and a task 
ID that is unique within
+ * the connector.
+ */
+public class ConnectorTaskId implements Serializable, 
Comparable<ConnectorTaskId> {
+    private final String connector;
+    private final int task;
+
+    @JsonCreator
+    public ConnectorTaskId(@JsonProperty("connector") String connector, 
@JsonProperty("task") int task) {
+        this.connector = connector;
+        this.task = task;
+    }
+
+    @JsonProperty
+    public String connector() {
+        return connector;
+    }
+
+    @JsonProperty
+    public int task() {
+        return task;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        ConnectorTaskId that = (ConnectorTaskId) o;
+
+        if (task != that.task)
+            return false;
+        if (connector != null ? !connector.equals(that.connector) : 
that.connector != null)
+            return false;
+
+        return true;
+    }
+
+    @Override
+    public int hashCode() {
+        int result = connector != null ? connector.hashCode() : 0;
+        result = 31 * result + task;
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return connector + '-' + task;
+    }
+
+    @Override
+    public int compareTo(ConnectorTaskId o) {
+        int connectorCmp = connector.compareTo(o.connector);
+        if (connectorCmp != 0)
+            return connectorCmp;
+        return ((Integer) task).compareTo(o.task);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConvertingFutureCallback.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConvertingFutureCallback.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConvertingFutureCallback.java
new file mode 100644
index 0000000..88bc9a1
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConvertingFutureCallback.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.util;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public abstract class ConvertingFutureCallback<U, T> implements Callback<U>, 
Future<T> {
+
+    private Callback<T> underlying;
+    private CountDownLatch finishedLatch;
+    private T result = null;
+    private Throwable exception = null;
+
+    public ConvertingFutureCallback(Callback<T> underlying) {
+        this.underlying = underlying;
+        this.finishedLatch = new CountDownLatch(1);
+    }
+
+    public abstract T convert(U result);
+
+    @Override
+    public void onCompletion(Throwable error, U result) {
+        this.exception = error;
+        this.result = convert(result);
+        if (underlying != null)
+            underlying.onCompletion(error, this.result);
+        finishedLatch.countDown();
+    }
+
+    @Override
+    public boolean cancel(boolean b) {
+        return false;
+    }
+
+    @Override
+    public boolean isCancelled() {
+        return false;
+    }
+
+    @Override
+    public boolean isDone() {
+        return finishedLatch.getCount() == 0;
+    }
+
+    @Override
+    public T get() throws InterruptedException, ExecutionException {
+        finishedLatch.await();
+        return result();
+    }
+
+    @Override
+    public T get(long l, TimeUnit timeUnit)
+            throws InterruptedException, ExecutionException, TimeoutException {
+        if (!finishedLatch.await(l, timeUnit))
+            throw new TimeoutException("Timed out waiting for future");
+        return result();
+    }
+
+    private T result() throws ExecutionException {
+        if (exception != null) {
+            throw new ExecutionException(exception);
+        }
+        return result;
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/util/FutureCallback.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/FutureCallback.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/FutureCallback.java
new file mode 100644
index 0000000..5b0522a
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/FutureCallback.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.util;
+
+public class FutureCallback<T> extends ConvertingFutureCallback<T, T> {
+
+    public FutureCallback(Callback<T> underlying) {
+        super(underlying);
+    }
+
+    public FutureCallback() {
+        super(null);
+    }
+
+    @Override
+    public T convert(T result) {
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
new file mode 100644
index 0000000..3b37076
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
@@ -0,0 +1,331 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.Future;
+
+/**
+ * <p>
+ *     KafkaBasedLog provides a generic implementation of a shared, compacted 
log of records stored in Kafka that all
+ *     clients need to consume and, at times, agree on their offset / that 
they have read to the end of the log.
+ * </p>
+ * <p>
+ *     This functionality is useful for storing different types of data that 
all clients may need to agree on --
+ *     offsets or config for example. This class runs a consumer in a 
background thread to continuously tail the target
+ *     topic, accepts write requests which it writes to the topic using an 
internal producer, and provides some helpful
+ *     utilities like checking the current log end offset and waiting until 
the current end of the log is reached.
+ * </p>
+ * <p>
+ *     To support different use cases, this class works with either single- or 
multi-partition topics.
+ * </p>
+ * <p>
+ *     Since this class is generic, it delegates the details of data storage 
via a callback that is invoked for each
+ *     record that is consumed from the topic. The invocation of callbacks is 
guaranteed to be serialized -- if the
+ *     calling class keeps track of state based on the log and only writes to 
it when consume callbacks are invoked
+ *     and only reads it in {@link #readToEnd(Callback)} callbacks then no 
additional synchronization will be required.
+ * </p>
+ */
+public class KafkaBasedLog<K, V> {
+    private static final Logger log = 
LoggerFactory.getLogger(KafkaBasedLog.class);
+    private static final long CREATE_TOPIC_TIMEOUT_MS = 30000;
+
+    private Time time;
+    private final String topic;
+    private final Map<String, Object> producerConfigs;
+    private final Map<String, Object> consumerConfigs;
+    private final Callback<ConsumerRecord<K, V>> consumedCallback;
+    private Consumer<K, V> consumer;
+    private Producer<K, V> producer;
+
+    private Thread thread;
+    private boolean stopRequested;
+    private Queue<Callback<Void>> readLogEndOffsetCallbacks;
+
+    /**
+     * Create a new KafkaBasedLog object. This does not start reading the log 
and writing is not permitted until
+     * {@link #start()} is invoked.
+     *
+     * @param topic the topic to treat as a log
+     * @param producerConfigs configuration options to use when creating the 
internal producer. At a minimum this must
+     *                        contain compatible serializer settings for the 
generic types used on this class. Some
+     *                        setting, such as the number of acks, will be 
overridden to ensure correct behavior of this
+     *                        class.
+     * @param consumerConfigs configuration options to use when creating the 
internal consumer. At a minimum this must
+     *                        contain compatible serializer settings for the 
generic types used on this class. Some
+     *                        setting, such as the auto offset reset policy, 
will be overridden to ensure correct
+     *                        behavior of this class.
+     * @param consumedCallback callback to invoke for each {@link 
ConsumerRecord} consumed when tailing the log
+     * @param time Time interface
+     */
+    public KafkaBasedLog(String topic, Map<String, Object> producerConfigs, 
Map<String, Object> consumerConfigs,
+                         Callback<ConsumerRecord<K, V>> consumedCallback, Time 
time) {
+        this.topic = topic;
+        this.producerConfigs = producerConfigs;
+        this.consumerConfigs = consumerConfigs;
+        this.consumedCallback = consumedCallback;
+        this.stopRequested = false;
+        this.readLogEndOffsetCallbacks = new ArrayDeque<>();
+        this.time = time;
+    }
+
+    public void start() {
+        log.info("Starting KafkaBasedLog with topic " + topic);
+
+        producer = createProducer();
+        consumer = createConsumer();
+
+        List<TopicPartition> partitions = new ArrayList<>();
+
+        // Until we have admin utilities we can use to check for the existence 
of this topic and create it if it is missing,
+        // we rely on topic auto-creation
+        List<PartitionInfo> partitionInfos = null;
+        long started = time.milliseconds();
+        while (partitionInfos == null && time.milliseconds() - started < 
CREATE_TOPIC_TIMEOUT_MS) {
+            partitionInfos = consumer.partitionsFor(topic);
+            Utils.sleep(Math.min(time.milliseconds() - started, 1000));
+        }
+        if (partitionInfos == null)
+            throw new ConnectException("Could not look up partition metadata 
for offset backing store topic in" +
+                    " allotted period. This could indicate a connectivity 
issue, unavailable topic partitions, or if" +
+                    " this is your first use of the topic it may have taken 
too long to create.");
+
+        for (PartitionInfo partition : partitionInfos)
+            partitions.add(new TopicPartition(partition.topic(), 
partition.partition()));
+        consumer.assign(partitions);
+
+        readToLogEnd();
+
+        thread = new WorkThread();
+        thread.start();
+
+        log.info("Finished reading KafakBasedLog for topic " + topic);
+
+        log.info("Started KafakBasedLog for topic " + topic);
+    }
+
+    public void stop() {
+        log.info("Stopping KafkaBasedLog for topic " + topic);
+
+        synchronized (this) {
+            stopRequested = true;
+        }
+        consumer.wakeup();
+
+        try {
+            thread.join();
+        } catch (InterruptedException e) {
+            throw new ConnectException("Failed to stop KafkaBasedLog. Exiting 
without cleanly shutting " +
+                    "down it's producer and consumer.", e);
+        }
+
+        try {
+            producer.close();
+        } catch (KafkaException e) {
+            log.error("Failed to stop KafkaBasedLog producer", e);
+        }
+
+        try {
+            consumer.close();
+        } catch (KafkaException e) {
+            log.error("Failed to stop KafkaBasedLog consumer", e);
+        }
+
+        log.info("Stopped KafkaBasedLog for topic " + topic);
+    }
+
+    /**
+     * Flushes any outstanding writes and then reads to the current end of the 
log and invokes the specified callback.
+     * Note that this checks the current, offsets, reads to them, and invokes 
the callback regardless of whether
+     * additional records have been written to the log. If the caller needs to 
ensure they have truly reached the end
+     * of the log, they must ensure there are no other writers during this 
period.
+     *
+     * This waits until the end of all partitions has been reached.
+     *
+     * This method is asynchronous. If you need a synchronous version, pass an 
instance of
+     * {@link org.apache.kafka.connect.util.FutureCallback} as the {@param 
callback} parameter and wait on it to block.
+     *
+     * @param callback the callback to invoke once the end of the log has been 
reached.
+     */
+    public void readToEnd(Callback<Void> callback) {
+        producer.flush();
+        synchronized (this) {
+            readLogEndOffsetCallbacks.add(callback);
+        }
+        consumer.wakeup();
+    }
+
+    /**
+     * Same as {@link #readToEnd(Callback)} but provides a {@link Future} 
instead of using a callback.
+     * @return the future associated with the operation
+     */
+    public Future<Void> readToEnd() {
+        FutureCallback<Void> future = new FutureCallback<>(null);
+        readToEnd(future);
+        return future;
+    }
+
+    public void send(K key, V value) {
+        send(key, value, null);
+    }
+
+    public void send(K key, V value, 
org.apache.kafka.clients.producer.Callback callback) {
+        producer.send(new ProducerRecord<>(topic, key, value), callback);
+    }
+
+
+    private Producer<K, V> createProducer() {
+        // Always require producer acks to all to ensure durable writes
+        producerConfigs.put(ProducerConfig.ACKS_CONFIG, "all");
+        return new KafkaProducer<>(producerConfigs);
+    }
+
+    private Consumer<K, V> createConsumer() {
+        // Always force reset to the beginning of the log since this class 
wants to consume all available log data
+        consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
+        return new KafkaConsumer<>(consumerConfigs);
+    }
+
+    private void poll(long timeoutMs) {
+        try {
+            ConsumerRecords<K, V> records = consumer.poll(timeoutMs);
+            for (ConsumerRecord<K, V> record : records)
+                consumedCallback.onCompletion(null, record);
+        } catch (WakeupException e) {
+            // Expected on get() or stop(). The calling code should handle this
+            throw e;
+        } catch (KafkaException e) {
+            log.error("Error polling: " + e);
+        }
+    }
+
+    private void readToLogEnd() {
+        log.trace("Reading to end of offset log");
+
+        Set<TopicPartition> assignment = consumer.assignment();
+
+        // This approach to getting the current end offset is hacky until we 
have an API for looking these up directly
+        Map<TopicPartition, Long> offsets = new HashMap<>();
+        for (TopicPartition tp : assignment) {
+            long offset = consumer.position(tp);
+            offsets.put(tp, offset);
+            consumer.seekToEnd(tp);
+        }
+
+        Map<TopicPartition, Long> endOffsets = new HashMap<>();
+        try {
+            poll(0);
+        } finally {
+            // If there is an exception, even a possibly expected one like 
WakeupException, we need to make sure
+            // the consumers position is reset or it'll get into an 
inconsistent state.
+            for (TopicPartition tp : assignment) {
+                long startOffset = offsets.get(tp);
+                long endOffset = consumer.position(tp);
+                if (endOffset > startOffset) {
+                    endOffsets.put(tp, endOffset);
+                    consumer.seek(tp, startOffset);
+                }
+                log.trace("Reading to end of log for {}: starting offset {} to 
ending offset {}", tp, startOffset, endOffset);
+            }
+        }
+
+        while (!endOffsets.isEmpty()) {
+            poll(Integer.MAX_VALUE);
+
+            Iterator<Map.Entry<TopicPartition, Long>> it = 
endOffsets.entrySet().iterator();
+            while (it.hasNext()) {
+                Map.Entry<TopicPartition, Long> entry = it.next();
+                if (consumer.position(entry.getKey()) >= entry.getValue())
+                    it.remove();
+                else
+                    break;
+            }
+        }
+    }
+
+
+    private class WorkThread extends Thread {
+        @Override
+        public void run() {
+            try {
+                while (true) {
+                    int numCallbacks;
+                    synchronized (KafkaBasedLog.this) {
+                        if (stopRequested)
+                            break;
+                        numCallbacks = readLogEndOffsetCallbacks.size();
+                    }
+
+                    if (numCallbacks > 0) {
+                        try {
+                            readToLogEnd();
+                        } catch (WakeupException e) {
+                            // Either received another get() call and need to 
retry reading to end of log or stop() was
+                            // called. Both are handled by restarting this 
loop.
+                            continue;
+                        }
+                    }
+
+                    synchronized (KafkaBasedLog.this) {
+                        // Only invoke exactly the number of callbacks we 
found before triggering the read to log end
+                        // since it is possible for another write + readToEnd 
to sneak in in the meantime
+                        for (int i = 0; i < numCallbacks; i++) {
+                            Callback<Void> cb = 
readLogEndOffsetCallbacks.poll();
+                            cb.onCompletion(null, null);
+                        }
+                    }
+
+                    try {
+                        poll(Integer.MAX_VALUE);
+                    } catch (WakeupException e) {
+                        // See previous comment, both possible causes of this 
wakeup are handled by starting this loop again
+                        continue;
+                    }
+                }
+            } catch (Throwable t) {
+                log.error("Unexpected exception in KafkaBasedLog's work 
thread", t);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/util/ShutdownableThread.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ShutdownableThread.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ShutdownableThread.java
new file mode 100644
index 0000000..01dac90
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ShutdownableThread.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * <p>
+ * Thread class with support for triggering graceful and forcible shutdown. In 
graceful shutdown,
+ * a flag is set, which the thread should detect and try to exit gracefully 
from. In forcible
+ * shutdown, the thread is interrupted. These can be combined to give a thread 
a chance to exit
+ * gracefully, but then force it to exit if it takes too long.
+ * </p>
+ * <p>
+ * Implementations should override the {@link #execute} method and check 
{@link #getRunning} to
+ * determine whether they should try to gracefully exit.
+ * </p>
+ */
+public abstract class ShutdownableThread extends Thread {
+    private static final Logger log = 
LoggerFactory.getLogger(ShutdownableThread.class);
+
+    private AtomicBoolean isRunning = new AtomicBoolean(true);
+    private CountDownLatch shutdownLatch = new CountDownLatch(1);
+
+    /**
+     * An UncaughtExceptionHandler to register on every instance of this 
class. This is useful for
+     * testing, where AssertionExceptions in the thread may not cause the test 
to fail. Since one
+     * instance is used for all threads, it must be thread-safe.
+     */
+    volatile public static UncaughtExceptionHandler funcaughtExceptionHandler 
= null;
+
+    public ShutdownableThread(String name) {
+        // The default is daemon=true so that these threads will not prevent 
shutdown. We use this
+        // default because threads that are running user code that may not 
clean up properly, even
+        // when we attempt to forcibly shut them down.
+        this(name, true);
+    }
+
+    public ShutdownableThread(String name, boolean daemon) {
+        super(name);
+        this.setDaemon(daemon);
+        if (funcaughtExceptionHandler != null)
+            this.setUncaughtExceptionHandler(funcaughtExceptionHandler);
+    }
+
+    /**
+     * Implementations should override this method with the main body for the 
thread.
+     */
+    public abstract void execute();
+
+    /**
+     * Returns true if the thread hasn't exited yet and none of the shutdown 
methods have been
+     * invoked
+     */
+    public boolean getRunning() {
+        return isRunning.get();
+    }
+
+    @Override
+    public void run() {
+        try {
+            execute();
+        } catch (Error | RuntimeException e) {
+            log.error("Thread {} exiting with uncaught exception: ", 
getName(), e);
+            throw e;
+        } finally {
+            shutdownLatch.countDown();
+        }
+    }
+
+    /**
+     * Shutdown the thread, first trying to shut down gracefully using the 
specified timeout, then
+     * forcibly interrupting the thread.
+     * @param gracefulTimeout the maximum time to wait for a graceful exit
+     * @param unit the time unit of the timeout argument
+     */
+    public void shutdown(long gracefulTimeout, TimeUnit unit)
+            throws InterruptedException {
+        boolean success = gracefulShutdown(gracefulTimeout, unit);
+        if (!success)
+            forceShutdown();
+    }
+
+    /**
+     * Attempt graceful shutdown
+     * @param timeout the maximum time to wait
+     * @param unit the time unit of the timeout argument
+     * @return true if successful, false if the timeout elapsed
+     */
+    public boolean gracefulShutdown(long timeout, TimeUnit unit) throws 
InterruptedException {
+        startGracefulShutdown();
+        return awaitShutdown(timeout, unit);
+    }
+
+    /**
+     * Start shutting down this thread gracefully, but do not block waiting 
for it to exit.
+     */
+    public void startGracefulShutdown() {
+        log.info("Starting graceful shutdown of thread {}", getName());
+        isRunning.set(false);
+    }
+
+    /**
+     * Awaits shutdown of this thread, waiting up to the timeout.
+     * @param timeout the maximum time to wait
+     * @param unit the time unit of the timeout argument
+     * @return true if successful, false if the timeout elapsed
+     * @throws InterruptedException
+     */
+    public boolean awaitShutdown(long timeout, TimeUnit unit) throws 
InterruptedException {
+        return shutdownLatch.await(timeout, unit);
+    }
+
+    /**
+     * Immediately tries to force the thread to shut down by interrupting it. 
This does not try to
+     * wait for the thread to truly exit because forcible shutdown is not 
always possible. By
+     * default, threads are marked as daemon threads so they will not prevent 
the process from
+     * exiting.
+     */
+    public void forceShutdown() throws InterruptedException {
+        log.info("Forcing shutdown of thread {}", getName());
+        isRunning.set(false);
+        interrupt();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
new file mode 100644
index 0000000..62ec5a8
--- /dev/null
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.errors.RetriableException;
+import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
+import org.apache.kafka.connect.sink.SinkConnector;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.apache.kafka.connect.util.MockTime;
+import org.easymock.Capture;
+import org.easymock.CaptureType;
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.api.easymock.annotation.Mock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(WorkerSinkTask.class)
+@PowerMockIgnore("javax.management.*")
+public class WorkerSinkTaskTest {
+    // These are fixed to keep this code simpler. In this example we assume 
byte[] raw values
+    // with mix of integer/string in Connect
+    private static final String TOPIC = "test";
+    private static final int PARTITION = 12;
+    private static final int PARTITION2 = 13;
+    private static final long FIRST_OFFSET = 45;
+    private static final Schema KEY_SCHEMA = Schema.INT32_SCHEMA;
+    private static final int KEY = 12;
+    private static final Schema VALUE_SCHEMA = Schema.STRING_SCHEMA;
+    private static final String VALUE = "VALUE";
+    private static final byte[] RAW_KEY = "key".getBytes();
+    private static final byte[] RAW_VALUE = "value".getBytes();
+
+    private static final TopicPartition TOPIC_PARTITION = new 
TopicPartition(TOPIC, PARTITION);
+    private static final TopicPartition TOPIC_PARTITION2 = new 
TopicPartition(TOPIC, PARTITION2);
+
+    private static final Map<String, String> TASK_PROPS = new HashMap<>();
+    static {
+        TASK_PROPS.put(SinkConnector.TOPICS_CONFIG, TOPIC);
+    }
+
+
+    private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
+    private Time time;
+    private WorkerSinkTask workerTask;
+    @Mock
+    private SinkTask sinkTask;
+    private Capture<WorkerSinkTaskContext> sinkTaskContext = 
EasyMock.newCapture();
+    private WorkerConfig workerConfig;
+    @Mock
+    private Converter keyConverter;
+    @Mock
+    private Converter valueConverter;
+    @Mock
+    private WorkerSinkTaskThread workerThread;
+    @Mock
+    private KafkaConsumer<byte[], byte[]> consumer;
+    private Capture<ConsumerRebalanceListener> rebalanceListener = 
EasyMock.newCapture();
+
+    private long recordsReturned;
+
+    @Before
+    public void setUp() {
+        time = new MockTime();
+        Map<String, String> workerProps = new HashMap<>();
+        workerProps.put("key.converter", 
"org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put("value.converter", 
"org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put("internal.key.converter", 
"org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put("internal.value.converter", 
"org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put("internal.key.converter.schemas.enable", "false");
+        workerProps.put("internal.value.converter.schemas.enable", "false");
+        workerConfig = new StandaloneConfig(workerProps);
+        workerTask = PowerMock.createPartialMock(
+                WorkerSinkTask.class, new String[]{"createConsumer", 
"createWorkerThread"},
+                taskId, sinkTask, workerConfig, keyConverter, valueConverter, 
time);
+
+        recordsReturned = 0;
+    }
+
+    @Test
+    public void testPollRedelivery() throws Exception {
+        expectInitializeTask();
+
+        // If a retriable exception is thrown, we should redeliver the same 
batch, pausing the consumer in the meantime
+        expectConsumerPoll(1);
+        expectConvertMessages(1);
+        Capture<Collection<SinkRecord>> records = 
EasyMock.newCapture(CaptureType.ALL);
+        sinkTask.put(EasyMock.capture(records));
+        EasyMock.expectLastCall().andThrow(new RetriableException("retry"));
+        // Pause
+        EasyMock.expect(consumer.assignment()).andReturn(new 
HashSet<>(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2)));
+        consumer.pause(TOPIC_PARTITION);
+        PowerMock.expectLastCall();
+        consumer.pause(TOPIC_PARTITION2);
+        PowerMock.expectLastCall();
+
+        // Retry delivery should suceed
+        expectConsumerPoll(0);
+        sinkTask.put(EasyMock.capture(records));
+        EasyMock.expectLastCall();
+        // And unpause
+        EasyMock.expect(consumer.assignment()).andReturn(new 
HashSet<>(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2)));
+        consumer.resume(TOPIC_PARTITION);
+        PowerMock.expectLastCall();
+        consumer.resume(TOPIC_PARTITION2);
+        PowerMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        workerTask.start(TASK_PROPS);
+        workerTask.joinConsumerGroupAndStart();
+        workerTask.poll(Long.MAX_VALUE);
+        workerTask.poll(Long.MAX_VALUE);
+
+        PowerMock.verifyAll();
+    }
+
+
+    private void expectInitializeTask() throws Exception {
+        PowerMock.expectPrivate(workerTask, 
"createConsumer").andReturn(consumer);
+        PowerMock.expectPrivate(workerTask, "createWorkerThread")
+                .andReturn(workerThread);
+        workerThread.start();
+        PowerMock.expectLastCall();
+
+        consumer.subscribe(EasyMock.eq(Arrays.asList(TOPIC)), 
EasyMock.capture(rebalanceListener));
+        PowerMock.expectLastCall();
+
+        EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(new 
IAnswer<ConsumerRecords<byte[], byte[]>>() {
+            @Override
+            public ConsumerRecords<byte[], byte[]> answer() throws Throwable {
+                
rebalanceListener.getValue().onPartitionsAssigned(Arrays.asList(TOPIC_PARTITION,
 TOPIC_PARTITION2));
+                return ConsumerRecords.empty();
+            }
+        });
+        
EasyMock.expect(consumer.position(TOPIC_PARTITION)).andReturn(FIRST_OFFSET);
+        
EasyMock.expect(consumer.position(TOPIC_PARTITION2)).andReturn(FIRST_OFFSET);
+
+        sinkTask.initialize(EasyMock.capture(sinkTaskContext));
+        PowerMock.expectLastCall();
+        sinkTask.start(TASK_PROPS);
+        PowerMock.expectLastCall();
+    }
+
+    private void expectConsumerPoll(final int numMessages) {
+        EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(
+                new IAnswer<ConsumerRecords<byte[], byte[]>>() {
+                    @Override
+                    public ConsumerRecords<byte[], byte[]> answer() throws 
Throwable {
+                        List<ConsumerRecord<byte[], byte[]>> records = new 
ArrayList<>();
+                        for (int i = 0; i < numMessages; i++)
+                            records.add(new ConsumerRecord<>(TOPIC, PARTITION, 
FIRST_OFFSET + recordsReturned + i, RAW_KEY, RAW_VALUE));
+                        recordsReturned += numMessages;
+                        return new ConsumerRecords<>(
+                                numMessages > 0 ?
+                                        Collections.singletonMap(new 
TopicPartition(TOPIC, PARTITION), records) :
+                                        Collections.<TopicPartition, 
List<ConsumerRecord<byte[], byte[]>>>emptyMap()
+                        );
+                    }
+                });
+    }
+
+    private void expectConvertMessages(final int numMessages) {
+        EasyMock.expect(keyConverter.toConnectData(TOPIC, 
RAW_KEY)).andReturn(new SchemaAndValue(KEY_SCHEMA, KEY)).times(numMessages);
+        EasyMock.expect(valueConverter.toConnectData(TOPIC, 
RAW_VALUE)).andReturn(new SchemaAndValue(VALUE_SCHEMA, 
VALUE)).times(numMessages);
+    }
+}

Reply via email to