This is an automated email from the ASF dual-hosted git repository.

damianguy pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new fc19c3e  KAFKA-6577: Fix Connect system tests and add debug messages
fc19c3e is described below

commit fc19c3e6f243a8d1b3e27cdc912dc092bbd342e0
Author: Randall Hauch <rha...@gmail.com>
AuthorDate: Thu Feb 22 09:39:59 2018 +0000

    KAFKA-6577: Fix Connect system tests and add debug messages
    
    **NOTE: This should be backported to the `1.1` branch, and is currently a 
blocker for 1.1.**
    
    The `connect_test.py::ConnectStandaloneFileTest.test_file_source_and_sink` 
system test is failing with the SASL configuration without a sufficient 
explanation. During the test, the Connect worker fails to start, but the 
Connect log contains no useful information. There are actual several things 
compounding to cause the failure and make it difficult to understand the 
problem.
    
    First, the 
`tests/kafkatest/tests/connect/templates/connect_standalone.properties` is only 
adding in the broker's security configuration with the `producer.` and 
`consumer.` prefixes, but is not adding them with no prefix. The worker uses 
the AdminClient to connect to the broker to get the Kafka cluster ID and to 
manage the three internal topics, and the AdminClient is configured via 
top-level properties. Because the SASL test requires the clients all connect 
using SASL, the lack of b [...]
    
    Second, the default `request.timeout.ms` for the AdminClient (and the other 
clients) is 120 seconds, so the AdminClient was retrying for 120 seconds before 
it would give up and thrown an error. However, the test was only waiting for 60 
seconds before determining that the service failed to start. This can be 
corrected by setting `request.timeout.ms=10000` in the Connect distributed and 
standalone worker configurations.
    
    Third, the Connect workers were recently changed to lookup the Kafka 
