This is an automated email from the ASF dual-hosted git repository.
vvcephei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3f5ad46 MINOR: optimize integration test shutdown (#8366)
3f5ad46 is described below
commit 3f5ad4640bd7cf373fddcbbfe5ac5596d95c82c0
Author: John Roesler <[email protected]>
AuthorDate: Fri Mar 27 10:50:43 2020 -0500
MINOR: optimize integration test shutdown (#8366)
* delete topics before tearing down multi-node clusters to avoid leader
elections during shutdown
* tear down all nodes concurrently instead of sequentially
Reviewers: Matthias J. Sax <[email protected]>
---
.../integration/utils/EmbeddedKafkaCluster.java | 21 ++++++++++++++++++++-
.../streams/integration/utils/KafkaEmbedded.java | 11 ++++++-----
2 files changed, 26 insertions(+), 6 deletions(-)
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
index 4d53058..edabe24 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
@@ -21,6 +21,7 @@ import kafka.server.KafkaConfig$;
import kafka.server.KafkaServer;
import kafka.utils.MockTime;
import kafka.zk.EmbeddedZookeeper;
+import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.test.TestCondition;
@@ -39,6 +40,7 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
/**
* Runs an in-memory, "embedded" Kafka cluster with 1 ZooKeeper instance and
supplied number of Kafka brokers.
@@ -118,8 +120,25 @@ public class EmbeddedKafkaCluster extends ExternalResource
{
* Stop the Kafka cluster.
*/
private void stop() {
+ if (brokers.length > 1) {
+ // delete the topics first to avoid cascading leader elections
while shutting down the brokers
+ final Set<String> topics = getAllTopicsInCluster();
+ if (!topics.isEmpty()) {
+ try (final Admin adminClient = brokers[0].createAdminClient())
{
+ adminClient.deleteTopics(topics).all().get();
+ } catch (final InterruptedException e) {
+ log.warn("Got interrupted while deleting topics in
preparation for stopping embedded brokers", e);
+ throw new RuntimeException(e);
+ } catch (final ExecutionException | RuntimeException e) {
+ log.warn("Couldn't delete all topics before stopping
brokers", e);
+ }
+ }
+ }
+ for (final KafkaEmbedded broker : brokers) {
+ broker.stopAsync();
+ }
for (final KafkaEmbedded broker : brokers) {
- broker.stop();
+ broker.awaitStoppedAndPurge();
}
zookeeper.shutdown();
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
index 43438a5..78b1ee1 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
@@ -123,14 +123,15 @@ public class KafkaEmbedded {
return effectiveConfig.getProperty("zookeeper.connect",
DEFAULT_ZK_CONNECT);
}
- /**
- * Stop the broker.
- */
@SuppressWarnings("WeakerAccess")
- public void stop() {
+ public void stopAsync() {
log.debug("Shutting down embedded Kafka broker at {} (with ZK ensemble
at {}) ...",
- brokerList(), zookeeperConnect());
+ brokerList(), zookeeperConnect());
kafka.shutdown();
+ }
+
+ @SuppressWarnings("WeakerAccess")
+ public void awaitStoppedAndPurge() {
kafka.awaitShutdown();
log.debug("Removing log dir at {} ...", logDir);
try {