Repository: kafka
Updated Branches:
  refs/heads/trunk 30736da65 -> 56623efd7


KAFKA-4667: Connect uses AdminClient to create internal topics when needed 
(KIP-154)

The backing store for offsets, status, and configs now attempts to use the new 
AdminClient to look up the internal topics and create them if they don’t yet 
exist. If the necessary APIs are not available in the connected broker, the 
stores fall back to the old behavior of relying upon auto-created topics. Kafka 
Connect requires a minimum of Apache Kafka 0.10.0.1-cp1, and the AdminClient 
can work with all versions since 0.10.0.0.

All three of Connect’s internal topics are created as compacted topics, and 
new distributed worker configuration properties control the replication factor 
for all three topics and the number of partitions for the offsets and status 
topics; the config topic requires a single partition and does not allow it to 
be set via configuration. All of these new configuration properties have 
sensible defaults, meaning users can upgrade without having to change any of 
the existing configurations. In most situations, existing Connect deployments 
will have already created the storage topics prior to upgrading.

The replication factor defaults to 3, so anyone running Kafka clusters with 
fewer nodes than 3 will receive an error unless they explicitly set the 
replication factor for the three internal topics. This is actually desired 
behavior, since it signals the users that they should be aware they are not 
using sufficient replication for production use.

The integration tests use a cluster with a single broker, so they were changed 
to explicitly specify a replication factor of 1 and a single partition.

The `KafkaAdminClientTest` was refactored to extract a utility for setting up a 
`KafkaAdminClient` with a `MockClient` for unit tests.

Author: Randall Hauch <rha...@gmail.com>

Reviewers: Ewen Cheslack-Postava <e...@confluent.io>

Closes #2984 from rhauch/kafka-4667


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/56623efd
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/56623efd
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/56623efd

Branch: refs/heads/trunk
Commit: 56623efd73ec77e68cf35021d18d630b27062e82
Parents: 30736da
Author: Randall Hauch <rha...@gmail.com>
Authored: Thu May 18 16:02:29 2017 -0700
Committer: Ewen Cheslack-Postava <m...@ewencp.org>
Committed: Thu May 18 16:02:29 2017 -0700

----------------------------------------------------------------------
 .../apache/kafka/clients/admin/NewTopic.java    |  20 ++
 .../clients/admin/KafkaAdminClientTest.java     | 151 +++++------
 .../clients/admin/MockKafkaAdminClientEnv.java  |  87 ++++++
 .../java/org/apache/kafka/test/TestUtils.java   |   9 +
 config/connect-distributed.properties           |  37 ++-
 .../runtime/distributed/DistributedConfig.java  |  68 ++++-
 .../storage/KafkaConfigBackingStore.java        |  27 +-
 .../storage/KafkaOffsetBackingStore.java        |  33 ++-
 .../storage/KafkaStatusBackingStore.java        |  28 +-
 .../kafka/connect/util/KafkaBasedLog.java       |  16 +-
 .../apache/kafka/connect/util/TopicAdmin.java   | 263 +++++++++++++++++++
 .../storage/KafkaConfigBackingStoreTest.java    |  11 +-
 .../storage/KafkaOffsetBackingStoreTest.java    |  16 +-
 .../kafka/connect/util/KafkaBasedLogTest.java   |   9 +-
 .../kafka/connect/util/TopicAdminTest.java      | 158 +++++++++++
 .../tests/connect/connect_distributed_test.py   |   8 +-
 .../tests/connect/connect_rest_test.py          |   5 +
 .../templates/connect-distributed.properties    |   5 +
 18 files changed, 828 insertions(+), 123 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/56623efd/clients/src/main/java/org/apache/kafka/clients/admin/NewTopic.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/NewTopic.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/NewTopic.java
index a1f6fb5..f6460de 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/NewTopic.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/NewTopic.java
@@ -56,6 +56,14 @@ public class NewTopic {
         return name;
     }
 
+    public int partitions() {
+        return numPartitions;
+    }
+
+    public short replicationFactor() {
+        return replicationFactor;
+    }
+
     /**
      * Set the configuration to use on the new topic.
      *
@@ -82,4 +90,16 @@ public class NewTopic {
             }
         }
     }
+
+    @Override
+    public String toString() {
+        StringBuilder bld = new StringBuilder();
+        bld.append("(name=").append(name).
+                append(", numPartitions=").append(numPartitions).
+                append(", replicationFactor=").append(replicationFactor).
+                append(", replicasAssignments=").append(replicasAssignments).
+                append(", configs=").append(configs).
+                append(")");
+        return bld.toString();
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/56623efd/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index e432c0a..62c7dde 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -16,8 +16,6 @@
  */
 package org.apache.kafka.clients.admin;
 
-import org.apache.kafka.clients.Metadata;
-import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.clients.NodeApiVersions;
 import org.apache.kafka.clients.admin.DeleteAclsResults.FilterResults;
 import org.apache.kafka.common.Cluster;
@@ -35,7 +33,9 @@ import org.apache.kafka.common.requests.DeleteAclsResponse;
 import org.apache.kafka.common.requests.DeleteAclsResponse.AclDeletionResult;
 import org.apache.kafka.common.requests.DeleteAclsResponse.AclFilterResponse;
 import org.apache.kafka.common.requests.DescribeAclsResponse;
-import org.apache.kafka.common.utils.Time;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -48,15 +48,13 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.Timeout;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
+
 /**
  * A unit test for KafkaAdminClient.
  *
@@ -95,7 +93,7 @@ public class KafkaAdminClientTest {
         assertEquals("Null exception.", 
KafkaAdminClient.prettyPrintException(null));
         assertEquals("TimeoutException", 
KafkaAdminClient.prettyPrintException(new TimeoutException()));
         assertEquals("TimeoutException: The foobar timed out.",
-            KafkaAdminClient.prettyPrintException(new TimeoutException("The 
foobar timed out.")));
+                KafkaAdminClient.prettyPrintException(new 
TimeoutException("The foobar timed out.")));
     }
 
     private static Map<String, Object> newStrMap(String... vals) {
@@ -124,54 +122,36 @@ public class KafkaAdminClientTest {
             ids.add(id);
         }
         assertEquals("myCustomId",
-            
KafkaAdminClient.generateClientId(newConfMap(AdminClientConfig.CLIENT_ID_CONFIG,
 "myCustomId")));
+                
KafkaAdminClient.generateClientId(newConfMap(AdminClientConfig.CLIENT_ID_CONFIG,
 "myCustomId")));
     }
 
-    private static class MockKafkaAdminClientContext implements AutoCloseable {
-        final static String CLUSTER_ID = "mockClusterId";
-        final AdminClientConfig adminClientConfig;
-        final Metadata metadata;
-        final HashMap<Integer, Node> nodes;
-        final MockClient mockClient;
-        final AdminClient client;
-        Cluster cluster;
-
-        MockKafkaAdminClientContext(Map<String, Object> config) {
-            this.adminClientConfig = new AdminClientConfig(config);
-            this.metadata = new 
Metadata(adminClientConfig.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG),
-                
adminClientConfig.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG));
-            this.nodes = new HashMap<Integer, Node>();
-            this.nodes.put(0, new Node(0, "localhost", 8121));
-            this.nodes.put(1, new Node(1, "localhost", 8122));
-            this.nodes.put(2, new Node(2, "localhost", 8123));
-            this.mockClient = new MockClient(Time.SYSTEM, this.metadata);
-            this.client = KafkaAdminClient.create(adminClientConfig, 
mockClient, metadata);
-            this.cluster = new Cluster(CLUSTER_ID,  nodes.values(),
+    private static MockKafkaAdminClientEnv mockClientEnv(String... configVals) 
{
+        HashMap<Integer, Node> nodes = new HashMap<>();
+        nodes.put(0, new Node(0, "localhost", 8121));
+        nodes.put(1, new Node(1, "localhost", 8122));
+        nodes.put(2, new Node(2, "localhost", 8123));
+        Cluster cluster = new Cluster("mockClusterId", nodes.values(),
                 Collections.<PartitionInfo>emptySet(), 
Collections.<String>emptySet(),
                 Collections.<String>emptySet(), nodes.get(0));
-        }
-
-        @Override
-        public void close() {
-            this.client.close();
-        }
+        return new MockKafkaAdminClientEnv(cluster, configVals);
     }
 
     @Test
     public void testCloseAdminClient() throws Exception {
-        new MockKafkaAdminClientContext(newStrMap()).close();
+        try (MockKafkaAdminClientEnv env = mockClientEnv()) {
+        }
     }
 
     private static void assertFutureError(Future<?> future, Class<? extends 
Throwable> exceptionClass)
-        throws InterruptedException {
+            throws InterruptedException {
         try {
             future.get();
             fail("Expected a " + exceptionClass.getSimpleName() + " exception, 
but got success.");
         } catch (ExecutionException ee) {
             Throwable cause = ee.getCause();
             assertEquals("Expected a " + exceptionClass.getSimpleName() + " 
exception, but got " +
-                cause.getClass().getSimpleName(),
-                exceptionClass, cause.getClass());
+                            cause.getClass().getSimpleName(),
+                    exceptionClass, cause.getClass());
         }
     }
 
@@ -180,34 +160,27 @@ public class KafkaAdminClientTest {
      */
     @Test
     public void testTimeoutWithoutMetadata() throws Exception {
-        try (MockKafkaAdminClientContext ctx = new 
MockKafkaAdminClientContext(newStrMap(
-            AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "10"))) {
-            ctx.mockClient.setNodeApiVersions(NodeApiVersions.create());
-            ctx.mockClient.setNode(new Node(0, "localhost", 8121));
-            ctx.mockClient.prepareResponse(new CreateTopicsResponse(new 
HashMap<String, ApiError>() {{
-                    put("myTopic", new ApiError(Errors.NONE, ""));
-                }}));
-            KafkaFuture<Void> future = ctx.client.
-                createTopics(Collections.singleton(new NewTopic("myTopic", new 
HashMap<Integer, List<Integer>>() {{
-                        put(0, Arrays.asList(new Integer[]{0, 1, 2}));
-                    }})), new CreateTopicsOptions().timeoutMs(1000)).all();
+        try (MockKafkaAdminClientEnv env = 
mockClientEnv(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "10")) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().setNode(new Node(0, "localhost", 8121));
+            env.kafkaClient().prepareResponse(new 
CreateTopicsResponse(Collections.singletonMap("myTopic", new 
ApiError(Errors.NONE, ""))));
+            KafkaFuture<Void> future = env.adminClient().createTopics(
+                    Collections.singleton(new NewTopic("myTopic", 
Collections.singletonMap(Integer.valueOf(0), Arrays.asList(new Integer[]{0, 1, 
2})))),
+                    new CreateTopicsOptions().timeoutMs(1000)).all();
             assertFutureError(future, TimeoutException.class);
         }
     }
 
     @Test
     public void testCreateTopics() throws Exception {
-        try (MockKafkaAdminClientContext ctx = new 
MockKafkaAdminClientContext(newStrMap())) {
-            ctx.mockClient.setNodeApiVersions(NodeApiVersions.create());
-            ctx.mockClient.prepareMetadataUpdate(ctx.cluster, 
Collections.<String>emptySet());
-            ctx.mockClient.setNode(ctx.nodes.get(0));
-            ctx.mockClient.prepareResponse(new CreateTopicsResponse(new 
HashMap<String, ApiError>() {{
-                    put("myTopic", new ApiError(Errors.NONE, ""));
-                }}));
-            KafkaFuture<Void> future = ctx.client.
-                createTopics(Collections.singleton(new NewTopic("myTopic", new 
HashMap<Integer, List<Integer>>() {{
-                        put(0, Arrays.asList(new Integer[]{0, 1, 2}));
-                    }})), new CreateTopicsOptions().timeoutMs(10000)).all();
+        try (MockKafkaAdminClientEnv env = mockClientEnv()) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareMetadataUpdate(env.cluster(), 
Collections.<String>emptySet());
+            env.kafkaClient().setNode(env.cluster().controller());
+            env.kafkaClient().prepareResponse(new 
CreateTopicsResponse(Collections.singletonMap("myTopic", new 
ApiError(Errors.NONE, ""))));
+            KafkaFuture<Void> future = env.adminClient().createTopics(
+                    Collections.singleton(new NewTopic("myTopic", 
Collections.singletonMap(Integer.valueOf(0), Arrays.asList(new Integer[]{0, 1, 
2})))),
+                    new CreateTopicsOptions().timeoutMs(10000)).all();
             future.get();
         }
     }
