This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4e60ad7 MINOR: Improve error message in
MirrorConnectorsIntegrationBaseTest (#10268)
4e60ad7 is described below
commit 4e60ad72fb280926f1d98bcde187a399b577310d
Author: Luke Chen <[email protected]>
AuthorDate: Fri Mar 12 20:14:53 2021 +0800
MINOR: Improve error message in MirrorConnectorsIntegrationBaseTest (#10268)
Reviewers: Mickael Maison <[email protected]>
---
.../MirrorConnectorsIntegrationBaseTest.java | 22 +++++++++++-----------
.../util/clusters/EmbeddedConnectCluster.java | 13 +++++++++++++
2 files changed, 24 insertions(+), 11 deletions(-)
diff --git
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
index 7aae7d5..e6bdb96 100644
---
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
+++
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
@@ -231,8 +231,8 @@ public abstract class MirrorConnectorsIntegrationBaseTest {
MirrorClient backupClient = new
MirrorClient(mm2Config.clientConfig(BACKUP_CLUSTER_ALIAS));
// make sure the topic is auto-created in the other cluster
- waitForTopicCreated(primary.kafka(), "backup.test-topic-1");
- waitForTopicCreated(backup.kafka(), "primary.test-topic-1");
+ waitForTopicCreated(primary, "backup.test-topic-1");
+ waitForTopicCreated(backup, "primary.test-topic-1");
assertEquals(TopicConfig.CLEANUP_POLICY_COMPACT,
getTopicConfig(backup.kafka(), "primary.test-topic-1",
TopicConfig.CLEANUP_POLICY_CONFIG),
"topic config was not synced");
@@ -318,8 +318,8 @@ public abstract class MirrorConnectorsIntegrationBaseTest {
backup.kafka().createTopic("test-topic-3", NUM_PARTITIONS);
// make sure the topic is auto-created in the other cluster
- waitForTopicCreated(backup.kafka(), "primary.test-topic-2");
- waitForTopicCreated(primary.kafka(), "backup.test-topic-3");
+ waitForTopicCreated(backup, "primary.test-topic-2");
+ waitForTopicCreated(primary, "backup.test-topic-3");
// only produce messages to the first partition
produceMessages(primary, "test-topic-2", 1);
@@ -408,8 +408,8 @@ public abstract class MirrorConnectorsIntegrationBaseTest {
waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config,
PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
// make sure the topic is created in the other cluster
- waitForTopicCreated(primary.kafka(), "backup.test-topic-1");
- waitForTopicCreated(backup.kafka(), "primary.test-topic-1");
+ waitForTopicCreated(primary, "backup.test-topic-1");
+ waitForTopicCreated(backup, "primary.test-topic-1");
// create a consumer at backup cluster with same consumer group Id to
consume 1 topic
Consumer<byte[], byte[]> backupConsumer =
backup.kafka().createConsumerAndSubscribeTo(
consumerProps, "primary.test-topic-1");
@@ -427,7 +427,7 @@ public abstract class MirrorConnectorsIntegrationBaseTest {
// now create a new topic in primary cluster
primary.kafka().createTopic("test-topic-2", NUM_PARTITIONS);
// make sure the topic is created in backup cluster
- waitForTopicCreated(backup.kafka(), "primary.test-topic-2");
+ waitForTopicCreated(backup, "primary.test-topic-2");
// produce some records to the new topic in primary cluster
produceMessages(primary, "test-topic-2");
@@ -468,17 +468,17 @@ public abstract class MirrorConnectorsIntegrationBaseTest
{
// flaky tests where the connector and tasks didn't start up in time
for the tests to be run
for (Class<? extends Connector> connector : connectorClasses) {
connectCluster.assertions().assertConnectorAndAtLeastNumTasksAreRunning(connector.getSimpleName(),
1,
- "Connector " + connector.getSimpleName() + " tasks did not
start in time on cluster: " + connectCluster);
+ "Connector " + connector.getSimpleName() + " tasks did not
start in time on cluster: " + connectCluster.getName());
}
}
/*
* wait for the topic created on the cluster
*/
- private static void waitForTopicCreated(EmbeddedKafkaCluster cluster,
String topicName) throws InterruptedException {
- try (final Admin adminClient = cluster.createAdminClient()) {
+ private static void waitForTopicCreated(EmbeddedConnectCluster cluster,
String topicName) throws InterruptedException {
+ try (final Admin adminClient = cluster.kafka().createAdminClient()) {
waitForCondition(() ->
adminClient.listTopics().names().get().contains(topicName),
TOPIC_SYNC_DURATION_MS,
- "Topic: " + topicName + " didn't get created in the cluster"
+ "Topic: " + topicName + " didn't get created on cluster: " +
cluster.getName()
);
}
}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
index a1e36c9..b7111d0 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
@@ -261,6 +261,19 @@ public class EmbeddedConnectCluster {
}
}
+ @Override
+ public String toString() {
+ return String.format("EmbeddedConnectCluster(name= %s, numBrokers= %d,
numInitialWorkers= %d, workerProps= %s)",
+ connectClusterName,
+ numBrokers,
+ numInitialWorkers,
+ workerProps);
+ }
+
+ public String getName() {
+ return connectClusterName;
+ }
+
/**
* Get the workers that are up and running.
*