This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.4 by this push:
new 1be8fb9 KAFKA-8962; Use least loaded node for
AdminClient#describeTopics (#7421)
1be8fb9 is described below
commit 1be8fb9e3781533dc2c0d269979ef6a18aa56a6b
Author: Dhruvil Shah <[email protected]>
AuthorDate: Fri Oct 18 02:08:35 2019 -0400
KAFKA-8962; Use least loaded node for AdminClient#describeTopics (#7421)
Allow routing of `AdminClient#describeTopics` to any broker in the cluster
than just the controller, so that we don't create a hotspot for this API call.
`AdminClient#describeTopics` uses the broker's metadata cache which is
asynchronously maintained, so routing to brokers other than the controller is
not expected to have a significant difference in terms of metadata consistency;
all metadata requests are eventually consistent.
This patch also fixes a few flaky test failures.
Reviewers: Ismael Juma <[email protected]>, José Armando García Sancio
<[email protected]>, Jason Gustafson <[email protected]>
---
.../kafka/clients/admin/KafkaAdminClient.java | 2 +-
.../scala/kafka/controller/KafkaController.scala | 2 +-
.../kafka/api/AdminClientIntegrationTest.scala | 153 ++++++++++++---------
.../api/SaslSslAdminClientIntegrationTest.scala | 25 +++-
.../kafka/admin/LeaderElectionCommandTest.scala | 26 ++--
.../test/scala/unit/kafka/utils/TestUtils.scala | 36 ++---
6 files changed, 142 insertions(+), 102 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index d4958f2..0850ced 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -1537,7 +1537,7 @@ public class KafkaAdminClient extends AdminClient {
}
final long now = time.milliseconds();
Call call = new Call("describeTopics", calcDeadlineMs(now,
options.timeoutMs()),
- new ControllerNodeProvider()) {
+ new LeastLoadedNodeProvider()) {
private boolean supportsDisablingTopicCreation = true;
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala
b/core/src/main/scala/kafka/controller/KafkaController.scala
index 91c4534..0dd2ec7 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -790,7 +790,7 @@ class KafkaController(val config: KafkaConfig,
electionType: ElectionType,
electionTrigger: ElectionTrigger
): Map[TopicPartition, Either[Throwable, LeaderAndIsr]] = {
- info(s"Starting replica leader election ($electionType) for partitions
${partitions.mkString(",")} triggerd by $electionTrigger")
+ info(s"Starting replica leader election ($electionType) for partitions
${partitions.mkString(",")} triggered by $electionTrigger")
try {
val strategy = electionType match {
case ElectionType.PREFERRED =>
PreferredReplicaPartitionLeaderElectionStrategy
diff --git
a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index f2e7af6..6ff4f0e 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -37,10 +37,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
-import org.apache.kafka.common.ConsumerGroupState
-import org.apache.kafka.common.ElectionType
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.TopicPartitionReplica
+import org.apache.kafka.common.{ConsumerGroupState, ElectionType,
TopicPartition, TopicPartitionInfo, TopicPartitionReplica}
import org.apache.kafka.common.acl._
import org.apache.kafka.common.config.{ConfigResource, LogLevelConfig}
import org.apache.kafka.common.errors._
@@ -296,15 +293,14 @@ class AdminClientIntegrationTest extends
IntegrationTestHarness with Logging {
waitForTopics(client, expectedPresent = Seq(topic), expectedMissing =
List())
// without includeAuthorizedOperations flag
- var topicResult = client.describeTopics(Seq(topic).asJava).values
- assertEquals(Set().asJava,
topicResult.get(topic).get().authorizedOperations())
+ var topicResult = getTopicMetadata(client, topic)
+ assertEquals(Set().asJava, topicResult.authorizedOperations)
//with includeAuthorizedOperations flag
- topicResult = client.describeTopics(Seq(topic).asJava,
- new DescribeTopicsOptions().includeAuthorizedOperations(true)).values
+ topicResult = getTopicMetadata(client, topic, new
DescribeTopicsOptions().includeAuthorizedOperations(true))
expectedOperations = Topic.supportedOperations
.map(operation => operation.toJava).asJava
- assertEquals(expectedOperations,
topicResult.get(topic).get().authorizedOperations())
+ assertEquals(expectedOperations, topicResult.authorizedOperations)
}
def configuredClusterPermissions() : Set[AclOperation] = {
@@ -559,17 +555,19 @@ class AdminClientIntegrationTest extends
IntegrationTestHarness with Logging {
createTopic(topic2, numPartitions = 1, replicationFactor = 2)
// assert that both the topics have 1 partition
- assertEquals(1,
client.describeTopics(Set(topic1).asJava).values.get(topic1).get.partitions.size)
- assertEquals(1,
client.describeTopics(Set(topic2).asJava).values.get(topic2).get.partitions.size)
+ val topic1_metadata = getTopicMetadata(client, topic1)
+ val topic2_metadata = getTopicMetadata(client, topic2)
+ assertEquals(1, topic1_metadata.partitions.size)
+ assertEquals(1, topic2_metadata.partitions.size)
val validateOnly = new CreatePartitionsOptions().validateOnly(true)
val actuallyDoIt = new CreatePartitionsOptions().validateOnly(false)
- def partitions(topic: String) =
- client.describeTopics(Set(topic).asJava).values.get(topic).get.partitions
+ def partitions(topic: String, expectedNumPartitionsOpt: Option[Int] =
None): util.List[TopicPartitionInfo] = {
+ getTopicMetadata(client, topic, expectedNumPartitionsOpt =
expectedNumPartitionsOpt).partitions
+ }
- def numPartitions(topic: String) =
- partitions(topic).size
+ def numPartitions(topic: String): Int = partitions(topic).size
// validateOnly: try creating a new partition (no assignments), to bring
the total to 3 partitions
var alterResult = client.createPartitions(Map(topic1 ->
@@ -581,7 +579,7 @@ class AdminClientIntegrationTest extends
IntegrationTestHarness with Logging {
alterResult = client.createPartitions(Map(topic1 ->
NewPartitions.increaseTo(3)).asJava, actuallyDoIt)
altered = alterResult.values.get(topic1).get
- assertEquals(3, numPartitions(topic1))
+ TestUtils.waitUntilTrue(() => numPartitions(topic1) == 3, "Timed out
waiting for new partitions to appear")
// validateOnly: now try creating a new partition (with assignments), to
bring the total to 3 partitions
val newPartition2Assignments = asList[util.List[Integer]](asList(0, 1),
asList(1, 2))
@@ -594,7 +592,7 @@ class AdminClientIntegrationTest extends
IntegrationTestHarness with Logging {
alterResult = client.createPartitions(Map(topic2 ->
NewPartitions.increaseTo(3, newPartition2Assignments)).asJava,
actuallyDoIt)
altered = alterResult.values.get(topic2).get
- val actualPartitions2 = partitions(topic2)
+ val actualPartitions2 = partitions(topic2, expectedNumPartitionsOpt =
Some(3))
assertEquals(3, actualPartitions2.size)
assertEquals(Seq(0, 1),
actualPartitions2.get(1).replicas.asScala.map(_.id).toList)
assertEquals(Seq(1, 2),
actualPartitions2.get(2).replicas.asScala.map(_.id).toList)
@@ -782,7 +780,7 @@ class AdminClientIntegrationTest extends
IntegrationTestHarness with Logging {
topic2 -> NewPartitions.increaseTo(2)).asJava, actuallyDoIt)
// assert that the topic1 now has 4 partitions
altered = alterResult.values.get(topic1).get
- assertEquals(4, numPartitions(topic1))
+ TestUtils.waitUntilTrue(() => numPartitions(topic1) == 4, "Timed out
waiting for new partitions to appear")
try {
altered = alterResult.values.get(topic2).get
} catch {
@@ -1452,15 +1450,17 @@ class AdminClientIntegrationTest extends
IntegrationTestHarness with Logging {
val partition2 = new TopicPartition("elect-preferred-leaders-topic-2", 0)
TestUtils.createTopic(zkClient, partition2.topic, Map[Int,
Seq[Int]](partition2.partition -> prefer0), servers)
- def preferredLeader(topicPartition: TopicPartition) =
-
client.describeTopics(asList(topicPartition.topic)).values.get(topicPartition.topic).
- get.partitions.get(topicPartition.partition).replicas.get(0).id
+ def preferredLeader(topicPartition: TopicPartition): Int = {
+ val partitionMetadata = getTopicMetadata(client,
topicPartition.topic).partitions.get(topicPartition.partition)
+ val preferredLeaderMetadata = partitionMetadata.replicas.get(0)
+ preferredLeaderMetadata.id
+ }
/** Changes the <i>preferred</i> leader without changing the
<i>current</i> leader. */
def changePreferredLeader(newAssignment: Seq[Int]) = {
val preferred = newAssignment.head
- val prior1 = TestUtils.currentLeader(client, partition1).get
- val prior2 = TestUtils.currentLeader(client, partition2).get
+ val prior1 = zkClient.getLeaderForPartition(partition1).get
+ val prior2 = zkClient.getLeaderForPartition(partition2).get
var m = Map.empty[TopicPartition, Seq[Int]]
@@ -1475,26 +1475,26 @@ class AdminClientIntegrationTest extends
IntegrationTestHarness with Logging {
s"Expected preferred leader to become $preferred, but is
${preferredLeader(partition1)} and ${preferredLeader(partition2)}",
10000)
// Check the leader hasn't moved
- assertEquals(Some(prior1), TestUtils.currentLeader(client, partition1))
- assertEquals(Some(prior2), TestUtils.currentLeader(client, partition2))
+ TestUtils.assertLeader(client, partition1, prior1)
+ TestUtils.assertLeader(client, partition2, prior2)
}
// Check current leaders are 0
- assertEquals(Some(0), TestUtils.currentLeader(client, partition1))
- assertEquals(Some(0), TestUtils.currentLeader(client, partition2))
+ TestUtils.assertLeader(client, partition1, 0)
+ TestUtils.assertLeader(client, partition2, 0)
// Noop election
var electResult = client.electLeaders(ElectionType.PREFERRED,
Set(partition1).asJava)
var exception = electResult.partitions.get.get(partition1).get
assertEquals(classOf[ElectionNotNeededException], exception.getClass)
assertEquals("Leader election not needed for topic partition",
exception.getMessage)
- assertEquals(Some(0), TestUtils.currentLeader(client, partition1))
+ TestUtils.assertLeader(client, partition1, 0)
// Noop election with null partitions
electResult = client.electLeaders(ElectionType.PREFERRED, null)
assertTrue(electResult.partitions.get.isEmpty)
- assertEquals(Some(0), TestUtils.currentLeader(client, partition1))
- assertEquals(Some(0), TestUtils.currentLeader(client, partition2))
+ TestUtils.assertLeader(client, partition1, 0)
+ TestUtils.assertLeader(client, partition2, 0)
// Now change the preferred leader to 1
changePreferredLeader(prefer1)
@@ -1503,17 +1503,17 @@ class AdminClientIntegrationTest extends
IntegrationTestHarness with Logging {
electResult = client.electLeaders(ElectionType.PREFERRED,
Set(partition1).asJava)
assertEquals(Set(partition1).asJava, electResult.partitions.get.keySet)
assertFalse(electResult.partitions.get.get(partition1).isPresent)
- TestUtils.waitForLeaderToBecome(client, partition1, Some(1))
+ TestUtils.assertLeader(client, partition1, 1)
// topic 2 unchanged
assertFalse(electResult.partitions.get.containsKey(partition2))
- assertEquals(Some(0), TestUtils.currentLeader(client, partition2))
+ TestUtils.assertLeader(client, partition2, 0)
// meaningful election with null partitions
electResult = client.electLeaders(ElectionType.PREFERRED, null)
assertEquals(Set(partition2), electResult.partitions.get.keySet.asScala)
assertFalse(electResult.partitions.get.get(partition2).isPresent)
- TestUtils.waitForLeaderToBecome(client, partition2, Some(1))
+ TestUtils.assertLeader(client, partition2, 1)
// unknown topic
val unknownPartition = new TopicPartition("topic-does-not-exist", 0)
@@ -1522,8 +1522,8 @@ class AdminClientIntegrationTest extends
IntegrationTestHarness with Logging {
exception = electResult.partitions.get.get(unknownPartition).get
assertEquals(classOf[UnknownTopicOrPartitionException], exception.getClass)
assertEquals("The partition does not exist.", exception.getMessage)
- assertEquals(Some(1), TestUtils.currentLeader(client, partition1))
- assertEquals(Some(1), TestUtils.currentLeader(client, partition2))
+ TestUtils.assertLeader(client, partition1, 1)
+ TestUtils.assertLeader(client, partition2, 1)
// Now change the preferred leader to 2
changePreferredLeader(prefer2)
@@ -1531,8 +1531,8 @@ class AdminClientIntegrationTest extends
IntegrationTestHarness with Logging {
// mixed results
electResult = client.electLeaders(ElectionType.PREFERRED,
Set(unknownPartition, partition1).asJava)
assertEquals(Set(unknownPartition, partition1).asJava,
electResult.partitions.get.keySet)
- TestUtils.waitForLeaderToBecome(client, partition1, Some(2))
- assertEquals(Some(1), TestUtils.currentLeader(client, partition2))
+ TestUtils.assertLeader(client, partition1, 2)
+ TestUtils.assertLeader(client, partition2, 1)
exception = electResult.partitions.get.get(unknownPartition).get
assertEquals(classOf[UnknownTopicOrPartitionException], exception.getClass)
assertEquals("The partition does not exist.", exception.getMessage)
@@ -1541,7 +1541,7 @@ class AdminClientIntegrationTest extends
IntegrationTestHarness with Logging {
electResult = client.electLeaders(ElectionType.PREFERRED,
Set(partition2).asJava)
assertEquals(Set(partition2).asJava, electResult.partitions.get.keySet)
assertFalse(electResult.partitions.get.get(partition2).isPresent)
- TestUtils.waitForLeaderToBecome(client, partition2, Some(2))
+ TestUtils.assertLeader(client, partition2, 2)
// Now change the preferred leader to 1
changePreferredLeader(prefer1)
@@ -1557,7 +1557,7 @@ class AdminClientIntegrationTest extends
IntegrationTestHarness with Logging {
assertEquals(classOf[PreferredLeaderNotAvailableException],
exception.getClass)
assertTrue(s"Wrong message ${exception.getMessage}",
exception.getMessage.contains(
"Failed to elect leader for partition elect-preferred-leaders-topic-1-0
under strategy PreferredReplicaPartitionLeaderElectionStrategy"))
- assertEquals(Some(2), TestUtils.currentLeader(client, partition1))
+ TestUtils.assertLeader(client, partition1, 2)
// preferred leader unavailable with null argument
electResult = client.electLeaders(ElectionType.PREFERRED, null,
shortTimeout)
@@ -1572,8 +1572,8 @@ class AdminClientIntegrationTest extends
IntegrationTestHarness with Logging {
assertTrue(s"Wrong message ${exception.getMessage}",
exception.getMessage.contains(
"Failed to elect leader for partition elect-preferred-leaders-topic-2-0
under strategy PreferredReplicaPartitionLeaderElectionStrategy"))
- assertEquals(Some(2), TestUtils.currentLeader(client, partition1))
- assertEquals(Some(2), TestUtils.currentLeader(client, partition2))
+ TestUtils.assertLeader(client, partition1, 2)
+ TestUtils.assertLeader(client, partition2, 2)
}
@Test
@@ -1588,17 +1588,17 @@ class AdminClientIntegrationTest extends
IntegrationTestHarness with Logging {
val partition1 = new TopicPartition("unclean-test-topic-1", 0)
TestUtils.createTopic(zkClient, partition1.topic, Map[Int,
Seq[Int]](partition1.partition -> assignment1), servers)
- TestUtils.waitForLeaderToBecome(client, partition1, Option(broker1))
+ TestUtils.assertLeader(client, partition1, broker1)
servers(broker2).shutdown()
TestUtils.waitForBrokersOutOfIsr(client, Set(partition1), Set(broker2))
servers(broker1).shutdown()
- TestUtils.waitForLeaderToBecome(client, partition1, None)
+ TestUtils.assertNoLeader(client, partition1)
servers(broker2).startup()
val electResult = client.electLeaders(ElectionType.UNCLEAN,
Set(partition1).asJava)
assertFalse(electResult.partitions.get.get(partition1).isPresent)
- assertEquals(Option(broker2), TestUtils.currentLeader(client, partition1))
+ TestUtils.assertLeader(client, partition1, broker2)
}
@Test
@@ -1622,21 +1622,21 @@ class AdminClientIntegrationTest extends
IntegrationTestHarness with Logging {
servers
)
- TestUtils.waitForLeaderToBecome(client, partition1, Option(broker1))
- TestUtils.waitForLeaderToBecome(client, partition2, Option(broker1))
+ TestUtils.assertLeader(client, partition1, broker1)
+ TestUtils.assertLeader(client, partition2, broker1)
servers(broker2).shutdown()
TestUtils.waitForBrokersOutOfIsr(client, Set(partition1, partition2),
Set(broker2))
servers(broker1).shutdown()
- TestUtils.waitForLeaderToBecome(client, partition1, None)
- TestUtils.waitForLeaderToBecome(client, partition2, None)
+ TestUtils.assertNoLeader(client, partition1)
+ TestUtils.assertNoLeader(client, partition2)
servers(broker2).startup()
val electResult = client.electLeaders(ElectionType.UNCLEAN,
Set(partition1, partition2).asJava)
assertFalse(electResult.partitions.get.get(partition1).isPresent)
assertFalse(electResult.partitions.get.get(partition2).isPresent)
- assertEquals(Option(broker2), TestUtils.currentLeader(client, partition1))
- assertEquals(Option(broker2), TestUtils.currentLeader(client, partition2))
+ TestUtils.assertLeader(client, partition1, broker2)
+ TestUtils.assertLeader(client, partition2, broker2)
}
@Test
@@ -1661,21 +1661,21 @@ class AdminClientIntegrationTest extends
IntegrationTestHarness with Logging {
servers
)
- TestUtils.waitForLeaderToBecome(client, partition1, Option(broker1))
- TestUtils.waitForLeaderToBecome(client, partition2, Option(broker1))
+ TestUtils.assertLeader(client, partition1, broker1)
+ TestUtils.assertLeader(client, partition2, broker1)
servers(broker2).shutdown()
TestUtils.waitForBrokersOutOfIsr(client, Set(partition1), Set(broker2))
servers(broker1).shutdown()
- TestUtils.waitForLeaderToBecome(client, partition1, None)
- TestUtils.waitForLeaderToBecome(client, partition2, Some(broker3))
+ TestUtils.assertNoLeader(client, partition1)
+ TestUtils.assertLeader(client, partition2, broker3)
servers(broker2).startup()
val electResult = client.electLeaders(ElectionType.UNCLEAN, null)
assertFalse(electResult.partitions.get.get(partition1).isPresent)
assertFalse(electResult.partitions.get.containsKey(partition2))
- assertEquals(Option(broker2), TestUtils.currentLeader(client, partition1))
- assertEquals(Option(broker3), TestUtils.currentLeader(client, partition2))
+ TestUtils.assertLeader(client, partition1, broker2)
+ TestUtils.assertLeader(client, partition2, broker3)
}
@Test
@@ -1698,7 +1698,7 @@ class AdminClientIntegrationTest extends
IntegrationTestHarness with Logging {
servers
)
- TestUtils.waitForLeaderToBecome(client, new TopicPartition(topic, 0),
Option(broker1))
+ TestUtils.assertLeader(client, new TopicPartition(topic, 0), broker1)
val electResult = client.electLeaders(ElectionType.UNCLEAN,
Set(unknownPartition, unknownTopic).asJava)
assertTrue(electResult.partitions.get.get(unknownPartition).get.isInstanceOf[UnknownTopicOrPartitionException])
@@ -1724,12 +1724,12 @@ class AdminClientIntegrationTest extends
IntegrationTestHarness with Logging {
servers
)
- TestUtils.waitForLeaderToBecome(client, partition1, Option(broker1))
+ TestUtils.assertLeader(client, partition1, broker1)
servers(broker2).shutdown()
TestUtils.waitForBrokersOutOfIsr(client, Set(partition1), Set(broker2))
servers(broker1).shutdown()
- TestUtils.waitForLeaderToBecome(client, partition1, None)
+ TestUtils.assertNoLeader(client, partition1)
val electResult = client.electLeaders(ElectionType.UNCLEAN,
Set(partition1).asJava)
assertTrue(electResult.partitions.get.get(partition1).get.isInstanceOf[EligibleLeadersNotAvailableException])
@@ -1754,10 +1754,10 @@ class AdminClientIntegrationTest extends
IntegrationTestHarness with Logging {
servers
)
- TestUtils.waitForLeaderToBecome(client, partition1, Option(broker1))
+ TestUtils.assertLeader(client, partition1, broker1)
servers(broker1).shutdown()
- TestUtils.waitForLeaderToBecome(client, partition1, Some(broker2))
+ TestUtils.assertLeader(client, partition1, broker2)
servers(broker1).startup()
val electResult = client.electLeaders(ElectionType.UNCLEAN,
Set(partition1).asJava)
@@ -1786,21 +1786,21 @@ class AdminClientIntegrationTest extends
IntegrationTestHarness with Logging {
servers
)
- TestUtils.waitForLeaderToBecome(client, partition1, Option(broker1))
- TestUtils.waitForLeaderToBecome(client, partition2, Option(broker1))
+ TestUtils.assertLeader(client, partition1, broker1)
+ TestUtils.assertLeader(client, partition2, broker1)
servers(broker2).shutdown()
TestUtils.waitForBrokersOutOfIsr(client, Set(partition1), Set(broker2))
servers(broker1).shutdown()
- TestUtils.waitForLeaderToBecome(client, partition1, None)
- TestUtils.waitForLeaderToBecome(client, partition2, Some(broker3))
+ TestUtils.assertNoLeader(client, partition1)
+ TestUtils.assertLeader(client, partition2, broker3)
servers(broker2).startup()
val electResult = client.electLeaders(ElectionType.UNCLEAN,
Set(partition1, partition2).asJava)
assertFalse(electResult.partitions.get.get(partition1).isPresent)
assertTrue(electResult.partitions.get.get(partition2).get.isInstanceOf[ElectionNotNeededException])
- assertEquals(Option(broker2), TestUtils.currentLeader(client, partition1))
- assertEquals(Option(broker3), TestUtils.currentLeader(client, partition2))
+ TestUtils.assertLeader(client, partition1, broker2)
+ TestUtils.assertLeader(client, partition2, broker3)
}
@Test
@@ -2428,4 +2428,23 @@ object AdminClientIntegrationTest {
assertEquals(Defaults.CompressionType.toString,
configs.get(brokerResource).get(KafkaConfig.CompressionTypeProp).value)
}
+
+ private def getTopicMetadata(client: Admin,
+ topic: String,
+ describeOptions: DescribeTopicsOptions = new
DescribeTopicsOptions,
+ expectedNumPartitionsOpt: Option[Int] = None):
TopicDescription = {
+ var result: TopicDescription = null
+
+ TestUtils.waitUntilTrue(() => {
+ val topicResult = client.describeTopics(Set(topic).asJava,
describeOptions).values.get(topic)
+ try {
+ result = topicResult.get
+ expectedNumPartitionsOpt.map(_ ==
result.partitions.size).getOrElse(true)
+ } catch {
+ case e: ExecutionException if
e.getCause.isInstanceOf[UnknownTopicOrPartitionException] => false // metadata
may not have propagated yet, so retry
+ }
+ }, s"Timed out waiting for metadata for $topic")
+
+ result
+ }
}
diff --git
a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
index 2a8131e..fd4b29f 100644
---
a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
@@ -24,7 +24,7 @@ import kafka.utils.TestUtils._
import org.apache.kafka.clients.admin._
import org.apache.kafka.common.acl._
import org.apache.kafka.common.config.ConfigResource
-import org.apache.kafka.common.errors.{ClusterAuthorizationException,
InvalidRequestException, TopicAuthorizationException}
+import org.apache.kafka.common.errors.{ClusterAuthorizationException,
InvalidRequestException, TopicAuthorizationException,
UnknownTopicOrPartitionException}
import org.apache.kafka.common.resource.{PatternType, ResourcePattern,
ResourcePatternFilter, ResourceType}
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.junit.Assert.{assertEquals, assertTrue}
@@ -33,6 +33,7 @@ import org.junit.{After, Assert, Before, Test}
import scala.collection.JavaConverters._
import scala.collection.Seq
import scala.compat.java8.OptionConverters._
+import scala.concurrent.ExecutionException
import scala.util.{Failure, Success, Try}
class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest
with SaslSetup {
@@ -448,9 +449,8 @@ class SaslSslAdminClientIntegrationTest extends
AdminClientIntegrationTest with
validateMetadataAndConfigs(createResult)
val createResponseConfig =
createResult.config(topic1).get().entries.asScala
- val topicResource = new ConfigResource(ConfigResource.Type.TOPIC, topic1)
- val describeResponseConfig =
client.describeConfigs(List(topicResource).asJava).values.get(topicResource).get().entries.asScala
- assertEquals(describeResponseConfig.size, createResponseConfig.size)
+ val describeResponseConfig = describeConfigs(topic1)
+ assertEquals(describeResponseConfig.map(_.name).toSet,
createResponseConfig.map(_.name).toSet)
describeResponseConfig.foreach { describeEntry =>
val name = describeEntry.name
val createEntry = createResponseConfig.find(_.name == name).get
@@ -461,6 +461,23 @@ class SaslSslAdminClientIntegrationTest extends
AdminClientIntegrationTest with
}
}
+ private def describeConfigs(topic: String): Iterable[ConfigEntry] = {
+ val topicResource = new ConfigResource(ConfigResource.Type.TOPIC, topic)
+ var configEntries: Iterable[ConfigEntry] = null
+
+ TestUtils.waitUntilTrue(() => {
+ try {
+ val topicResponse =
client.describeConfigs(List(topicResource).asJava).all.get.get(topicResource)
+ configEntries = topicResponse.entries.asScala
+ true
+ } catch {
+ case e: ExecutionException if
e.getCause.isInstanceOf[UnknownTopicOrPartitionException] => false
+ }
+ }, "Timed out waiting for describeConfigs")
+
+ configEntries
+ }
+
private def waitForDescribeAcls(client: Admin, filter: AclBindingFilter,
acls: Set[AclBinding]): Unit = {
var lastResults: util.Collection[AclBinding] = null
TestUtils.waitUntilTrue(() => {
diff --git
a/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala
b/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala
index c30011f..45fb7b9 100644
--- a/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala
@@ -80,12 +80,12 @@ final class LeaderElectionCommandTest extends
ZooKeeperTestHarness {
val topicPartition = new TopicPartition(topic, partition)
- TestUtils.waitForLeaderToBecome(client, topicPartition, Option(broker2))
+ TestUtils.assertLeader(client, topicPartition, broker2)
servers(broker3).shutdown()
TestUtils.waitForBrokersOutOfIsr(client, Set(topicPartition),
Set(broker3))
servers(broker2).shutdown()
- TestUtils.waitForLeaderToBecome(client, topicPartition, None)
+ TestUtils.assertNoLeader(client, topicPartition)
servers(broker3).startup()
LeaderElectionCommand.main(
@@ -96,7 +96,7 @@ final class LeaderElectionCommandTest extends
ZooKeeperTestHarness {
)
)
- assertEquals(Option(broker3), TestUtils.currentLeader(client,
topicPartition))
+ TestUtils.assertLeader(client, topicPartition, broker3)
}
}
@@ -111,12 +111,12 @@ final class LeaderElectionCommandTest extends
ZooKeeperTestHarness {
val topicPartition = new TopicPartition(topic, partition)
- TestUtils.waitForLeaderToBecome(client, topicPartition, Option(broker2))
+ TestUtils.assertLeader(client, topicPartition, broker2)
servers(broker3).shutdown()
TestUtils.waitForBrokersOutOfIsr(client, Set(topicPartition),
Set(broker3))
servers(broker2).shutdown()
- TestUtils.waitForLeaderToBecome(client, topicPartition, None)
+ TestUtils.assertNoLeader(client, topicPartition)
servers(broker3).startup()
LeaderElectionCommand.main(
@@ -128,7 +128,7 @@ final class LeaderElectionCommandTest extends
ZooKeeperTestHarness {
)
)
- assertEquals(Option(broker3), TestUtils.currentLeader(client,
topicPartition))
+ TestUtils.assertLeader(client, topicPartition, broker3)
}
}
@@ -143,12 +143,12 @@ final class LeaderElectionCommandTest extends
ZooKeeperTestHarness {
val topicPartition = new TopicPartition(topic, partition)
- TestUtils.waitForLeaderToBecome(client, topicPartition, Option(broker2))
+ TestUtils.assertLeader(client, topicPartition, broker2)
servers(broker3).shutdown()
TestUtils.waitForBrokersOutOfIsr(client, Set(topicPartition),
Set(broker3))
servers(broker2).shutdown()
- TestUtils.waitForLeaderToBecome(client, topicPartition, None)
+ TestUtils.assertNoLeader(client, topicPartition)
servers(broker3).startup()
val topicPartitionPath = tempTopicPartitionFile(Set(topicPartition))
@@ -161,7 +161,7 @@ final class LeaderElectionCommandTest extends
ZooKeeperTestHarness {
)
)
- assertEquals(Option(broker3), TestUtils.currentLeader(client,
topicPartition))
+ TestUtils.assertLeader(client, topicPartition, broker3)
}
}
@@ -176,10 +176,10 @@ final class LeaderElectionCommandTest extends
ZooKeeperTestHarness {
val topicPartition = new TopicPartition(topic, partition)
- TestUtils.waitForLeaderToBecome(client, topicPartition, Option(broker2))
+ TestUtils.assertLeader(client, topicPartition, broker2)
servers(broker2).shutdown()
- TestUtils.waitForLeaderToBecome(client, topicPartition, Some(broker3))
+ TestUtils.assertLeader(client, topicPartition, broker3)
servers(broker2).startup()
TestUtils.waitForBrokersInIsr(client, topicPartition, Set(broker2))
@@ -191,7 +191,7 @@ final class LeaderElectionCommandTest extends
ZooKeeperTestHarness {
)
)
- assertEquals(Option(broker2), TestUtils.currentLeader(client,
topicPartition))
+ TestUtils.assertLeader(client, topicPartition, broker2)
}
}
@@ -273,7 +273,7 @@ final class LeaderElectionCommandTest extends
ZooKeeperTestHarness {
LeaderElectionCommand.main(
Array(
"--bootstrap-server", bootstrapServers(servers),
- "--election-type", "preferrred"
+ "--election-type", "preferred"
)
)
fail()
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 6df853d..2e8afe3 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -28,8 +28,8 @@ import java.util.Arrays
import java.util.Collections
import java.util.Properties
import java.util.concurrent.{Callable, ExecutionException, Executors, TimeUnit}
-import javax.net.ssl.X509TrustManager
+import javax.net.ssl.X509TrustManager
import kafka.api._
import kafka.cluster.{Broker, EndPoint}
import kafka.log._
@@ -49,6 +49,7 @@ import org.apache.kafka.clients.producer.{KafkaProducer,
ProducerConfig, Produce
import org.apache.kafka.common.acl.{AccessControlEntry,
AccessControlEntryFilter, AclBindingFilter}
import org.apache.kafka.common.{KafkaFuture, TopicPartition}
import org.apache.kafka.common.config.ConfigResource
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException
import org.apache.kafka.common.header.Header
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.network.{ListenerName, Mode}
@@ -1488,24 +1489,27 @@ object TestUtils extends Logging {
adminClient.alterConfigs(configs)
}
- def currentLeader(client: Admin, topicPartition: TopicPartition):
Option[Int] = {
- Option(
- client
- .describeTopics(Arrays.asList(topicPartition.topic))
- .all
- .get
- .get(topicPartition.topic)
- .partitions
- .get(topicPartition.partition)
- .leader
- ).map(_.id)
+ def assertLeader(client: Admin, topicPartition: TopicPartition,
expectedLeader: Int): Unit = {
+ waitForLeaderToBecome(client, topicPartition, Some(expectedLeader))
+ }
+
+ def assertNoLeader(client: Admin, topicPartition: TopicPartition): Unit = {
+ waitForLeaderToBecome(client, topicPartition, None)
}
def waitForLeaderToBecome(client: Admin, topicPartition: TopicPartition,
leader: Option[Int]): Unit = {
- TestUtils.waitUntilTrue(
- () => currentLeader(client, topicPartition) == leader,
- s"Expected leader to become $leader", 10000
- )
+ val topic = topicPartition.topic
+ val partition = topicPartition.partition
+
+ TestUtils.waitUntilTrue(() => {
+ try {
+ val topicResult =
client.describeTopics(Arrays.asList(topic)).all.get.get(topic)
+ val partitionResult = topicResult.partitions.get(partition)
+ Option(partitionResult.leader).map(_.id) == leader
+ } catch {
+ case e: ExecutionException if
e.getCause.isInstanceOf[UnknownTopicOrPartitionException] => false
+ }
+ }, "Timed out waiting for leader metadata")
}
def waitForBrokersOutOfIsr(client: Admin, partition: Set[TopicPartition],
brokerIds: Set[Int]): Unit = {