@@ -223,45 +196,45 @@ public class KafkaAdminClientTest {
 
     @Test
     public void testDescribeAcls() throws Exception {
-        try (MockKafkaAdminClientContext ctx = new 
MockKafkaAdminClientContext(newStrMap())) {
-            ctx.mockClient.setNodeApiVersions(NodeApiVersions.create());
-            ctx.mockClient.prepareMetadataUpdate(ctx.cluster, 
Collections.<String>emptySet());
-            ctx.mockClient.setNode(ctx.nodes.get(0));
+        try (MockKafkaAdminClientEnv env = mockClientEnv()) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareMetadataUpdate(env.cluster(), 
Collections.<String>emptySet());
+            env.kafkaClient().setNode(env.cluster().controller());
 
             // Test a call where we get back ACL1 and ACL2.
-            ctx.mockClient.prepareResponse(new DescribeAclsResponse(0, null,
+            env.kafkaClient().prepareResponse(new DescribeAclsResponse(0, null,
                 new ArrayList<AclBinding>() {{
                         add(ACL1);
                         add(ACL2);
                     }}));
-            assertCollectionIs(ctx.client.describeAcls(FILTER1).all().get(), 
ACL1, ACL2);
+            
assertCollectionIs(env.adminClient().describeAcls(FILTER1).all().get(), ACL1, 
ACL2);
 
             // Test a call where we get back no results.
-            ctx.mockClient.prepareResponse(new DescribeAclsResponse(0, null,
+            env.kafkaClient().prepareResponse(new DescribeAclsResponse(0, null,
                 Collections.<AclBinding>emptySet()));
-            assertTrue(ctx.client.describeAcls(FILTER2).all().get().isEmpty());
+            
assertTrue(env.adminClient().describeAcls(FILTER2).all().get().isEmpty());
 
             // Test a call where we get back an error.
-            ctx.mockClient.prepareResponse(new DescribeAclsResponse(0,
+            env.kafkaClient().prepareResponse(new DescribeAclsResponse(0,
                 new SecurityDisabledException("Security is disabled"), 
Collections.<AclBinding>emptySet()));
-            assertFutureError(ctx.client.describeAcls(FILTER2).all(), 
SecurityDisabledException.class);
+            assertFutureError(env.adminClient().describeAcls(FILTER2).all(), 
SecurityDisabledException.class);
         }
     }
 
     @Test
     public void testCreateAcls() throws Exception {
-        try (MockKafkaAdminClientContext ctx = new 
MockKafkaAdminClientContext(newStrMap())) {
-            ctx.mockClient.setNodeApiVersions(NodeApiVersions.create());
-            ctx.mockClient.prepareMetadataUpdate(ctx.cluster, 
Collections.<String>emptySet());
-            ctx.mockClient.setNode(ctx.nodes.get(0));
+        try (MockKafkaAdminClientEnv env = mockClientEnv()) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareMetadataUpdate(env.cluster(), 
Collections.<String>emptySet());
+            env.kafkaClient().setNode(env.cluster().controller());
 
             // Test a call where we successfully create two ACLs.
-            ctx.mockClient.prepareResponse(new CreateAclsResponse(0,
+            env.kafkaClient().prepareResponse(new CreateAclsResponse(0,
                 new ArrayList<AclCreationResponse>() {{
                         add(new AclCreationResponse(null));
                         add(new AclCreationResponse(null));
                     }}));
-            CreateAclsResults results = ctx.client.createAcls(new 
ArrayList<AclBinding>() {{
+            CreateAclsResults results = env.adminClient().createAcls(new 
ArrayList<AclBinding>() {{
                         add(ACL1);
                         add(ACL2);
                     }});
@@ -272,12 +245,12 @@ public class KafkaAdminClientTest {
             results.all().get();
 
             // Test a call where we fail to create one ACL.
-            ctx.mockClient.prepareResponse(new CreateAclsResponse(0,
+            env.kafkaClient().prepareResponse(new CreateAclsResponse(0,
                     new ArrayList<AclCreationResponse>() {{
                         add(new AclCreationResponse(new 
SecurityDisabledException("Security is disabled")));
                         add(new AclCreationResponse(null));
                     }}));
-            results = ctx.client.createAcls(new ArrayList<AclBinding>() {{
+            results = env.adminClient().createAcls(new ArrayList<AclBinding>() 
{{
                     add(ACL1);
                     add(ACL2);
                 }});
@@ -290,13 +263,13 @@ public class KafkaAdminClientTest {
 
     @Test
     public void testDeleteAcls() throws Exception {
-        try (MockKafkaAdminClientContext ctx = new 
MockKafkaAdminClientContext(newStrMap())) {
-            ctx.mockClient.setNodeApiVersions(NodeApiVersions.create());
-            ctx.mockClient.prepareMetadataUpdate(ctx.cluster, 
Collections.<String>emptySet());
-            ctx.mockClient.setNode(ctx.nodes.get(0));
+        try (MockKafkaAdminClientEnv env = mockClientEnv()) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareMetadataUpdate(env.cluster(), 
Collections.<String>emptySet());
+            env.kafkaClient().setNode(env.cluster().controller());
 
             // Test a call where one filter has an error.
-            ctx.mockClient.prepareResponse(new DeleteAclsResponse(0, new 
ArrayList<AclFilterResponse>() {{
+            env.kafkaClient().prepareResponse(new DeleteAclsResponse(0, new 
ArrayList<AclFilterResponse>() {{
                     add(new AclFilterResponse(null,
                             new ArrayList<AclDeletionResult>() {{
                                 add(new AclDeletionResult(null, ACL1));
@@ -305,7 +278,7 @@ public class KafkaAdminClientTest {
                     add(new AclFilterResponse(new 
SecurityDisabledException("No security"),
                         Collections.<AclDeletionResult>emptySet()));
                 }}));
-            DeleteAclsResults results = ctx.client.deleteAcls(new 
ArrayList<AclBindingFilter>() {{
+            DeleteAclsResults results = env.adminClient().deleteAcls(new 
ArrayList<AclBindingFilter>() {{
                         add(FILTER1);
                         add(FILTER2);
                     }});
@@ -320,7 +293,7 @@ public class KafkaAdminClientTest {
             assertFutureError(results.all(), SecurityDisabledException.class);
 
             // Test a call where one deletion result has an error.
-            ctx.mockClient.prepareResponse(new DeleteAclsResponse(0, new 
ArrayList<AclFilterResponse>() {{
+            env.kafkaClient().prepareResponse(new DeleteAclsResponse(0, new 
ArrayList<AclFilterResponse>() {{
                     add(new AclFilterResponse(null,
                         new ArrayList<AclDeletionResult>() {{
                                 add(new AclDeletionResult(null, ACL1));
@@ -328,7 +301,7 @@ public class KafkaAdminClientTest {
                             }}));
                     add(new AclFilterResponse(null, 
Collections.<AclDeletionResult>emptySet()));
                 }}));
-            results = ctx.client.deleteAcls(
+            results = env.adminClient().deleteAcls(
                     new ArrayList<AclBindingFilter>() {{
                             add(FILTER1);
                             add(FILTER2);
@@ -337,7 +310,7 @@ public class KafkaAdminClientTest {
             assertFutureError(results.all(), SecurityDisabledException.class);
 
             // Test a call where there are no errors.
-            ctx.mockClient.prepareResponse(new DeleteAclsResponse(0, new 
ArrayList<AclFilterResponse>() {{
+            env.kafkaClient().prepareResponse(new DeleteAclsResponse(0, new 
ArrayList<AclFilterResponse>() {{
                     add(new AclFilterResponse(null,
                         new ArrayList<AclDeletionResult>() {{
                                 add(new AclDeletionResult(null, ACL1));
@@ -347,7 +320,7 @@ public class KafkaAdminClientTest {
                                 add(new AclDeletionResult(null, ACL2));
                             }}));
                 }}));
-            results = ctx.client.deleteAcls(
+            results = env.adminClient().deleteAcls(
                     new ArrayList<AclBindingFilter>() {{
                         add(FILTER1);
                         add(FILTER2);

http://git-wip-us.apache.org/repos/asf/kafka/blob/56623efd/clients/src/test/java/org/apache/kafka/clients/admin/MockKafkaAdminClientEnv.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/MockKafkaAdminClientEnv.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/MockKafkaAdminClientEnv.java
new file mode 100644
index 0000000..1fa0249
--- /dev/null
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/MockKafkaAdminClientEnv.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.utils.Time;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Simple utility for setting up a mock {@link KafkaAdminClient} that uses a 
{@link MockClient} for a supplied
+ * {@link Cluster}. Create a {@link Cluster} manually or use {@link 
org.apache.kafka.test.TestUtils} methods to
+ * easily create a simple cluster.
+ * <p>
+ * To use in a test, create an instance and prepare its {@link #kafkaClient() 
MockClient} with the expected responses
+ * for the {@link AdminClient}. Then, use the {@link #adminClient() 
AdminClient} in the test, which will then use the MockClient
+ * and receive the responses you provided.
+ * <p>
+ * When finished, be sure to {@link #close() close} the environment object.
+ */
+public class MockKafkaAdminClientEnv implements AutoCloseable {
+    private final AdminClientConfig adminClientConfig;
+    private final Metadata metadata;
+    private final MockClient mockClient;
+    private final KafkaAdminClient client;
+    private final Cluster cluster;
+
+    public MockKafkaAdminClientEnv(Cluster cluster, String...vals) {
+        this(cluster, newStrMap(vals));
+    }
+
+    public MockKafkaAdminClientEnv(Cluster cluster, Map<String, Object> 
config) {
+        this.adminClientConfig = new AdminClientConfig(config);
+        this.cluster = cluster;
+        this.metadata = new 
Metadata(adminClientConfig.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG),
+                
adminClientConfig.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG));
+        this.mockClient = new MockClient(Time.SYSTEM, this.metadata);
+        this.client = KafkaAdminClient.create(adminClientConfig, mockClient, 
metadata);
+    }
+
+    public Cluster cluster() {
+        return cluster;
+    }
+
+    public AdminClient adminClient() {
+        return client;
+    }
+
+    public MockClient kafkaClient() {
+        return mockClient;
+    }
+
+    @Override
+    public void close() {
+        this.client.close();
+    }
+
+    private static Map<String, Object> newStrMap(String... vals) {
+        Map<String, Object> map = new HashMap<>();
+        map.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:8121");
+        map.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000");
+        if (vals.length % 2 != 0) {
+            throw new IllegalStateException();
+        }
+        for (int i = 0; i < vals.length; i += 2) {
+            map.put(vals[i], vals[i + 1]);
+        }
+        return map;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/56623efd/clients/src/test/java/org/apache/kafka/test/TestUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java 
b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
index 1019282..c1fc675 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -33,6 +33,7 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -69,6 +70,10 @@ public class TestUtils {
     public static final Random RANDOM = new Random();
     public static final long DEFAULT_MAX_WAIT_MS = 15000;
 
+    public static Cluster singletonCluster() {
+        return clusterWith(1);
+    }
+
     public static Cluster singletonCluster(final Map<String, Integer> 
topicPartitionCounts) {
         return clusterWith(1, topicPartitionCounts);
     }
@@ -77,6 +82,10 @@ public class TestUtils {
         return clusterWith(1, topic, partitions);
     }
 
+    public static Cluster clusterWith(int nodes) {
+        return clusterWith(nodes, new HashMap<String, Integer>());
+    }
+
     public static Cluster clusterWith(final int nodes, final Map<String, 
Integer> topicPartitionCounts) {
         final Node[] ns = new Node[nodes];
         for (int i = 0; i < nodes; i++)

http://git-wip-us.apache.org/repos/asf/kafka/blob/56623efd/config/connect-distributed.properties
----------------------------------------------------------------------
diff --git a/config/connect-distributed.properties 
b/config/connect-distributed.properties
index 752e1f5..5f3f358 100644
--- a/config/connect-distributed.properties
+++ b/config/connect-distributed.properties
@@ -15,7 +15,11 @@
 # limitations under the License.
 ##
 
-# These are defaults. This file just demonstrates how to override some 
settings.
+# This file contains some of the configurations for the Kafka Connect 
distributed worker. This file is intended
+# to be used with the examples, and some settings may differ from those used 
in a production system, especially
+# the `bootstrap.servers` and those specifying replication factors.
+
+# A list of host/port pairs to use for establishing the initial connection to 
the Kafka cluster.
 bootstrap.servers=localhost:9092
 
 # unique name for the cluster, used in forming the Connect cluster group. Note 
that this must not conflict with consumer group IDs
@@ -30,22 +34,41 @@ value.converter=org.apache.kafka.connect.json.JsonConverter
 key.converter.schemas.enable=true
 value.converter.schemas.enable=true
 
-# The internal converter used for offsets and config data is configurable and 
must be specified, but most users will
-# always want to use the built-in default. Offset and config data is never 
visible outside of Kafka Connect in this format.
+# The internal converter used for offsets, config, and status data is 
configurable and must be specified, but most users will
+# always want to use the built-in default. Offset, config, and status data is 
never visible outside of Kafka Connect in this format.
 internal.key.converter=org.apache.kafka.connect.json.JsonConverter
 internal.value.converter=org.apache.kafka.connect.json.JsonConverter
 internal.key.converter.schemas.enable=false
 internal.value.converter.schemas.enable=false
 
-# Topic to use for storing offsets. This topic should have many partitions and 
be replicated.
+# Topic to use for storing offsets. This topic should have many partitions and 
be replicated and compacted.
+# Kafka Connect will attempt to create the topic automatically when needed, 
but you can always manually create
+# the topic before starting Kafka Connect if a specific topic configuration is 
needed.
+# Most users will want to use the built-in default replication factor of 3 or 
in some cases even specify a larger value.
+# Since this means there must be at least as many brokers as the maximum 
replication factor used, we'd like to be able
+# to run this example on a single-broker cluster and so here we instead set 
the replication factor to 1.
 offset.storage.topic=connect-offsets
+offset.storage.replication.factor=1
+#offset.storage.partitions=25
 
-# Topic to use for storing connector and task configurations; note that this 
should be a single partition, highly replicated topic.
-# You may need to manually create the topic to ensure single partition for the 
config topic as auto created topics may have multiple partitions.
+# Topic to use for storing connector and task configurations; note that this 
should be a single partition, highly replicated,
+# and compacted topic. Kafka Connect will attempt to create the topic 
automatically when needed, but you can always manually create
+# the topic before starting Kafka Connect if a specific topic configuration is 
needed.
+# Most users will want to use the built-in default replication factor of 3 or 
in some cases even specify a larger value.
+# Since this means there must be at least as many brokers as the maximum 
replication factor used, we'd like to be able
+# to run this example on a single-broker cluster and so here we instead set 
the replication factor to 1.
 config.storage.topic=connect-configs
+config.storage.replication.factor=1
 
-# Topic to use for storing statuses. This topic can have multiple partitions 
and should be replicated.
+# Topic to use for storing statuses. This topic can have multiple partitions 
and should be replicated and compacted.
+# Kafka Connect will attempt to create the topic automatically when needed, 
but you can always manually create
+# the topic before starting Kafka Connect if a specific topic configuration is 
needed.
+# Most users will want to use the built-in default replication factor of 3 or 
in some cases even specify a larger value.
+# Since this means there must be at least as many brokers as the maximum 
replication factor used, we'd like to be able
+# to run this example on a single-broker cluster and so here we instead set 
the replication factor to 1.
 status.storage.topic=connect-status
+status.storage.replication.factor=1
+#status.storage.partitions=5
 
 # Flush much faster than normal, which is useful for testing/debugging
 offset.flush.interval.ms=10000

http://git-wip-us.apache.org/repos/asf/kafka/blob/56623efd/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
index 2d77928..eb3b876 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
@@ -88,19 +88,49 @@ public class DistributedConfig extends WorkerConfig {
      * <code>offset.storage.topic</code>
      */
     public static final String OFFSET_STORAGE_TOPIC_CONFIG = 
"offset.storage.topic";
-    private static final String OFFSET_STORAGE_TOPIC_CONFIG_DOC = "kafka topic 
to store connector offsets in";
+    private static final String OFFSET_STORAGE_TOPIC_CONFIG_DOC = "The name of 
the Kafka topic where connector offsets are stored";
+
+    /**
+     * <code>offset.storage.partitions</code>
+     */
+    public static final String OFFSET_STORAGE_PARTITIONS_CONFIG = 
"offset.storage.partitions";
+    private static final String OFFSET_STORAGE_PARTITIONS_CONFIG_DOC = "The 
number of partitions used when creating the offset storage topic";
+
+    /**
+     * <code>offset.storage.replication.factor</code>
+     */
+    public static final String OFFSET_STORAGE_REPLICATION_FACTOR_CONFIG = 
"offset.storage.replication.factor";
+    private static final String OFFSET_STORAGE_REPLICATION_FACTOR_CONFIG_DOC = 
"Replication factor used when creating the offset storage topic";
 
     /**
      * <code>config.storage.topic</code>
      */
     public static final String CONFIG_TOPIC_CONFIG = "config.storage.topic";
-    private static final String CONFIG_TOPIC_CONFIG_DOC = "kafka topic to 
store configs";
+    private static final String CONFIG_TOPIC_CONFIG_DOC = "The name of the 
Kafka topic where connector configurations are stored";
+
+    /**
+     * <code>config.storage.replication.factor</code>
+     */
+    public static final String CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG = 
"config.storage.replication.factor";
+    private static final String CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG_DOC = 
"Replication factor used when creating the configuration storage topic";
 
     /**
      * <code>status.storage.topic</code>
      */
     public static final String STATUS_STORAGE_TOPIC_CONFIG = 
"status.storage.topic";
-    public static final String STATUS_STORAGE_TOPIC_CONFIG_DOC = "kafka topic 
to track connector and task status";
+    public static final String STATUS_STORAGE_TOPIC_CONFIG_DOC = "The name of 
the Kafka topic where connector and task status are stored";
+
+    /**
+     * <code>status.storage.partitions</code>
+     */
+    public static final String STATUS_STORAGE_PARTITIONS_CONFIG = 
"status.storage.partitions";
+    private static final String STATUS_STORAGE_PARTITIONS_CONFIG_DOC = "The 
number of partitions used when creating the status storage topic";
+
+    /**
+     * <code>status.storage.replication.factor</code>
+     */
+    public static final String STATUS_STORAGE_REPLICATION_FACTOR_CONFIG = 
"status.storage.replication.factor";
+    private static final String STATUS_STORAGE_REPLICATION_FACTOR_CONFIG_DOC = 
"Replication factor used when creating the status storage topic";
 
     static {
         CONFIG = baseConfigDef()
@@ -209,14 +239,44 @@ public class DistributedConfig extends WorkerConfig {
                         ConfigDef.Type.STRING,
                         ConfigDef.Importance.HIGH,
                         OFFSET_STORAGE_TOPIC_CONFIG_DOC)
+                .define(OFFSET_STORAGE_PARTITIONS_CONFIG,
+                        ConfigDef.Type.INT,
+                        25,
+                        atLeast(1),
+                        ConfigDef.Importance.LOW,
+                        OFFSET_STORAGE_PARTITIONS_CONFIG_DOC)
+                .define(OFFSET_STORAGE_REPLICATION_FACTOR_CONFIG,
+                        ConfigDef.Type.SHORT,
+                        (short) 3,
+                        atLeast(1),
+                        ConfigDef.Importance.LOW,
+                        OFFSET_STORAGE_REPLICATION_FACTOR_CONFIG_DOC)
                 .define(CONFIG_TOPIC_CONFIG,
                         ConfigDef.Type.STRING,
                         ConfigDef.Importance.HIGH,
                         CONFIG_TOPIC_CONFIG_DOC)
+                .define(CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG,
+                        ConfigDef.Type.SHORT,
+                        (short) 3,
+                        atLeast(1),
+                        ConfigDef.Importance.LOW,
+                        CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG_DOC)
                 .define(STATUS_STORAGE_TOPIC_CONFIG,
                         ConfigDef.Type.STRING,
                         ConfigDef.Importance.HIGH,
-                        STATUS_STORAGE_TOPIC_CONFIG_DOC);
+                        STATUS_STORAGE_TOPIC_CONFIG_DOC)
+                .define(STATUS_STORAGE_PARTITIONS_CONFIG,
+                        ConfigDef.Type.INT,
+                        5,
+                        atLeast(1),
+                        ConfigDef.Importance.LOW,
+                        STATUS_STORAGE_PARTITIONS_CONFIG_DOC)
+                .define(STATUS_STORAGE_REPLICATION_FACTOR_CONFIG,
+                        ConfigDef.Type.SHORT,
+                        (short) 3,
+                        atLeast(1),
+                        ConfigDef.Importance.LOW,
+                        STATUS_STORAGE_REPLICATION_FACTOR_CONFIG_DOC);
     }
 
     public DistributedConfig(Map<String, String> props) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/56623efd/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
index c30e840..dd69c30 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.connect.storage;
 
+import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.ProducerConfig;
@@ -39,6 +40,7 @@ import 
org.apache.kafka.connect.runtime.distributed.DistributedConfig;
 import org.apache.kafka.connect.util.Callback;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.apache.kafka.connect.util.KafkaBasedLog;
+import org.apache.kafka.connect.util.TopicAdmin;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -403,7 +405,7 @@ public class KafkaConfigBackingStore implements 
ConfigBackingStore {
     }
 
     // package private for testing
-    KafkaBasedLog<String, byte[]> setupAndCreateKafkaBasedLog(String topic, 
WorkerConfig config) {
+    KafkaBasedLog<String, byte[]> setupAndCreateKafkaBasedLog(String topic, 
final WorkerConfig config) {
         Map<String, Object> producerProps = new HashMap<>();
         producerProps.putAll(config.originals());
         producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
@@ -415,12 +417,29 @@ public class KafkaConfigBackingStore implements 
ConfigBackingStore {
         consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
         consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
 
-        return createKafkaBasedLog(topic, producerProps, consumerProps, new 
ConsumeCallback());
+        Map<String, Object> adminProps = new HashMap<>(config.originals());
+        NewTopic topicDescription = TopicAdmin.defineTopic(topic).
+                compacted().
+                partitions(1).
+                
replicationFactor(config.getShort(DistributedConfig.CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG)).
+                build();
+
+        return createKafkaBasedLog(topic, producerProps, consumerProps, new 
ConsumeCallback(), topicDescription, adminProps);
     }
 
     private KafkaBasedLog<String, byte[]> createKafkaBasedLog(String topic, 
Map<String, Object> producerProps,
-                                                              Map<String, 
Object> consumerProps, Callback<ConsumerRecord<String, byte[]>> 
consumedCallback) {
-        return new KafkaBasedLog<>(topic, producerProps, consumerProps, 
consumedCallback, Time.SYSTEM);
+                                                              Map<String, 
Object> consumerProps,
+                                                              
Callback<ConsumerRecord<String, byte[]>> consumedCallback,
+                                                              final NewTopic 
topicDescription, final Map<String, Object> adminProps) {
+        Runnable createTopics = new Runnable() {
+            @Override
+            public void run() {
+                try (TopicAdmin admin = new TopicAdmin(adminProps)) {
+                    admin.createTopics(topicDescription);
+                }
+            }
+        };
+        return new KafkaBasedLog<>(topic, producerProps, consumerProps, 
consumedCallback, Time.SYSTEM, createTopics);
     }
 
     @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/kafka/blob/56623efd/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
index d927de7..cb4f089 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.connect.storage;
 
+import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.ProducerConfig;
@@ -29,6 +30,7 @@ import 
org.apache.kafka.connect.runtime.distributed.DistributedConfig;
 import org.apache.kafka.connect.util.Callback;
 import org.apache.kafka.connect.util.ConvertingFutureCallback;
 import org.apache.kafka.connect.util.KafkaBasedLog;
+import org.apache.kafka.connect.util.TopicAdmin;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -58,7 +60,7 @@ public class KafkaOffsetBackingStore implements 
OffsetBackingStore {
     private HashMap<ByteBuffer, ByteBuffer> data;
 
     @Override
-    public void configure(WorkerConfig config) {
+    public void configure(final WorkerConfig config) {
         String topic = 
config.getString(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG);
         if (topic.equals(""))
             throw new ConfigException("Offset storage topic must be 
specified");
@@ -76,7 +78,29 @@ public class KafkaOffsetBackingStore implements 
OffsetBackingStore {
         consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
         consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
 
-        offsetLog = createKafkaBasedLog(topic, producerProps, consumerProps, 
consumedCallback);
+        Map<String, Object> adminProps = new HashMap<>(config.originals());
+        NewTopic topicDescription = TopicAdmin.defineTopic(topic).
+                compacted().
+                
partitions(config.getInt(DistributedConfig.OFFSET_STORAGE_PARTITIONS_CONFIG)).
+                
replicationFactor(config.getShort(DistributedConfig.OFFSET_STORAGE_REPLICATION_FACTOR_CONFIG)).
+                build();
+
+        offsetLog = createKafkaBasedLog(topic, producerProps, consumerProps, 
consumedCallback, topicDescription, adminProps);
+    }
+
+    private KafkaBasedLog<byte[], byte[]> createKafkaBasedLog(String topic, 
Map<String, Object> producerProps,
+                                                              Map<String, 
Object> consumerProps,
+                                                              
Callback<ConsumerRecord<byte[], byte[]>> consumedCallback,
+                                                              final NewTopic 
topicDescription, final Map<String, Object> adminProps) {
+        Runnable createTopics = new Runnable() {
+            @Override
+            public void run() {
+                try (TopicAdmin admin = new TopicAdmin(adminProps)) {
+                    admin.createTopics(topicDescription);
+                }
+            }
+        };
+        return new KafkaBasedLog<>(topic, producerProps, consumerProps, 
consumedCallback, Time.SYSTEM, createTopics);
     }
 
     @Override
@@ -135,11 +159,6 @@ public class KafkaOffsetBackingStore implements 
OffsetBackingStore {
         }
     };
 
-    private KafkaBasedLog<byte[], byte[]> createKafkaBasedLog(String topic, 
Map<String, Object> producerProps,
-                                                              Map<String, 
Object> consumerProps, Callback<ConsumerRecord<byte[], byte[]>> 
consumedCallback) {
-        return new KafkaBasedLog<>(topic, producerProps, consumerProps, 
consumedCallback, Time.SYSTEM);
-    }
-
     private static class SetCallbackFuture implements 
org.apache.kafka.clients.producer.Callback, Future<Void> {
         private int numLeft;
         private boolean completed = false;

http://git-wip-us.apache.org/repos/asf/kafka/blob/56623efd/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
index 137d7d1..11f951a 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.connect.storage;
 
+import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.ProducerConfig;
@@ -41,6 +42,7 @@ import org.apache.kafka.connect.util.Callback;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.apache.kafka.connect.util.KafkaBasedLog;
 import org.apache.kafka.connect.util.Table;
+import org.apache.kafka.connect.util.TopicAdmin;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -117,7 +119,7 @@ public class KafkaStatusBackingStore implements 
StatusBackingStore {
     }
 
     @Override
-    public void configure(WorkerConfig config) {
+    public void configure(final WorkerConfig config) {
         this.topic = 
config.getString(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG);
         if (topic.equals(""))
             throw new ConfigException("Must specify topic for connector 
status.");
@@ -133,13 +135,35 @@ public class KafkaStatusBackingStore implements 
StatusBackingStore {
         consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
         consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
 
+        Map<String, Object> adminProps = new HashMap<>(config.originals());
+        NewTopic topicDescription = TopicAdmin.defineTopic(topic).
+                compacted().
+                
partitions(config.getInt(DistributedConfig.STATUS_STORAGE_PARTITIONS_CONFIG)).
+                
replicationFactor(config.getShort(DistributedConfig.STATUS_STORAGE_REPLICATION_FACTOR_CONFIG)).
+                build();
+
         Callback<ConsumerRecord<String, byte[]>> readCallback = new 
Callback<ConsumerRecord<String, byte[]>>() {
             @Override
             public void onCompletion(Throwable error, ConsumerRecord<String, 
byte[]> record) {
                 read(record);
             }
         };
-        this.kafkaLog = new KafkaBasedLog<>(topic, producerProps, 
consumerProps, readCallback, time);
+        this.kafkaLog = createKafkaBasedLog(topic, producerProps, 
consumerProps, readCallback, topicDescription, adminProps);
+    }
+
+    private KafkaBasedLog<String, byte[]> createKafkaBasedLog(String topic, 
Map<String, Object> producerProps,
+                                                              Map<String, 
Object> consumerProps,
+                                                              
Callback<ConsumerRecord<String, byte[]>> consumedCallback,
+                                                              final NewTopic 
topicDescription, final Map<String, Object> adminProps) {
+        Runnable createTopics = new Runnable() {
+            @Override
+            public void run() {
+                try (TopicAdmin admin = new TopicAdmin(adminProps)) {
+                    admin.createTopics(topicDescription);
+                }
+            }
+        };
+        return new KafkaBasedLog<>(topic, producerProps, consumerProps, 
consumedCallback, time, createTopics);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/56623efd/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
index df8eefc..0e190bc 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
@@ -81,6 +81,7 @@ public class KafkaBasedLog<K, V> {
     private Thread thread;
     private boolean stopRequested;
     private Queue<Callback<Void>> readLogEndOffsetCallbacks;
+    private Runnable initializer;
 
     /**
      * Create a new KafkaBasedLog object. This does not start reading the log 
and writing is not permitted until
@@ -97,12 +98,14 @@ public class KafkaBasedLog<K, V> {
      *                        behavior of this class.
      * @param consumedCallback callback to invoke for each {@link 
ConsumerRecord} consumed when tailing the log
      * @param time Time interface
+     * @param initializer the component that should be run when this log is 
{@link #start() started}; may be null
      */
     public KafkaBasedLog(String topic,
                          Map<String, Object> producerConfigs,
                          Map<String, Object> consumerConfigs,
                          Callback<ConsumerRecord<K, V>> consumedCallback,
-                         Time time) {
+                         Time time,
+                         Runnable initializer) {
         this.topic = topic;
         this.producerConfigs = producerConfigs;
         this.consumerConfigs = consumerConfigs;
@@ -110,18 +113,23 @@ public class KafkaBasedLog<K, V> {
         this.stopRequested = false;
         this.readLogEndOffsetCallbacks = new ArrayDeque<>();
         this.time = time;
+        this.initializer = initializer != null ? initializer : new Runnable() {
+            @Override
+            public void run() {
+            }
+        };
     }
 
     public void start() {
         log.info("Starting KafkaBasedLog with topic " + topic);
 
+        initializer.run();
         producer = createProducer();
         consumer = createConsumer();
 
         List<TopicPartition> partitions = new ArrayList<>();
 
-        // Until we have admin utilities we can use to check for the existence 
of this topic and create it if it is missing,
-        // we rely on topic auto-creation
+        // We expect that the topics will have been created either manually by 
the user or automatically by the herder
         List<PartitionInfo> partitionInfos = null;
         long started = time.milliseconds();
         while (partitionInfos == null && time.milliseconds() - started < 
CREATE_TOPIC_TIMEOUT_MS) {
@@ -327,4 +335,4 @@ public class KafkaBasedLog<K, V> {
             }
         }
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/56623efd/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
new file mode 100644
index 0000000..a3b4218
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.CreateTopicsOptions;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.TopicExistsException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * Utility to simplify creating and managing topics via the {@link 
org.apache.kafka.clients.admin.AdminClient}.
+ */
+public class TopicAdmin implements AutoCloseable {
+
+    private static final String CLEANUP_POLICY_CONFIG = "cleanup.policy";
+    private static final String CLEANUP_POLICY_COMPACT = "compact";
+
+    private static final String MIN_INSYNC_REPLICAS_CONFIG = 
"min.insync.replicas";
+
+    private static final String UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG = 
"unclean.leader.election.enable";
+
+    /**
+     * A builder of {@link NewTopic} instances.
+     */
+    public static class NewTopicBuilder {
+        private String name;
+        private int numPartitions;
+        private short replicationFactor;
+        private Map<String, String> configs = new HashMap<>();
+
+        NewTopicBuilder(String name) {
+            this.name = name;
+        }
+
+        /**
+         * Specify the desired number of partitions for the topic.
+         *
+         * @param numPartitions the desired number of partitions; must be 
positive
+         * @return this builder to allow methods to be chained; never null
+         */
+        public NewTopicBuilder partitions(int numPartitions) {
+            this.numPartitions = numPartitions;
+            return this;
+        }
+
+        /**
+         * Specify the desired replication factor for the topic.
+         *
+         * @param replicationFactor the desired replication factor; must be 
positive
+         * @return this builder to allow methods to be chained; never null
+         */
+        public NewTopicBuilder replicationFactor(short replicationFactor) {
+            this.replicationFactor = replicationFactor;
+            return this;
+        }
+
+        /**
+         * Specify that the topic should be compacted.
+         *
+         * @return this builder to allow methods to be chained; never null
+         */
+        public NewTopicBuilder compacted() {
+            this.configs.put(CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT);
+            return this;
+        }
+
+        /**
+         * Specify the minimum number of in-sync replicas required for this 
topic.
+         *
+         * @param minInSyncReplicas the minimum number of in-sync replicas 
allowed for the topic; must be positive
+         * @return this builder to allow methods to be chained; never null
+         */
+        public NewTopicBuilder minInSyncReplicas(short minInSyncReplicas) {
+            this.configs.put(MIN_INSYNC_REPLICAS_CONFIG, 
Short.toString(minInSyncReplicas));
+            return this;
+        }
+
+        /**
+         * Specify whether the broker is allowed to elect a leader that was 
not an in-sync replica when no ISRs
+         * are available.
+         *
+         * @param allow true if unclean leaders can be elected, or false if 
they are not allowed
+         * @return this builder to allow methods to be chained; never null
+         */
+        public NewTopicBuilder uncleanLeaderElection(boolean allow) {
+            this.configs.put(UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, 
Boolean.toString(allow));
+            return this;
+        }
+
+        /**
+         * Specify the configuration properties for the topic, overwriting any 
previously-set properties.
+         *
+         * @param configs the desired topic configuration properties, or null 
if all existing properties should be cleared
+         * @return this builder to allow methods to be chained; never null
+         */
+        public NewTopicBuilder config(Map<String, Object> configs) {
+            if (configs != null) {
+                for (Map.Entry<String, Object> entry : configs.entrySet()) {
+                    Object value = entry.getValue();
+                    this.configs.put(entry.getKey(), value != null ? 
value.toString() : null);
+                }
+            } else {
+                this.configs.clear();
+            }
+            return this;
+        }
+
+        /**
+         * Build the {@link NewTopic} representation.
+         *
+         * @return the topic description; never null
+         */
+        public NewTopic build() {
+            return new NewTopic(name, numPartitions, 
replicationFactor).configs(configs);
+        }
+    }
+
+    /**
+     * Obtain a {@link NewTopicBuilder builder} to define a {@link NewTopic}.
+     *
+     * @param topicName the name of the topic
+     * @return the {@link NewTopic} description of the topic; never null
+     */
+    public static NewTopicBuilder defineTopic(String topicName) {
+        return new NewTopicBuilder(topicName);
+    }
+
+    private static final Logger log = 
LoggerFactory.getLogger(TopicAdmin.class);
+    private final Map<String, Object> adminConfig;
+    private final AdminClient admin;
+
+    /**
+     * Create a new topic admin component with the given configuration.
+     *
+     * @param adminConfig the configuration for the {@link AdminClient}
+     */
+    public TopicAdmin(Map<String, Object> adminConfig) {
+        this(adminConfig, AdminClient.create(adminConfig));
+    }
+
+    // visible for testing
+    TopicAdmin(Map<String, Object> adminConfig, AdminClient adminClient) {
+        this.admin = adminClient;
+        this.adminConfig = adminConfig != null ? adminConfig : 
Collections.<String, Object>emptyMap();
+    }
+
+   /**
+     * Attempt to create the topic described by the given definition, 
returning true if the topic was created or false
+     * if the topic already existed.
+     *
+     * @param topic the specification of the topic
+     * @return true if the topic was created or false if the topic already 
existed.
+     * @throws ConnectException            if an error occurs, the operation 
takes too long, or the thread is interrupted while
+     *                                     attempting to perform this operation
+     * @throws UnsupportedVersionException if the broker does not support the 
necessary APIs to perform this request
+     */
+    public boolean createTopic(NewTopic topic) {
+        if (topic == null) return false;
+        Set<String> newTopicNames = createTopics(topic);
+        return newTopicNames.contains(topic.name());
+    }
+
+    /**
+     * Attempt to create the topics described by the given definitions, 
returning all of the names of those topics that
+     * were created by this request. Any existing topics with the same name 
are unchanged, and the names of such topics
+     * are excluded from the result.
+     * <p>
+     * If multiple topic definitions have the same topic name, the last one 
with that name will be used.
+     * </p>
+     *
+     * @param topics the specifications of the topics
+     * @return the names of the topics that were created by this operation; 
never null but possibly empty
+     * @throws ConnectException            if an error occurs, the operation 
takes too long, or the thread is interrupted while
+     *                                     attempting to perform this operation
+     * @throws UnsupportedVersionException if the broker does not support the 
necessary APIs to perform this request
+     */
+    public Set<String> createTopics(NewTopic... topics) {
+        Map<String, NewTopic> topicsByName = new HashMap<>();
+        if (topics != null) {
+            for (NewTopic topic : topics) {
+                if (topic != null) topicsByName.put(topic.name(), topic);
+            }
+        }
+        if (topicsByName.isEmpty()) return Collections.emptySet();
+        String bootstrapServers = bootstrapServers();
+        String topicNameList = Utils.join(topicsByName.keySet(), "', '");
+
+        // Attempt to create any missing topics
+        CreateTopicsOptions args = new 
CreateTopicsOptions().validateOnly(false);
+        Map<String, KafkaFuture<Void>> newResults = 
admin.createTopics(topicsByName.values(), args).results();
+
+        // Iterate over each future so that we can handle individual failures 
like when some topics already exist
+        Set<String> newlyCreatedTopicNames = new HashSet<>();
+        for (Map.Entry<String, KafkaFuture<Void>> entry : 
newResults.entrySet()) {
+            String topic = entry.getKey();
+            try {
+                entry.getValue().get();
+                log.info("Created topic {} on brokers at {}", 
topicsByName.get(topic), bootstrapServers);
+                newlyCreatedTopicNames.add(topic);
+            } catch (ExecutionException e) {
+                Throwable cause = e.getCause();
+                if (e.getCause() instanceof TopicExistsException) {
+                    log.debug("Found existing topic '{}' on the brokers at 
{}", topic, bootstrapServers);
+                    continue;
+                }
+                if (cause instanceof UnsupportedVersionException) {
+                    log.error("Unable to use Kafka admin client to create 
topic descriptions for '{}' using the brokers at {}", topicNameList, 
bootstrapServers);
+                    throw (UnsupportedVersionException) cause;
+                }
+                if (cause instanceof TimeoutException) {
+                    // Timed out waiting for the operation to complete
+                    throw new ConnectException("Timed out while checking for 
or creating topic(s) '" + topicNameList + "'." +
+                            " This could indicate a connectivity issue, 
unavailable topic partitions, or if" +
+                            " this is your first use of the topic it may have 
taken too long to create.", cause);
+                }
+                throw new ConnectException("Error while attempting to 
create/find topic(s) '" + topicNameList + "'", e);
+            } catch (InterruptedException e) {
+                Thread.interrupted();
+                throw new ConnectException("Interrupted while attempting to 
create/find topic(s) '" + topicNameList + "'", e);
+            }
+        }
+        return newlyCreatedTopicNames;
+    }
+
+    @Override
+    public void close() {
+        admin.close();
+    }
+
+    private String bootstrapServers() {
+        Object servers = 
adminConfig.get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG);
+        return servers != null ? servers.toString() : "<unknown>";
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/56623efd/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
index 9d0f23e..9da574d 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.connect.storage;
 
 import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.ProducerConfig;
@@ -65,12 +66,14 @@ import static org.junit.Assert.assertTrue;
 @SuppressWarnings("unchecked")
 public class KafkaConfigBackingStoreTest {
     private static final String TOPIC = "connect-configs";
+    private static final short TOPIC_REPLICATION_FACTOR = 5;
     private static final Map<String, String> DEFAULT_CONFIG_STORAGE_PROPS = 
new HashMap<>();
     private static final DistributedConfig DEFAULT_DISTRIBUTED_CONFIG;
 
     static {
         
DEFAULT_CONFIG_STORAGE_PROPS.put(DistributedConfig.CONFIG_TOPIC_CONFIG, TOPIC);
         
DEFAULT_CONFIG_STORAGE_PROPS.put(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, 
"connect-offsets");
+        
DEFAULT_CONFIG_STORAGE_PROPS.put(DistributedConfig.CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG,
 Short.toString(TOPIC_REPLICATION_FACTOR));
         DEFAULT_CONFIG_STORAGE_PROPS.put(DistributedConfig.GROUP_ID_CONFIG, 
"connect");
         
DEFAULT_CONFIG_STORAGE_PROPS.put(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG, 
"status-topic");
         
DEFAULT_CONFIG_STORAGE_PROPS.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
"broker1:9092,broker2:9093");
@@ -136,6 +139,8 @@ public class KafkaConfigBackingStoreTest {
     private Capture<String> capturedTopic = EasyMock.newCapture();
     private Capture<Map<String, Object>> capturedProducerProps = 
EasyMock.newCapture();
     private Capture<Map<String, Object>> capturedConsumerProps = 
EasyMock.newCapture();
+    private Capture<Map<String, Object>> capturedAdminProps = 
EasyMock.newCapture();
+    private Capture<NewTopic> capturedNewTopic = EasyMock.newCapture();
     private Capture<Callback<ConsumerRecord<String, byte[]>>> 
capturedConsumedCallback = EasyMock.newCapture();
 
     private long logOffset = 0;
@@ -163,6 +168,9 @@ public class KafkaConfigBackingStoreTest {
         
assertEquals("org.apache.kafka.common.serialization.StringDeserializer", 
capturedConsumerProps.getValue().get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG));
         
assertEquals("org.apache.kafka.common.serialization.ByteArrayDeserializer", 
capturedConsumerProps.getValue().get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG));
 
+        assertEquals(TOPIC, capturedNewTopic.getValue().name());
+        assertEquals(1, capturedNewTopic.getValue().partitions());
+        assertEquals(TOPIC_REPLICATION_FACTOR, 
capturedNewTopic.getValue().replicationFactor());
         configStorage.start();
         configStorage.stop();
 
@@ -747,7 +755,8 @@ public class KafkaConfigBackingStoreTest {
     private void expectConfigure() throws Exception {
         PowerMock.expectPrivate(configStorage, "createKafkaBasedLog",
                 EasyMock.capture(capturedTopic), 
EasyMock.capture(capturedProducerProps),
-                EasyMock.capture(capturedConsumerProps), 
EasyMock.capture(capturedConsumedCallback))
+                EasyMock.capture(capturedConsumerProps), 
EasyMock.capture(capturedConsumedCallback),
+                EasyMock.capture(capturedNewTopic), 
EasyMock.capture(capturedAdminProps))
                 .andReturn(storeLog);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/56623efd/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
index f22b328..70d7f40 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.connect.storage;
 
 import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.ProducerConfig;
@@ -62,13 +63,17 @@ import static org.junit.Assert.fail;
 @SuppressWarnings("unchecked")
 public class KafkaOffsetBackingStoreTest {
     private static final String TOPIC = "connect-offsets";
+    private static final short TOPIC_PARTITIONS = 2;
+    private static final short TOPIC_REPLICATION_FACTOR = 5;
     private static final Map<String, String> DEFAULT_PROPS = new HashMap<>();
     private static final DistributedConfig DEFAULT_DISTRIBUTED_CONFIG;
     static {
         DEFAULT_PROPS.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
"broker1:9092,broker2:9093");
         DEFAULT_PROPS.put(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, 
TOPIC);
+        
DEFAULT_PROPS.put(DistributedConfig.OFFSET_STORAGE_REPLICATION_FACTOR_CONFIG, 
Short.toString(TOPIC_REPLICATION_FACTOR));
+        DEFAULT_PROPS.put(DistributedConfig.OFFSET_STORAGE_PARTITIONS_CONFIG, 
Integer.toString(TOPIC_PARTITIONS));
         DEFAULT_PROPS.put(DistributedConfig.CONFIG_TOPIC_CONFIG, 
"connect-configs");
-        DEFAULT_PROPS.put(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, 
"connect-offsets");
+        
DEFAULT_PROPS.put(DistributedConfig.CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG, 
Short.toString(TOPIC_REPLICATION_FACTOR));
         DEFAULT_PROPS.put(DistributedConfig.GROUP_ID_CONFIG, "connect");
         DEFAULT_PROPS.put(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG, 
"status-topic");
         DEFAULT_PROPS.put(DistributedConfig.KEY_CONVERTER_CLASS_CONFIG, 
"org.apache.kafka.connect.json.JsonConverter");
@@ -100,6 +105,8 @@ public class KafkaOffsetBackingStoreTest {
     private Capture<String> capturedTopic = EasyMock.newCapture();
     private Capture<Map<String, Object>> capturedProducerProps = 
EasyMock.newCapture();
     private Capture<Map<String, Object>> capturedConsumerProps = 
EasyMock.newCapture();
+    private Capture<Map<String, Object>> capturedAdminProps = 
EasyMock.newCapture();
+    private Capture<NewTopic> capturedNewTopic = EasyMock.newCapture();
     private Capture<Callback<ConsumerRecord<byte[], byte[]>>> 
capturedConsumedCallback = EasyMock.newCapture();
 
     @Before
@@ -122,6 +129,10 @@ public class KafkaOffsetBackingStoreTest {
         
assertEquals("org.apache.kafka.common.serialization.ByteArrayDeserializer", 
capturedConsumerProps.getValue().get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG));
         
assertEquals("org.apache.kafka.common.serialization.ByteArrayDeserializer", 
capturedConsumerProps.getValue().get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG));
 
+        assertEquals(TOPIC, capturedNewTopic.getValue().name());
+        assertEquals(TOPIC_PARTITIONS, 
capturedNewTopic.getValue().partitions());
+        assertEquals(TOPIC_REPLICATION_FACTOR, 
capturedNewTopic.getValue().replicationFactor());
+
         store.start();
         store.stop();
 
@@ -402,7 +413,8 @@ public class KafkaOffsetBackingStoreTest {
 
     private void expectConfigure() throws Exception {
         PowerMock.expectPrivate(store, "createKafkaBasedLog", 
EasyMock.capture(capturedTopic), EasyMock.capture(capturedProducerProps),
-                EasyMock.capture(capturedConsumerProps), 
EasyMock.capture(capturedConsumedCallback))
+                EasyMock.capture(capturedConsumerProps), 
EasyMock.capture(capturedConsumedCallback),
+                EasyMock.capture(capturedNewTopic), 
EasyMock.capture(capturedAdminProps))
                 .andReturn(storeLog);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/56623efd/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
index f734032..b2c164d 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
@@ -111,6 +111,8 @@ public class KafkaBasedLogTest {
     private KafkaBasedLog<String, String> store;
 
     @Mock
+    private Runnable initializer;
+    @Mock
     private KafkaProducer<String, String> producer;
     private MockConsumer<String, String> consumer;
 
@@ -131,7 +133,7 @@ public class KafkaBasedLogTest {
     @Before
     public void setUp() throws Exception {
         store = PowerMock.createPartialMock(KafkaBasedLog.class, new 
String[]{"createConsumer", "createProducer"},
-                TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, consumedCallback, time);
+                TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, consumedCallback, time, 
initializer);
         consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
         consumer.updatePartitions(TOPIC, Arrays.asList(TPINFO0, TPINFO1));
         Map<TopicPartition, Long> beginningOffsets = new HashMap<>();
@@ -462,6 +464,9 @@ public class KafkaBasedLogTest {
 
 
     private void expectStart() throws Exception {
+        initializer.run();
+        EasyMock.expectLastCall().times(1);
+
         PowerMock.expectPrivate(store, "createProducer")
                 .andReturn(producer);
         PowerMock.expectPrivate(store, "createConsumer")
@@ -478,4 +483,4 @@ public class KafkaBasedLogTest {
         return ByteBuffer.wrap(v.getBytes());
     }
 
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/56623efd/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
new file mode 100644
index 0000000..bafbce8
--- /dev/null
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.clients.admin.MockKafkaAdminClientEnv;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.requests.CreateTopicsResponse;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TopicAdminTest {
+
+    /**
+     * 0.10.x clients can't talk with 0.9.x brokers, and 0.10.0.0 introduced 
the new protocol with API versions.
+     * That means we can simulate an API version mismatch.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void returnNullWithApiVersionMismatch() {
+        final NewTopic newTopic = 
TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build();
+        boolean internal = false;
+        Cluster cluster = createCluster(1);
+        try (MockKafkaAdminClientEnv env = new 
MockKafkaAdminClientEnv(cluster)) {
+            env.kafkaClient().setNode(cluster.controller());
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareMetadataUpdate(env.cluster(), 
Collections.<String>emptySet());
+            
env.kafkaClient().prepareResponse(createTopicResponseWithUnsupportedVersion(newTopic));
+            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+            admin.createTopic(newTopic);
+            fail();
+        } catch (UnsupportedVersionException e) {
+            // expected
+        }
+    }
+
+    @Test
+    public void shouldNotCreateTopicWhenItAlreadyExists() {
+        NewTopic newTopic = 
TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build();
+        Cluster cluster = createCluster(1);
+        try (MockKafkaAdminClientEnv env = new 
MockKafkaAdminClientEnv(cluster)) {
+            env.kafkaClient().setNode(cluster.controller());
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareMetadataUpdate(env.cluster(), 
Collections.<String>emptySet());
+            
env.kafkaClient().prepareResponse(createTopicResponseWithAlreadyExists(newTopic));
+            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+            boolean created = admin.createTopic(newTopic);
+            assertFalse(created);
+        }
+    }
+
+    @Test
+    public void shouldCreateTopicWhenItDoesNotExist() {
+        NewTopic newTopic = 
TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build();
+        Cluster cluster = createCluster(1);
+        try (MockKafkaAdminClientEnv env = new 
MockKafkaAdminClientEnv(cluster)) {
+            env.kafkaClient().setNode(cluster.controller());
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareMetadataUpdate(env.cluster(), 
Collections.<String>emptySet());
+            env.kafkaClient().prepareResponse(createTopicResponse(newTopic));
+            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+            boolean created = admin.createTopic(newTopic);
+            assertTrue(created);
+        }
+    }
+
+    @Test
+    public void 
shouldCreateOneTopicWhenProvidedMultipleDefinitionsWithSameTopicName() {
+        NewTopic newTopic1 = 
TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build();
+        NewTopic newTopic2 = 
TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build();
+        Cluster cluster = createCluster(1);
+        try (MockKafkaAdminClientEnv env = new 
MockKafkaAdminClientEnv(cluster)) {
+            env.kafkaClient().setNode(cluster.controller());
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareMetadataUpdate(env.cluster(), 
Collections.<String>emptySet());
+            env.kafkaClient().prepareResponse(createTopicResponse(newTopic1));
+            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+            Set<String> newTopicNames = admin.createTopics(newTopic1, 
newTopic2);
+            assertEquals(1, newTopicNames.size());
+            assertEquals(newTopic2.name(), newTopicNames.iterator().next());
+        }
+    }
+
+    @Test
+    public void shouldReturnFalseWhenSuppliedNullTopicDescription() {
+        Cluster cluster = createCluster(1);
+        try (MockKafkaAdminClientEnv env = new 
MockKafkaAdminClientEnv(cluster)) {
+            env.kafkaClient().setNode(cluster.controller());
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareMetadataUpdate(env.cluster(), 
Collections.<String>emptySet());
+            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+            boolean created = admin.createTopic(null);
+            assertFalse(created);
+        }
+    }
+
+    private Cluster createCluster(int numNodes) {
+        HashMap<Integer, Node> nodes = new HashMap<>();
+        for (int i = 0; i != numNodes; ++i) {
+            nodes.put(i, new Node(i, "localhost", 8121 + i));
+        }
+        Cluster cluster = new Cluster("mockClusterId", nodes.values(),
+                Collections.<PartitionInfo>emptySet(), 
Collections.<String>emptySet(),
+                Collections.<String>emptySet(), nodes.get(0));
+        return cluster;
+    }
+
+    private CreateTopicsResponse createTopicResponse(NewTopic... topics) {
+        return createTopicResponse(new ApiError(Errors.NONE, ""), topics);
+    }
+
+    private CreateTopicsResponse 
createTopicResponseWithAlreadyExists(NewTopic... topics) {
+        return createTopicResponse(new ApiError(Errors.TOPIC_ALREADY_EXISTS, 
"Topic already exists"), topics);
+    }
+
+    private CreateTopicsResponse 
createTopicResponseWithUnsupportedVersion(NewTopic... topics) {
+        return createTopicResponse(new ApiError(Errors.UNSUPPORTED_VERSION, 
"This version of the API is not supported"), topics);
+    }
+
+    private CreateTopicsResponse createTopicResponse(ApiError error, 
NewTopic... topics) {
+        if (error == null) error = new ApiError(Errors.NONE, "");
+        Map<String, ApiError> topicResults = new HashMap<>();
+        for (NewTopic topic : topics) {
+            topicResults.put(topic.name(), error);
+        }
+        return new CreateTopicsResponse(topicResults);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/56623efd/tests/kafkatest/tests/connect/connect_distributed_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/connect/connect_distributed_test.py 
b/tests/kafkatest/tests/connect/connect_distributed_test.py
index c298fb1..8fbc13b 100644
--- a/tests/kafkatest/tests/connect/connect_distributed_test.py
+++ b/tests/kafkatest/tests/connect/connect_distributed_test.py
@@ -45,8 +45,13 @@ class ConnectDistributedTest(Test):
 
     TOPIC = "test"
     OFFSETS_TOPIC = "connect-offsets"
+    OFFSETS_REPLICATION_FACTOR = "1"
+    OFFSETS_PARTITIONS = "1"
     CONFIG_TOPIC = "connect-configs"
+    CONFIG_REPLICATION_FACTOR = "1"
     STATUS_TOPIC = "connect-status"
+    STATUS_REPLICATION_FACTOR = "1"
+    STATUS_PARTITIONS = "1"
 
     # Since tasks can be assigned to any node and we're testing with files, we 
need to make sure the content is the same
     # across all nodes.
@@ -70,11 +75,12 @@ class ConnectDistributedTest(Test):
         self.key_converter = "org.apache.kafka.connect.json.JsonConverter"
         self.value_converter = "org.apache.kafka.connect.json.JsonConverter"
         self.schemas = True
+        self.broker_config_overrides = [["auto.create.topics.enable","false"]]
 
     def setup_services(self, security_protocol=SecurityConfig.PLAINTEXT, 
timestamp_type=None):
         self.kafka = KafkaService(self.test_context, self.num_brokers, self.zk,
                                   security_protocol=security_protocol, 
interbroker_security_protocol=security_protocol,
-                                  topics=self.topics)
+                                  topics=self.topics, 
server_prop_overides=self.broker_config_overrides)
         if timestamp_type is not None:
             for node in self.kafka.nodes:
                 node.config[config_property.MESSAGE_TIMESTAMP_TYPE] = 
timestamp_type

http://git-wip-us.apache.org/repos/asf/kafka/blob/56623efd/tests/kafkatest/tests/connect/connect_rest_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/connect/connect_rest_test.py 
b/tests/kafkatest/tests/connect/connect_rest_test.py
index 999d773..105f196 100644
--- a/tests/kafkatest/tests/connect/connect_rest_test.py
+++ b/tests/kafkatest/tests/connect/connect_rest_test.py
@@ -40,8 +40,13 @@ class ConnectRestApiTest(KafkaTest):
 
     TOPIC = "test"
     OFFSETS_TOPIC = "connect-offsets"
+    OFFSETS_REPLICATION_FACTOR = "1"
+    OFFSETS_PARTITIONS = "1"
     CONFIG_TOPIC = "connect-configs"
+    CONFIG_REPLICATION_FACTOR = "1"
     STATUS_TOPIC = "connect-status"
+    STATUS_REPLICATION_FACTOR = "1"
+    STATUS_PARTITIONS = "1"
 
     # Since tasks can be assigned to any node and we're testing with files, we 
need to make sure the content is the same
     # across all nodes.

http://git-wip-us.apache.org/repos/asf/kafka/blob/56623efd/tests/kafkatest/tests/connect/templates/connect-distributed.properties
----------------------------------------------------------------------
diff --git 
a/tests/kafkatest/tests/connect/templates/connect-distributed.properties 
b/tests/kafkatest/tests/connect/templates/connect-distributed.properties
index 48f5f78..6660e6c 100644
--- a/tests/kafkatest/tests/connect/templates/connect-distributed.properties
+++ b/tests/kafkatest/tests/connect/templates/connect-distributed.properties
@@ -35,8 +35,13 @@ internal.key.converter.schemas.enable=false
 internal.value.converter.schemas.enable=false
 
 offset.storage.topic={{ OFFSETS_TOPIC }}
+offset.storage.replication.factor={{ OFFSETS_REPLICATION_FACTOR }}
+offset.storage.partitions={{ OFFSETS_PARTITIONS }}
 config.storage.topic={{ CONFIG_TOPIC }}
+config.storage.replication.factor={{ CONFIG_REPLICATION_FACTOR }}
 status.storage.topic={{ STATUS_TOPIC }}
+status.storage.replication.factor={{ STATUS_REPLICATION_FACTOR }}
+status.storage.partitions={{ STATUS_PARTITIONS }}
 
 # Make sure data gets flushed frequently so tests don't have to wait to ensure 
they see data in output systems
 offset.flush.interval.ms=5000

Reply via email to