Repository: kafka
Updated Branches:
  refs/heads/trunk b8cf97686 -> 1a653c813

KAFKA-5704: Corrected Connect distributed startup behavior to allow older 
brokers to auto-create topics

When a Connect distributed worker starts up talking with broker versions and later, it will use the AdminClient to look for the internal topics 
and attempt to create them if they are missing. Although the AdminClient was 
added in, the AdminClient uses APIs to create topics that existed in and later. This feature works as expected when Connect uses a broker 
version or later.

However, when a Connect distributed worker starts up using a broker older than, the AdminClient is not able to find the required APIs and thus will 
throw an UnsupportedVersionException. Unfortunately, this exception is not 
caught and instead causes the Connect worker to fail even when the topics 
already exist.

This change handles the UnsupportedVersionException by logging a debug message 
and doing nothing. The existing producer logic will get information about the 
topics, which will cause the broker to create them if they don’t exist and 
broker auto-creation of topics is enabled. This is the same behavior that 
existed prior to, and so this change restores that behavior for 
brokers older than

This change also adds a system test that verifies Connect works with a variety 
of brokers and is able to run source and sink connectors. The test verifies 
that Connect can read from the internal topics when the connectors are 

Author: Randall Hauch <>

Reviewers: Ewen Cheslack-Postava <>

Closes #3641 from rhauch/kafka-5704


Branch: refs/heads/trunk
Commit: 1a653c813c842c0b67f26fb119d7727e272cf834
Parents: b8cf976
Author: Randall Hauch <>
Authored: Tue Aug 8 20:20:41 2017 -0700
Committer: Ewen Cheslack-Postava <>
Committed: Tue Aug 8 20:20:41 2017 -0700

 .../apache/kafka/connect/util/   | 10 +++---
 .../kafka/connect/util/      | 15 +++-----
 .../tests/connect/   | 36 ++++++++++++++++++--
 3 files changed, 44 insertions(+), 17 deletions(-)
