This is an automated email from the ASF dual-hosted git repository.
showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 44906bdcdf KAFKA-8785: fix request timeout by waiting for metadata
cache up-to-date (#11681)
44906bdcdf is described below
commit 44906bdcdfbb09516e28f207ed0902d07f174cfd
Author: Luke Chen <[email protected]>
AuthorDate: Tue Apr 19 14:13:21 2022 +0800
KAFKA-8785: fix request timeout by waiting for metadata cache up-to-date
(#11681)
The reason why this test is flaky is because we have race condition at the
beginning of the test, when brokers are staring up, and the adminClient is
requesting for brokers metadata. Once the adminClient only got partial
metadata, the test will fail, because in these tests, brokers will be shutdown
to test leader election.
Fix this issue by explicitly waiting for metadata cache up-to-date in
waitForReadyBrokers, and let admin client get created after waitForReadyBrokers.
Reviewers: Jason Gustafson <[email protected]>, David Jacot
<[email protected]>, dengziming <[email protected]>
---
.../kafka/test/junit/RaftClusterInvocationContext.java | 2 +-
core/src/test/java/kafka/testkit/KafkaClusterTestKit.java | 15 ++++++++++++---
.../unit/kafka/admin/LeaderElectionCommandTest.scala | 15 ++++++---------
3 files changed, 19 insertions(+), 13 deletions(-)
diff --git
a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
index b34f286720..711143785c 100644
--- a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
+++ b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
@@ -185,7 +185,7 @@ public class RaftClusterInvocationContext implements
TestTemplateInvocationConte
@Override
public Admin createAdminClient(Properties configOverrides) {
- Admin admin =
Admin.create(clusterReference.get().clientProperties());
+ Admin admin =
Admin.create(clusterReference.get().clientProperties(configOverrides));
admins.add(admin);
return admin;
}
diff --git a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
index 2263b09116..1372006f19 100644
--- a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
+++ b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
@@ -372,6 +372,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
/**
* Wait for a controller to mark all the brokers as ready (registered and
unfenced).
+ * And also wait for the metadata cache up-to-date in each broker server.
*/
public void waitForReadyBrokers() throws ExecutionException,
InterruptedException {
// We can choose any controller, not just the active controller.
@@ -379,6 +380,11 @@ public class KafkaClusterTestKit implements AutoCloseable {
ControllerServer controllerServer =
controllers.values().iterator().next();
Controller controller = controllerServer.controller();
controller.waitForReadyBrokers(brokers.size()).get();
+
+ // make sure metadata cache in each broker server is up-to-date
+ TestUtils.waitForCondition(() ->
+ brokers().values().stream().allMatch(brokerServer ->
brokerServer.metadataCache().getAliveBrokers().size() == brokers.size()),
+ "Failed to wait for publisher to publish the metadata update to
each broker.");
}
public Properties controllerClientProperties() throws ExecutionException,
InterruptedException {
@@ -403,7 +409,10 @@ public class KafkaClusterTestKit implements AutoCloseable {
}
public Properties clientProperties() {
- Properties properties = new Properties();
+ return clientProperties(new Properties());
+ }
+
+ public Properties clientProperties(Properties configOverrides) {
if (!brokers.isEmpty()) {
StringBuilder bld = new StringBuilder();
String prefix = "";
@@ -420,9 +429,9 @@ public class KafkaClusterTestKit implements AutoCloseable {
bld.append(prefix).append("localhost:").append(port);
prefix = ",";
}
-
properties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
bld.toString());
+
configOverrides.putIfAbsent(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
bld.toString());
}
- return properties;
+ return configOverrides;
}
public Map<Integer, ControllerServer> controllers() {
diff --git
a/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala
b/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala
index e9efdabe8e..785054901d 100644
--- a/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala
@@ -32,7 +32,7 @@ import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.extension.ExtendWith
-import org.junit.jupiter.api.{BeforeEach, Disabled, Tag}
+import org.junit.jupiter.api.{BeforeEach, Tag}
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
@ClusterTestDefaults(clusterType = Type.BOTH, brokers = 3)
@@ -56,12 +56,12 @@ final class LeaderElectionCommandTest(cluster:
ClusterInstance) {
@ClusterTest
def testAllTopicPartition(): Unit = {
- val client = cluster.createAdminClient()
val topic = "unclean-topic"
val partition = 0
val assignment = Seq(broker2, broker3)
cluster.waitForReadyBrokers()
+ val client = cluster.createAdminClient()
createTopic(client, topic, Map(partition -> assignment))
val topicPartition = new TopicPartition(topic, partition)
@@ -86,14 +86,13 @@ final class LeaderElectionCommandTest(cluster:
ClusterInstance) {
}
@ClusterTest
- @Disabled // TODO: re-enable until we fixed KAFKA-8541
def testTopicPartition(): Unit = {
- val client = cluster.createAdminClient()
val topic = "unclean-topic"
val partition = 0
val assignment = Seq(broker2, broker3)
cluster.waitForReadyBrokers()
+ val client = cluster.createAdminClient()
createTopic(client, topic, Map(partition -> assignment))
val topicPartition = new TopicPartition(topic, partition)
@@ -120,14 +119,13 @@ final class LeaderElectionCommandTest(cluster:
ClusterInstance) {
}
@ClusterTest
- @Disabled // TODO: re-enable until we fixed KAFKA-8785
def testPathToJsonFile(): Unit = {
- val client = cluster.createAdminClient()
val topic = "unclean-topic"
val partition = 0
val assignment = Seq(broker2, broker3)
cluster.waitForReadyBrokers()
+ val client = cluster.createAdminClient()
createTopic(client, topic, Map(partition -> assignment))
val topicPartition = new TopicPartition(topic, partition)
@@ -155,14 +153,13 @@ final class LeaderElectionCommandTest(cluster:
ClusterInstance) {
}
@ClusterTest
- @Disabled // TODO: re-enable after KAFKA-13737 is fixed
def testPreferredReplicaElection(): Unit = {
- val client = cluster.createAdminClient()
val topic = "preferred-topic"
val partition = 0
val assignment = Seq(broker2, broker3)
cluster.waitForReadyBrokers()
+ val client = cluster.createAdminClient()
createTopic(client, topic, Map(partition -> assignment))
val topicPartition = new TopicPartition(topic, partition)
@@ -200,7 +197,6 @@ final class LeaderElectionCommandTest(cluster:
ClusterInstance) {
@ClusterTest
def testElectionResultOutput(): Unit = {
- val client = cluster.createAdminClient()
val topic = "non-preferred-topic"
val partition0 = 0
val partition1 = 1
@@ -208,6 +204,7 @@ final class LeaderElectionCommandTest(cluster:
ClusterInstance) {
val assignment1 = Seq(broker3, broker2)
cluster.waitForReadyBrokers()
+ val client = cluster.createAdminClient()
createTopic(client, topic, Map(
partition0 -> assignment0,
partition1 -> assignment1