http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/CreateConnectorRequest.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/CreateConnectorRequest.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/CreateConnectorRequest.java
new file mode 100644
index 0000000..c8e0f6f
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/CreateConnectorRequest.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime.rest.entities;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Map;
+import java.util.Objects;
+
+public class CreateConnectorRequest {
+    private final String name;
+    private final Map<String, String> config;
+
+    @JsonCreator
+    public CreateConnectorRequest(@JsonProperty("name") String name, 
@JsonProperty("config") Map<String, String> config) {
+        this.name = name;
+        this.config = config;
+    }
+
+    @JsonProperty
+    public String name() {
+        return name;
+    }
+
+    @JsonProperty
+    public Map<String, String> config() {
+        return config;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        CreateConnectorRequest that = (CreateConnectorRequest) o;
+        return Objects.equals(name, that.name) &&
+                Objects.equals(config, that.config);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(name, config);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ErrorMessage.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ErrorMessage.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ErrorMessage.java
new file mode 100644
index 0000000..493b00d
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ErrorMessage.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime.rest.entities;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+
+/**
+ * Standard error format for all REST API failures. These are generated 
automatically by
+ * {@link ConnectExceptionMapper} in response to uncaught
+ * {@link ConnectException}s.
+ */
+public class ErrorMessage {
+    private final int errorCode;
+    private final String message;
+
+    @JsonCreator
+    public ErrorMessage(@JsonProperty("error_code") int errorCode, 
@JsonProperty("message") String message) {
+        this.errorCode = errorCode;
+        this.message = message;
+    }
+
+    @JsonProperty("error_code")
+    public int errorCode() {
+        return errorCode;
+    }
+
+    @JsonProperty
+    public String message() {
+        return message;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        ErrorMessage that = (ErrorMessage) o;
+        return Objects.equals(errorCode, that.errorCode) &&
+                Objects.equals(message, that.message);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(errorCode, message);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ServerInfo.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ServerInfo.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ServerInfo.java
new file mode 100644
index 0000000..1d5e8ba
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ServerInfo.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime.rest.entities;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.common.utils.AppInfoParser;
+
+public class ServerInfo {
+    private String version;
+    private String commit;
+
+    public ServerInfo() {
+        version = AppInfoParser.getVersion();
+        commit = AppInfoParser.getCommitId();
+    }
+
+    @JsonProperty
+    public String version() {
+        return version;
+    }
+
+    @JsonProperty
+    public String commit() {
+        return commit;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/TaskInfo.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/TaskInfo.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/TaskInfo.java
new file mode 100644
index 0000000..3d443a5
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/TaskInfo.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime.rest.entities;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+
+import java.util.Map;
+import java.util.Objects;
+
+public class TaskInfo {
+    private final ConnectorTaskId id;
+    private final Map<String, String> config;
+
+    public TaskInfo(ConnectorTaskId id, Map<String, String> config) {
+        this.id = id;
+        this.config = config;
+    }
+
+    @JsonProperty
+    public ConnectorTaskId id() {
+        return id;
+    }
+
+    @JsonProperty
+    public Map<String, String> config() {
+        return config;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        TaskInfo taskInfo = (TaskInfo) o;
+        return Objects.equals(id, taskInfo.id) &&
+                Objects.equals(config, taskInfo.config);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(id, config);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectExceptionMapper.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectExceptionMapper.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectExceptionMapper.java
new file mode 100644
index 0000000..67c38e7
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectExceptionMapper.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime.rest.errors;
+
+import org.apache.kafka.connect.errors.AlreadyExistsException;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.errors.NotFoundException;
+import org.apache.kafka.connect.runtime.rest.entities.ErrorMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.Response;
+import javax.ws.rs.ext.ExceptionMapper;
+
+public class ConnectExceptionMapper implements 
ExceptionMapper<ConnectException> {
+    private static final Logger log = 
LoggerFactory.getLogger(ConnectExceptionMapper.class);
+
+    @Override
+    public Response toResponse(ConnectException exception) {
+        log.debug("Uncaught exception in REST call: ", exception);
+
+        if (exception instanceof ConnectRestException) {
+            ConnectRestException restException = (ConnectRestException) 
exception;
+            return Response.status(restException.statusCode())
+                    .entity(new ErrorMessage(restException.errorCode(), 
restException.getMessage()))
+                    .build();
+        }
+
+        if (exception instanceof NotFoundException) {
+            return Response.status(Response.Status.NOT_FOUND)
+                    .entity(new 
ErrorMessage(Response.Status.NOT_FOUND.getStatusCode(), exception.getMessage()))
+                    .build();
+        }
+
+        if (exception instanceof AlreadyExistsException) {
+            return Response.status(Response.Status.CONFLICT)
+                    .entity(new 
ErrorMessage(Response.Status.CONFLICT.getStatusCode(), exception.getMessage()))
+                    .build();
+        }
+
+        return Response.status(Response.Status.INTERNAL_SERVER_ERROR)
+                .entity(new 
ErrorMessage(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), 
exception.getMessage()))
+                .build();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectRestException.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectRestException.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectRestException.java
new file mode 100644
index 0000000..5dcbcf4
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectRestException.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime.rest.errors;
+
+import org.apache.kafka.connect.errors.ConnectException;
+
+import javax.ws.rs.core.Response;
+
+public class ConnectRestException extends ConnectException {
+    private final int statusCode;
+    private final int errorCode;
+
+    public ConnectRestException(int statusCode, int errorCode, String message, 
Throwable t) {
+        super(message, t);
+        this.statusCode = statusCode;
+        this.errorCode = errorCode;
+    }
+
+    public ConnectRestException(Response.Status status, int errorCode, String 
message, Throwable t) {
+        this(status.getStatusCode(), errorCode, message, t);
+    }
+
+    public ConnectRestException(int statusCode, int errorCode, String message) 
{
+        this(statusCode, errorCode, message, null);
+    }
+
+    public ConnectRestException(Response.Status status, int errorCode, String 
message) {
+        this(status, errorCode, message, null);
+    }
+
+    public ConnectRestException(int statusCode, String message, Throwable t) {
+        this(statusCode, statusCode, message, t);
+    }
+
+    public ConnectRestException(Response.Status status, String message, 
Throwable t) {
+        this(status, status.getStatusCode(), message, t);
+    }
+
+    public ConnectRestException(int statusCode, String message) {
+        this(statusCode, statusCode, message, null);
+    }
+
+    public ConnectRestException(Response.Status status, String message) {
+        this(status.getStatusCode(), status.getStatusCode(), message, null);
+    }
+
+
+    public int statusCode() {
+        return statusCode;
+    }
+
+    public int errorCode() {
+        return errorCode;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
new file mode 100644
index 0000000..cea4360
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime.rest.resources;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.kafka.connect.runtime.ConnectorConfig;
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.distributed.NotLeaderException;
+import org.apache.kafka.connect.runtime.rest.RestServer;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
+import org.apache.kafka.connect.runtime.rest.entities.CreateConnectorRequest;
+import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
+import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
+import org.apache.kafka.connect.util.FutureCallback;
+
+import javax.servlet.ServletContext;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.net.URI;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+@Path("/connectors")
+@Produces(MediaType.APPLICATION_JSON)
+@Consumes(MediaType.APPLICATION_JSON)
+public class ConnectorsResource {
+    // TODO: This should not be so long. However, due to potentially long 
rebalances that may have to wait a full
+    // session timeout to complete, during which we cannot serve some 
requests. Ideally we could reduce this, but
+    // we need to consider all possible scenarios this could fail. It might be 
ok to fail with a timeout in rare cases,
+    // but currently a worker simply leaving the group can take this long as 
well.
+    private static final long REQUEST_TIMEOUT_MS = 90 * 1000;
+
+    private final Herder herder;
+    @javax.ws.rs.core.Context
+    private ServletContext context;
+
+    public ConnectorsResource(Herder herder) {
+        this.herder = herder;
+    }
+
+    @GET
+    @Path("/")
+    public Collection<String> listConnectors() throws Throwable {
+        FutureCallback<Collection<String>> cb = new FutureCallback<>();
+        herder.connectors(cb);
+        return completeOrForwardRequest(cb, "/connectors", "GET", null, new 
TypeReference<Collection<String>>() {
+        });
+    }
+
+    @POST
+    @Path("/")
+    public Response createConnector(final CreateConnectorRequest 
createRequest) throws Throwable {
+        String name = createRequest.name();
+        Map<String, String> configs = createRequest.config();
+        if (!configs.containsKey(ConnectorConfig.NAME_CONFIG))
+            configs.put(ConnectorConfig.NAME_CONFIG, name);
+
+        FutureCallback<Herder.Created<ConnectorInfo>> cb = new 
FutureCallback<>();
+        herder.putConnectorConfig(name, configs, false, cb);
+        Herder.Created<ConnectorInfo> info = completeOrForwardRequest(cb, 
"/connectors", "POST", createRequest,
+                new TypeReference<ConnectorInfo>() { }, new 
CreatedConnectorInfoTranslator());
+        return Response.created(URI.create("/connectors/" + 
name)).entity(info.result()).build();
+    }
+
+    @GET
+    @Path("/{connector}")
+    public ConnectorInfo getConnector(final @PathParam("connector") String 
connector) throws Throwable {
+        FutureCallback<ConnectorInfo> cb = new FutureCallback<>();
+        herder.connectorInfo(connector, cb);
+        return completeOrForwardRequest(cb, "/connectors/" + connector, "GET", 
null, new TypeReference<ConnectorInfo>() {
+        });
+    }
+
+    @GET
+    @Path("/{connector}/config")
+    public Map<String, String> getConnectorConfig(final 
@PathParam("connector") String connector) throws Throwable {
+        FutureCallback<Map<String, String>> cb = new FutureCallback<>();
+        herder.connectorConfig(connector, cb);
+        return completeOrForwardRequest(cb, "/connectors/" + connector + 
"/config", "GET", null, new TypeReference<Map<String, String>>() {
+        });
+    }
+
+    @PUT
+    @Path("/{connector}/config")
+    public Response putConnectorConfig(final @PathParam("connector") String 
connector,
+                                   final Map<String, String> connectorConfig) 
throws Throwable {
+        FutureCallback<Herder.Created<ConnectorInfo>> cb = new 
FutureCallback<>();
+        herder.putConnectorConfig(connector, connectorConfig, true, cb);
+        Herder.Created<ConnectorInfo> createdInfo = 
completeOrForwardRequest(cb, "/connectors/" + connector + "/config",
+                "PUT", connectorConfig, new TypeReference<ConnectorInfo>() { 
}, new CreatedConnectorInfoTranslator());
+        Response.ResponseBuilder response;
+        if (createdInfo.created())
+            response = Response.created(URI.create("/connectors/" + 
connector));
+        else
+            response = Response.ok();
+        return response.entity(createdInfo.result()).build();
+    }
+
+    @GET
+    @Path("/{connector}/tasks")
+    public List<TaskInfo> getTaskConfigs(final @PathParam("connector") String 
connector) throws Throwable {
+        FutureCallback<List<TaskInfo>> cb = new FutureCallback<>();
+        herder.taskConfigs(connector, cb);
+        return completeOrForwardRequest(cb, "/connectors/" + connector + 
"/tasks", "GET", null, new TypeReference<List<TaskInfo>>() {
+        });
+    }
+
+    @POST
+    @Path("/{connector}/tasks")
+    public void putTaskConfigs(final @PathParam("connector") String connector,
+                               final List<Map<String, String>> taskConfigs) 
throws Throwable {
+        FutureCallback<Void> cb = new FutureCallback<>();
+        herder.putTaskConfigs(connector, taskConfigs, cb);
+        completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks", 
"POST", taskConfigs);
+    }
+
+    @DELETE
+    @Path("/{connector}")
+    public void destroyConnector(final @PathParam("connector") String 
connector) throws Throwable {
+        FutureCallback<Herder.Created<ConnectorInfo>> cb = new 
FutureCallback<>();
+        herder.putConnectorConfig(connector, null, true, cb);
+        completeOrForwardRequest(cb, "/connectors/" + connector, "DELETE", 
null);
+    }
+
+    // Wait for a FutureCallback to complete. If it succeeds, return the 
parsed response. If it fails, try to forward the
+    // request to the leader.
+    private <T, U> T completeOrForwardRequest(
+            FutureCallback<T> cb, String path, String method, Object body, 
TypeReference<U> resultType,
+            Translator<T, U> translator) throws Throwable {
+        try {
+            return cb.get(REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            if (e.getCause() instanceof NotLeaderException) {
+                NotLeaderException notLeaderError = (NotLeaderException) 
e.getCause();
+                return 
translator.translate(RestServer.httpRequest(RestServer.urlJoin(notLeaderError.leaderUrl(),
 path), method, body, resultType));
+            }
+
+            throw e.getCause();
+        } catch (TimeoutException e) {
+            // This timeout is for the operation itself. None of the timeout 
error codes are relevant, so internal server
+            // error is the best option
+            throw new 
ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), 
"Request timed out");
+        } catch (InterruptedException e) {
+            throw new 
ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), 
"Request interrupted");
+        }
+    }
+
+    private <T> T completeOrForwardRequest(FutureCallback<T> cb, String path, 
String method, Object body, TypeReference<T> resultType) throws Throwable {
+        return completeOrForwardRequest(cb, path, method, body, resultType, 
new IdentityTranslator<T>());
+    }
+
+    private <T> T completeOrForwardRequest(FutureCallback<T> cb, String path, 
String method, Object body) throws Throwable {
+        return completeOrForwardRequest(cb, path, method, body, null, new 
IdentityTranslator<T>());
+    }
+
+    private interface Translator<T, U> {
+        T translate(RestServer.HttpResponse<U> response);
+    }
+
+    private class IdentityTranslator<T> implements Translator<T, T> {
+        @Override
+        public T translate(RestServer.HttpResponse<T> response) {
+            return response.body();
+        }
+    }
+
+    private class CreatedConnectorInfoTranslator implements 
Translator<Herder.Created<ConnectorInfo>, ConnectorInfo> {
+        @Override
+        public Herder.Created<ConnectorInfo> 
translate(RestServer.HttpResponse<ConnectorInfo> response) {
+            boolean created = response.status() == 201;
+            return new Herder.Created<>(created, response.body());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/RootResource.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/RootResource.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/RootResource.java
new file mode 100644
index 0000000..3364ffd
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/RootResource.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime.rest.resources;
+
+import org.apache.kafka.connect.runtime.rest.entities.ServerInfo;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+
+@Path("/")
+@Produces(MediaType.APPLICATION_JSON)
+public class RootResource {
+
+    @GET
+    @Path("/")
+    public ServerInfo serverInfo() {
+        return new ServerInfo();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneConfig.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneConfig.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneConfig.java
new file mode 100644
index 0000000..7cefe22
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneConfig.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime.standalone;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+
+import java.util.Map;
+
+public class StandaloneConfig extends WorkerConfig {
+    private static final ConfigDef CONFIG;
+
+    static {
+        CONFIG = baseConfigDef();
+    }
+
+    public StandaloneConfig(Map<String, String> props) {
+        super(CONFIG, props);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
new file mode 100644
index 0000000..89847ab
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime.standalone;
+
+import org.apache.kafka.connect.errors.AlreadyExistsException;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.errors.NotFoundException;
+import org.apache.kafka.connect.runtime.ConnectorConfig;
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.HerderConnectorContext;
+import org.apache.kafka.connect.runtime.TaskConfig;
+import org.apache.kafka.connect.runtime.Worker;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
+import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
+import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * Single process, in-memory "herder". Useful for a standalone Kafka Connect 
process.
+ */
+public class StandaloneHerder implements Herder {
+    private static final Logger log = 
LoggerFactory.getLogger(StandaloneHerder.class);
+
+    private final Worker worker;
+    private HashMap<String, ConnectorState> connectors = new HashMap<>();
+
+    public StandaloneHerder(Worker worker) {
+        this.worker = worker;
+    }
+
+    public synchronized void start() {
+        log.info("Herder starting");
+        log.info("Herder started");
+    }
+
+    public synchronized void stop() {
+        log.info("Herder stopping");
+
+        // There's no coordination/hand-off to do here since this is all 
standalone. Instead, we
+        // should just clean up the stuff we normally would, i.e. cleanly 
checkpoint and shutdown all
+        // the tasks.
+        for (String connName : new HashSet<>(connectors.keySet())) {
+            removeConnectorTasks(connName);
+            try {
+                worker.stopConnector(connName);
+            } catch (ConnectException e) {
+                log.error("Error shutting down connector {}: ", connName, e);
+            }
+        }
+        connectors.clear();
+
+        log.info("Herder stopped");
+    }
+
+    @Override
+    public synchronized void connectors(Callback<Collection<String>> callback) 
{
+        callback.onCompletion(null, new ArrayList<>(connectors.keySet()));
+    }
+
+    @Override
+    public synchronized void connectorInfo(String connName, 
Callback<ConnectorInfo> callback) {
+        ConnectorState state = connectors.get(connName);
+        if (state == null) {
+            callback.onCompletion(new NotFoundException("Connector " + 
connName + " not found"), null);
+            return;
+        }
+        callback.onCompletion(null, createConnectorInfo(state));
+    }
+
+    private ConnectorInfo createConnectorInfo(ConnectorState state) {
+        if (state == null)
+            return null;
+
+        List<ConnectorTaskId> taskIds = new ArrayList<>();
+        for (int i = 0; i < state.taskConfigs.size(); i++)
+            taskIds.add(new ConnectorTaskId(state.name, i));
+        return new ConnectorInfo(state.name, state.configOriginals, taskIds);
+    }
+
+    @Override
+    public void connectorConfig(String connName, final Callback<Map<String, 
String>> callback) {
+        // Subset of connectorInfo, so piggy back on that implementation
+        connectorInfo(connName, new Callback<ConnectorInfo>() {
+            @Override
+            public void onCompletion(Throwable error, ConnectorInfo result) {
+                if (error != null) {
+                    callback.onCompletion(error, null);
+                    return;
+                }
+                callback.onCompletion(null, result.config());
+            }
+        });
+    }
+
+    @Override
+    public synchronized void putConnectorConfig(String connName, final 
Map<String, String> config,
+                                                boolean allowReplace,
+                                                final 
Callback<Created<ConnectorInfo>> callback) {
+        try {
+            boolean created = false;
+            if (connectors.containsKey(connName)) {
+                if (!allowReplace) {
+                    callback.onCompletion(new 
AlreadyExistsException("Connector " + connName + " already exists"), null);
+                    return;
+                }
+                if (config == null) // Deletion, kill tasks as well
+                    removeConnectorTasks(connName);
+                worker.stopConnector(connName);
+                if (config == null)
+                    connectors.remove(connName);
+            } else {
+                if (config == null) {
+                    // Deletion, must already exist
+                    callback.onCompletion(new NotFoundException("Connector " + 
connName + " not found", null), null);
+                    return;
+                }
+                created = true;
+            }
+            if (config != null) {
+                startConnector(config);
+                updateConnectorTasks(connName);
+            }
+            if (config != null)
+                callback.onCompletion(null, new Created<>(created, 
createConnectorInfo(connectors.get(connName))));
+            else
+                callback.onCompletion(null, new Created<ConnectorInfo>(false, 
null));
+        } catch (ConnectException e) {
+            callback.onCompletion(e, null);
+        }
+
+    }
+
+    @Override
+    public synchronized void requestTaskReconfiguration(String connName) {
+        if (!worker.connectorNames().contains(connName)) {
+            log.error("Task that requested reconfiguration does not exist: 
{}", connName);
+            return;
+        }
+        updateConnectorTasks(connName);
+    }
+
+    @Override
+    public synchronized void taskConfigs(String connName, 
Callback<List<TaskInfo>> callback) {
+        ConnectorState state = connectors.get(connName);
+        if (state == null) {
+            callback.onCompletion(new NotFoundException("Connector " + 
connName + " not found", null), null);
+            return;
+        }
+
+        List<TaskInfo> result = new ArrayList<>();
+        for (int i = 0; i < state.taskConfigs.size(); i++) {
+            TaskInfo info = new TaskInfo(new ConnectorTaskId(connName, i), 
state.taskConfigs.get(i));
+            result.add(info);
+        }
+        callback.onCompletion(null, result);
+    }
+
+    @Override
+    public void putTaskConfigs(String connName, List<Map<String, String>> 
configs, Callback<Void> callback) {
+        throw new UnsupportedOperationException("Kafka Connect in standalone 
mode does not support externally setting task configurations.");
+    }
+
+    /**
+     * Start a connector in the worker and record its state.
+     * @param connectorProps new connector configuration
+     * @return the connector name
+     */
+    private String startConnector(Map<String, String> connectorProps) {
+        ConnectorConfig connConfig = new ConnectorConfig(connectorProps);
+        String connName = connConfig.getString(ConnectorConfig.NAME_CONFIG);
+        ConnectorState state = connectors.get(connName);
+        worker.addConnector(connConfig, new HerderConnectorContext(this, 
connName));
+        if (state == null) {
+            connectors.put(connName, new ConnectorState(connectorProps, 
connConfig));
+        } else {
+            state.configOriginals = connectorProps;
+            state.config = connConfig;
+        }
+        return connName;
+    }
+
+
+    private List<Map<String, String>> recomputeTaskConfigs(String connName) {
+        ConnectorState state = connectors.get(connName);
+        return worker.connectorTaskConfigs(connName,
+                state.config.getInt(ConnectorConfig.TASKS_MAX_CONFIG),
+                state.config.getList(ConnectorConfig.TOPICS_CONFIG));
+    }
+
+    private void createConnectorTasks(String connName) {
+        ConnectorState state = connectors.get(connName);
+        int index = 0;
+        for (Map<String, String> taskConfigMap : state.taskConfigs) {
+            ConnectorTaskId taskId = new ConnectorTaskId(connName, index);
+            TaskConfig config = new TaskConfig(taskConfigMap);
+            try {
+                worker.addTask(taskId, config);
+            } catch (Throwable e) {
+                log.error("Failed to add task {}: ", taskId, e);
+                // Swallow this so we can continue updating the rest of the 
tasks
+                // FIXME what's the proper response? Kill all the tasks? 
Consider this the same as a task
+                // that died after starting successfully.
+            }
+            index++;
+        }
+    }
+
+    private void removeConnectorTasks(String connName) {
+        ConnectorState state = connectors.get(connName);
+        for (int i = 0; i < state.taskConfigs.size(); i++) {
+            ConnectorTaskId taskId = new ConnectorTaskId(connName, i);
+            try {
+                worker.stopTask(taskId);
+            } catch (ConnectException e) {
+                log.error("Failed to stop task {}: ", taskId, e);
+                // Swallow this so we can continue stopping the rest of the 
tasks
+                // FIXME: Forcibly kill the task?
+            }
+        }
+        state.taskConfigs = new ArrayList<>();
+    }
+
+    private void updateConnectorTasks(String connName) {
+        List<Map<String, String>> newTaskConfigs = 
recomputeTaskConfigs(connName);
+        ConnectorState state = connectors.get(connName);
+        if (!newTaskConfigs.equals(state.taskConfigs)) {
+            removeConnectorTasks(connName);
+            state.taskConfigs = newTaskConfigs;
+            createConnectorTasks(connName);
+        }
+    }
+
+
+    private static class ConnectorState {
+        public String name;
+        public Map<String, String> configOriginals;
+        public ConnectorConfig config;
+        List<Map<String, String>> taskConfigs;
+
+        public ConnectorState(Map<String, String> configOriginals, 
ConnectorConfig config) {
+            this.name = config.getString(ConnectorConfig.NAME_CONFIG);
+            this.configOriginals = configOriginals;
+            this.config = config;
+            this.taskConfigs = new ArrayList<>();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/runtime/src/main/java/org/apache/kafka/connect/storage/FileOffsetBackingStore.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/FileOffsetBackingStore.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/FileOffsetBackingStore.java
new file mode 100644
index 0000000..1d1f8ef
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/FileOffsetBackingStore.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.storage;
+
+import org.apache.kafka.connect.errors.ConnectException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Implementation of OffsetBackingStore that saves data locally to a file. To 
ensure this behaves
+ * similarly to a real backing store, operations are executed asynchronously 
on a background thread.
+ */
+public class FileOffsetBackingStore extends MemoryOffsetBackingStore {
+    private static final Logger log = 
LoggerFactory.getLogger(FileOffsetBackingStore.class);
+
+    public final static String OFFSET_STORAGE_FILE_FILENAME_CONFIG = 
"offset.storage.file.filename";
+    private File file;
+
+    public FileOffsetBackingStore() {
+
+    }
+
+    @Override
+    public void configure(Map<String, ?> props) {
+        super.configure(props);
+        String filename = (String) 
props.get(OFFSET_STORAGE_FILE_FILENAME_CONFIG);
+        file = new File(filename);
+    }
+
+    @Override
+    public synchronized void start() {
+        super.start();
+        log.info("Starting FileOffsetBackingStore with file {}", file);
+        load();
+    }
+
+    @Override
+    public synchronized void stop() {
+        super.stop();
+        // Nothing to do since this doesn't maintain any outstanding 
connections/data
+        log.info("Stopped FileOffsetBackingStore");
+    }
+
+    @SuppressWarnings("unchecked")
+    private void load() {
+        try {
+            ObjectInputStream is = new ObjectInputStream(new 
FileInputStream(file));
+            Object obj = is.readObject();
+            if (!(obj instanceof HashMap))
+                throw new ConnectException("Expected HashMap but found " + 
obj.getClass());
+            Map<byte[], byte[]> raw = (Map<byte[], byte[]>) obj;
+            data = new HashMap<>();
+            for (Map.Entry<byte[], byte[]> mapEntry : raw.entrySet()) {
+                ByteBuffer key = (mapEntry.getKey() != null) ? 
ByteBuffer.wrap(mapEntry.getKey()) : null;
+                ByteBuffer value = (mapEntry.getValue() != null) ? 
ByteBuffer.wrap(mapEntry.getValue()) : null;
+                data.put(key, value);
+            }
+            is.close();
+        } catch (FileNotFoundException | EOFException e) {
+            // FileNotFoundException: Ignore, may be new.
+            // EOFException: Ignore, this means the file was missing or corrupt
+        } catch (IOException | ClassNotFoundException e) {
+            throw new ConnectException(e);
+        }
+    }
+
+    protected void save() {
+        try {
+            ObjectOutputStream os = new ObjectOutputStream(new 
FileOutputStream(file));
+            Map<byte[], byte[]> raw = new HashMap<>();
+            for (Map.Entry<ByteBuffer, ByteBuffer> mapEntry : data.entrySet()) 
{
+                byte[] key = (mapEntry.getKey() != null) ? 
mapEntry.getKey().array() : null;
+                byte[] value = (mapEntry.getValue() != null) ? 
mapEntry.getValue().array() : null;
+                raw.put(key, value);
+            }
+            os.writeObject(raw);
+            os.close();
+        } catch (IOException e) {
+            throw new ConnectException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigStorage.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigStorage.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigStorage.java
new file mode 100644
index 0000000..4b60131
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigStorage.java
@@ -0,0 +1,578 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.storage;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.errors.DataException;
+import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
+import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.apache.kafka.connect.util.KafkaBasedLog;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * <p>
+ * Provides persistent storage of Kafka Connect connector configurations in a 
Kafka topic.
+ * </p>
+ * <p>
+ * This class manages both connector and task configurations. It tracks three 
types of configuration entries:
+ * <p/>
+ * 1. Connector config: map of string -> string configurations passed to the 
Connector class, with support for
+ * expanding this format if necessary. (Kafka key: connector-[connector-id]).
+ * These configs are *not* ephemeral. They represent the source of truth. If 
the entire Connect
+ * cluster goes down, this is all that is really needed to recover.
+ * 2. Task configs: map of string -> string configurations passed to the Task 
class, with support for expanding
+ * this format if necessary. (Kafka key: task-[connector-id]-[task-id]).
+ * These configs are ephemeral; they are stored here to a) disseminate them to 
all workers while
+ * ensuring agreement and b) to allow faster cluster/worker recovery since the 
common case
+ * of recovery (restoring a connector) will simply result in the same 
configuration as before
+ * the failure.
+ * 3. Task commit "configs": records indicating that previous task config 
entries should be committed and all task
+ * configs for a connector can be applied. (Kafka key: commit-[connector-id].
+ * This config has two effects. First, it records the number of tasks the 
connector is currently
+ * running (and can therefore increase/decrease parallelism). Second, because 
each task config
+ * is stored separately but they need to be applied together to ensure each 
partition is assigned
+ * to a single task, this record also indicates that task configs for the 
specified connector
+ * can be "applied" or "committed".
+ * </p>
+ * <p>
+ * This configuration is expected to be stored in a *single partition* and 
*compacted* topic. Using a single partition
+ * ensures we can enforce ordering on messages, allowing Kafka to be used as a 
write ahead log. Compaction allows
+ * us to clean up outdated configurations over time. However, this combination 
has some important implications for
+ * the implementation of this class and the configuration state that it may 
expose.
+ * </p>
+ * <p>
+ * Connector configurations are independent of all other configs, so they are 
handled easily. Writing a single record
+ * is already atomic, so these can be applied as soon as they are read. One 
connectors config does not affect any
+ * others, and they do not need to coordinate with the connector's task 
configuration at all.
+ * </p>
+ * <p>
+ * The most obvious implication for task configs is the need for the commit 
messages. Because Kafka does not
+ * currently have multi-record transactions or support atomic batch record 
writes, task commit messages are required
+ * to ensure that readers do not end up using inconsistent configs. For 
example, consider if a connector wrote configs
+ * for its tasks, then was reconfigured and only managed to write updated 
configs for half its tasks. If task configs
+ * were applied immediately you could be using half the old configs and half 
the new configs. In that condition, some
+ * partitions may be double-assigned because the old config and new config may 
use completely different assignments.
+ * Therefore, when reading the log, we must buffer config updates for a 
connector's tasks and only apply atomically them
+ * once a commit message has been read.
+ * </p>
+ * <p>
+ * However, there are also further challenges. This simple buffering approach 
would work fine as long as the entire log was
+ * always available, but we would like to be able to enable compaction so our 
configuration topic does not grow
+ * indefinitely. Compaction may break a normal log because old entries will 
suddenly go missing. A new worker reading
+ * from the beginning of the log in order to build up the full current 
configuration will see task commits, but some
+ * records required for those commits will have been removed because the same 
keys have subsequently been rewritten.
+ * For example, if you have a sequence of record keys [connector-foo-config, 
task-foo-1-config, task-foo-2-config,
+ * commit-foo (2 tasks), task-foo-1-config, commit-foo (1 task)], we can end 
up with a compacted log containing
+ * [connector-foo-config, task-foo-2-config, commit-foo (2 tasks), 
task-foo-1-config, commit-foo (1 task)]. When read
+ * back, the first commit will see an invalid state because the first 
task-foo-1-config has been cleaned up.
+ * </p>
+ * <p>
+ * Compaction can further complicate things if writing new task configs fails 
mid-write. Consider a similar scenario
+ * as the previous one, but in this case both the first and second update will 
write 2 task configs. However, the
+ * second write fails half of the way through:
+ * [connector-foo-config, task-foo-1-config, task-foo-2-config, commit-foo (2 
tasks), task-foo-1-config]. Now compaction
+ * occurs and we're left with
+ * [connector-foo-config, task-foo-2-config, commit-foo (2 tasks), 
task-foo-1-config]. At the first commit, we don't
+ * have a complete set of configs. And because of the failure, there is no 
second commit. We are left in an inconsistent
+ * state with no obvious way to resolve the issue -- we can try to keep on 
reading, but the failed node may never
+ * recover and write the updated config. Meanwhile, other workers may have 
seen the entire log; they will see the second
+ * task-foo-1-config waiting to be applied, but will otherwise think 
everything is ok -- they have a valid set of task
+ * configs for connector "foo".
+ * </p>
+ * <p>
+ * Because we can encounter these inconsistencies and addressing them requires 
support from the rest of the system
+ * (resolving the task configuration inconsistencies requires support from the 
connector instance to regenerate updated
+ * configs), this class exposes not only the current set of configs, but also 
which connectors have inconsistent data.
+ * This allows users of this class (i.e., Herder implementations) to take 
action to resolve any inconsistencies. These
+ * inconsistencies should be rare (as described above, due to compaction 
combined with leader failures in the middle
+ * of updating task configurations).
+ * </p>
+ * <p>
+ * Note that the expectation is that this config storage system has only a 
single writer at a time.
+ * The caller (Herder) must ensure this is the case. In distributed mode this 
will require forwarding config change
+ * requests to the leader in the cluster (i.e. the worker group coordinated by 
the Kafka broker).
+ * </p>
+ * <p>
+ * Since processing of the config log occurs in a background thread, callers 
must take care when using accessors.
+ * To simplify handling this correctly, this class only exposes a mechanism to 
snapshot the current state of the cluster.
+ * Updates may continue to be applied (and callbacks invoked) in the 
background. Callers must take care that they are
+ * using a consistent snapshot and only update when it is safe. In particular, 
if task configs are updated which require
+ * synchronization across workers to commit offsets and update the 
configuration, callbacks and updates during the
+ * rebalance must be deferred.
+ * </p>
+ */
+public class KafkaConfigStorage {
+    private static final Logger log = 
LoggerFactory.getLogger(KafkaConfigStorage.class);
+
+    public static final String CONFIG_TOPIC_CONFIG = "config.storage.topic";
+
+    public static final String CONNECTOR_PREFIX = "connector-";
+
+    public static String CONNECTOR_KEY(String connectorName) {
+        return CONNECTOR_PREFIX + connectorName;
+    }
+
+    public static final String TASK_PREFIX = "task-";
+
+    public static String TASK_KEY(ConnectorTaskId taskId) {
+        return TASK_PREFIX + taskId.connector() + "-" + taskId.task();
+    }
+
+    public static final String COMMIT_TASKS_PREFIX = "commit-";
+
+    public static String COMMIT_TASKS_KEY(String connectorName) {
+        return COMMIT_TASKS_PREFIX + connectorName;
+    }
+
+    // Note that while using real serialization for values as we have here, 
but ad hoc string serialization for keys,
+    // isn't ideal, we use this approach because it avoids any potential 
problems with schema evolution or
+    // converter/serializer changes causing keys to change. We need to 
absolutely ensure that the keys remain precisely
+    // the same.
+    public static final Schema CONNECTOR_CONFIGURATION_V0 = 
SchemaBuilder.struct()
+            .field("properties", SchemaBuilder.map(Schema.STRING_SCHEMA, 
Schema.OPTIONAL_STRING_SCHEMA))
+            .build();
+    public static final Schema TASK_CONFIGURATION_V0 = 
CONNECTOR_CONFIGURATION_V0;
+    public static final Schema CONNECTOR_TASKS_COMMIT_V0 = 
SchemaBuilder.struct()
+            .field("tasks", Schema.INT32_SCHEMA)
+            .build();
+
+    private static final long READ_TO_END_TIMEOUT_MS = 30000;
+
+    private final Object lock;
+    private boolean starting;
+    private final Converter converter;
+    private final Callback<String> connectorConfigCallback;
+    private final Callback<List<ConnectorTaskId>> tasksConfigCallback;
+    private String topic;
+    // Data is passed to the log already serialized. We use a converter to 
handle translating to/from generic Connect
+    // format to serialized form
+    private KafkaBasedLog<String, byte[]> configLog;
+    // Connector -> # of tasks
+    private Map<String, Integer> connectorTaskCounts = new HashMap<>();
+    // Connector and task configs: name or id -> config map
+    private Map<String, Map<String, String>> connectorConfigs = new 
HashMap<>();
+    private Map<ConnectorTaskId, Map<String, String>> taskConfigs = new 
HashMap<>();
+    // Set of connectors where we saw a task commit with an incomplete set of 
task config updates, indicating the data
+    // is in an inconsistent state and we cannot safely use them until they 
have been refreshed.
+    private Set<String> inconsistent = new HashSet<>();
+    // The most recently read offset. This does not take into account deferred 
task updates/commits, so we may have
+    // outstanding data to be applied.
+    private long offset;
+
+    // Connector -> Map[ConnectorTaskId -> Configs]
+    private Map<String, Map<ConnectorTaskId, Map<String, String>>> 
deferredTaskUpdates = new HashMap<>();
+
+
+    public KafkaConfigStorage(Converter converter, Callback<String> 
connectorConfigCallback, Callback<List<ConnectorTaskId>> tasksConfigCallback) {
+        this.lock = new Object();
+        this.starting = false;
+        this.converter = converter;
+        this.connectorConfigCallback = connectorConfigCallback;
+        this.tasksConfigCallback = tasksConfigCallback;
+
+        offset = -1;
+    }
+
+    public void configure(Map<String, ?> configs) {
+        if (configs.get(CONFIG_TOPIC_CONFIG) == null)
+            throw new ConnectException("Must specify topic for connector 
configuration.");
+        topic = (String) configs.get(CONFIG_TOPIC_CONFIG);
+
+        Map<String, Object> producerProps = new HashMap<>();
+        producerProps.putAll(configs);
+        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer");
+        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArraySerializer");
+        producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
+
+        Map<String, Object> consumerProps = new HashMap<>();
+        consumerProps.putAll(configs);
+        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringDeserializer");
+        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
+        consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+
+        configLog = createKafkaBasedLog(topic, producerProps, consumerProps, 
consumedCallback);
+    }
+
+    public void start() {
+        log.info("Starting KafkaConfigStorage");
+        // During startup, callbacks are *not* invoked. You can grab a 
snapshot after starting -- just take care that
+        // updates can continue to occur in the background
+        starting = true;
+        configLog.start();
+        starting = false;
+        log.info("Started KafkaConfigStorage");
+    }
+
+    public void stop() {
+        log.info("Closing KafkaConfigStorage");
+        configLog.stop();
+        log.info("Closed KafkaConfigStorage");
+    }
+
+    /**
+     * Get a snapshot of the current state of the cluster.
+     */
+    public ClusterConfigState snapshot() {
+        synchronized (lock) {
+            // Doing a shallow copy of the data is safe here because the 
complex nested data that is copied should all be
+            // immutable configs
+            return new ClusterConfigState(
+                    offset,
+                    new HashMap<>(connectorTaskCounts),
+                    new HashMap<>(connectorConfigs),
+                    new HashMap<>(taskConfigs),
+                    new HashSet<>(inconsistent)
+            );
+        }
+    }
+
+    /**
+     * Write this connector configuration to persistent storage and wait until 
it has been acknowledge and read back by
+     * tailing the Kafka log with a consumer.
+     *
+     * @param connector  name of the connector to write data for
+     * @param properties the configuration to write
+     */
+    public void putConnectorConfig(String connector, Map<String, String> 
properties) {
+        byte[] serializedConfig;
+        if (properties == null) {
+            serializedConfig = null;
+        } else {
+            Struct connectConfig = new Struct(CONNECTOR_CONFIGURATION_V0);
+            connectConfig.put("properties", properties);
+            serializedConfig = converter.fromConnectData(topic, 
CONNECTOR_CONFIGURATION_V0, connectConfig);
+        }
+
+        try {
+            configLog.send(CONNECTOR_KEY(connector), serializedConfig);
+            configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, 
TimeUnit.MILLISECONDS);
+        } catch (InterruptedException | ExecutionException | TimeoutException 
e) {
+            log.error("Failed to write connector configuration to Kafka: ", e);
+            throw new ConnectException("Error writing connector configuration 
to Kafka", e);
+        }
+    }
+
+    /**
+     * Write these task configurations and associated commit messages, unless 
an inconsistency is found that indicates
+     * that we would be leaving one of the referenced connectors with an 
inconsistent state.
+     *
+     * @param configs map containing task configurations
+     * @throws ConnectException if the task configurations do not resolve 
inconsistencies found in the existing root
+     *                          and task configurations.
+     */
+    public void putTaskConfigs(Map<ConnectorTaskId, Map<String, String>> 
configs) {
+        // Make sure we're at the end of the log. We should be the only 
writer, but we want to make sure we don't have
+        // any outstanding lagging data to consume.
+        try {
+            configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, 
TimeUnit.MILLISECONDS);
+        } catch (InterruptedException | ExecutionException | TimeoutException 
e) {
+            log.error("Failed to write root configuration to Kafka: ", e);
+            throw new ConnectException("Error writing root configuration to 
Kafka", e);
+        }
+
+        // In theory, there is only a single writer and we shouldn't need this 
lock since the background thread should
+        // not invoke any callbacks that would conflict, but in practice this 
guards against inconsistencies due to
+        // the root config being updated.
+        Map<String, Integer> newTaskCounts = new HashMap<>();
+        synchronized (lock) {
+            // Validate tasks in this assignment. Any task configuration 
updates should include updates for *all* tasks
+            // in the connector -- we should have all task IDs 0 - N-1 within 
a connector if any task is included here
+            Map<String, Set<Integer>> updatedConfigIdsByConnector = 
taskIdsByConnector(configs);
+            for (Map.Entry<String, Set<Integer>> taskConfigSetEntry : 
updatedConfigIdsByConnector.entrySet()) {
+                if (!completeTaskIdSet(taskConfigSetEntry.getValue(), 
taskConfigSetEntry.getValue().size())) {
+                    log.error("Submitted task configuration contain invalid 
range of task IDs, ignoring this submission");
+                    throw new ConnectException("Error writing task 
configurations: found some connectors with invalid connectors");
+                }
+                newTaskCounts.put(taskConfigSetEntry.getKey(), 
taskConfigSetEntry.getValue().size());
+            }
+        }
+
+        // Start sending all the individual updates
+        for (Map.Entry<ConnectorTaskId, Map<String, String>> taskConfigEntry : 
configs.entrySet()) {
+            Struct connectConfig = new Struct(TASK_CONFIGURATION_V0);
+            connectConfig.put("properties", taskConfigEntry.getValue());
+            byte[] serializedConfig = converter.fromConnectData(topic, 
TASK_CONFIGURATION_V0, connectConfig);
+            configLog.send(TASK_KEY(taskConfigEntry.getKey()), 
serializedConfig);
+        }
+
+        // Finally, send the commit to update the number of tasks and apply 
the new configs, then wait until we read to
+        // the end of the log
+        try {
+            // Read to end to ensure all the task configs have been written
+            configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, 
TimeUnit.MILLISECONDS);
+
+            // Write all the commit messages
+            for (Map.Entry<String, Integer> taskCountEntry : 
newTaskCounts.entrySet()) {
+                Struct connectConfig = new Struct(CONNECTOR_TASKS_COMMIT_V0);
+                connectConfig.put("tasks", taskCountEntry.getValue());
+                byte[] serializedConfig = converter.fromConnectData(topic, 
CONNECTOR_TASKS_COMMIT_V0, connectConfig);
+                configLog.send(COMMIT_TASKS_KEY(taskCountEntry.getKey()), 
serializedConfig);
+            }
+
+            // Read to end to ensure all the commit messages have been written
+            configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, 
TimeUnit.MILLISECONDS);
+        } catch (InterruptedException | ExecutionException | TimeoutException 
e) {
+            log.error("Failed to write root configuration to Kafka: ", e);
+            throw new ConnectException("Error writing root configuration to 
Kafka", e);
+        }
+    }
+
+    public Future<Void> readToEnd() {
+        return configLog.readToEnd();
+    }
+
+    public void readToEnd(Callback<Void> cb) {
+        configLog.readToEnd(cb);
+    }
+
+    private KafkaBasedLog<String, byte[]> createKafkaBasedLog(String topic, 
Map<String, Object> producerProps,
+                                                              Map<String, 
Object> consumerProps, Callback<ConsumerRecord<String, byte[]>> 
consumedCallback) {
+        return new KafkaBasedLog<>(topic, producerProps, consumerProps, 
consumedCallback, new SystemTime());
+    }
+
+    private final Callback<ConsumerRecord<String, byte[]>> consumedCallback = 
new Callback<ConsumerRecord<String, byte[]>>() {
+        @Override
+        public void onCompletion(Throwable error, ConsumerRecord<String, 
byte[]> record) {
+            if (error != null) {
+                log.error("Unexpected in consumer callback for 
KafkaConfigStorage: ", error);
+                return;
+            }
+
+            final SchemaAndValue value;
+            try {
+                value = converter.toConnectData(topic, record.value());
+            } catch (DataException e) {
+                log.error("Failed to convert config data to Kafka Connect 
format: ", e);
+                return;
+            }
+            // Make the recorded offset match the API used for positions in 
the consumer -- return the offset of the
+            // *next record*, not the last one consumed.
+            offset = record.offset() + 1;
+
+            if (record.key().startsWith(CONNECTOR_PREFIX)) {
+                String connectorName = 
record.key().substring(CONNECTOR_PREFIX.length());
+                synchronized (lock) {
+                    if (value.value() == null) {
+                        // Connector deletion will be written as a null value
+                        connectorConfigs.remove(connectorName);
+                    } else {
+                        // Connector configs can be applied and callbacks 
invoked immediately
+                        if (!(value.value() instanceof Map)) {
+                            log.error("Found connector configuration (" + 
record.key() + ") in wrong format: " + value.value().getClass());
+                            return;
+                        }
+                        Object newConnectorConfig = ((Map<String, Object>) 
value.value()).get("properties");
+                        if (!(newConnectorConfig instanceof Map)) {
+                            log.error("Invalid data for connector config: 
properties filed should be a Map but is " + newConnectorConfig.getClass());
+                            return;
+                        }
+                        connectorConfigs.put(connectorName, (Map<String, 
String>) newConnectorConfig);
+                    }
+                }
+                if (!starting)
+                    connectorConfigCallback.onCompletion(null, connectorName);
+            } else if (record.key().startsWith(TASK_PREFIX)) {
+                synchronized (lock) {
+                    ConnectorTaskId taskId = parseTaskId(record.key());
+                    if (taskId == null) {
+                        log.error("Ignoring task configuration because " + 
record.key() + " couldn't be parsed as a task config key");
+                        return;
+                    }
+                    if (!(value.value() instanceof Map)) {
+                        log.error("Ignoring task configuration because it is 
in the wrong format: " + value.value());
+                        return;
+                    }
+
+                    Object newTaskConfig = ((Map<String, Object>) 
value.value()).get("properties");
+                    if (!(newTaskConfig instanceof Map)) {
+                        log.error("Invalid data for task config: properties 
filed should be a Map but is " + newTaskConfig.getClass());
+                        return;
+                    }
+
+                    Map<ConnectorTaskId, Map<String, String>> deferred = 
deferredTaskUpdates.get(taskId.connector());
+                    if (deferred == null) {
+                        deferred = new HashMap<>();
+                        deferredTaskUpdates.put(taskId.connector(), deferred);
+                    }
+                    deferred.put(taskId, (Map<String, String>) newTaskConfig);
+                }
+            } else if (record.key().startsWith(COMMIT_TASKS_PREFIX)) {
+                String connectorName = 
record.key().substring(COMMIT_TASKS_PREFIX.length());
+                List<ConnectorTaskId> updatedTasks = new ArrayList<>();
+                synchronized (lock) {
+                    // Apply any outstanding deferred task updates for the 
given connector. Note that just because we
+                    // encounter a commit message does not mean it will result 
in consistent output. In particular due to
+                    // compaction, there may be cases where . For example if 
we have the following sequence of writes:
+                    //
+                    // 1. Write connector "foo"'s config
+                    // 2. Write connector "foo", task 1's config <-- compacted
+                    // 3. Write connector "foo", task 2's config
+                    // 4. Write connector "foo" task commit message
+                    // 5. Write connector "foo", task 1's config
+                    // 6. Write connector "foo", task 2's config
+                    // 7. Write connector "foo" task commit message
+                    //
+                    // then when a new worker starts up, if message 2 had been 
compacted, then when message 4 is applied
+                    // "foo" will not have a complete set of configs. Only 
when message 7 is applied will the complete
+                    // configuration be available. Worse, if the leader died 
while writing messages 5, 6, and 7 such that
+                    // only 5 was written, then there may be nothing that will 
finish writing the configs and get the
+                    // log back into a consistent state.
+                    //
+                    // It is expected that the user of this class (i.e., the 
Herder) will take the necessary action to
+                    // resolve this (i.e., get the connector to recommit its 
configuration). This inconsistent state is
+                    // exposed in the snapshots provided via 
ClusterConfigState so they are easy to handle.
+                    if (!(value.value() instanceof Map)) { // Schema-less, so 
we get maps instead of structs
+                        log.error("Ignoring connector tasks configuration 
commit because it is in the wrong format: " + value.value());
+                        return;
+                    }
+
+                    Map<ConnectorTaskId, Map<String, String>> deferred = 
deferredTaskUpdates.get(connectorName);
+
+                    int newTaskCount = intValue(((Map<String, Object>) 
value.value()).get("tasks"));
+
+                    // Validate the configs we're supposed to update to ensure 
we're getting a complete configuration
+                    // update of all tasks that are expected based on the 
number of tasks in the commit message.
+                    Map<String, Set<Integer>> updatedConfigIdsByConnector = 
taskIdsByConnector(deferred);
+                    Set<Integer> taskIdSet = 
updatedConfigIdsByConnector.get(connectorName);
+                    if (!completeTaskIdSet(taskIdSet, newTaskCount)) {
+                        // Given the logic for writing commit messages, we 
should only hit this condition due to compacted
+                        // historical data, in which case we would not have 
applied any updates yet and there will be no
+                        // task config data already committed for the 
connector, so we shouldn't have to clear any data
+                        // out. All we need to do is add the flag marking it 
inconsistent.
+                        inconsistent.add(connectorName);
+                    } else {
+                        if (deferred != null) {
+                            taskConfigs.putAll(deferred);
+                            updatedTasks.addAll(taskConfigs.keySet());
+                        }
+                        inconsistent.remove(connectorName);
+                    }
+                    // Always clear the deferred entries, even if we didn't 
apply them. If they represented an inconsistent
+                    // update, then we need to see a completely fresh set of 
configs after this commit message, so we don't
+                    // want any of these outdated configs
+                    if (deferred != null)
+                        deferred.clear();
+
+                    connectorTaskCounts.put(connectorName, newTaskCount);
+                }
+
+                if (!starting)
+                    tasksConfigCallback.onCompletion(null, updatedTasks);
+            } else {
+                log.error("Discarding config update record with invalid key: " 
+ record.key());
+            }
+        }
+    };
+
+    private ConnectorTaskId parseTaskId(String key) {
+        String[] parts = key.split("-");
+        if (parts.length < 3) return null;
+
+        try {
+            int taskNum = Integer.parseInt(parts[parts.length - 1]);
+            String connectorName = Utils.join(Arrays.copyOfRange(parts, 1, 
parts.length - 1), "-");
+            return new ConnectorTaskId(connectorName, taskNum);
+        } catch (NumberFormatException e) {
+            return null;
+        }
+    }
+
+    /**
+     * Given task configurations, get a set of integer task IDs organized by 
connector name.
+     */
+    private Map<String, Set<Integer>> taskIdsByConnector(Map<ConnectorTaskId, 
Map<String, String>> configs) {
+        Map<String, Set<Integer>> connectorTaskIds = new HashMap<>();
+        if (configs == null)
+            return connectorTaskIds;
+        for (Map.Entry<ConnectorTaskId, Map<String, String>> taskConfigEntry : 
configs.entrySet()) {
+            ConnectorTaskId taskId = taskConfigEntry.getKey();
+            if (!connectorTaskIds.containsKey(taskId.connector()))
+                connectorTaskIds.put(taskId.connector(), new 
TreeSet<Integer>());
+            connectorTaskIds.get(taskId.connector()).add(taskId.task());
+        }
+        return connectorTaskIds;
+    }
+
+    private boolean completeTaskIdSet(Set<Integer> idSet, int expectedSize) {
+        // Note that we do *not* check for the exact set. This is an important 
implication of compaction. If we start out
+        // with 2 tasks, then reduce to 1, we'll end up with log entries like:
+        //
+        // 1. Connector "foo" config
+        // 2. Connector "foo", task 1 config
+        // 3. Connector "foo", task 2 config
+        // 4. Connector "foo", commit 2 tasks
+        // 5. Connector "foo", task 1 config
+        // 6. Connector "foo", commit 1 tasks
+        //
+        // However, due to compaction we could end up with a log that looks 
like this:
+        //
+        // 1. Connector "foo" config
+        // 3. Connector "foo", task 2 config
+        // 5. Connector "foo", task 1 config
+        // 6. Connector "foo", commit 1 tasks
+        //
+        // which isn't incorrect, but would appear in this code to have an 
extra task configuration. Instead, we just
+        // validate that all the configs specified by the commit message are 
present. This should be fine because the
+        // logic for writing configs ensures all the task configs are written 
(and reads them back) before writing the
+        // commit message.
+
+        if (idSet.size() < expectedSize)
+            return false;
+
+        for (int i = 0; i < expectedSize; i++)
+            if (!idSet.contains(i))
+                return false;
+        return true;
+    }
+
+    // Convert an integer value extracted from a schemaless struct to an int. 
This handles potentially different
+    // encodings by different Converters.
+    private static int intValue(Object value) {
+        if (value instanceof Integer)
+            return (int) value;
+        else if (value instanceof Long)
+            return (int) (long) value;
+        else
+            throw new ConnectException("Expected integer value to be either 
Integer or Long");
+    }
+}
+

Reply via email to