Repository: kafka Updated Branches: refs/heads/trunk 7bb551b4a -> dd9f43140
MINOR: Fix bug in `waitUntilLeaderIsElectedOrChanged` and simplify result type Also disable a couple of tests that were passing incorrectly until KAFKA-3096 is fixed. The bug was for the following case: `leader.isDefined && oldLeaderOpt.isEmpty && newLeaderOpt.isDefined && newLeaderOpt.get != leader.get` We would consider it a successful election even though the new leader was not the expected leader. I also changed the result type as we never return `None` (we throw an exception instead). Author: Ismael Juma <[email protected]> Reviewers: Jun Rao <[email protected]> Closes #3031 from ijuma/fix-wait-until-leader-is-elected Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/dd9f4314 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/dd9f4314 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/dd9f4314 Branch: refs/heads/trunk Commit: dd9f43140d4a898576d86f6d5b7be1a021de1040 Parents: 7bb551b Author: Ismael Juma <[email protected]> Authored: Mon May 15 16:08:06 2017 -0700 Committer: Jun Rao <[email protected]> Committed: Mon May 15 16:08:06 2017 -0700 ---------------------------------------------------------------------- .../integration/kafka/api/BaseQuotaTest.scala | 5 +- .../kafka/api/ProducerBounceTest.scala | 4 +- .../unit/kafka/admin/AddPartitionsTest.scala | 16 ++--- .../test/scala/unit/kafka/admin/AdminTest.scala | 4 +- .../unit/kafka/admin/DeleteTopicTest.scala | 6 +- .../integration/UncleanLeaderElectionTest.scala | 12 ++-- .../unit/kafka/server/FetchRequestTest.scala | 2 +- .../unit/kafka/server/LeaderElectionTest.scala | 26 ++++---- .../unit/kafka/server/LogRecoveryTest.scala | 14 ++-- .../unit/kafka/server/ProduceRequestTest.scala | 2 +- .../test/scala/unit/kafka/utils/TestUtils.scala | 69 +++++++++++--------- 11 files changed, 80 insertions(+), 80 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/dd9f4314/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala index 8d879a2..918bb55 100644 --- a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala @@ -75,9 +75,8 @@ abstract class BaseQuotaTest extends IntegrationTestHarness { val numPartitions = 1 val leaders = TestUtils.createTopic(zkUtils, topic1, numPartitions, serverCount, servers) - leaderNode = if (leaders(0).get == servers.head.config.brokerId) servers.head else servers(1) - followerNode = if (leaders(0).get != servers.head.config.brokerId) servers.head else servers(1) - assertTrue("Leader of all partitions of the topic should exist", leaders.values.forall(leader => leader.isDefined)) + leaderNode = if (leaders(0) == servers.head.config.brokerId) servers.head else servers(1) + followerNode = if (leaders(0) != servers.head.config.brokerId) servers.head else servers(1) } @Test http://git-wip-us.apache.org/repos/asf/kafka/blob/dd9f4314/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala index 7b65c4f..5fead18 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala @@ -67,9 +67,7 @@ class ProducerBounceTest extends KafkaServerTestHarness { val numPartitions = 3 val topicConfig = new Properties() topicConfig.put(KafkaConfig.MinInSyncReplicasProp, 2.toString) - val leaders = TestUtils.createTopic(zkUtils, topic1, numPartitions, numServers, servers, topicConfig) - - assertTrue("Leader of all partitions of the topic should exist", leaders.values.forall(leader => leader.isDefined)) + TestUtils.createTopic(zkUtils, topic1, numPartitions, numServers, servers, topicConfig) val scheduler = new ProducerScheduler() scheduler.start http://git-wip-us.apache.org/repos/asf/kafka/blob/dd9f4314/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala index e9c5ac5..d08552e 100755 --- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala @@ -87,12 +87,12 @@ class AddPartitionsTest extends ZooKeeperTestHarness { def testIncrementPartitions { AdminUtils.addPartitions(zkUtils, topic1, 3) // wait until leader is elected - var leader1 = waitUntilLeaderIsElectedOrChanged(zkUtils, topic1, 1) - var leader2 = waitUntilLeaderIsElectedOrChanged(zkUtils, topic1, 2) + val leader1 = waitUntilLeaderIsElectedOrChanged(zkUtils, topic1, 1) + val leader2 = waitUntilLeaderIsElectedOrChanged(zkUtils, topic1, 2) val leader1FromZk = zkUtils.getLeaderForPartition(topic1, 1).get val leader2FromZk = zkUtils.getLeaderForPartition(topic1, 2).get - assertEquals(leader1.get, leader1FromZk) - assertEquals(leader2.get, leader2FromZk) + assertEquals(leader1, leader1FromZk) + assertEquals(leader2, leader2FromZk) // read metadata from a broker and verify the new topic partitions exist TestUtils.waitUntilMetadataIsPropagated(servers, topic1, 1) @@ -114,12 +114,12 @@ class AddPartitionsTest extends ZooKeeperTestHarness { def testManualAssignmentOfReplicas { AdminUtils.addPartitions(zkUtils, topic2, 3, "1:2,0:1,2:3") // wait until leader is elected - var leader1 = waitUntilLeaderIsElectedOrChanged(zkUtils, topic2, 1) - var leader2 = waitUntilLeaderIsElectedOrChanged(zkUtils, topic2, 2) + val leader1 = waitUntilLeaderIsElectedOrChanged(zkUtils, topic2, 1) + val leader2 = waitUntilLeaderIsElectedOrChanged(zkUtils, topic2, 2) val leader1FromZk = zkUtils.getLeaderForPartition(topic2, 1).get val leader2FromZk = zkUtils.getLeaderForPartition(topic2, 2).get - assertEquals(leader1.get, leader1FromZk) - assertEquals(leader2.get, leader2FromZk) + assertEquals(leader1, leader1FromZk) + assertEquals(leader2, leader2FromZk) // read metadata from a broker and verify the new topic partitions exist TestUtils.waitUntilMetadataIsPropagated(servers, topic2, 1) http://git-wip-us.apache.org/repos/asf/kafka/blob/dd9f4314/core/src/test/scala/unit/kafka/admin/AdminTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala index f8c65eb..377501c 100755 --- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala @@ -348,11 +348,11 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest { AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment) servers = serverConfigs.reverseMap(s => TestUtils.createServer(s)) // broker 2 should be the leader since it was started first - val currentLeader = TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partition, oldLeaderOpt = None).get + val currentLeader = TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partition, oldLeaderOpt = None) // trigger preferred replica election val preferredReplicaElection = new PreferredReplicaLeaderElectionCommand(zkUtils, Set(TopicAndPartition(topic, partition))) preferredReplicaElection.moveLeaderToPreferredReplica() - val newLeader = TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partition, oldLeaderOpt = Some(currentLeader)).get + val newLeader = TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partition, oldLeaderOpt = Some(currentLeader)) assertEquals("Preferred replica election failed", preferredReplica, newLeader) } http://git-wip-us.apache.org/repos/asf/kafka/blob/dd9f4314/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala index d9ab85e..15018f5 100644 --- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala +++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala @@ -193,8 +193,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness { // re-create topic on same replicas AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment) // wait until leader is elected - val leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 0, 1000) - assertTrue("New leader should be elected after re-creating topic test", leaderIdOpt.isDefined) + TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 0, 1000) // check if all replica logs are created TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager().getLog(topicPartition).isDefined), "Replicas for topic test not created.") @@ -220,8 +219,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness { // test the topic path exists assertTrue("Topic test mistakenly deleted", zkUtils.pathExists(getTopicPath(topic))) // topic test should have a leader - val leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 0, 1000) - assertTrue("Leader should exist for topic test", leaderIdOpt.isDefined) + TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 0, 1000) } @Test http://git-wip-us.apache.org/repos/asf/kafka/blob/dd9f4314/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala index 2597e81..25ed480 100755 --- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala @@ -18,7 +18,7 @@ package kafka.integration import org.apache.kafka.common.config.ConfigException -import org.junit.{Test, After, Before} +import org.junit.{After, Before, Ignore, Test} import scala.util.Random import org.apache.log4j.{Level, Logger} @@ -115,6 +115,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness { } @Test + @Ignore // Should be re-enabled after KAFKA-3096 is fixed def testUncleanLeaderElectionDisabled { // unclean leader election is disabled by default startBrokers(Seq(configProps1, configProps2)) @@ -142,6 +143,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness { } @Test + @Ignore // Should be re-enabled after KAFKA-3096 is fixed def testCleanLeaderElectionDisabledByTopicOverride { // enable unclean leader election globally, but disable for our specific test topic configProps1.put("unclean.leader.election.enable", "true") @@ -172,9 +174,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness { def verifyUncleanLeaderElectionEnabled { // wait until leader is elected - val leaderIdOpt = waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId) - assertTrue("Leader should get elected", leaderIdOpt.isDefined) - val leaderId = leaderIdOpt.get + val leaderId = waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId) debug("Leader for " + topic + " is elected to be: %s".format(leaderId)) assertTrue("Leader id is set to expected value for topic: " + topic, leaderId == brokerId1 || leaderId == brokerId2) @@ -207,9 +207,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness { def verifyUncleanLeaderElectionDisabled { // wait until leader is elected - val leaderIdOpt = waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId) - assertTrue("Leader should get elected", leaderIdOpt.isDefined) - val leaderId = leaderIdOpt.get + val leaderId = waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId) debug("Leader for " + topic + " is elected to be: %s".format(leaderId)) assertTrue("Leader id is set to expected value for topic: " + topic, leaderId == brokerId1 || leaderId == brokerId2) http://git-wip-us.apache.org/repos/asf/kafka/blob/dd9f4314/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala index 73e48af..48b3945 100644 --- a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala @@ -214,7 +214,7 @@ class FetchRequestTest extends BaseRequestTest { topics.flatMap { topic => val partitionToLeader = createTopic(zkUtils, topic, numPartitions = numPartitions, replicationFactor = 2, servers = servers, topicConfig = topicConfig) - partitionToLeader.map { case (partition, leader) => new TopicPartition(topic, partition) -> leader.get } + partitionToLeader.map { case (partition, leader) => new TopicPartition(topic, partition) -> leader } }.toMap } http://git-wip-us.apache.org/repos/asf/kafka/blob/dd9f4314/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala index aa243be..37e0966 100755 --- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala @@ -75,22 +75,21 @@ class LeaderElectionTest extends ZooKeeperTestHarness { val leaderEpoch1 = zkUtils.getEpochForPartition(topic, partitionId) debug("leader Epoch: " + leaderEpoch1) - debug("Leader is elected to be: %s".format(leader1.getOrElse(-1))) - assertTrue("Leader should get elected", leader1.isDefined) + debug("Leader is elected to be: %s".format(leader1)) // NOTE: this is to avoid transient test failures - assertTrue("Leader could be broker 0 or broker 1", (leader1.getOrElse(-1) == 0) || (leader1.getOrElse(-1) == 1)) + assertTrue("Leader could be broker 0 or broker 1", leader1 == 0 || leader1 == 1) assertEquals("First epoch value should be 0", 0, leaderEpoch1) // kill the server hosting the preferred replica servers.last.shutdown() // check if leader moves to the other server val leader2 = waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId, - oldLeaderOpt = if(leader1.get == 0) None else leader1) + oldLeaderOpt = if (leader1 == 0) None else Some(leader1)) val leaderEpoch2 = zkUtils.getEpochForPartition(topic, partitionId) - debug("Leader is elected to be: %s".format(leader1.getOrElse(-1))) + debug("Leader is elected to be: %s".format(leader1)) debug("leader Epoch: " + leaderEpoch2) - assertEquals("Leader must move to broker 0", 0, leader2.getOrElse(-1)) - if(leader1.get == leader2.get) + assertEquals("Leader must move to broker 0", 0, leader2) + if (leader1 == leader2) assertEquals("Second epoch value should be " + leaderEpoch1+1, leaderEpoch1+1, leaderEpoch2) else assertEquals("Second epoch value should be %d".format(leaderEpoch1+1) , leaderEpoch1+1, leaderEpoch2) @@ -99,12 +98,12 @@ class LeaderElectionTest extends ZooKeeperTestHarness { servers.head.shutdown() Thread.sleep(zookeeper.tickTime) val leader3 = waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId, - oldLeaderOpt = if(leader2.get == 1) None else leader2) + oldLeaderOpt = if (leader2 == 1) None else Some(leader2)) val leaderEpoch3 = zkUtils.getEpochForPartition(topic, partitionId) debug("leader Epoch: " + leaderEpoch3) - debug("Leader is elected to be: %s".format(leader3.getOrElse(-1))) - assertEquals("Leader must return to 1", 1, leader3.getOrElse(-1)) - if(leader2.get == leader3.get) + debug("Leader is elected to be: %s".format(leader3)) + assertEquals("Leader must return to 1", 1, leader3) + if (leader2 == leader3) assertEquals("Second epoch value should be " + leaderEpoch2, leaderEpoch2, leaderEpoch3) else assertEquals("Second epoch value should be %d".format(leaderEpoch2+1) , leaderEpoch2+1, leaderEpoch3) @@ -121,10 +120,9 @@ class LeaderElectionTest extends ZooKeeperTestHarness { val leaderEpoch1 = zkUtils.getEpochForPartition(topic, partitionId) debug("leader Epoch: " + leaderEpoch1) - debug("Leader is elected to be: %s".format(leader1.getOrElse(-1))) - assertTrue("Leader should get elected", leader1.isDefined) + debug("Leader is elected to be: %s".format(leader1)) // NOTE: this is to avoid transient test failures - assertTrue("Leader could be broker 0 or broker 1", (leader1.getOrElse(-1) == 0) || (leader1.getOrElse(-1) == 1)) + assertTrue("Leader could be broker 0 or broker 1", leader1 == 0 || leader1 == 1) assertEquals("First epoch value should be 0", 0, leaderEpoch1) // start another controller http://git-wip-us.apache.org/repos/asf/kafka/blob/dd9f4314/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala index 0ecc3c7..a5f0dba 100755 --- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala @@ -130,8 +130,8 @@ class LogRecoveryTest extends ZooKeeperTestHarness { assertEquals(hw, hwFile1.read.getOrElse(topicPartition, 0L)) // check if leader moves to the other server - leader = waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId, oldLeaderOpt = leader) - assertEquals("Leader must move to broker 1", 1, leader.getOrElse(-1)) + leader = waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId, oldLeaderOpt = Some(leader)) + assertEquals("Leader must move to broker 1", 1, leader) // bring the preferred replica back server1.startup() @@ -140,7 +140,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness { leader = waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId) assertTrue("Leader must remain on broker 1, in case of zookeeper session expiration it can move to broker 0", - leader.isDefined && (leader.get == 0 || leader.get == 1)) + leader == 0 || leader == 1) assertEquals(hw, hwFile1.read.getOrElse(topicPartition, 0L)) // since server 2 was never shut down, the hw value of 30 is probably not checkpointed to disk yet @@ -149,9 +149,9 @@ class LogRecoveryTest extends ZooKeeperTestHarness { server2.startup() updateProducer() - leader = waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId, oldLeaderOpt = leader) + leader = waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId, oldLeaderOpt = Some(leader)) assertTrue("Leader must remain on broker 0, in case of zookeeper session expiration it can move to broker 1", - leader.isDefined && (leader.get == 0 || leader.get == 1)) + leader == 0 || leader == 1) sendMessages(1) hw += 1 @@ -202,8 +202,8 @@ class LogRecoveryTest extends ZooKeeperTestHarness { server2.startup() updateProducer() // check if leader moves to the other server - leader = waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId, oldLeaderOpt = leader) - assertEquals("Leader must move to broker 1", 1, leader.getOrElse(-1)) + leader = waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId, oldLeaderOpt = Some(leader)) + assertEquals("Leader must move to broker 1", 1, leader) assertEquals(hw, hwFile1.read.getOrElse(topicPartition, 0L)) http://git-wip-us.apache.org/repos/asf/kafka/blob/dd9f4314/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala index 2f16719..189f57c 100644 --- a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala @@ -63,7 +63,7 @@ class ProduceRequestTest extends BaseRequestTest { private def createTopicAndFindPartitionWithLeader(topic: String): (Int, Int) = { val partitionToLeader = TestUtils.createTopic(zkUtils, topic, 3, 2, servers) partitionToLeader.collectFirst { - case (partition, Some(leader)) if leader != -1 => (partition, leader) + case (partition, leader) if leader != -1 => (partition, leader) }.getOrElse(fail(s"No leader elected for topic $topic")) } http://git-wip-us.apache.org/repos/asf/kafka/blob/dd9f4314/core/src/test/scala/unit/kafka/utils/TestUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 012fdfd..6bee18d 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -258,7 +258,7 @@ object TestUtils extends Logging { numPartitions: Int = 1, replicationFactor: Int = 1, servers: Seq[KafkaServer], - topicConfig: Properties = new Properties) : scala.collection.immutable.Map[Int, Option[Int]] = { + topicConfig: Properties = new Properties): scala.collection.immutable.Map[Int, Int] = { // create topic AdminUtils.createTopic(zkUtils, topic, numPartitions, replicationFactor, topicConfig) // wait until the update metadata request for new topic reaches all servers @@ -274,7 +274,7 @@ object TestUtils extends Logging { * Return the leader for each partition. */ def createTopic(zkUtils: ZkUtils, topic: String, partitionReplicaAssignment: collection.Map[Int, Seq[Int]], - servers: Seq[KafkaServer]) : scala.collection.immutable.Map[Int, Option[Int]] = { + servers: Seq[KafkaServer]): scala.collection.immutable.Map[Int, Int] = { // create topic AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, partitionReplicaAssignment) // wait until the update metadata request for new topic reaches all servers @@ -744,46 +744,55 @@ object TestUtils extends Logging { * If oldLeaderOpt is defined, it waits until the new leader is different from the old leader. * If newLeaderOpt is defined, it waits until the new leader becomes the expected new leader. * - * @return The new leader or assertion failure if timeout is reached. + * @return The new leader (note that negative values are used to indicate conditions like NoLeader and + * LeaderDuringDelete). + * @throws AssertionError if the expected condition is not true within the timeout. */ - def waitUntilLeaderIsElectedOrChanged(zkUtils: ZkUtils, topic: String, partition: Int, - timeoutMs: Long = 30000, - oldLeaderOpt: Option[Int] = None, newLeaderOpt: Option[Int] = None): Option[Int] = { + def waitUntilLeaderIsElectedOrChanged(zkUtils: ZkUtils, topic: String, partition: Int, timeoutMs: Long = 30000L, + oldLeaderOpt: Option[Int] = None, newLeaderOpt: Option[Int] = None): Int = { require(!(oldLeaderOpt.isDefined && newLeaderOpt.isDefined), "Can't define both the old and the new leader") val startTime = System.currentTimeMillis() - var isLeaderElectedOrChanged = false + val topicPartition = new TopicPartition(topic, partition) - trace("Waiting for leader to be elected or changed for partition [%s,%d], older leader is %s, new leader is %s" - .format(topic, partition, oldLeaderOpt, newLeaderOpt)) + trace(s"Waiting for leader to be elected or changed for partition $topicPartition, old leader is $oldLeaderOpt, " + + s"new leader is $newLeaderOpt") var leader: Option[Int] = None - while (!isLeaderElectedOrChanged && System.currentTimeMillis() < startTime + timeoutMs) { + var electedOrChangedLeader: Option[Int] = None + while (electedOrChangedLeader.isEmpty && System.currentTimeMillis() < startTime + timeoutMs) { // check if leader is elected leader = zkUtils.getLeaderForPartition(topic, partition) leader match { - case Some(l) => - if (newLeaderOpt.isDefined && newLeaderOpt.get == l) { - trace("Expected new leader %d is elected for partition [%s,%d]".format(l, topic, partition)) - isLeaderElectedOrChanged = true - } else if (oldLeaderOpt.isDefined && oldLeaderOpt.get != l) { - trace("Leader for partition [%s,%d] is changed from %d to %d".format(topic, partition, oldLeaderOpt.get, l)) - isLeaderElectedOrChanged = true - } else if (oldLeaderOpt.isEmpty) { - trace("Leader %d is elected for partition [%s,%d]".format(l, topic, partition)) - isLeaderElectedOrChanged = true - } else { - trace("Current leader for partition [%s,%d] is %d".format(topic, partition, l)) - } + case Some(l) => (newLeaderOpt, oldLeaderOpt) match { + case (Some(newLeader), _) if newLeader == l => + trace(s"Expected new leader $l is elected for partition $topicPartition") + electedOrChangedLeader = leader + case (_, Some(oldLeader)) if oldLeader != l => + trace(s"Leader for partition $topicPartition is changed from $oldLeader to $l") + electedOrChangedLeader = leader + case (None, None) => + trace(s"Leader $l is elected for partition $topicPartition") + electedOrChangedLeader = leader + case _ => + trace(s"Current leader for partition $topicPartition is $l") + } case None => - trace("Leader for partition [%s,%d] is not elected yet".format(topic, partition)) + trace(s"Leader for partition $topicPartition is not elected yet") } - Thread.sleep(timeoutMs.min(100L)) + Thread.sleep(math.min(timeoutMs, 100L)) + } + electedOrChangedLeader.getOrElse { + val errorMessage = (newLeaderOpt, oldLeaderOpt) match { + case (Some(newLeader), _) => + s"Timing out after $timeoutMs ms since expected new leader $newLeader was not elected for partition $topicPartition, leader is $leader" + case (_, Some(oldLeader)) => + s"Timing out after $timeoutMs ms since a new leader that is different from $oldLeader was not elected for partition $topicPartition, " + + s"leader is $leader" + case _ => + s"Timing out after $timeoutMs ms since a leader was not elected for partition $topicPartition" + } + fail(errorMessage) } - if (!isLeaderElectedOrChanged) - fail("Timing out after %d ms since leader is not elected or changed for partition [%s,%d]" - .format(timeoutMs, topic, partition)) - - leader } /**
