This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 61ece48a862 MINOR: stabilize
LeaderElectionTest#testLeaderElectionAndEpoch (#13290)
61ece48a862 is described below
commit 61ece48a86263d8c4275effecd13e6f70f928c07
Author: Chia-Ping Tsai <[email protected]>
AuthorDate: Thu Feb 23 19:13:09 2023 +0800
MINOR: stabilize LeaderElectionTest#testLeaderElectionAndEpoch (#13290)
Reviewers: Luke Chen <[email protected]>
---
core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala | 3 ++-
core/src/test/scala/unit/kafka/utils/TestUtils.scala | 4 +++-
2 files changed, 5 insertions(+), 2 deletions(-)
diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
index 259ea91df87..6e7ca0747c9 100755
--- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
@@ -86,7 +86,8 @@ class LeaderElectionTest extends QuorumTestHarness {
// kill the server hosting the preferred replica/initial leader
servers.head.shutdown()
// check if leader moves to the other server
- val leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic,
partitionId, oldLeaderOpt = Some(leader1))
+ val leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic,
partitionId,
+ oldLeaderOpt = Some(leader1), ignoreNoLeader = true)
val leaderEpoch2 = zkClient.getEpochForPartition(new TopicPartition(topic,
partitionId)).get
assertEquals(1, leader2, "Leader must move to broker 1")
// new leaderEpoch will be leaderEpoch1+2, one increment during
ReplicaStateMachine.startup()-> handleStateChanges
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 2c9c4ae6690..2a71f86a453 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -900,10 +900,12 @@ object TestUtils extends Logging {
partition: Int,
timeoutMs: Long = 30000L,
oldLeaderOpt: Option[Int] = None,
- newLeaderOpt: Option[Int] = None
+ newLeaderOpt: Option[Int] = None,
+ ignoreNoLeader: Boolean = false
): Int = {
def getPartitionLeader(topic: String, partition: Int): Option[Int] = {
zkClient.getLeaderForPartition(new TopicPartition(topic, partition))
+ .filter(p => !ignoreNoLeader || p != LeaderAndIsr.NoLeader)
}
doWaitUntilLeaderIsElectedOrChanged(getPartitionLeader, topic, partition,
timeoutMs, oldLeaderOpt, newLeaderOpt)
}