This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4e60ad7  MINOR: Improve error message in 
MirrorConnectorsIntegrationBaseTest (#10268)
4e60ad7 is described below

commit 4e60ad72fb280926f1d98bcde187a399b577310d
Author: Luke Chen <[email protected]>
AuthorDate: Fri Mar 12 20:14:53 2021 +0800

    MINOR: Improve error message in MirrorConnectorsIntegrationBaseTest (#10268)
    
    
    Reviewers: Mickael Maison <[email protected]>
---
 .../MirrorConnectorsIntegrationBaseTest.java       | 22 +++++++++++-----------
 .../util/clusters/EmbeddedConnectCluster.java      | 13 +++++++++++++
 2 files changed, 24 insertions(+), 11 deletions(-)

diff --git 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
index 7aae7d5..e6bdb96 100644
--- 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
+++ 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
@@ -231,8 +231,8 @@ public abstract class MirrorConnectorsIntegrationBaseTest {
         MirrorClient backupClient = new 
MirrorClient(mm2Config.clientConfig(BACKUP_CLUSTER_ALIAS));
 
         // make sure the topic is auto-created in the other cluster
-        waitForTopicCreated(primary.kafka(), "backup.test-topic-1");
-        waitForTopicCreated(backup.kafka(), "primary.test-topic-1");
+        waitForTopicCreated(primary, "backup.test-topic-1");
+        waitForTopicCreated(backup, "primary.test-topic-1");
         assertEquals(TopicConfig.CLEANUP_POLICY_COMPACT, 
getTopicConfig(backup.kafka(), "primary.test-topic-1", 
TopicConfig.CLEANUP_POLICY_CONFIG),
                 "topic config was not synced");
         
@@ -318,8 +318,8 @@ public abstract class MirrorConnectorsIntegrationBaseTest {
         backup.kafka().createTopic("test-topic-3", NUM_PARTITIONS);
 
         // make sure the topic is auto-created in the other cluster
-        waitForTopicCreated(backup.kafka(), "primary.test-topic-2");
-        waitForTopicCreated(primary.kafka(), "backup.test-topic-3");
+        waitForTopicCreated(backup, "primary.test-topic-2");
+        waitForTopicCreated(primary, "backup.test-topic-3");
 
         // only produce messages to the first partition
         produceMessages(primary, "test-topic-2", 1);
@@ -408,8 +408,8 @@ public abstract class MirrorConnectorsIntegrationBaseTest {
         waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, 
PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
 
         // make sure the topic is created in the other cluster
-        waitForTopicCreated(primary.kafka(), "backup.test-topic-1");
-        waitForTopicCreated(backup.kafka(), "primary.test-topic-1");
+        waitForTopicCreated(primary, "backup.test-topic-1");
+        waitForTopicCreated(backup, "primary.test-topic-1");
         // create a consumer at backup cluster with same consumer group Id to 
consume 1 topic
         Consumer<byte[], byte[]> backupConsumer = 
backup.kafka().createConsumerAndSubscribeTo(
             consumerProps, "primary.test-topic-1");
@@ -427,7 +427,7 @@ public abstract class MirrorConnectorsIntegrationBaseTest {
         // now create a new topic in primary cluster
         primary.kafka().createTopic("test-topic-2", NUM_PARTITIONS);
         // make sure the topic is created in backup cluster
-        waitForTopicCreated(backup.kafka(), "primary.test-topic-2");
+        waitForTopicCreated(backup, "primary.test-topic-2");
 
         // produce some records to the new topic in primary cluster
         produceMessages(primary, "test-topic-2");
@@ -468,17 +468,17 @@ public abstract class MirrorConnectorsIntegrationBaseTest 
{
         // flaky tests where the connector and tasks didn't start up in time 
for the tests to be run
         for (Class<? extends Connector> connector : connectorClasses) {
             
connectCluster.assertions().assertConnectorAndAtLeastNumTasksAreRunning(connector.getSimpleName(),
 1,
-                    "Connector " + connector.getSimpleName() + " tasks did not 
start in time on cluster: " + connectCluster);
+                    "Connector " + connector.getSimpleName() + " tasks did not 
start in time on cluster: " + connectCluster.getName());
         }
     }
 
     /*
      * wait for the topic created on the cluster
      */
-    private static void waitForTopicCreated(EmbeddedKafkaCluster cluster, 
String topicName) throws InterruptedException {
-        try (final Admin adminClient = cluster.createAdminClient()) {
+    private static void waitForTopicCreated(EmbeddedConnectCluster cluster, 
String topicName) throws InterruptedException {
+        try (final Admin adminClient = cluster.kafka().createAdminClient()) {
             waitForCondition(() -> 
adminClient.listTopics().names().get().contains(topicName), 
TOPIC_SYNC_DURATION_MS,
-                "Topic: " + topicName + " didn't get created in the cluster"
+                "Topic: " + topicName + " didn't get created on cluster: " + 
cluster.getName()
             );
         }
     }
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
index a1e36c9..b7111d0 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
@@ -261,6 +261,19 @@ public class EmbeddedConnectCluster {
         }
     }
 
+    @Override
+    public String toString() {
+        return String.format("EmbeddedConnectCluster(name= %s, numBrokers= %d, 
numInitialWorkers= %d, workerProps= %s)",
+            connectClusterName,
+            numBrokers,
+            numInitialWorkers,
+            workerProps);
+    }
+
+    public String getName() {
+        return connectClusterName;
+    }
+
     /**
      * Get the workers that are up and running.
      *

Reply via email to