http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/errors/AlreadyExistsException.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/errors/AlreadyExistsException.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/errors/AlreadyExistsException.java
new file mode 100644
index 0000000..6fdefdf
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/errors/AlreadyExistsException.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.errors;
+
+/**
+ * Indicates the operation tried to create an entity that already exists.
+ */
+public class AlreadyExistsException extends ConnectException {
+    public AlreadyExistsException(String s) {
+        super(s);
+    }
+
+    public AlreadyExistsException(String s, Throwable throwable) {
+        super(s, throwable);
+    }
+
+    public AlreadyExistsException(Throwable throwable) {
+        super(throwable);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/errors/NotFoundException.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/errors/NotFoundException.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/errors/NotFoundException.java
new file mode 100644
index 0000000..a3bbe91
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/errors/NotFoundException.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.errors;
+
+/**
+ * Indicates that an operation attempted to modify or delete a connector or 
task that is not present on the worker.
+ */
+public class NotFoundException extends ConnectException {
+    public NotFoundException(String s) {
+        super(s);
+    }
+
+    public NotFoundException(String s, Throwable throwable) {
+        super(s, throwable);
+    }
+
+    public NotFoundException(Throwable throwable) {
+        super(throwable);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/errors/RetriableException.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/errors/RetriableException.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/errors/RetriableException.java
new file mode 100644
index 0000000..1b5b07a
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/errors/RetriableException.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.errors;
+
+/**
+ * An exception that indicates the operation can be reattempted.
+ */
+public class RetriableException extends ConnectException {
+    public RetriableException(String s) {
+        super(s);
+    }
+
+    public RetriableException(String s, Throwable throwable) {
+        super(s, throwable);
+    }
+
+    public RetriableException(Throwable throwable) {
+        super(throwable);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java
new file mode 100644
index 0000000..6611e5d
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.connect.runtime.rest.RestServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * This class ties together all the components of a Kafka Connect process 
(herder, worker,
+ * storage, command interface), managing their lifecycle.
+ */
+@InterfaceStability.Unstable
+public class Connect {
+    private static final Logger log = LoggerFactory.getLogger(Connect.class);
+
+    private final Worker worker;
+    private final Herder herder;
+    private final RestServer rest;
+    private final CountDownLatch startLatch = new CountDownLatch(1);
+    private final CountDownLatch stopLatch = new CountDownLatch(1);
+    private final AtomicBoolean shutdown = new AtomicBoolean(false);
+    private final ShutdownHook shutdownHook;
+
+    public Connect(Worker worker, Herder herder, RestServer rest) {
+        log.debug("Kafka Connect instance created");
+        this.worker = worker;
+        this.herder = herder;
+        this.rest = rest;
+        shutdownHook = new ShutdownHook();
+    }
+
+    public void start() {
+        log.info("Kafka Connect starting");
+        Runtime.getRuntime().addShutdownHook(shutdownHook);
+
+        worker.start();
+        herder.start();
+        rest.start(herder);
+
+        log.info("Kafka Connect started");
+
+        startLatch.countDown();
+    }
+
+    public void stop() {
+        boolean wasShuttingDown = shutdown.getAndSet(true);
+        if (!wasShuttingDown) {
+            log.info("Kafka Connect stopping");
+
+            rest.stop();
+            herder.stop();
+            worker.stop();
+
+            log.info("Kafka Connect stopped");
+        }
+
+        stopLatch.countDown();
+    }
+
+    public void awaitStop() {
+        try {
+            stopLatch.await();
+        } catch (InterruptedException e) {
+            log.error("Interrupted waiting for Kafka Connect to shutdown");
+        }
+    }
+
+    private class ShutdownHook extends Thread {
+        @Override
+        public void run() {
+            try {
+                startLatch.await();
+                Connect.this.stop();
+            } catch (InterruptedException e) {
+                log.error("Interrupted in shutdown hook while waiting for 
Kafka Connect startup to finish");
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
new file mode 100644
index 0000000..77cfc8d
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.common.config.ConfigDef.Type;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * <p>
+ * Configuration options for Connectors. These only include Kafka Connect 
system-level configuration
+ * options (e.g. Connector class name, timeouts used by Connect to control the 
connector) but does
+ * not include Connector-specific options (e.g. database connection settings).
+ * </p>
+ * <p>
+ * Note that some of these options are not required for all connectors. For 
example TOPICS_CONFIG
+ * is sink-specific.
+ * </p>
+ */
+public class ConnectorConfig extends AbstractConfig {
+
+    public static final String NAME_CONFIG = "name";
+    private static final String NAME_DOC = "Globally unique name to use for 
this connector.";
+
+    public static final String CONNECTOR_CLASS_CONFIG = "connector.class";
+    private static final String CONNECTOR_CLASS_DOC =
+            "Name of the class for this connector. Must be a subclass of 
org.apache.kafka.connect.connector.Connector";
+
+    public static final String TASKS_MAX_CONFIG = "tasks.max";
+    private static final String TASKS_MAX_DOC = "Maximum number of tasks to 
use for this connector.";
+    public static final int TASKS_MAX_DEFAULT = 1;
+
+    public static final String TOPICS_CONFIG = "topics";
+    private static final String TOPICS_DOC = "";
+    public static final String TOPICS_DEFAULT = "";
+
+    private static ConfigDef config;
+
+    static {
+        config = new ConfigDef()
+                .define(NAME_CONFIG, Type.STRING, Importance.HIGH, NAME_DOC)
+                .define(CONNECTOR_CLASS_CONFIG, Type.CLASS, Importance.HIGH, 
CONNECTOR_CLASS_DOC)
+                .define(TASKS_MAX_CONFIG, Type.INT, TASKS_MAX_DEFAULT, 
Importance.HIGH, TASKS_MAX_DOC)
+                .define(TOPICS_CONFIG, Type.LIST, TOPICS_DEFAULT, 
Importance.HIGH, TOPICS_DOC);
+    }
+
+    public ConnectorConfig() {
+        this(new HashMap<String, String>());
+    }
+
+    public ConnectorConfig(Map<String, String> props) {
+        super(config, props);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
new file mode 100644
index 0000000..fc0689c
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
+import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
+import org.apache.kafka.connect.util.Callback;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * <p>
+ * The herder interface tracks and manages workers and connectors. It is the 
main interface for external components
+ * to make changes to the state of the cluster. For example, in distributed 
mode, an implementation of this class
+ * knows how to accept a connector configuration, may need to route it to the 
current leader worker for the cluster so
+ * the config can be written to persistent storage, and then ensures the new 
connector is correctly instantiated on one
+ * of the workers.
+ * </p>
+ * <p>
+ * This class must implement all the actions that can be taken on the cluster 
(add/remove connectors, pause/resume tasks,
+ * get state of connectors and tasks, etc). The non-Java interfaces to the 
cluster (REST API and CLI) are very simple
+ * wrappers of the functionality provided by this interface.
+ * </p>
+ * <p>
+ * In standalone mode, this implementation of this class will be trivial 
because no coordination is needed. In that case,
+ * the implementation will mainly be delegating tasks directly to other 
components. For example, when creating a new
+ * connector in standalone mode, there is no need to persist the config and 
the connector and its tasks must run in the
+ * same process, so the standalone herder implementation can immediately 
instantiate and start the connector and its
+ * tasks.
+ * </p>
+ */
+public interface Herder {
+
+    void start();
+
+    void stop();
+
+    /**
+     * Get a list of connectors currently running in this cluster. This is a 
full list of connectors in the cluster gathered
+     * from the current configuration. However, note
+     *
+     * @returns A list of connector names
+     * @throws org.apache.kafka.connect.runtime.distributed.NotLeaderException 
if this node can not resolve the request
+     *         (e.g., because it has not joined the cluster or does not have 
configs in sync with the group) and it is
+     *         also not the leader
+     * @throws ConnectException if this node is the leader, but still cannot 
resolve the
+     *         request (e.g., it is not in sync with other worker's config 
state)
+     */
+    void connectors(Callback<Collection<String>> callback);
+
+    /**
+     * Get the definition and status of a connector.
+     */
+    void connectorInfo(String connName, Callback<ConnectorInfo> callback);
+
+    /**
+     * Get the configuration for a connector.
+     * @param connName name of the connector
+     * @param callback callback to invoke with the configuration
+     */
+    void connectorConfig(String connName, Callback<Map<String, String>> 
callback);
+
+    /**
+     * Set the configuration for a connector. This supports creation, update, 
and deletion.
+     * @param connName name of the connector
+     * @param config the connectors configuration, or null if deleting the 
connector
+     * @param allowReplace if true, allow overwriting previous configs; if 
false, throw AlreadyExistsException if a connector
+     *                     with the same name already exists
+     * @param callback callback to invoke when the configuration has been 
written
+     */
+    void putConnectorConfig(String connName, Map<String, String> config, 
boolean allowReplace, Callback<Created<ConnectorInfo>> callback);
+
+    /**
+     * Requests reconfiguration of the task. This should only be triggered by
+     * {@link HerderConnectorContext}.
+     *
+     * @param connName name of the connector that should be reconfigured
+     */
+    void requestTaskReconfiguration(String connName);
+
+    /**
+     * Get the configurations for the current set of tasks of a connector.
+     * @param connName connector to update
+     * @param callback callback to invoke upon completion
+     */
+    void taskConfigs(String connName, Callback<List<TaskInfo>> callback);
+
+    /**
+     * Set the configurations for the tasks of a connector. This should always 
include all tasks in the connector; if
+     * there are existing configurations and fewer are provided, this will 
reduce the number of tasks, and if more are
+     * provided it will increase the number of tasks.
+     * @param connName connector to update
+     * @param configs list of configurations
+     * @param callback callback to invoke upon completion
+     */
+    void putTaskConfigs(String connName, List<Map<String, String>> configs, 
Callback<Void> callback);
+
+
+    class Created<T> {
+        private final boolean created;
+        private final T result;
+
+        public Created(boolean created, T result) {
+            this.created = created;
+            this.result = result;
+        }
+
+        public boolean created() {
+            return created;
+        }
+
+        public T result() {
+            return result;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            Created<?> created1 = (Created<?>) o;
+            return Objects.equals(created, created1.created) &&
+                    Objects.equals(result, created1.result);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(created, result);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/HerderConnectorContext.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/HerderConnectorContext.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/HerderConnectorContext.java
new file mode 100644
index 0000000..070aa20
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/HerderConnectorContext.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.connector.ConnectorContext;
+
+/**
+ * ConnectorContext for use with the StandaloneHerder, which maintains all 
connectors and tasks
+ * in a single process.
+ */
+public class HerderConnectorContext implements ConnectorContext {
+
+    private Herder herder;
+    private String connectorName;
+
+    public HerderConnectorContext(Herder herder, String connectorName) {
+        this.herder = herder;
+        this.connectorName = connectorName;
+    }
+
+    @Override
+    public void requestTaskReconfiguration() {
+        // This is trivial to forward since there is only one herder and it's 
in memory in this
+        // process
+        herder.requestTaskReconfiguration(connectorName);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java
new file mode 100644
index 0000000..bee24e7
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * <p>
+ * Manages offset commit scheduling and execution for SourceTasks.
+ * </p>
+ * <p>
+ * Unlike sink tasks which directly manage their offset commits in the main 
poll() thread since
+ * they drive the event loop and control (for all intents and purposes) the 
timeouts, source
+ * tasks are at the whim of the connector and cannot be guaranteed to wake up 
on the necessary
+ * schedule. Instead, this class tracks all the active tasks, their schedule 
for commits, and
+ * ensures they are invoked in a timely fashion.
+ * </p>
+ */
+class SourceTaskOffsetCommitter {
+    private static final Logger log = 
LoggerFactory.getLogger(SourceTaskOffsetCommitter.class);
+
+    private Time time;
+    private WorkerConfig config;
+    private ScheduledExecutorService commitExecutorService = null;
+    private HashMap<ConnectorTaskId, ScheduledCommitTask> committers = new 
HashMap<>();
+
+    SourceTaskOffsetCommitter(Time time, WorkerConfig config) {
+        this.time = time;
+        this.config = config;
+        commitExecutorService = Executors.newSingleThreadScheduledExecutor();
+    }
+
+    public void close(long timeoutMs) {
+        commitExecutorService.shutdown();
+        try {
+            if (!commitExecutorService.awaitTermination(timeoutMs, 
TimeUnit.MILLISECONDS)) {
+                log.error("Graceful shutdown of offset commitOffsets thread 
timed out.");
+            }
+        } catch (InterruptedException e) {
+            // ignore and allow to exit immediately
+        }
+    }
+
+    public void schedule(final ConnectorTaskId id, final WorkerSourceTask 
workerTask) {
+        synchronized (committers) {
+            long commitIntervalMs = 
config.getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG);
+            ScheduledFuture<?> commitFuture = 
commitExecutorService.schedule(new Runnable() {
+                @Override
+                public void run() {
+                    commit(id, workerTask);
+                }
+            }, commitIntervalMs, TimeUnit.MILLISECONDS);
+            committers.put(id, new ScheduledCommitTask(commitFuture));
+        }
+    }
+
+    public void remove(ConnectorTaskId id) {
+        final ScheduledCommitTask task;
+        synchronized (committers) {
+            task = committers.remove(id);
+            task.cancelled = true;
+            task.commitFuture.cancel(false);
+        }
+        if (task.finishedLatch != null) {
+            try {
+                task.finishedLatch.await();
+            } catch (InterruptedException e) {
+                throw new ConnectException("Unexpected interruption in 
SourceTaskOffsetCommitter.", e);
+            }
+        }
+    }
+
+    public void commit(ConnectorTaskId id, WorkerSourceTask workerTask) {
+        final ScheduledCommitTask task;
+        synchronized (committers) {
+            task = committers.get(id);
+            if (task == null || task.cancelled)
+                return;
+            task.finishedLatch = new CountDownLatch(1);
+        }
+
+        try {
+            log.debug("Committing offsets for {}", workerTask);
+            boolean success = workerTask.commitOffsets();
+            if (!success) {
+                log.error("Failed to commit offsets for {}", workerTask);
+            }
+        } catch (Throwable t) {
+            // We're very careful about exceptions here since any uncaught 
exceptions in the commit
+            // thread would cause the fixed interval schedule on the 
ExecutorService to stop running
+            // for that task
+            log.error("Unhandled exception when committing {}: ", workerTask, 
t);
+        } finally {
+            synchronized (committers) {
+                task.finishedLatch.countDown();
+                if (!task.cancelled)
+                    schedule(id, workerTask);
+            }
+        }
+    }
+
+    private static class ScheduledCommitTask {
+        ScheduledFuture<?> commitFuture;
+        boolean cancelled;
+        CountDownLatch finishedLatch;
+
+        ScheduledCommitTask(ScheduledFuture<?> commitFuture) {
+            this.commitFuture = commitFuture;
+            this.cancelled = false;
+            this.finishedLatch = null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TaskConfig.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TaskConfig.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TaskConfig.java
new file mode 100644
index 0000000..48cb4d8
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TaskConfig.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.common.config.ConfigDef.Type;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * <p>
+ * Configuration options for Tasks. These only include Kafka Connect 
system-level configuration
+ * options.
+ * </p>
+ */
+public class TaskConfig extends AbstractConfig {
+
+    public static final String TASK_CLASS_CONFIG = "task.class";
+    private static final String TASK_CLASS_DOC =
+            "Name of the class for this task. Must be a subclass of 
org.apache.kafka.connect.connector.Task";
+
+    private static ConfigDef config;
+
+    static {
+        config = new ConfigDef()
+                .define(TASK_CLASS_CONFIG, Type.CLASS, Importance.HIGH, 
TASK_CLASS_DOC);
+    }
+
+    public TaskConfig() {
+        this(new HashMap<String, String>());
+    }
+
+    public TaskConfig(Map<String, ?> props) {
+        super(config, props);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
new file mode 100644
index 0000000..2e359d6
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -0,0 +1,331 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.connector.ConnectorContext;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.kafka.connect.source.SourceTask;
+import org.apache.kafka.connect.storage.*;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * <p>
+ * Worker runs a (dynamic) set of tasks in a set of threads, doing the work of 
actually moving
+ * data to/from Kafka.
+ * </p>
+ * <p>
+ * Since each task has a dedicated thread, this is mainly just a container for 
them.
+ * </p>
+ */
+public class Worker {
+    private static final Logger log = LoggerFactory.getLogger(Worker.class);
+
+    private Time time;
+    private WorkerConfig config;
+    private Converter keyConverter;
+    private Converter valueConverter;
+    private Converter internalKeyConverter;
+    private Converter internalValueConverter;
+    private OffsetBackingStore offsetBackingStore;
+    private HashMap<String, Connector> connectors = new HashMap<>();
+    private HashMap<ConnectorTaskId, WorkerTask> tasks = new HashMap<>();
+    private KafkaProducer<byte[], byte[]> producer;
+    private SourceTaskOffsetCommitter sourceTaskOffsetCommitter;
+
+    public Worker(WorkerConfig config, OffsetBackingStore offsetBackingStore) {
+        this(new SystemTime(), config, offsetBackingStore);
+    }
+
+    @SuppressWarnings("unchecked")
+    public Worker(Time time, WorkerConfig config, OffsetBackingStore 
offsetBackingStore) {
+        this.time = time;
+        this.config = config;
+        this.keyConverter = 
config.getConfiguredInstance(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, 
Converter.class);
+        
this.keyConverter.configure(config.originalsWithPrefix("key.converter."), true);
+        this.valueConverter = 
config.getConfiguredInstance(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, 
Converter.class);
+        
this.valueConverter.configure(config.originalsWithPrefix("value.converter."), 
false);
+        this.internalKeyConverter = 
config.getConfiguredInstance(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, 
Converter.class);
+        
this.internalKeyConverter.configure(config.originalsWithPrefix("internal.key.converter."),
 true);
+        this.internalValueConverter = 
config.getConfiguredInstance(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG,
 Converter.class);
+        
this.internalValueConverter.configure(config.originalsWithPrefix("internal.value.converter."),
 false);
+
+        this.offsetBackingStore = offsetBackingStore;
+        this.offsetBackingStore.configure(config.originals());
+    }
+
+    public void start() {
+        log.info("Worker starting");
+
+        Map<String, Object> producerProps = new HashMap<>();
+        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
Utils.join(config.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ","));
+        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArraySerializer");
+        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArraySerializer");
+        producerProps.putAll(config.unusedConfigs());
+
+        producer = new KafkaProducer<>(producerProps);
+
+        offsetBackingStore.start();
+        sourceTaskOffsetCommitter = new SourceTaskOffsetCommitter(time, 
config);
+
+        log.info("Worker started");
+    }
+
+    public void stop() {
+        log.info("Worker stopping");
+
+        long started = time.milliseconds();
+        long limit = started + 
config.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG);
+
+        for (Map.Entry<String, Connector> entry : connectors.entrySet()) {
+            Connector conn = entry.getValue();
+            log.warn("Shutting down connector {} uncleanly; herder should have 
shut down connectors before the" +
+                    "Worker is stopped.", conn);
+            try {
+                conn.stop();
+            } catch (ConnectException e) {
+                log.error("Error while shutting down connector " + conn, e);
+            }
+        }
+
+        for (Map.Entry<ConnectorTaskId, WorkerTask> entry : tasks.entrySet()) {
+            WorkerTask task = entry.getValue();
+            log.warn("Shutting down task {} uncleanly; herder should have shut 
down "
+                    + "tasks before the Worker is stopped.", task);
+            try {
+                task.stop();
+            } catch (ConnectException e) {
+                log.error("Error while shutting down task " + task, e);
+            }
+        }
+
+        for (Map.Entry<ConnectorTaskId, WorkerTask> entry : tasks.entrySet()) {
+            WorkerTask task = entry.getValue();
+            log.debug("Waiting for task {} to finish shutting down", task);
+            if (!task.awaitStop(Math.max(limit - time.milliseconds(), 0)))
+                log.error("Graceful shutdown of task {} failed.", task);
+            task.close();
+        }
+
+        long timeoutMs = limit - time.milliseconds();
+        sourceTaskOffsetCommitter.close(timeoutMs);
+
+        offsetBackingStore.stop();
+
+        log.info("Worker stopped");
+    }
+
+    public WorkerConfig config() {
+        return config;
+    }
+
+    /**
+     * Add a new connector.
+     * @param connConfig connector configuration
+     * @param ctx context for the connector
+     */
+    public void addConnector(ConnectorConfig connConfig, ConnectorContext ctx) 
{
+        String connName = connConfig.getString(ConnectorConfig.NAME_CONFIG);
+        Class<?> maybeConnClass = 
connConfig.getClass(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+        log.info("Creating connector {} of type {}", connName, 
maybeConnClass.getName());
+
+        Class<? extends Connector> connClass;
+        try {
+            connClass = maybeConnClass.asSubclass(Connector.class);
+        } catch (ClassCastException e) {
+            throw new ConnectException("Specified class is not a subclass of 
Connector: " + maybeConnClass.getName());
+        }
+
+        if (connectors.containsKey(connName))
+            throw new ConnectException("Connector with name " + connName + " 
already exists");
+
+        final Connector connector = instantiateConnector(connClass);
+        log.info("Instantiated connector {} with version {} of type {}", 
connName, connector.version(), connClass.getName());
+        connector.initialize(ctx);
+        try {
+            connector.start(connConfig.originalsStrings());
+        } catch (ConnectException e) {
+            throw new ConnectException("Connector threw an exception while 
starting", e);
+        }
+
+        connectors.put(connName, connector);
+
+        log.info("Finished creating connector {}", connName);
+    }
+
+    private static Connector instantiateConnector(Class<? extends Connector> 
connClass) {
+        try {
+            return Utils.newInstance(connClass);
+        } catch (Throwable t) {
+            // Catches normal exceptions due to instantiation errors as well 
as any runtime errors that
+            // may be caused by user code
+            throw new ConnectException("Failed to create connector instance", 
t);
+        }
+    }
+
+    public List<Map<String, String>> connectorTaskConfigs(String connName, int 
maxTasks, List<String> sinkTopics) {
+        log.trace("Reconfiguring connector tasks for {}", connName);
+
+        Connector connector = connectors.get(connName);
+        if (connector == null)
+            throw new ConnectException("Connector " + connName + " not found 
in this worker.");
+
+        List<Map<String, String>> result = new ArrayList<>();
+        String taskClassName = connector.taskClass().getName();
+        for (Map<String, String> taskProps : connector.taskConfigs(maxTasks)) {
+            Map<String, String> taskConfig = new HashMap<>(taskProps); // 
Ensure we don't modify the connector's copy of the config
+            taskConfig.put(TaskConfig.TASK_CLASS_CONFIG, taskClassName);
+            if (sinkTopics != null)
+                taskConfig.put(SinkTask.TOPICS_CONFIG, Utils.join(sinkTopics, 
","));
+            result.add(taskConfig);
+        }
+        return result;
+    }
+
+    public void stopConnector(String connName) {
+        log.info("Stopping connector {}", connName);
+
+        Connector connector = connectors.get(connName);
+        if (connector == null)
+            throw new ConnectException("Connector " + connName + " not found 
in this worker.");
+
+        try {
+            connector.stop();
+        } catch (ConnectException e) {
+            log.error("Error shutting down connector {}: ", connector, e);
+        }
+
+        connectors.remove(connName);
+
+        log.info("Stopped connector {}", connName);
+    }
+
+    /**
+     * Get the IDs of the connectors currently running in this worker.
+     */
+    public Set<String> connectorNames() {
+        return connectors.keySet();
+    }
+
+    /**
+     * Add a new task.
+     * @param id Globally unique ID for this task.
+     * @param taskConfig the parsed task configuration
+     */
+    public void addTask(ConnectorTaskId id, TaskConfig taskConfig) {
+        log.info("Creating task {}", id);
+
+        if (tasks.containsKey(id)) {
+            String msg = "Task already exists in this worker; the herder 
should not have requested "
+                    + "that this : " + id;
+            log.error(msg);
+            throw new ConnectException(msg);
+        }
+
+        Class<? extends Task> taskClass = 
taskConfig.getClass(TaskConfig.TASK_CLASS_CONFIG).asSubclass(Task.class);
+        final Task task = instantiateTask(taskClass);
+        log.info("Instantiated task {} with version {} of type {}", id, 
task.version(), taskClass.getName());
+
+        // Decide which type of worker task we need based on the type of task.
+        final WorkerTask workerTask;
+        if (task instanceof SourceTask) {
+            SourceTask sourceTask = (SourceTask) task;
+            OffsetStorageReader offsetReader = new 
OffsetStorageReaderImpl(offsetBackingStore, id.connector(),
+                    internalKeyConverter, internalValueConverter);
+            OffsetStorageWriter offsetWriter = new 
OffsetStorageWriter(offsetBackingStore, id.connector(),
+                    internalKeyConverter, internalValueConverter);
+            workerTask = new WorkerSourceTask(id, sourceTask, keyConverter, 
valueConverter, producer,
+                    offsetReader, offsetWriter, config, time);
+        } else if (task instanceof SinkTask) {
+            workerTask = new WorkerSinkTask(id, (SinkTask) task, config, 
keyConverter, valueConverter, time);
+        } else {
+            log.error("Tasks must be a subclass of either SourceTask or 
SinkTask", task);
+            throw new ConnectException("Tasks must be a subclass of either 
SourceTask or SinkTask");
+        }
+
+        // Start the task before adding modifying any state, any exceptions 
are caught higher up the
+        // call chain and there's no cleanup to do here
+        workerTask.start(taskConfig.originalsStrings());
+        if (task instanceof SourceTask) {
+            WorkerSourceTask workerSourceTask = (WorkerSourceTask) workerTask;
+            sourceTaskOffsetCommitter.schedule(id, workerSourceTask);
+        }
+        tasks.put(id, workerTask);
+    }
+
+    private static Task instantiateTask(Class<? extends Task> taskClass) {
+        try {
+            return Utils.newInstance(taskClass);
+        } catch (KafkaException e) {
+            throw new ConnectException("Task class not found", e);
+        }
+    }
+
+    public void stopTask(ConnectorTaskId id) {
+        log.info("Stopping task {}", id);
+
+        WorkerTask task = getTask(id);
+        if (task instanceof WorkerSourceTask)
+            sourceTaskOffsetCommitter.remove(id);
+        task.stop();
+        if 
(!task.awaitStop(config.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG)))
+            log.error("Graceful stop of task {} failed.", task);
+        task.close();
+        tasks.remove(id);
+    }
+
+    /**
+     * Get the IDs of the tasks currently running in this worker.
+     */
+    public Set<ConnectorTaskId> taskIds() {
+        return tasks.keySet();
+    }
+
+    private WorkerTask getTask(ConnectorTaskId id) {
+        WorkerTask task = tasks.get(id);
+        if (task == null) {
+            log.error("Task not found: " + id);
+            throw new ConnectException("Task not found: " + id);
+        }
+        return task;
+    }
+
+    public Converter getInternalKeyConverter() {
+        return internalKeyConverter;
+    }
+
+    public Converter getInternalValueConverter() {
+        return internalValueConverter;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
new file mode 100644
index 0000000..4ecacbb
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.common.config.ConfigDef.Type;
+
+import java.util.Map;
+
+/**
+ * Common base class providing configuration for Kafka Connect workers, 
whether standalone or distributed.
+ */
+@InterfaceStability.Unstable
+public class WorkerConfig extends AbstractConfig {
+
+    public static final String CLUSTER_CONFIG = "cluster";
+    private static final String CLUSTER_CONFIG_DOC =
+            "ID for this cluster, which is used to provide a namespace so 
multiple Kafka Connect clusters "
+                    + "or instances may co-exist while sharing a single Kafka 
cluster.";
+    public static final String CLUSTER_DEFAULT = "connect";
+
+    public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";
+    public static final String BOOTSTRAP_SERVERS_DOC
+            = "A list of host/port pairs to use for establishing the initial 
connection to the Kafka "
+            + "cluster. The client will make use of all servers irrespective 
of which servers are "
+            + "specified here for bootstrapping&mdash;this list only impacts 
the initial hosts used "
+            + "to discover the full set of servers. This list should be in the 
form "
+            + "<code>host1:port1,host2:port2,...</code>. Since these servers 
are just used for the "
+            + "initial connection to discover the full cluster membership 
(which may change "
+            + "dynamically), this list need not contain the full set of 
servers (you may want more "
+            + "than one, though, in case a server is down).";
+    public static final String BOOTSTRAP_SERVERS_DEFAULT = "localhost:9092";
+
+    public static final String KEY_CONVERTER_CLASS_CONFIG = "key.converter";
+    public static final String KEY_CONVERTER_CLASS_DOC =
+            "Converter class for key Connect data that implements the 
<code>Converter</code> interface.";
+
+    public static final String VALUE_CONVERTER_CLASS_CONFIG = 
"value.converter";
+    public static final String VALUE_CONVERTER_CLASS_DOC =
+            "Converter class for value Connect data that implements the 
<code>Converter</code> interface.";
+
+    public static final String INTERNAL_KEY_CONVERTER_CLASS_CONFIG = 
"internal.key.converter";
+    public static final String INTERNAL_KEY_CONVERTER_CLASS_DOC =
+            "Converter class for internal key Connect data that implements the 
<code>Converter</code> interface. Used for converting data like offsets and 
configs.";
+
+    public static final String INTERNAL_VALUE_CONVERTER_CLASS_CONFIG = 
"internal.value.converter";
+    public static final String INTERNAL_VALUE_CONVERTER_CLASS_DOC =
+            "Converter class for offset value Connect data that implements the 
<code>Converter</code> interface. Used for converting data like offsets and 
configs.";
+
+    public static final String TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG
+            = "task.shutdown.graceful.timeout.ms";
+    private static final String TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DOC =
+            "Amount of time to wait for tasks to shutdown gracefully. This is 
the total amount of time,"
+                    + " not per task. All task have shutdown triggered, then 
they are waited on sequentially.";
+    private static final String TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DEFAULT = 
"5000";
+
+    public static final String OFFSET_COMMIT_INTERVAL_MS_CONFIG = 
"offset.flush.interval.ms";
+    private static final String OFFSET_COMMIT_INTERVAL_MS_DOC
+            = "Interval at which to try committing offsets for tasks.";
+    public static final long OFFSET_COMMIT_INTERVAL_MS_DEFAULT = 60000L;
+
+    public static final String OFFSET_COMMIT_TIMEOUT_MS_CONFIG = 
"offset.flush.timeout.ms";
+    private static final String OFFSET_COMMIT_TIMEOUT_MS_DOC
+            = "Maximum number of milliseconds to wait for records to flush and 
partition offset data to be"
+            + " committed to offset storage before cancelling the process and 
restoring the offset "
+            + "data to be committed in a future attempt.";
+    public static final long OFFSET_COMMIT_TIMEOUT_MS_DEFAULT = 5000L;
+
+    public static final String REST_HOST_NAME_CONFIG = "rest.host.name";
+    private static final String REST_HOST_NAME_DOC
+            = "Hostname for the REST API. If this is set, it will only bind to 
this interface.";
+
+    public static final String REST_PORT_CONFIG = "rest.port";
+    private static final String REST_PORT_DOC
+            = "Port for the REST API to listen on.";
+    public static final int REST_PORT_DEFAULT = 8083;
+
+    public static final String REST_ADVERTISED_HOST_NAME_CONFIG = 
"rest.advertised.host.name";
+    private static final String REST_ADVERTISED_HOST_NAME_DOC
+            = "If this is set, this is the hostname that will be given out to 
other workers to connect to.";
+
+    public static final String REST_ADVERTISED_PORT_CONFIG = 
"rest.advertised.port";
+    private static final String REST_ADVERTISED_PORT_DOC
+            = "If this is set, this is the port that will be given out to 
other workers to connect to.";
+
+    /**
+     * Get a basic ConfigDef for a WorkerConfig. This includes all the common 
settings. Subclasses can use this to
+     * bootstrap their own ConfigDef.
+     * @return a ConfigDef with all the common options specified
+     */
+    protected static ConfigDef baseConfigDef() {
+        return new ConfigDef()
+                .define(CLUSTER_CONFIG, Type.STRING, CLUSTER_DEFAULT, 
Importance.HIGH, CLUSTER_CONFIG_DOC)
+                .define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, 
BOOTSTRAP_SERVERS_DEFAULT,
+                        Importance.HIGH, BOOTSTRAP_SERVERS_DOC)
+                .define(KEY_CONVERTER_CLASS_CONFIG, Type.CLASS,
+                        Importance.HIGH, KEY_CONVERTER_CLASS_DOC)
+                .define(VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS,
+                        Importance.HIGH, VALUE_CONVERTER_CLASS_DOC)
+                .define(INTERNAL_KEY_CONVERTER_CLASS_CONFIG, Type.CLASS,
+                        Importance.HIGH, INTERNAL_KEY_CONVERTER_CLASS_DOC)
+                .define(INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS,
+                        Importance.HIGH, INTERNAL_VALUE_CONVERTER_CLASS_DOC)
+                .define(TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG, Type.LONG,
+                        TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DEFAULT, 
Importance.LOW,
+                        TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DOC)
+                .define(OFFSET_COMMIT_INTERVAL_MS_CONFIG, Type.LONG, 
OFFSET_COMMIT_INTERVAL_MS_DEFAULT,
+                        Importance.LOW, OFFSET_COMMIT_INTERVAL_MS_DOC)
+                .define(OFFSET_COMMIT_TIMEOUT_MS_CONFIG, Type.LONG, 
OFFSET_COMMIT_TIMEOUT_MS_DEFAULT,
+                        Importance.LOW, OFFSET_COMMIT_TIMEOUT_MS_DOC)
+                .define(REST_HOST_NAME_CONFIG, Type.STRING, null, 
Importance.LOW, REST_HOST_NAME_DOC)
+                .define(REST_PORT_CONFIG, Type.INT, REST_PORT_DEFAULT, 
Importance.LOW, REST_PORT_DOC)
+                .define(REST_ADVERTISED_HOST_NAME_CONFIG, Type.STRING,  null, 
Importance.LOW, REST_ADVERTISED_HOST_NAME_DOC)
+                .define(REST_ADVERTISED_PORT_CONFIG, Type.INT,  null, 
Importance.LOW, REST_ADVERTISED_PORT_DOC);
+    }
+
+    public WorkerConfig(ConfigDef definition, Map<String, String> props) {
+        super(definition, props);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
new file mode 100644
index 0000000..a4d4093
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -0,0 +1,370 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.errors.RetriableException;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * WorkerTask that uses a SinkTask to export data from Kafka.
+ */
+class WorkerSinkTask implements WorkerTask {
+    private static final Logger log = 
LoggerFactory.getLogger(WorkerSinkTask.class);
+
+    private final ConnectorTaskId id;
+    private final SinkTask task;
+    private final WorkerConfig workerConfig;
+    private final Time time;
+    private final Converter keyConverter;
+    private final Converter valueConverter;
+    private WorkerSinkTaskThread workThread;
+    private Map<String, String> taskProps;
+    private KafkaConsumer<byte[], byte[]> consumer;
+    private WorkerSinkTaskContext context;
+    private boolean started;
+    private final List<SinkRecord> messageBatch;
+    private Map<TopicPartition, OffsetAndMetadata> lastCommittedOffsets;
+    private Map<TopicPartition, OffsetAndMetadata> currentOffsets;
+    private boolean pausedForRedelivery;
+
+    public WorkerSinkTask(ConnectorTaskId id, SinkTask task, WorkerConfig 
workerConfig,
+                          Converter keyConverter, Converter valueConverter, 
Time time) {
+        this.id = id;
+        this.task = task;
+        this.workerConfig = workerConfig;
+        this.keyConverter = keyConverter;
+        this.valueConverter = valueConverter;
+        this.time = time;
+        this.started = false;
+        this.messageBatch = new ArrayList<>();
+        this.currentOffsets = new HashMap<>();
+        this.pausedForRedelivery = false;
+    }
+
+    @Override
+    public void start(Map<String, String> props) {
+        taskProps = props;
+        consumer = createConsumer();
+        context = new WorkerSinkTaskContext(consumer);
+
+        workThread = createWorkerThread();
+        workThread.start();
+    }
+
+    @Override
+    public void stop() {
+        // Offset commit is handled upon exit in work thread
+        if (workThread != null)
+            workThread.startGracefulShutdown();
+        consumer.wakeup();
+    }
+
+    @Override
+    public boolean awaitStop(long timeoutMs) {
+        boolean success = true;
+        if (workThread != null) {
+            try {
+                success = workThread.awaitShutdown(timeoutMs, 
TimeUnit.MILLISECONDS);
+                if (!success)
+                    workThread.forceShutdown();
+            } catch (InterruptedException e) {
+                success = false;
+            }
+        }
+        task.stop();
+        return success;
+    }
+
+    @Override
+    public void close() {
+        // FIXME Kafka needs to add a timeout parameter here for us to 
properly obey the timeout
+        // passed in
+        if (consumer != null)
+            consumer.close();
+    }
+
+    /**
+     * Preforms initial join process for consumer group, ensures we have an 
assignment, and initializes + starts the
+     * SinkTask.
+     *
+     * @returns true if successful, false if joining the consumer group was 
interrupted
+     */
+    public boolean joinConsumerGroupAndStart() {
+        String topicsStr = taskProps.get(SinkTask.TOPICS_CONFIG);
+        if (topicsStr == null || topicsStr.isEmpty())
+            throw new ConnectException("Sink tasks require a list of topics.");
+        String[] topics = topicsStr.split(",");
+        log.debug("Task {} subscribing to topics {}", id, topics);
+        consumer.subscribe(Arrays.asList(topics), new HandleRebalance());
+
+        // Ensure we're in the group so that if start() wants to rewind 
offsets, it will have an assignment of partitions
+        // to work with. Any rewinding will be handled immediately when 
polling starts.
+        try {
+            consumer.poll(0);
+        } catch (WakeupException e) {
+            log.error("Sink task {} was stopped before completing join group. 
Task initialization and start is being skipped", this);
+            return false;
+        }
+        task.initialize(context);
+        task.start(taskProps);
+        log.info("Sink task {} finished initialization and start", this);
+        started = true;
+        return true;
+    }
+
+    /** Poll for new messages with the given timeout. Should only be invoked 
by the worker thread. */
+    public void poll(long timeoutMs) {
+        try {
+            rewind();
+            long retryTimeout = context.timeout();
+            if (retryTimeout > 0) {
+                timeoutMs = Math.min(timeoutMs, retryTimeout);
+                context.timeout(-1L);
+            }
+
+            log.trace("{} polling consumer with timeout {} ms", id, timeoutMs);
+            ConsumerRecords<byte[], byte[]> msgs = consumer.poll(timeoutMs);
+            assert messageBatch.isEmpty() || msgs.isEmpty();
+            log.trace("{} polling returned {} messages", id, msgs.count());
+
+            convertMessages(msgs);
+            deliverMessages();
+        } catch (WakeupException we) {
+            log.trace("{} consumer woken up", id);
+        }
+    }
+
+    /**
+     * Starts an offset commit by flushing outstanding messages from the task 
and then starting
+     * the write commit. This should only be invoked by the 
WorkerSinkTaskThread.
+     **/
+    public void commitOffsets(boolean sync, final int seqno) {
+        log.info("{} Committing offsets", this);
+
+        final Map<TopicPartition, OffsetAndMetadata> offsets = new 
HashMap<>(currentOffsets);
+
+        try {
+            task.flush(offsets);
+        } catch (Throwable t) {
+            log.error("Commit of {} offsets failed due to exception while 
flushing: {}", this, t);
+            log.error("Rewinding offsets to last committed offsets");
+            for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : 
lastCommittedOffsets.entrySet()) {
+                log.debug("{} Rewinding topic partition {} to offset {}", id, 
entry.getKey(), entry.getValue().offset());
+                consumer.seek(entry.getKey(), entry.getValue().offset());
+            }
+            currentOffsets = new HashMap<>(lastCommittedOffsets);
+            workThread.onCommitCompleted(t, seqno);
+            return;
+        }
+
+        if (sync) {
+            try {
+                consumer.commitSync(offsets);
+                lastCommittedOffsets = offsets;
+                workThread.onCommitCompleted(null, seqno);
+            } catch (KafkaException e) {
+                workThread.onCommitCompleted(e, seqno);
+            }
+        } else {
+            OffsetCommitCallback cb = new OffsetCommitCallback() {
+                @Override
+                public void onComplete(Map<TopicPartition, OffsetAndMetadata> 
offsets, Exception error) {
+                    lastCommittedOffsets = offsets;
+                    workThread.onCommitCompleted(error, seqno);
+                }
+            };
+            consumer.commitAsync(offsets, cb);
+        }
+    }
+
+    public Time time() {
+        return time;
+    }
+
+    public WorkerConfig workerConfig() {
+        return workerConfig;
+    }
+
+    private KafkaConsumer<byte[], byte[]> createConsumer() {
+        // Include any unknown worker configs so consumer configs can be set 
globally on the worker
+        // and through to the task
+        Map<String, Object> props = workerConfig.unusedConfigs();
+        props.put(ConsumerConfig.GROUP_ID_CONFIG, "connect-" + id.connector());
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
+                
Utils.join(workerConfig.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ","));
+        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
+
+        KafkaConsumer<byte[], byte[]> newConsumer;
+        try {
+            newConsumer = new KafkaConsumer<>(props);
+        } catch (Throwable t) {
+            throw new ConnectException("Failed to create consumer", t);
+        }
+
+        return newConsumer;
+    }
+
+    private WorkerSinkTaskThread createWorkerThread() {
+        return new WorkerSinkTaskThread(this, "WorkerSinkTask-" + id, time, 
workerConfig);
+    }
+
+    private void convertMessages(ConsumerRecords<byte[], byte[]> msgs) {
+        for (ConsumerRecord<byte[], byte[]> msg : msgs) {
+            log.trace("Consuming message with key {}, value {}", msg.key(), 
msg.value());
+            SchemaAndValue keyAndSchema = 
keyConverter.toConnectData(msg.topic(), msg.key());
+            SchemaAndValue valueAndSchema = 
valueConverter.toConnectData(msg.topic(), msg.value());
+            messageBatch.add(
+                    new SinkRecord(msg.topic(), msg.partition(),
+                            keyAndSchema.schema(), keyAndSchema.value(),
+                            valueAndSchema.schema(), valueAndSchema.value(),
+                            msg.offset())
+            );
+        }
+    }
+
+    private void deliverMessages() {
+        // Finally, deliver this batch to the sink
+        try {
+            // Since we reuse the messageBatch buffer, ensure we give the task 
its own copy
+            task.put(new ArrayList<>(messageBatch));
+            for (SinkRecord record : messageBatch)
+                currentOffsets.put(new TopicPartition(record.topic(), 
record.kafkaPartition()),
+                        new OffsetAndMetadata(record.kafkaOffset() + 1));
+            messageBatch.clear();
+            // If we had paused all consumer topic partitions to try to 
redeliver data, then we should resume any that
+            // the task had not explicitly paused
+            if (pausedForRedelivery) {
+                for (TopicPartition tp : consumer.assignment())
+                    if (!context.pausedPartitions().contains(tp))
+                        consumer.resume(tp);
+                pausedForRedelivery = false;
+            }
+        } catch (RetriableException e) {
+            log.error("RetriableException from SinkTask {}: {}", id, e);
+            // If we're retrying a previous batch, make sure we've paused all 
topic partitions so we don't get new data,
+            // but will still be able to poll in order to handle 
user-requested timeouts, keep group membership, etc.
+            pausedForRedelivery = true;
+            for (TopicPartition tp : consumer.assignment())
+                consumer.pause(tp);
+            // Let this exit normally, the batch will be reprocessed on the 
next loop.
+        } catch (Throwable t) {
+            log.error("Task {} threw an uncaught and unrecoverable exception", 
id);
+            log.error("Task is being killed and will not recover until 
manually restarted:", t);
+            throw new ConnectException("Exiting WorkerSinkTask due to 
unrecoverable exception.");
+        }
+    }
+
+    private void rewind() {
+        Map<TopicPartition, Long> offsets = context.offsets();
+        if (offsets.isEmpty()) {
+            return;
+        }
+        for (TopicPartition tp: offsets.keySet()) {
+            Long offset = offsets.get(tp);
+            if (offset != null) {
+                log.trace("Rewind {} to offset {}.", tp, offset);
+                consumer.seek(tp, offset);
+                lastCommittedOffsets.put(tp, new OffsetAndMetadata(offset));
+                currentOffsets.put(tp, new OffsetAndMetadata(offset));
+            }
+        }
+        context.clearOffsets();
+    }
+
+    private class HandleRebalance implements ConsumerRebalanceListener {
+        @Override
+        public void onPartitionsAssigned(Collection<TopicPartition> 
partitions) {
+            lastCommittedOffsets = new HashMap<>();
+            currentOffsets = new HashMap<>();
+            for (TopicPartition tp : partitions) {
+                long pos = consumer.position(tp);
+                lastCommittedOffsets.put(tp, new OffsetAndMetadata(pos));
+                currentOffsets.put(tp, new OffsetAndMetadata(pos));
+                log.debug("{} assigned topic partition {} with offset {}", id, 
tp, pos);
+            }
+
+            // If we paused everything for redelivery (which is no longer 
relevant since we discarded the data), make
+            // sure anything we paused that the task didn't request to be 
paused *and* which we still own is resumed.
+            // Also make sure our tracking of paused partitions is updated to 
remove any partitions we no longer own.
+            if (pausedForRedelivery) {
+                pausedForRedelivery = false;
+                Set<TopicPartition> assigned = new HashSet<>(partitions);
+                Set<TopicPartition> taskPaused = context.pausedPartitions();
+
+                for (TopicPartition tp : partitions) {
+                    if (!taskPaused.contains(tp))
+                        consumer.resume(tp);
+                }
+
+                Iterator<TopicPartition> tpIter = taskPaused.iterator();
+                while (tpIter.hasNext()) {
+                    TopicPartition tp = tpIter.next();
+                    if (assigned.contains(tp))
+                        tpIter.remove();
+                }
+            }
+
+            // Instead of invoking the assignment callback on initialization, 
we guarantee the consumer is ready upon
+            // task start. Since this callback gets invoked during that 
initial setup before we've started the task, we
+            // need to guard against invoking the user's callback method 
during that period.
+            if (started)
+                task.onPartitionsAssigned(partitions);
+        }
+
+        @Override
+        public void onPartitionsRevoked(Collection<TopicPartition> partitions) 
{
+            task.onPartitionsRevoked(partitions);
+            commitOffsets(true, -1);
+            // Make sure we don't have any leftover data since offsets will be 
reset to committed positions
+            messageBatch.clear();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java
new file mode 100644
index 0000000..06f4838
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License.  You may obtain a
+ * copy of the License at <p/> http://www.apache.org/licenses/LICENSE-2.0 <p/> 
Unless required by
+ * applicable law or agreed to in writing, software distributed under the 
License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See
+ * the License for the specific language governing permissions and limitations 
under the License.
+ **/
+
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.errors.IllegalWorkerStateException;
+import org.apache.kafka.connect.sink.SinkTaskContext;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+public class WorkerSinkTaskContext implements SinkTaskContext {
+    private Map<TopicPartition, Long> offsets;
+    private long timeoutMs;
+    private KafkaConsumer<byte[], byte[]> consumer;
+    private final Set<TopicPartition> pausedPartitions;
+
+    public WorkerSinkTaskContext(KafkaConsumer<byte[], byte[]> consumer) {
+        this.offsets = new HashMap<>();
+        this.timeoutMs = -1L;
+        this.consumer = consumer;
+        this.pausedPartitions = new HashSet<>();
+    }
+
+    @Override
+    public void offset(Map<TopicPartition, Long> offsets) {
+        this.offsets.putAll(offsets);
+    }
+
+    @Override
+    public void offset(TopicPartition tp, long offset) {
+        offsets.put(tp, offset);
+    }
+
+    public void clearOffsets() {
+        offsets.clear();
+    }
+
+    /**
+     * Get offsets that the SinkTask has submitted to be reset. Used by the 
Kafka Connect framework.
+     * @return the map of offsets
+     */
+    public Map<TopicPartition, Long> offsets() {
+        return offsets;
+    }
+
+    @Override
+    public void timeout(long timeoutMs) {
+        this.timeoutMs = timeoutMs;
+    }
+
+    /**
+     * Get the timeout in milliseconds set by SinkTasks. Used by the Kafka 
Connect framework.
+     * @return the backoff timeout in milliseconds.
+     */
+    public long timeout() {
+        return timeoutMs;
+    }
+
+    @Override
+    public Set<TopicPartition> assignment() {
+        if (consumer == null) {
+            throw new IllegalWorkerStateException("SinkTaskContext may not be 
used to look up partition assignment until the task is initialized");
+        }
+        return consumer.assignment();
+    }
+
+    @Override
+    public void pause(TopicPartition... partitions) {
+        if (consumer == null) {
+            throw new IllegalWorkerStateException("SinkTaskContext may not be 
used to pause consumption until the task is initialized");
+        }
+        try {
+            for (TopicPartition partition : partitions)
+                pausedPartitions.add(partition);
+            consumer.pause(partitions);
+        } catch (IllegalStateException e) {
+            throw new IllegalWorkerStateException("SinkTasks may not pause 
partitions that are not currently assigned to them.", e);
+        }
+    }
+
+    @Override
+    public void resume(TopicPartition... partitions) {
+        if (consumer == null) {
+            throw new IllegalWorkerStateException("SinkTaskContext may not be 
used to resume consumption until the task is initialized");
+        }
+        try {
+            for (TopicPartition partition : partitions)
+                pausedPartitions.remove(partition);
+            consumer.resume(partitions);
+        } catch (IllegalStateException e) {
+            throw new IllegalWorkerStateException("SinkTasks may not resume 
partitions that are not currently assigned to them.", e);
+        }
+    }
+
+    public Set<TopicPartition> pausedPartitions() {
+        return pausedPartitions;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThread.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThread.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThread.java
new file mode 100644
index 0000000..e776f08
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThread.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.util.ShutdownableThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Worker thread for a WorkerSinkTask. These classes are very tightly coupled, 
but separated to
+ * simplify testing.
+ */
+class WorkerSinkTaskThread extends ShutdownableThread {
+    private static final Logger log = 
LoggerFactory.getLogger(WorkerSinkTask.class);
+
+    private final WorkerSinkTask task;
+    private long nextCommit;
+    private boolean committing;
+    private int commitSeqno;
+    private long commitStarted;
+    private int commitFailures;
+
+    public WorkerSinkTaskThread(WorkerSinkTask task, String name, Time time,
+                                WorkerConfig workerConfig) {
+        super(name);
+        this.task = task;
+        this.nextCommit = time.milliseconds() +
+                
workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG);
+        this.committing = false;
+        this.commitSeqno = 0;
+        this.commitStarted = -1;
+        this.commitFailures = 0;
+    }
+
+    @Override
+    public void execute() {
+        // Try to join and start. If we're interrupted before this completes, 
bail.
+        if (!task.joinConsumerGroupAndStart())
+            return;
+
+        while (getRunning()) {
+            iteration();
+        }
+
+        // Make sure any uncommitted data has committed
+        task.commitOffsets(true, -1);
+    }
+
+    public void iteration() {
+        long now = task.time().milliseconds();
+
+        // Maybe commit
+        if (!committing && now >= nextCommit) {
+            synchronized (this) {
+                committing = true;
+                commitSeqno += 1;
+                commitStarted = now;
+            }
+            task.commitOffsets(false, commitSeqno);
+            nextCommit += 
task.workerConfig().getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG);
+        }
+
+        // Check for timed out commits
+        long commitTimeout = commitStarted + task.workerConfig().getLong(
+                WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG);
+        if (committing && now >= commitTimeout) {
+            log.warn("Commit of {} offsets timed out", this);
+            commitFailures++;
+            committing = false;
+        }
+
+        // And process messages
+        long timeoutMs = Math.max(nextCommit - now, 0);
+        task.poll(timeoutMs);
+    }
+
+    public void onCommitCompleted(Throwable error, long seqno) {
+        synchronized (this) {
+            if (commitSeqno != seqno) {
+                log.debug("Got callback for timed out commit {}: {}, but most 
recent commit is {}",
+                        this,
+                        seqno, commitSeqno);
+            } else {
+                if (error != null) {
+                    log.error("Commit of {} offsets threw an unexpected 
exception: ", this, error);
+                    commitFailures++;
+                } else {
+                    log.debug("Finished {} offset commit successfully in {} 
ms",
+                            this, task.time().milliseconds() - commitStarted);
+                    commitFailures = 0;
+                }
+                committing = false;
+            }
+        }
+    }
+
+    public int commitFailures() {
+        return commitFailures;
+    }
+}

Reply via email to