diff --git 
index adc3378..5da4f2d 100644
@@ -195,13 +195,14 @@ public class TopicAdmin implements AutoCloseable {
      * are excluded from the result.
      * <p>
      * If multiple topic definitions have the same topic name, the last one 
with that name will be used.
-     * </p>
+     * <p>
+     * Apache Kafka added support for creating topics in, so this 
method works as expected with that and later versions.
+     * With brokers older than, this method is unable to create 
topics and always returns an empty set.
      * @param topics the specifications of the topics
      * @return the names of the topics that were created by this operation; 
never null but possibly empty
      * @throws ConnectException            if an error occurs, the operation 
takes too long, or the thread is interrupted while
      *                                     attempting to perform this operation
-     * @throws UnsupportedVersionException if the broker does not support the 
necessary APIs to perform this request
     public Set<String> createTopics(NewTopic... topics) {
         Map<String, NewTopic> topicsByName = new HashMap<>();
@@ -233,8 +234,9 @@ public class TopicAdmin implements AutoCloseable {
                 if (cause instanceof UnsupportedVersionException) {
-                    log.error("Unable to use Kafka admin client to create 
topic descriptions for '{}' using the brokers at {}", topicNameList, 
-                    throw (UnsupportedVersionException) cause;
+                    log.debug("Unable to use Kafka admin client to create 
topic descriptions for '{}' using the brokers at {}," +
+                                      "falling back to assume topic(s) exist 
or will be auto-created by the broker", topicNameList, bootstrapServers);
+                    return Collections.emptySet();
                 if (cause instanceof TimeoutException) {
                     // Timed out waiting for the operation to complete
diff --git 
index f90b77f..0a61d3e 100644
@@ -22,7 +22,6 @@ import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.ApiError;
 import org.apache.kafka.common.requests.CreateTopicsResponse;
@@ -36,15 +35,13 @@ import java.util.Set;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
-import static;
 public class TopicAdminTest {
-     * 0.10.x clients can't talk with 0.9.x brokers, and introduced 
the new protocol with API versions.
-     * That means we can simulate an API version mismatch.
-     *
-     * @throws Exception
+     * clients can talk with older brokers, but the CREATE_TOPIC API 
was added in That means,
+     * if our TopicAdmin talks to a pre 0.10.1 broker, it should receive an 
UnsupportedVersionException, should
+     * create no topics, and return false.
     public void returnNullWithApiVersionMismatch() {
@@ -56,10 +53,8 @@ public class TopicAdminTest {
             TopicAdmin admin = new TopicAdmin(null, env.adminClient());
-            admin.createTopic(newTopic);
-            fail();
-        } catch (UnsupportedVersionException e) {
-            // expected
+            boolean created = admin.createTopic(newTopic);
+            assertFalse(created);
diff --git a/tests/kafkatest/tests/connect/ 
index 5c7793a..da7d1de 100644
--- a/tests/kafkatest/tests/connect/
+++ b/tests/kafkatest/tests/connect/
@@ -24,6 +24,7 @@ from import KafkaService, 
 from import ConnectDistributedService, 
VerifiableSource, VerifiableSink, ConnectRestError, MockSink, MockSource
 from import ConsoleConsumer
 from import SecurityConfig
+from kafkatest.version import DEV_BRANCH, LATEST_0_11_0, LATEST_0_10_2, 
LATEST_0_10_1, LATEST_0_10_0, LATEST_0_9, LATEST_0_8_2, KafkaVersion
 from collections import Counter, namedtuple
 import itertools
@@ -75,12 +76,12 @@ class ConnectDistributedTest(Test):
         self.key_converter = "org.apache.kafka.connect.json.JsonConverter"
         self.value_converter = "org.apache.kafka.connect.json.JsonConverter"
         self.schemas = True
-        self.broker_config_overrides = [["auto.create.topics.enable", "false"]]
-    def setup_services(self, security_protocol=SecurityConfig.PLAINTEXT, 
+    def setup_services(self, security_protocol=SecurityConfig.PLAINTEXT, 
timestamp_type=None, broker_version=DEV_BRANCH, auto_create_topics=False):
         self.kafka = KafkaService(self.test_context, self.num_brokers, self.zk,
-                                  topics=self.topics, 
+                                  topics=self.topics, version=broker_version,
server_prop_overides=[["auto.create.topics.enable", str(auto_create_topics)]])
         if timestamp_type is not None:
             for node in self.kafka.nodes:
                 node.config[config_property.MESSAGE_TIMESTAMP_TYPE] = 
@@ -502,6 +503,35 @@ class ConnectDistributedTest(Test):
             assert obj['payload']['content'] in self.FIRST_INPUT_LIST
             assert obj['payload'][ts_fieldname] == ts
+    @cluster(num_nodes=5)
+    @parametrize(broker_version=str(DEV_BRANCH), auto_create_topics=False, 
+    @parametrize(broker_version=str(LATEST_0_11_0), auto_create_topics=False, 
+    @parametrize(broker_version=str(LATEST_0_10_2), auto_create_topics=False, 
+    @parametrize(broker_version=str(LATEST_0_10_1), auto_create_topics=False, 
+    @parametrize(broker_version=str(LATEST_0_10_0), auto_create_topics=True, 
+    @parametrize(broker_version=str(LATEST_0_9), auto_create_topics=True, 
+    def test_broker_compatibility(self, broker_version, auto_create_topics, 
+        """
+        Verify that Connect will start up with various broker versions with 
various configurations. 
+        When Connect distributed starts up, it either creates internal topics 
(v0.10.1.0 and after) 
+        or relies upon the broker to auto-create the topics (v0.10.0.x and 
+        """
+        self.setup_services(broker_version=broker_version, 
auto_create_topics=auto_create_topics, security_protocol=security_protocol)
+ node: 
self.render("", node=node))
+"Creating connectors")
+        self._start_connector("")
+        self._start_connector("")
+        # Generating data on the source node should generate new records and 
create new output on the sink node. Timeouts
+        # here need to be more generous than they are for standalone mode 
because a) it takes longer to write configs,
+        # do rebalancing of the group, etc, and b) without explicit leave 
group support, rebalancing takes awhile
+        for node in
+            node.account.ssh("echo -e -n " + repr(self.FIRST_INPUTS) + " >> " 
+ self.INPUT_FILE)
+        wait_until(lambda: self._validate_file_output(self.FIRST_INPUT_LIST), 
timeout_sec=70, err_msg="Data added to input file was not seen in the output 
file in a reasonable amount of time.")
     def _validate_file_output(self, input):
         input_set = set(input)
         # Output needs to be collected from all nodes because we can't be sure 
where the tasks will be scheduled.

Reply via email to