cluster ID before it started the herder. This is unlike the older uses of the 
AdminClient to find and manage the internal topics, where failure to connect 
was not necessarily logged correctly but nevertheless still skipped over, 
relying upon broker auto-topic creation to create the internal topics. (This 
may be why the test did not fail prior to the recent change to always require a 
successful AdminClient connection. [...]
    
    The `ConnectStandaloneFileTest.test_file_source_and_sink` system tests were 
run locally prior to this fix, and they failed as with the nightlies. Once 
these fixes were made, the locally run system tests passed.
    
    Author: Randall Hauch <rha...@gmail.com>
    
    Reviewers: Konstantine Karantasis <konstant...@confluent.io>, Ewen 
Cheslack-Postava <m...@ewencp.org>
    
    Closes #4610 from rhauch/kafka-6577-trunk
---
 .../main/java/org/apache/kafka/connect/cli/ConnectDistributed.java   | 1 +
 .../main/java/org/apache/kafka/connect/cli/ConnectStandalone.java    | 1 +
 .../org/apache/kafka/connect/storage/KafkaConfigBackingStore.java    | 1 +
 .../org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java    | 1 +
 .../org/apache/kafka/connect/storage/KafkaStatusBackingStore.java    | 1 +
 .../src/main/java/org/apache/kafka/connect/util/ConnectUtils.java    | 5 ++++-
 tests/kafkatest/tests/connect/connect_test.py                        | 2 +-
 .../kafkatest/tests/connect/templates/connect-distributed.properties | 3 +++
 .../kafkatest/tests/connect/templates/connect-standalone.properties  | 4 ++++
 9 files changed, 17 insertions(+), 2 deletions(-)

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
index 4afa47d..3b7ec87 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
@@ -74,6 +74,7 @@ public class ConnectDistributed {
         DistributedConfig config = new DistributedConfig(workerProps);
 
         String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config);
+        log.debug("Kafka cluster ID: {}", kafkaClusterId);
 
         RestServer rest = new RestServer(config);
         URI advertisedUrl = rest.advertisedUrl();
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
index 1769905..413cb46 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
@@ -78,6 +78,7 @@ public class ConnectStandalone {
         StandaloneConfig config = new StandaloneConfig(workerProps);
 
         String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config);
+        log.debug("Kafka cluster ID: {}", kafkaClusterId);
 
         RestServer rest = new RestServer(config);
         URI advertisedUrl = rest.advertisedUrl();
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
index b34e483..e51b365 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
@@ -432,6 +432,7 @@ public class KafkaConfigBackingStore implements 
ConfigBackingStore {
         Runnable createTopics = new Runnable() {
             @Override
             public void run() {
+                log.debug("Creating admin client to manage Connect internal 
config topic");
                 try (TopicAdmin admin = new TopicAdmin(adminProps)) {
                     admin.createTopics(topicDescription);
                 }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
index f29f3c2..fb8ad97 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
@@ -94,6 +94,7 @@ public class KafkaOffsetBackingStore implements 
OffsetBackingStore {
         Runnable createTopics = new Runnable() {
             @Override
             public void run() {
+                log.debug("Creating admin client to manage Connect internal 
offset topic");
                 try (TopicAdmin admin = new TopicAdmin(adminProps)) {
                     admin.createTopics(topicDescription);
                 }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
index 8ca21eb..6710808 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
@@ -157,6 +157,7 @@ public class KafkaStatusBackingStore implements 
StatusBackingStore {
         Runnable createTopics = new Runnable() {
             @Override
             public void run() {
+                log.debug("Creating admin client to manage Connect internal 
status topic");
                 try (TopicAdmin admin = new TopicAdmin(adminProps)) {
                     admin.createTopics(topicDescription);
                 }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java
index 1945204..9f30236 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java
@@ -40,6 +40,7 @@ public final class ConnectUtils {
     }
 
     public static String lookupKafkaClusterId(WorkerConfig config) {
+        log.info("Creating Kafka admin client");
         try (AdminClient adminClient = AdminClient.create(config.originals())) 
{
             return lookupKafkaClusterId(adminClient);
         }
@@ -53,13 +54,15 @@ public final class ConnectUtils {
                 log.info("Kafka cluster version is too old to return cluster 
ID");
                 return null;
             }
+            log.debug("Fetching Kafka cluster ID");
             String kafkaClusterId = clusterIdFuture.get();
             log.info("Kafka cluster ID: {}", kafkaClusterId);
             return kafkaClusterId;
         } catch (InterruptedException e) {
             throw new ConnectException("Unexpectedly interrupted when looking 
up Kafka cluster info", e);
         } catch (ExecutionException e) {
-            throw new ConnectException("Failed to connect to and describe 
Kafka cluster", e);
+            throw new ConnectException("Failed to connect to and describe 
Kafka cluster. "
+                                       + "Check worker's broker connection and 
security properties.", e);
         }
     }
 }
diff --git a/tests/kafkatest/tests/connect/connect_test.py 
b/tests/kafkatest/tests/connect/connect_test.py
index 9436119..3753876 100644
--- a/tests/kafkatest/tests/connect/connect_test.py
+++ b/tests/kafkatest/tests/connect/connect_test.py
@@ -91,7 +91,7 @@ class ConnectStandaloneFileTest(Test):
         self.source = ConnectStandaloneService(self.test_context, self.kafka, 
[self.INPUT_FILE, self.OFFSETS_FILE])
         self.sink = ConnectStandaloneService(self.test_context, self.kafka, 
[self.OUTPUT_FILE, self.OFFSETS_FILE])
         self.consumer_validator = ConsoleConsumer(self.test_context, 1, 
self.kafka, self.TOPIC,
-                                                  consumer_timeout_ms=1000)
+                                                  consumer_timeout_ms=10000)
 
         self.zk.start()
         self.kafka.start()
diff --git 
a/tests/kafkatest/tests/connect/templates/connect-distributed.properties 
b/tests/kafkatest/tests/connect/templates/connect-distributed.properties
index 6660e6c..a1d3de2 100644
--- a/tests/kafkatest/tests/connect/templates/connect-distributed.properties
+++ b/tests/kafkatest/tests/connect/templates/connect-distributed.properties
@@ -52,3 +52,6 @@ rest.advertised.host.name = {{ node.account.hostname }}
 # Reduce session timeouts so tests that kill workers don't need to wait as 
long to recover
 session.timeout.ms=10000
 consumer.session.timeout.ms=10000
+
+# Reduce the admin client request timeouts so that we don't wait the default 
120 sec before failing to connect the admin client
+request.timeout.ms=30000
diff --git 
a/tests/kafkatest/tests/connect/templates/connect-standalone.properties 
b/tests/kafkatest/tests/connect/templates/connect-standalone.properties
index 09c6487..5f079f7 100644
--- a/tests/kafkatest/tests/connect/templates/connect-standalone.properties
+++ b/tests/kafkatest/tests/connect/templates/connect-standalone.properties
@@ -14,6 +14,7 @@
 # limitations under the License.
 
 bootstrap.servers={{ 
kafka.bootstrap_servers(kafka.security_config.security_protocol) }}
+{{ kafka.security_config.client_config().props() }}
 {{ kafka.security_config.client_config().props("producer.") }}
 {{ kafka.security_config.client_config().props("consumer.") }}
 
@@ -32,3 +33,6 @@ internal.key.converter.schemas.enable=false
 internal.value.converter.schemas.enable=false
 
 offset.storage.file.filename={{ OFFSETS_FILE }}
+
+# Reduce the admin client request timeouts so that we don't wait the default 
120 sec before failing to connect the admin client
+request.timeout.ms=30000

-- 
To stop receiving notification emails like this one, please contact
damian...@apache.org.

Reply via email to