Repository: kafka Updated Branches: refs/heads/trunk 2fc91afba -> 12612e829
KAFKA-5329; Fix order of replica list in metadata cache Author: Ismael Juma <ism...@juma.me.uk> Reviewers: Colin P. Mccabe <cmcc...@confluent.io>, Jun Rao <jun...@gmail.com> Closes #3257 from ijuma/kafka-5329-fix-order-of-replica-list-in-metadata-cache Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/12612e82 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/12612e82 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/12612e82 Branch: refs/heads/trunk Commit: 12612e82971b8dc85941c771b7fe87ff95a07ab5 Parents: 2fc91af Author: Ismael Juma <ism...@juma.me.uk> Authored: Thu Jun 8 03:26:43 2017 +0100 Committer: Ismael Juma <ism...@juma.me.uk> Committed: Thu Jun 8 03:26:54 2017 +0100 ---------------------------------------------------------------------- .../common/requests/LeaderAndIsrRequest.java | 2 +- .../kafka/common/requests/MetadataResponse.java | 10 +++ .../kafka/common/requests/PartitionState.java | 12 ++-- .../common/requests/UpdateMetadataRequest.java | 2 +- .../common/requests/RequestResponseTest.java | 12 ++-- .../src/main/scala/kafka/api/LeaderAndIsr.scala | 2 +- .../controller/ControllerChannelManager.scala | 4 +- .../kafka/api/AdminClientIntegrationTest.scala | 44 +++++++++++- .../kafka/api/AuthorizerIntegrationTest.scala | 4 +- .../unit/kafka/server/LeaderElectionTest.scala | 2 +- .../unit/kafka/server/MetadataCacheTest.scala | 72 +++++++++++--------- .../unit/kafka/server/MetadataRequestTest.scala | 32 ++++++++- .../unit/kafka/server/ReplicaManagerTest.scala | 15 ++-- .../unit/kafka/server/RequestQuotaTest.scala | 4 +- 14 files changed, 148 insertions(+), 69 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/12612e82/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java index 36426c2..1fdb4a2 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java @@ -118,7 +118,7 @@ public class LeaderAndIsrRequest extends AbstractRequest { int zkVersion = partitionStateData.getInt(ZK_VERSION_KEY_NAME); Object[] replicasArray = partitionStateData.getArray(REPLICAS_KEY_NAME); - Set<Integer> replicas = new HashSet<>(replicasArray.length); + List<Integer> replicas = new ArrayList<>(replicasArray.length); for (Object r : replicasArray) replicas.add((Integer) r); http://git-wip-us.apache.org/repos/asf/kafka/blob/12612e82/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java index 74e058b..b798764 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java @@ -23,6 +23,7 @@ import org.apache.kafka.common.errors.InvalidMetadataException; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.utils.Utils; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -372,6 +373,15 @@ public class MetadataResponse extends AbstractResponse { return isr; } + @Override + public String toString() { + return "(type=PartitionMetadata," + + ", error=" + error + + ", partition=" + partition + + ", leader=" + leader + + ", replicas=" + Utils.join(replicas, ",") + + ", isr=" + Utils.join(isr, ",") + ')'; + } } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/12612e82/clients/src/main/java/org/apache/kafka/common/requests/PartitionState.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/PartitionState.java b/clients/src/main/java/org/apache/kafka/common/requests/PartitionState.java index 035b330..394a60f 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/PartitionState.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/PartitionState.java @@ -16,9 +16,9 @@ */ package org.apache.kafka.common.requests; -import java.util.Arrays; +import org.apache.kafka.common.utils.Utils; + import java.util.List; -import java.util.Set; public class PartitionState { public final int controllerEpoch; @@ -26,9 +26,9 @@ public class PartitionState { public final int leaderEpoch; public final List<Integer> isr; public final int zkVersion; - public final Set<Integer> replicas; + public final List<Integer> replicas; - public PartitionState(int controllerEpoch, int leader, int leaderEpoch, List<Integer> isr, int zkVersion, Set<Integer> replicas) { + public PartitionState(int controllerEpoch, int leader, int leaderEpoch, List<Integer> isr, int zkVersion, List<Integer> replicas) { this.controllerEpoch = controllerEpoch; this.leader = leader; this.leaderEpoch = leaderEpoch; @@ -42,8 +42,8 @@ public class PartitionState { return "PartitionState(controllerEpoch=" + controllerEpoch + ", leader=" + leader + ", leaderEpoch=" + leaderEpoch + - ", isr=" + Arrays.toString(isr.toArray()) + + ", isr=" + Utils.join(isr, ",") + ", zkVersion=" + zkVersion + - ", replicas=" + Arrays.toString(replicas.toArray()) + ")"; + ", replicas=" + Utils.join(replicas, ",") + ")"; } } http://git-wip-us.apache.org/repos/asf/kafka/blob/12612e82/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java index be2eaab..8f9b592 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java @@ -174,7 +174,7 @@ public class UpdateMetadataRequest extends AbstractRequest { int zkVersion = partitionStateData.getInt(ZK_VERSION_KEY_NAME); Object[] replicasArray = partitionStateData.getArray(REPLICAS_KEY_NAME); - Set<Integer> replicas = new HashSet<>(replicasArray.length); + List<Integer> replicas = new ArrayList<>(replicasArray.length); for (Object r : replicasArray) replicas.add((Integer) r); http://git-wip-us.apache.org/repos/asf/kafka/blob/12612e82/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index a05b680..327f228 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -798,11 +798,11 @@ public class RequestResponseTest { List<Integer> isr = asList(1, 2); List<Integer> replicas = asList(1, 2, 3, 4); partitionStates.put(new TopicPartition("topic5", 105), - new PartitionState(0, 2, 1, new ArrayList<>(isr), 2, new HashSet<>(replicas))); + new PartitionState(0, 2, 1, new ArrayList<>(isr), 2, replicas)); partitionStates.put(new TopicPartition("topic5", 1), - new PartitionState(1, 1, 1, new ArrayList<>(isr), 2, new HashSet<>(replicas))); + new PartitionState(1, 1, 1, new ArrayList<>(isr), 2, replicas)); partitionStates.put(new TopicPartition("topic20", 1), - new PartitionState(1, 0, 1, new ArrayList<>(isr), 2, new HashSet<>(replicas))); + new PartitionState(1, 0, 1, new ArrayList<>(isr), 2, replicas)); Set<Node> leaders = Utils.mkSet( new Node(0, "test0", 1223), @@ -823,11 +823,11 @@ public class RequestResponseTest { List<Integer> isr = asList(1, 2); List<Integer> replicas = asList(1, 2, 3, 4); partitionStates.put(new TopicPartition("topic5", 105), - new PartitionState(0, 2, 1, new ArrayList<>(isr), 2, new HashSet<>(replicas))); + new PartitionState(0, 2, 1, new ArrayList<>(isr), 2, replicas)); partitionStates.put(new TopicPartition("topic5", 1), - new PartitionState(1, 1, 1, new ArrayList<>(isr), 2, new HashSet<>(replicas))); + new PartitionState(1, 1, 1, new ArrayList<>(isr), 2, replicas)); partitionStates.put(new TopicPartition("topic20", 1), - new PartitionState(1, 0, 1, new ArrayList<>(isr), 2, new HashSet<>(replicas))); + new PartitionState(1, 0, 1, new ArrayList<>(isr), 2, replicas)); SecurityProtocol plaintext = SecurityProtocol.PLAINTEXT; List<UpdateMetadataRequest.EndPoint> endPoints1 = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/kafka/blob/12612e82/core/src/main/scala/kafka/api/LeaderAndIsr.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/LeaderAndIsr.scala b/core/src/main/scala/kafka/api/LeaderAndIsr.scala index 8c3b7e5..474d7a0 100644 --- a/core/src/main/scala/kafka/api/LeaderAndIsr.scala +++ b/core/src/main/scala/kafka/api/LeaderAndIsr.scala @@ -50,7 +50,7 @@ case class LeaderAndIsr(leader: Int, } } -case class PartitionStateInfo(leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, allReplicas: Set[Int]) { +case class PartitionStateInfo(leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, allReplicas: Seq[Int]) { override def toString: String = { val partitionStateInfo = new StringBuilder http://git-wip-us.apache.org/repos/asf/kafka/blob/12612e82/core/src/main/scala/kafka/controller/ControllerChannelManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index e5d12e8..758ff88 100755 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -316,7 +316,7 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging brokerIds.filter(_ >= 0).foreach { brokerId => val result = leaderAndIsrRequestMap.getOrElseUpdate(brokerId, mutable.Map.empty) - result.put(topicPartition, PartitionStateInfo(leaderIsrAndControllerEpoch, replicas.toSet)) + result.put(topicPartition, PartitionStateInfo(leaderIsrAndControllerEpoch, replicas)) } addUpdateMetadataRequestForBrokers(controllerContext.liveOrShuttingDownBrokerIds.toSeq, @@ -345,7 +345,7 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging val leaderIsrAndControllerEpochOpt = controllerContext.partitionLeadershipInfo.get(partition) leaderIsrAndControllerEpochOpt match { case Some(l @ LeaderIsrAndControllerEpoch(leaderAndIsr, controllerEpoch)) => - val replicas = controllerContext.partitionReplicaAssignment(partition).toSet + val replicas = controllerContext.partitionReplicaAssignment(partition) val leaderIsrAndControllerEpoch = if (beingDeleted) { val leaderDuringDelete = LeaderAndIsr.duringDelete(leaderAndIsr.isr) http://git-wip-us.apache.org/repos/asf/kafka/blob/12612e82/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala index cd4ecc9..e91c171 100644 --- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala @@ -123,7 +123,10 @@ class AdminClientIntegrationTest extends KafkaServerTestHarness with Logging { def testCreateDeleteTopics(): Unit = { client = AdminClient.create(createConfig()) val topics = Seq("mytopic", "mytopic2") - val newTopics = topics.map(new NewTopic(_, 1, 1)) + val newTopics = Seq( + new NewTopic("mytopic", Map((0: Integer) -> Seq[Integer](1, 2).asJava, (1: Integer) -> Seq[Integer](2, 0).asJava).asJava), + new NewTopic("mytopic2", 3, 3) + ) client.createTopics(newTopics.asJava, new CreateTopicsOptions().validateOnly(true)).all.get() waitForTopics(client, List(), topics) @@ -135,8 +138,43 @@ class AdminClientIntegrationTest extends KafkaServerTestHarness with Logging { assertFutureExceptionTypeEquals(results.get("mytopic"), classOf[TopicExistsException]) assertTrue(results.containsKey("mytopic2")) assertFutureExceptionTypeEquals(results.get("mytopic2"), classOf[TopicExistsException]) - val topicsFromDescribe = client.describeTopics(topics.asJava).all.get().asScala.keys - assertEquals(topics.toSet, topicsFromDescribe) + + val topicToDescription = client.describeTopics(topics.asJava).all.get() + assertEquals(topics.toSet, topicToDescription.keySet.asScala) + + val topic0 = topicToDescription.get("mytopic") + assertEquals(false, topic0.internal) + assertEquals("mytopic", topic0.name) + assertEquals(2, topic0.partitions.size) + val topic0Partition0 = topic0.partitions.get(0) + assertEquals(1, topic0Partition0.leader.id) + assertEquals(0, topic0Partition0.partition) + assertEquals(Seq(1, 2), topic0Partition0.isr.asScala.map(_.id)) + assertEquals(Seq(1, 2), topic0Partition0.replicas.asScala.map(_.id)) + val topic0Partition1 = topic0.partitions.get(1) + assertEquals(2, topic0Partition1.leader.id) + assertEquals(1, topic0Partition1.partition) + assertEquals(Seq(2, 0), topic0Partition1.isr.asScala.map(_.id)) + assertEquals(Seq(2, 0), topic0Partition1.replicas.asScala.map(_.id)) + + val topic1 = topicToDescription.get("mytopic2") + assertEquals(false, topic1.internal) + assertEquals("mytopic2", topic1.name) + assertEquals(3, topic1.partitions.size) + for (partitionId <- 0 until 3) { + val partition = topic1.partitions.get(partitionId) + assertEquals(partitionId, partition.partition) + assertEquals(3, partition.replicas.size) + partition.replicas.asScala.foreach { replica => + assertTrue(replica.id >= 0) + assertTrue(replica.id < brokerCount) + } + assertEquals("No duplicate replica ids", partition.replicas.size, partition.replicas.asScala.map(_.id).distinct.size) + + assertEquals(3, partition.isr.size) + assertEquals(partition.replicas, partition.isr) + assertTrue(partition.replicas.contains(partition.leader)) + } client.deleteTopics(topics.asJava).all.get() waitForTopics(client, List(), topics) http://git-wip-us.apache.org/repos/asf/kafka/blob/12612e82/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index dce5da2..22699a9 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -258,7 +258,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { } private def createUpdateMetadataRequest = { - val partitionState = Map(tp -> new PartitionState(Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, Set(brokerId).asJava)).asJava + val partitionState = Map(tp -> new PartitionState(Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, Seq(brokerId).asJava)).asJava val securityProtocol = SecurityProtocol.PLAINTEXT val brokers = Set(new requests.UpdateMetadataRequest.Broker(brokerId, Seq(new requests.UpdateMetadataRequest.EndPoint("localhost", 0, securityProtocol, @@ -294,7 +294,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { private def createLeaderAndIsrRequest = { new requests.LeaderAndIsrRequest.Builder(brokerId, Int.MaxValue, - Map(tp -> new PartitionState(Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, Set(brokerId).asJava)).asJava, + Map(tp -> new PartitionState(Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, Seq(brokerId).asJava)).asJava, Set(new Node(brokerId, "localhost", 0)).asJava).build() } http://git-wip-us.apache.org/repos/asf/kafka/blob/12612e82/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala index 37e0966..4ebb17b 100755 --- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala @@ -145,7 +145,7 @@ class LeaderElectionTest extends ZooKeeperTestHarness { val partitionStates = Map( new TopicPartition(topic, partitionId) -> new PartitionState(2, brokerId2, LeaderAndIsr.initialLeaderEpoch, Seq(brokerId1, brokerId2).map(Integer.valueOf).asJava, LeaderAndIsr.initialZKVersion, - Set(0, 1).map(Integer.valueOf).asJava) + Seq(0, 1).map(Integer.valueOf).asJava) ) val requestBuilder = new LeaderAndIsrRequest.Builder( controllerId, staleControllerEpoch, partitionStates.asJava, nodes.toSet.asJava) http://git-wip-us.apache.org/repos/asf/kafka/blob/12612e82/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala index 57c1846..d9fe995 100644 --- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala +++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala @@ -32,8 +32,6 @@ import scala.collection.JavaConverters._ class MetadataCacheTest { - private def asSet[T](elems: T*): util.Set[T] = new util.HashSet(elems.asJava) - @Test def getTopicMetadataNonExistingTopics() { val topic = "topic" @@ -44,7 +42,9 @@ class MetadataCacheTest { @Test def getTopicMetadata() { - val topic = "topic" + val topic0 = "topic-0" + val topic1 = "topic-1" + val cache = new MetadataCache(1) @@ -60,14 +60,14 @@ class MetadataCacheTest { ) } - val brokers = (0 to 2).map { brokerId => + val brokers = (0 to 4).map { brokerId => new Broker(brokerId, endPoints(brokerId).asJava, "rack1") }.toSet val partitionStates = Map( - new TopicPartition(topic, 0) -> new PartitionState(controllerEpoch, 0, 0, asList(0), zkVersion, asSet(0)), - new TopicPartition(topic, 1) -> new PartitionState(controllerEpoch, 1, 1, asList(1), zkVersion, asSet(1)), - new TopicPartition(topic, 2) -> new PartitionState(controllerEpoch, 2, 2, asList(2), zkVersion, asSet(2))) + new TopicPartition(topic0, 0) -> new PartitionState(controllerEpoch, 0, 0, asList(0, 1, 3), zkVersion, asList(0, 1, 3)), + new TopicPartition(topic0, 1) -> new PartitionState(controllerEpoch, 1, 1, asList(1, 0), zkVersion, asList(1, 2, 0, 4)), + new TopicPartition(topic1, 0) -> new PartitionState(controllerEpoch, 2, 2, asList(2, 1), zkVersion, asList(2, 1, 3))) val version = ApiKeys.UPDATE_METADATA_KEY.latestVersion val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, controllerId, controllerEpoch, @@ -76,29 +76,35 @@ class MetadataCacheTest { for (securityProtocol <- Seq(SecurityProtocol.PLAINTEXT, SecurityProtocol.SSL)) { val listenerName = ListenerName.forSecurityProtocol(securityProtocol) - val topicMetadatas = cache.getTopicMetadata(Set(topic), listenerName) - assertEquals(1, topicMetadatas.size) - - val topicMetadata = topicMetadatas.head - assertEquals(Errors.NONE, topicMetadata.error) - assertEquals(topic, topicMetadata.topic) - - val partitionMetadatas = topicMetadata.partitionMetadata.asScala.sortBy(_.partition) - assertEquals(3, partitionMetadatas.size) - - for (i <- 0 to 2) { - val partitionMetadata = partitionMetadatas(i) - assertEquals(Errors.NONE, partitionMetadata.error) - assertEquals(i, partitionMetadata.partition) - val leader = partitionMetadata.leader - assertEquals(i, leader.id) - val endPoint = endPoints(partitionMetadata.leader.id).find(_.listenerName == listenerName).get - assertEquals(endPoint.host, leader.host) - assertEquals(endPoint.port, leader.port) - assertEquals(List(i), partitionMetadata.isr.asScala.map(_.id)) - assertEquals(List(i), partitionMetadata.replicas.asScala.map(_.id)) + + def checkTopicMetadata(topic: String): Unit = { + val topicMetadatas = cache.getTopicMetadata(Set(topic), listenerName) + assertEquals(1, topicMetadatas.size) + + val topicMetadata = topicMetadatas.head + assertEquals(Errors.NONE, topicMetadata.error) + assertEquals(topic, topicMetadata.topic) + + val topicPartitionStates = partitionStates.filter { case (tp, _) => tp.topic == topic } + val partitionMetadatas = topicMetadata.partitionMetadata.asScala.sortBy(_.partition) + assertEquals(s"Unexpected partition count for topic $topic", topicPartitionStates.size, partitionMetadatas.size) + + partitionMetadatas.zipWithIndex.foreach { case (partitionMetadata, partitionId) => + assertEquals(Errors.NONE, partitionMetadata.error) + assertEquals(partitionId, partitionMetadata.partition) + val leader = partitionMetadata.leader + val partitionState = topicPartitionStates(new TopicPartition(topic, partitionId)) + assertEquals(partitionState.leader, leader.id) + assertEquals(partitionState.isr, partitionMetadata.isr.asScala.map(_.id).asJava) + assertEquals(partitionState.replicas, partitionMetadata.replicas.asScala.map(_.id).asJava) + val endPoint = endPoints(partitionMetadata.leader.id).find(_.listenerName == listenerName).get + assertEquals(endPoint.host, leader.host) + assertEquals(endPoint.port, leader.port) + } } + checkTopicMetadata(topic0) + checkTopicMetadata(topic1) } } @@ -119,7 +125,7 @@ class MetadataCacheTest { val leader = 1 val leaderEpoch = 1 val partitionStates = Map( - new TopicPartition(topic, 0) -> new PartitionState(controllerEpoch, leader, leaderEpoch, asList(0), zkVersion, asSet(0))) + new TopicPartition(topic, 0) -> new PartitionState(controllerEpoch, leader, leaderEpoch, asList(0), zkVersion, asList(0))) val version = ApiKeys.UPDATE_METADATA_KEY.latestVersion val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, controllerId, controllerEpoch, @@ -159,7 +165,7 @@ class MetadataCacheTest { // replica 1 is not available val leader = 0 val leaderEpoch = 0 - val replicas = asSet[Integer](0, 1) + val replicas = asList[Integer](0, 1) val isr = asList[Integer](0) val partitionStates = Map( @@ -219,7 +225,7 @@ class MetadataCacheTest { // replica 1 is not available val leader = 0 val leaderEpoch = 0 - val replicas = asSet[Integer](0) + val replicas = asList[Integer](0) val isr = asList[Integer](0, 1) val partitionStates = Map( @@ -273,7 +279,7 @@ class MetadataCacheTest { val controllerEpoch = 1 val leader = 0 val leaderEpoch = 0 - val replicas = asSet[Integer](0) + val replicas = asList[Integer](0) val isr = asList[Integer](0, 1) val partitionStates = Map( new TopicPartition(topic, 0) -> new PartitionState(controllerEpoch, leader, leaderEpoch, isr, 3, replicas)) @@ -306,7 +312,7 @@ class MetadataCacheTest { val controllerEpoch = 1 val leader = 0 val leaderEpoch = 0 - val replicas = asSet[Integer](0) + val replicas = asList[Integer](0) val isr = asList[Integer](0, 1) val partitionStates = Map( new TopicPartition(topic, 0) -> new PartitionState(controllerEpoch, leader, leaderEpoch, isr, 3, replicas)) http://git-wip-us.apache.org/repos/asf/kafka/blob/12612e82/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala index 177a9ee..877d156 100644 --- a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala @@ -19,6 +19,7 @@ package kafka.server import java.util.Properties +import kafka.network.SocketServer import kafka.utils.TestUtils import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.protocol.{ApiKeys, Errors} @@ -160,6 +161,33 @@ class MetadataRequestTest extends BaseRequestTest { assertEquals("V1 Response should have 2 (all) topics", 2, metadataResponseV1.topicMetadata.size()) } + /** + * Preferred replica should be the first item in the replicas list + */ + @Test + def testPreferredReplica(): Unit = { + val replicaAssignment = Map(0 -> Seq(1, 2, 0), 1 -> Seq(2, 0, 1)) + TestUtils.createTopic(zkUtils, "t1", replicaAssignment, servers) + // Call controller and one different broker to ensure that metadata propagation works correctly + val responses = Seq( + sendMetadataRequest(new MetadataRequest.Builder(Seq("t1").asJava, true).build(), Some(controllerSocketServer)), + sendMetadataRequest(new MetadataRequest.Builder(Seq("t1").asJava, true).build(), Some(notControllerSocketServer)) + ) + responses.foreach { response => + assertEquals(1, response.topicMetadata.size) + val topicMetadata = response.topicMetadata.iterator.next() + assertEquals(Errors.NONE, topicMetadata.error) + assertEquals("t1", topicMetadata.topic) + assertEquals(Set(0, 1), topicMetadata.partitionMetadata.asScala.map(_.partition).toSet) + topicMetadata.partitionMetadata.asScala.foreach { partitionMetadata => + val assignment = replicaAssignment(partitionMetadata.partition) + assertEquals(assignment, partitionMetadata.replicas.asScala.map(_.id)) + assertEquals(assignment, partitionMetadata.isr.asScala.map(_.id)) + assertEquals(assignment.head, partitionMetadata.leader.id) + } + } + } + @Test def testReplicaDownResponse() { val replicaDownTopic = "replicaDown" @@ -207,8 +235,8 @@ class MetadataRequestTest extends BaseRequestTest { assertEquals(s"Response should have $replicaCount replicas", replicaCount, v1PartitionMetadata.replicas.size) } - private def sendMetadataRequest(request: MetadataRequest): MetadataResponse = { - val response = connectAndSend(request, ApiKeys.METADATA) + private def sendMetadataRequest(request: MetadataRequest, destination: Option[SocketServer] = None): MetadataResponse = { + val response = connectAndSend(request, ApiKeys.METADATA, destination = destination.getOrElse(anySocketServer)) MetadataResponse.parse(response, request.version) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/12612e82/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index e078e12..5794854 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -141,13 +141,12 @@ class ReplicaManagerTest { try { val brokerList = Seq[Integer](0, 1).asJava - val brokerSet = Set[Integer](0, 1).asJava val partition = rm.getOrCreatePartition(new TopicPartition(topic, 0)) partition.getOrCreateReplica(0) // Make this replica the leader. val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(0, 0, - collection.immutable.Map(new TopicPartition(topic, 0) -> new PartitionState(0, 0, 0, brokerList, 0, brokerSet)).asJava, + collection.immutable.Map(new TopicPartition(topic, 0) -> new PartitionState(0, 0, 0, brokerList, 0, brokerList)).asJava, Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() rm.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ()) rm.getLeaderReplicaIfLocal(new TopicPartition(topic, 0)) @@ -164,7 +163,7 @@ class ReplicaManagerTest { // Make this replica the follower val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(0, 0, - collection.immutable.Map(new TopicPartition(topic, 0) -> new PartitionState(0, 1, 1, brokerList, 0, brokerSet)).asJava, + collection.immutable.Map(new TopicPartition(topic, 0) -> new PartitionState(0, 1, 1, brokerList, 0, brokerList)).asJava, Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() rm.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => ()) @@ -182,14 +181,13 @@ class ReplicaManagerTest { try { val brokerList = Seq[Integer](0, 1).asJava - val brokerSet = Set[Integer](0, 1).asJava val partition = replicaManager.getOrCreatePartition(new TopicPartition(topic, 0)) partition.getOrCreateReplica(0) // Make this replica the leader. val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(0, 0, - collection.immutable.Map(new TopicPartition(topic, 0) -> new PartitionState(0, 0, 0, brokerList, 0, brokerSet)).asJava, + collection.immutable.Map(new TopicPartition(topic, 0) -> new PartitionState(0, 0, 0, brokerList, 0, brokerList)).asJava, Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ()) replicaManager.getLeaderReplicaIfLocal(new TopicPartition(topic, 0)) @@ -280,7 +278,7 @@ class ReplicaManagerTest { // Make this replica the leader. val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(0, 0, - collection.immutable.Map(new TopicPartition(topic, 0) -> new PartitionState(0, 0, 0, brokerList, 0, brokerSet)).asJava, + collection.immutable.Map(new TopicPartition(topic, 0) -> new PartitionState(0, 0, 0, brokerList, 0, brokerList)).asJava, Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ()) replicaManager.getLeaderReplicaIfLocal(new TopicPartition(topic, 0)) @@ -351,15 +349,14 @@ class ReplicaManagerTest { metadataCache, Option(this.getClass.getName)) try { - val brokerList: java.util.List[Integer] = Seq[Integer](0, 1, 2).asJava - val brokerSet: java.util.Set[Integer] = Set[Integer](0, 1, 2).asJava + val brokerList = Seq[Integer](0, 1, 2).asJava val partition = rm.getOrCreatePartition(new TopicPartition(topic, 0)) partition.getOrCreateReplica(0) // Make this replica the leader. val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(0, 0, - collection.immutable.Map(new TopicPartition(topic, 0) -> new PartitionState(0, 0, 0, brokerList, 0, brokerSet)).asJava, + collection.immutable.Map(new TopicPartition(topic, 0) -> new PartitionState(0, 0, 0, brokerList, 0, brokerList)).asJava, Set(new Node(0, "host1", 0), new Node(1, "host2", 1), new Node(2, "host2", 2)).asJava).build() rm.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ()) rm.getLeaderReplicaIfLocal(new TopicPartition(topic, 0)) http://git-wip-us.apache.org/repos/asf/kafka/blob/12612e82/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index 3b0e93c..7c171b0 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -181,14 +181,14 @@ class RequestQuotaTest extends BaseRequestTest { case ApiKeys.LEADER_AND_ISR => new LeaderAndIsrRequest.Builder(brokerId, Int.MaxValue, - Map(tp -> new PartitionState(Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, Set(brokerId).asJava)).asJava, + Map(tp -> new PartitionState(Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, Seq(brokerId).asJava)).asJava, Set(new Node(brokerId, "localhost", 0)).asJava) case ApiKeys.STOP_REPLICA => new StopReplicaRequest.Builder(brokerId, Int.MaxValue, true, Set(tp).asJava) case ApiKeys.UPDATE_METADATA_KEY => - val partitionState = Map(tp -> new PartitionState(Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, Set(brokerId).asJava)).asJava + val partitionState = Map(tp -> new PartitionState(Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, Seq(brokerId).asJava)).asJava val securityProtocol = SecurityProtocol.PLAINTEXT val brokers = Set(new UpdateMetadataRequest.Broker(brokerId, Seq(new UpdateMetadataRequest.EndPoint("localhost", 0, securityProtocol,