Repository: kafka
Updated Branches:
  refs/heads/0.11.0 469778a5a -> eb2aa184a


KAFKA-5329; Fix order of replica list in metadata cache

Author: Ismael Juma <ism...@juma.me.uk>

Reviewers: Colin P. Mccabe <cmcc...@confluent.io>, Jun Rao <jun...@gmail.com>

Closes #3257 from ijuma/kafka-5329-fix-order-of-replica-list-in-metadata-cache

(cherry picked from commit 12612e82971b8dc85941c771b7fe87ff95a07ab5)
Signed-off-by: Ismael Juma <ism...@juma.me.uk>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/eb2aa184
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/eb2aa184
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/eb2aa184

Branch: refs/heads/0.11.0
Commit: eb2aa184a07abeec97fcda96d86d44c47ab35557
Parents: 469778a
Author: Ismael Juma <ism...@juma.me.uk>
Authored: Thu Jun 8 03:26:43 2017 +0100
Committer: Ismael Juma <ism...@juma.me.uk>
Committed: Thu Jun 8 03:27:45 2017 +0100

----------------------------------------------------------------------
 .../common/requests/LeaderAndIsrRequest.java    |  2 +-
 .../kafka/common/requests/MetadataResponse.java | 10 +++
 .../kafka/common/requests/PartitionState.java   | 12 ++--
 .../common/requests/UpdateMetadataRequest.java  |  2 +-
 .../common/requests/RequestResponseTest.java    | 12 ++--
 .../src/main/scala/kafka/api/LeaderAndIsr.scala |  2 +-
 .../controller/ControllerChannelManager.scala   |  4 +-
 .../kafka/api/AdminClientIntegrationTest.scala  | 44 +++++++++++-
 .../kafka/api/AuthorizerIntegrationTest.scala   |  4 +-
 .../unit/kafka/server/LeaderElectionTest.scala  |  2 +-
 .../unit/kafka/server/MetadataCacheTest.scala   | 72 +++++++++++---------
 .../unit/kafka/server/MetadataRequestTest.scala | 32 ++++++++-
 .../unit/kafka/server/ReplicaManagerTest.scala  | 15 ++--
 .../unit/kafka/server/RequestQuotaTest.scala    |  4 +-
 14 files changed, 148 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/eb2aa184/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
index 36426c2..1fdb4a2 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
@@ -118,7 +118,7 @@ public class LeaderAndIsrRequest extends AbstractRequest {
             int zkVersion = partitionStateData.getInt(ZK_VERSION_KEY_NAME);
 
             Object[] replicasArray = 
partitionStateData.getArray(REPLICAS_KEY_NAME);
-            Set<Integer> replicas = new HashSet<>(replicasArray.length);
+            List<Integer> replicas = new ArrayList<>(replicasArray.length);
             for (Object r : replicasArray)
                 replicas.add((Integer) r);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/eb2aa184/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java 
b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
index 74e058b..b798764 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -23,6 +23,7 @@ import 
org.apache.kafka.common.errors.InvalidMetadataException;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.utils.Utils;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -372,6 +373,15 @@ public class MetadataResponse extends AbstractResponse {
             return isr;
         }
 
+        @Override
+        public String toString() {
+            return "(type=PartitionMetadata," +
+                    ", error=" + error +
+                    ", partition=" + partition +
+                    ", leader=" + leader +
+                    ", replicas=" + Utils.join(replicas, ",") +
+                    ", isr=" + Utils.join(isr, ",") + ')';
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/eb2aa184/clients/src/main/java/org/apache/kafka/common/requests/PartitionState.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/PartitionState.java 
b/clients/src/main/java/org/apache/kafka/common/requests/PartitionState.java
index 035b330..394a60f 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/PartitionState.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/PartitionState.java
@@ -16,9 +16,9 @@
  */
 package org.apache.kafka.common.requests;
 
-import java.util.Arrays;
+import org.apache.kafka.common.utils.Utils;
+
 import java.util.List;
-import java.util.Set;
 
 public class PartitionState {
     public final int controllerEpoch;
@@ -26,9 +26,9 @@ public class PartitionState {
     public final int leaderEpoch;
     public final List<Integer> isr;
     public final int zkVersion;
-    public final Set<Integer> replicas;
+    public final List<Integer> replicas;
 
-    public PartitionState(int controllerEpoch, int leader, int leaderEpoch, 
List<Integer> isr, int zkVersion, Set<Integer> replicas) {
+    public PartitionState(int controllerEpoch, int leader, int leaderEpoch, 
List<Integer> isr, int zkVersion, List<Integer> replicas) {
         this.controllerEpoch = controllerEpoch;
         this.leader = leader;
         this.leaderEpoch = leaderEpoch;
@@ -42,8 +42,8 @@ public class PartitionState {
         return "PartitionState(controllerEpoch=" + controllerEpoch +
                 ", leader=" + leader +
                 ", leaderEpoch=" + leaderEpoch +
-                ", isr=" + Arrays.toString(isr.toArray()) +
+                ", isr=" + Utils.join(isr, ",") +
                 ", zkVersion=" + zkVersion +
-                ", replicas=" + Arrays.toString(replicas.toArray()) + ")";
+                ", replicas=" + Utils.join(replicas, ",") + ")";
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/eb2aa184/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
index be2eaab..8f9b592 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
@@ -174,7 +174,7 @@ public class UpdateMetadataRequest extends AbstractRequest {
             int zkVersion = partitionStateData.getInt(ZK_VERSION_KEY_NAME);
 
             Object[] replicasArray = 
partitionStateData.getArray(REPLICAS_KEY_NAME);
-            Set<Integer> replicas = new HashSet<>(replicasArray.length);
+            List<Integer> replicas = new ArrayList<>(replicasArray.length);
             for (Object r : replicasArray)
                 replicas.add((Integer) r);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/eb2aa184/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index a05b680..327f228 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -798,11 +798,11 @@ public class RequestResponseTest {
         List<Integer> isr = asList(1, 2);
         List<Integer> replicas = asList(1, 2, 3, 4);
         partitionStates.put(new TopicPartition("topic5", 105),
-                new PartitionState(0, 2, 1, new ArrayList<>(isr), 2, new 
HashSet<>(replicas)));
+                new PartitionState(0, 2, 1, new ArrayList<>(isr), 2, 
replicas));
         partitionStates.put(new TopicPartition("topic5", 1),
-                new PartitionState(1, 1, 1, new ArrayList<>(isr), 2, new 
HashSet<>(replicas)));
+                new PartitionState(1, 1, 1, new ArrayList<>(isr), 2, 
replicas));
         partitionStates.put(new TopicPartition("topic20", 1),
-                new PartitionState(1, 0, 1, new ArrayList<>(isr), 2, new 
HashSet<>(replicas)));
+                new PartitionState(1, 0, 1, new ArrayList<>(isr), 2, 
replicas));
 
         Set<Node> leaders = Utils.mkSet(
                 new Node(0, "test0", 1223),
@@ -823,11 +823,11 @@ public class RequestResponseTest {
         List<Integer> isr = asList(1, 2);
         List<Integer> replicas = asList(1, 2, 3, 4);
         partitionStates.put(new TopicPartition("topic5", 105),
-                new PartitionState(0, 2, 1, new ArrayList<>(isr), 2, new 
HashSet<>(replicas)));
+                new PartitionState(0, 2, 1, new ArrayList<>(isr), 2, 
replicas));
         partitionStates.put(new TopicPartition("topic5", 1),
-                new PartitionState(1, 1, 1, new ArrayList<>(isr), 2, new 
HashSet<>(replicas)));
+                new PartitionState(1, 1, 1, new ArrayList<>(isr), 2, 
replicas));
         partitionStates.put(new TopicPartition("topic20", 1),
-                new PartitionState(1, 0, 1, new ArrayList<>(isr), 2, new 
HashSet<>(replicas)));
+                new PartitionState(1, 0, 1, new ArrayList<>(isr), 2, 
replicas));
 
         SecurityProtocol plaintext = SecurityProtocol.PLAINTEXT;
         List<UpdateMetadataRequest.EndPoint> endPoints1 = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/kafka/blob/eb2aa184/core/src/main/scala/kafka/api/LeaderAndIsr.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/LeaderAndIsr.scala 
b/core/src/main/scala/kafka/api/LeaderAndIsr.scala
index 8c3b7e5..474d7a0 100644
--- a/core/src/main/scala/kafka/api/LeaderAndIsr.scala
+++ b/core/src/main/scala/kafka/api/LeaderAndIsr.scala
@@ -50,7 +50,7 @@ case class LeaderAndIsr(leader: Int,
   }
 }
 
-case class PartitionStateInfo(leaderIsrAndControllerEpoch: 
LeaderIsrAndControllerEpoch, allReplicas: Set[Int]) {
+case class PartitionStateInfo(leaderIsrAndControllerEpoch: 
LeaderIsrAndControllerEpoch, allReplicas: Seq[Int]) {
 
   override def toString: String = {
     val partitionStateInfo = new StringBuilder

http://git-wip-us.apache.org/repos/asf/kafka/blob/eb2aa184/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala 
b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index e5d12e8..758ff88 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -316,7 +316,7 @@ class ControllerBrokerRequestBatch(controller: 
KafkaController) extends  Logging
 
     brokerIds.filter(_ >= 0).foreach { brokerId =>
       val result = leaderAndIsrRequestMap.getOrElseUpdate(brokerId, 
mutable.Map.empty)
-      result.put(topicPartition, 
PartitionStateInfo(leaderIsrAndControllerEpoch, replicas.toSet))
+      result.put(topicPartition, 
PartitionStateInfo(leaderIsrAndControllerEpoch, replicas))
     }
 
     
addUpdateMetadataRequestForBrokers(controllerContext.liveOrShuttingDownBrokerIds.toSeq,
@@ -345,7 +345,7 @@ class ControllerBrokerRequestBatch(controller: 
KafkaController) extends  Logging
       val leaderIsrAndControllerEpochOpt = 
controllerContext.partitionLeadershipInfo.get(partition)
       leaderIsrAndControllerEpochOpt match {
         case Some(l @ LeaderIsrAndControllerEpoch(leaderAndIsr, 
controllerEpoch)) =>
-          val replicas = 
controllerContext.partitionReplicaAssignment(partition).toSet
+          val replicas = 
controllerContext.partitionReplicaAssignment(partition)
 
           val leaderIsrAndControllerEpoch = if (beingDeleted) {
             val leaderDuringDelete = 
LeaderAndIsr.duringDelete(leaderAndIsr.isr)

http://git-wip-us.apache.org/repos/asf/kafka/blob/eb2aa184/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index cd4ecc9..e91c171 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -123,7 +123,10 @@ class AdminClientIntegrationTest extends 
KafkaServerTestHarness with Logging {
   def testCreateDeleteTopics(): Unit = {
     client = AdminClient.create(createConfig())
     val topics = Seq("mytopic", "mytopic2")
-    val newTopics = topics.map(new NewTopic(_, 1, 1))
+    val newTopics = Seq(
+      new NewTopic("mytopic", Map((0: Integer) -> Seq[Integer](1, 2).asJava, 
(1: Integer) -> Seq[Integer](2, 0).asJava).asJava),
+      new NewTopic("mytopic2", 3, 3)
+    )
     client.createTopics(newTopics.asJava, new 
CreateTopicsOptions().validateOnly(true)).all.get()
     waitForTopics(client, List(), topics)
 
@@ -135,8 +138,43 @@ class AdminClientIntegrationTest extends 
KafkaServerTestHarness with Logging {
     assertFutureExceptionTypeEquals(results.get("mytopic"), 
classOf[TopicExistsException])
     assertTrue(results.containsKey("mytopic2"))
     assertFutureExceptionTypeEquals(results.get("mytopic2"), 
classOf[TopicExistsException])
-    val topicsFromDescribe = 
client.describeTopics(topics.asJava).all.get().asScala.keys
-    assertEquals(topics.toSet, topicsFromDescribe)
+
+    val topicToDescription = client.describeTopics(topics.asJava).all.get()
+    assertEquals(topics.toSet, topicToDescription.keySet.asScala)
+
+    val topic0 = topicToDescription.get("mytopic")
+    assertEquals(false, topic0.internal)
+    assertEquals("mytopic", topic0.name)
+    assertEquals(2, topic0.partitions.size)
+    val topic0Partition0 = topic0.partitions.get(0)
+    assertEquals(1, topic0Partition0.leader.id)
+    assertEquals(0, topic0Partition0.partition)
+    assertEquals(Seq(1, 2), topic0Partition0.isr.asScala.map(_.id))
+    assertEquals(Seq(1, 2), topic0Partition0.replicas.asScala.map(_.id))
+    val topic0Partition1 = topic0.partitions.get(1)
+    assertEquals(2, topic0Partition1.leader.id)
+    assertEquals(1, topic0Partition1.partition)
+    assertEquals(Seq(2, 0), topic0Partition1.isr.asScala.map(_.id))
+    assertEquals(Seq(2, 0), topic0Partition1.replicas.asScala.map(_.id))
+
+    val topic1 = topicToDescription.get("mytopic2")
+    assertEquals(false, topic1.internal)
+    assertEquals("mytopic2", topic1.name)
+    assertEquals(3, topic1.partitions.size)
+    for (partitionId <- 0 until 3) {
+      val partition = topic1.partitions.get(partitionId)
+      assertEquals(partitionId, partition.partition)
+      assertEquals(3, partition.replicas.size)
+      partition.replicas.asScala.foreach { replica =>
+        assertTrue(replica.id >= 0)
+        assertTrue(replica.id < brokerCount)
+      }
+      assertEquals("No duplicate replica ids", partition.replicas.size, 
partition.replicas.asScala.map(_.id).distinct.size)
+
+      assertEquals(3, partition.isr.size)
+      assertEquals(partition.replicas, partition.isr)
+      assertTrue(partition.replicas.contains(partition.leader))
+    }
 
     client.deleteTopics(topics.asJava).all.get()
     waitForTopics(client, List(), topics)

http://git-wip-us.apache.org/repos/asf/kafka/blob/eb2aa184/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index dce5da2..22699a9 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -258,7 +258,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   }
 
   private def createUpdateMetadataRequest = {
-    val partitionState = Map(tp -> new PartitionState(Int.MaxValue, brokerId, 
Int.MaxValue, List(brokerId).asJava, 2, Set(brokerId).asJava)).asJava
+    val partitionState = Map(tp -> new PartitionState(Int.MaxValue, brokerId, 
Int.MaxValue, List(brokerId).asJava, 2, Seq(brokerId).asJava)).asJava
     val securityProtocol = SecurityProtocol.PLAINTEXT
     val brokers = Set(new requests.UpdateMetadataRequest.Broker(brokerId,
       Seq(new requests.UpdateMetadataRequest.EndPoint("localhost", 0, 
securityProtocol,
@@ -294,7 +294,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   private def createLeaderAndIsrRequest = {
     new requests.LeaderAndIsrRequest.Builder(brokerId, Int.MaxValue,
-      Map(tp -> new PartitionState(Int.MaxValue, brokerId, Int.MaxValue, 
List(brokerId).asJava, 2, Set(brokerId).asJava)).asJava,
+      Map(tp -> new PartitionState(Int.MaxValue, brokerId, Int.MaxValue, 
List(brokerId).asJava, 2, Seq(brokerId).asJava)).asJava,
       Set(new Node(brokerId, "localhost", 0)).asJava).build()
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/eb2aa184/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala 
b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
index 37e0966..4ebb17b 100755
--- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
@@ -145,7 +145,7 @@ class LeaderElectionTest extends ZooKeeperTestHarness {
       val partitionStates = Map(
         new TopicPartition(topic, partitionId) -> new PartitionState(2, 
brokerId2, LeaderAndIsr.initialLeaderEpoch,
           Seq(brokerId1, brokerId2).map(Integer.valueOf).asJava, 
LeaderAndIsr.initialZKVersion,
-          Set(0, 1).map(Integer.valueOf).asJava)
+          Seq(0, 1).map(Integer.valueOf).asJava)
       )
       val requestBuilder = new LeaderAndIsrRequest.Builder(
           controllerId, staleControllerEpoch, partitionStates.asJava, 
nodes.toSet.asJava)

http://git-wip-us.apache.org/repos/asf/kafka/blob/eb2aa184/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala 
b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
index 57c1846..d9fe995 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
@@ -32,8 +32,6 @@ import scala.collection.JavaConverters._
 
 class MetadataCacheTest {
 
-  private def asSet[T](elems: T*): util.Set[T] = new util.HashSet(elems.asJava)
-
   @Test
   def getTopicMetadataNonExistingTopics() {
     val topic = "topic"
@@ -44,7 +42,9 @@ class MetadataCacheTest {
 
   @Test
   def getTopicMetadata() {
-    val topic = "topic"
+    val topic0 = "topic-0"
+    val topic1 = "topic-1"
+
 
     val cache = new MetadataCache(1)
 
@@ -60,14 +60,14 @@ class MetadataCacheTest {
       )
     }
 
-    val brokers = (0 to 2).map { brokerId =>
+    val brokers = (0 to 4).map { brokerId =>
       new Broker(brokerId, endPoints(brokerId).asJava, "rack1")
     }.toSet
 
     val partitionStates = Map(
-      new TopicPartition(topic, 0) -> new PartitionState(controllerEpoch, 0, 
0, asList(0), zkVersion, asSet(0)),
-      new TopicPartition(topic, 1) -> new PartitionState(controllerEpoch, 1, 
1, asList(1), zkVersion, asSet(1)),
-      new TopicPartition(topic, 2) -> new PartitionState(controllerEpoch, 2, 
2, asList(2), zkVersion, asSet(2)))
+      new TopicPartition(topic0, 0) -> new PartitionState(controllerEpoch, 0, 
0, asList(0, 1, 3), zkVersion, asList(0, 1, 3)),
+      new TopicPartition(topic0, 1) -> new PartitionState(controllerEpoch, 1, 
1, asList(1, 0), zkVersion, asList(1, 2, 0, 4)),
+      new TopicPartition(topic1, 0) -> new PartitionState(controllerEpoch, 2, 
2, asList(2, 1), zkVersion, asList(2, 1, 3)))
 
     val version = ApiKeys.UPDATE_METADATA_KEY.latestVersion
     val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, 
controllerId, controllerEpoch,
@@ -76,29 +76,35 @@ class MetadataCacheTest {
 
     for (securityProtocol <- Seq(SecurityProtocol.PLAINTEXT, 
SecurityProtocol.SSL)) {
       val listenerName = ListenerName.forSecurityProtocol(securityProtocol)
-      val topicMetadatas = cache.getTopicMetadata(Set(topic), listenerName)
-      assertEquals(1, topicMetadatas.size)
-
-      val topicMetadata = topicMetadatas.head
-      assertEquals(Errors.NONE, topicMetadata.error)
-      assertEquals(topic, topicMetadata.topic)
-
-      val partitionMetadatas = 
topicMetadata.partitionMetadata.asScala.sortBy(_.partition)
-      assertEquals(3, partitionMetadatas.size)
-
-      for (i <- 0 to 2) {
-        val partitionMetadata = partitionMetadatas(i)
-        assertEquals(Errors.NONE, partitionMetadata.error)
-        assertEquals(i, partitionMetadata.partition)
-        val leader = partitionMetadata.leader
-        assertEquals(i, leader.id)
-        val endPoint = 
endPoints(partitionMetadata.leader.id).find(_.listenerName == listenerName).get
-        assertEquals(endPoint.host, leader.host)
-        assertEquals(endPoint.port, leader.port)
-        assertEquals(List(i), partitionMetadata.isr.asScala.map(_.id))
-        assertEquals(List(i), partitionMetadata.replicas.asScala.map(_.id))
+
+      def checkTopicMetadata(topic: String): Unit = {
+        val topicMetadatas = cache.getTopicMetadata(Set(topic), listenerName)
+        assertEquals(1, topicMetadatas.size)
+
+        val topicMetadata = topicMetadatas.head
+        assertEquals(Errors.NONE, topicMetadata.error)
+        assertEquals(topic, topicMetadata.topic)
+
+        val topicPartitionStates = partitionStates.filter { case (tp, _) => 
tp.topic ==  topic }
+        val partitionMetadatas = 
topicMetadata.partitionMetadata.asScala.sortBy(_.partition)
+        assertEquals(s"Unexpected partition count for topic $topic", 
topicPartitionStates.size, partitionMetadatas.size)
+
+        partitionMetadatas.zipWithIndex.foreach { case (partitionMetadata, 
partitionId) =>
+          assertEquals(Errors.NONE, partitionMetadata.error)
+          assertEquals(partitionId, partitionMetadata.partition)
+          val leader = partitionMetadata.leader
+          val partitionState = topicPartitionStates(new TopicPartition(topic, 
partitionId))
+          assertEquals(partitionState.leader, leader.id)
+          assertEquals(partitionState.isr, 
partitionMetadata.isr.asScala.map(_.id).asJava)
+          assertEquals(partitionState.replicas, 
partitionMetadata.replicas.asScala.map(_.id).asJava)
+          val endPoint = 
endPoints(partitionMetadata.leader.id).find(_.listenerName == listenerName).get
+          assertEquals(endPoint.host, leader.host)
+          assertEquals(endPoint.port, leader.port)
+        }
       }
 
+      checkTopicMetadata(topic0)
+      checkTopicMetadata(topic1)
     }
 
   }
@@ -119,7 +125,7 @@ class MetadataCacheTest {
     val leader = 1
     val leaderEpoch = 1
     val partitionStates = Map(
-      new TopicPartition(topic, 0) -> new PartitionState(controllerEpoch, 
leader, leaderEpoch, asList(0), zkVersion, asSet(0)))
+      new TopicPartition(topic, 0) -> new PartitionState(controllerEpoch, 
leader, leaderEpoch, asList(0), zkVersion, asList(0)))
 
     val version = ApiKeys.UPDATE_METADATA_KEY.latestVersion
     val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, 
controllerId, controllerEpoch,
@@ -159,7 +165,7 @@ class MetadataCacheTest {
     // replica 1 is not available
     val leader = 0
     val leaderEpoch = 0
-    val replicas = asSet[Integer](0, 1)
+    val replicas = asList[Integer](0, 1)
     val isr = asList[Integer](0)
 
     val partitionStates = Map(
@@ -219,7 +225,7 @@ class MetadataCacheTest {
     // replica 1 is not available
     val leader = 0
     val leaderEpoch = 0
-    val replicas = asSet[Integer](0)
+    val replicas = asList[Integer](0)
     val isr = asList[Integer](0, 1)
 
     val partitionStates = Map(
@@ -273,7 +279,7 @@ class MetadataCacheTest {
     val controllerEpoch = 1
     val leader = 0
     val leaderEpoch = 0
-    val replicas = asSet[Integer](0)
+    val replicas = asList[Integer](0)
     val isr = asList[Integer](0, 1)
     val partitionStates = Map(
       new TopicPartition(topic, 0) -> new PartitionState(controllerEpoch, 
leader, leaderEpoch, isr, 3, replicas))
@@ -306,7 +312,7 @@ class MetadataCacheTest {
       val controllerEpoch = 1
       val leader = 0
       val leaderEpoch = 0
-      val replicas = asSet[Integer](0)
+      val replicas = asList[Integer](0)
       val isr = asList[Integer](0, 1)
       val partitionStates = Map(
         new TopicPartition(topic, 0) -> new PartitionState(controllerEpoch, 
leader, leaderEpoch, isr, 3, replicas))

http://git-wip-us.apache.org/repos/asf/kafka/blob/eb2aa184/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
index 177a9ee..877d156 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
@@ -19,6 +19,7 @@ package kafka.server
 
 import java.util.Properties
 
+import kafka.network.SocketServer
 import kafka.utils.TestUtils
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
@@ -160,6 +161,33 @@ class MetadataRequestTest extends BaseRequestTest {
     assertEquals("V1 Response should have 2 (all) topics", 2, 
metadataResponseV1.topicMetadata.size())
   }
 
+  /**
+    * Preferred replica should be the first item in the replicas list
+    */
+  @Test
+  def testPreferredReplica(): Unit = {
+    val replicaAssignment = Map(0 -> Seq(1, 2, 0), 1 -> Seq(2, 0, 1))
+    TestUtils.createTopic(zkUtils, "t1", replicaAssignment, servers)
+    // Call controller and one different broker to ensure that metadata 
propagation works correctly
+    val responses = Seq(
+      sendMetadataRequest(new MetadataRequest.Builder(Seq("t1").asJava, 
true).build(), Some(controllerSocketServer)),
+      sendMetadataRequest(new MetadataRequest.Builder(Seq("t1").asJava, 
true).build(), Some(notControllerSocketServer))
+    )
+    responses.foreach { response =>
+      assertEquals(1, response.topicMetadata.size)
+      val topicMetadata = response.topicMetadata.iterator.next()
+      assertEquals(Errors.NONE, topicMetadata.error)
+      assertEquals("t1", topicMetadata.topic)
+      assertEquals(Set(0, 1), 
topicMetadata.partitionMetadata.asScala.map(_.partition).toSet)
+      topicMetadata.partitionMetadata.asScala.foreach { partitionMetadata =>
+        val assignment = replicaAssignment(partitionMetadata.partition)
+        assertEquals(assignment, partitionMetadata.replicas.asScala.map(_.id))
+        assertEquals(assignment, partitionMetadata.isr.asScala.map(_.id))
+        assertEquals(assignment.head, partitionMetadata.leader.id)
+      }
+    }
+  }
+
   @Test
   def testReplicaDownResponse() {
     val replicaDownTopic = "replicaDown"
@@ -207,8 +235,8 @@ class MetadataRequestTest extends BaseRequestTest {
     assertEquals(s"Response should have $replicaCount replicas", replicaCount, 
v1PartitionMetadata.replicas.size)
   }
 
-  private def sendMetadataRequest(request: MetadataRequest): MetadataResponse 
= {
-    val response = connectAndSend(request, ApiKeys.METADATA)
+  private def sendMetadataRequest(request: MetadataRequest, destination: 
Option[SocketServer] = None): MetadataResponse = {
+    val response = connectAndSend(request, ApiKeys.METADATA, destination = 
destination.getOrElse(anySocketServer))
     MetadataResponse.parse(response, request.version)
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/eb2aa184/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index e078e12..5794854 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -141,13 +141,12 @@ class ReplicaManagerTest {
 
     try {
       val brokerList = Seq[Integer](0, 1).asJava
-      val brokerSet = Set[Integer](0, 1).asJava
 
       val partition = rm.getOrCreatePartition(new TopicPartition(topic, 0))
       partition.getOrCreateReplica(0)
       // Make this replica the leader.
       val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(0, 0,
-        collection.immutable.Map(new TopicPartition(topic, 0) -> new 
PartitionState(0, 0, 0, brokerList, 0, brokerSet)).asJava,
+        collection.immutable.Map(new TopicPartition(topic, 0) -> new 
PartitionState(0, 0, 0, brokerList, 0, brokerList)).asJava,
         Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
       rm.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ())
       rm.getLeaderReplicaIfLocal(new TopicPartition(topic, 0))
@@ -164,7 +163,7 @@ class ReplicaManagerTest {
 
       // Make this replica the follower
       val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(0, 0,
-        collection.immutable.Map(new TopicPartition(topic, 0) -> new 
PartitionState(0, 1, 1, brokerList, 0, brokerSet)).asJava,
+        collection.immutable.Map(new TopicPartition(topic, 0) -> new 
PartitionState(0, 1, 1, brokerList, 0, brokerList)).asJava,
         Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
       rm.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => ())
 
@@ -182,14 +181,13 @@ class ReplicaManagerTest {
 
     try {
       val brokerList = Seq[Integer](0, 1).asJava
-      val brokerSet = Set[Integer](0, 1).asJava
 
       val partition = replicaManager.getOrCreatePartition(new 
TopicPartition(topic, 0))
       partition.getOrCreateReplica(0)
 
       // Make this replica the leader.
       val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(0, 0,
-        collection.immutable.Map(new TopicPartition(topic, 0) -> new 
PartitionState(0, 0, 0, brokerList, 0, brokerSet)).asJava,
+        collection.immutable.Map(new TopicPartition(topic, 0) -> new 
PartitionState(0, 0, 0, brokerList, 0, brokerList)).asJava,
         Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
       replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => 
())
       replicaManager.getLeaderReplicaIfLocal(new TopicPartition(topic, 0))
@@ -280,7 +278,7 @@ class ReplicaManagerTest {
 
       // Make this replica the leader.
       val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(0, 0,
-        collection.immutable.Map(new TopicPartition(topic, 0) -> new 
PartitionState(0, 0, 0, brokerList, 0, brokerSet)).asJava,
+        collection.immutable.Map(new TopicPartition(topic, 0) -> new 
PartitionState(0, 0, 0, brokerList, 0, brokerList)).asJava,
         Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
       replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => 
())
       replicaManager.getLeaderReplicaIfLocal(new TopicPartition(topic, 0))
@@ -351,15 +349,14 @@ class ReplicaManagerTest {
       metadataCache, Option(this.getClass.getName))
 
     try {
-      val brokerList: java.util.List[Integer] = Seq[Integer](0, 1, 2).asJava
-      val brokerSet: java.util.Set[Integer] = Set[Integer](0, 1, 2).asJava
+      val brokerList = Seq[Integer](0, 1, 2).asJava
 
       val partition = rm.getOrCreatePartition(new TopicPartition(topic, 0))
       partition.getOrCreateReplica(0)
 
       // Make this replica the leader.
       val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(0, 0,
-        collection.immutable.Map(new TopicPartition(topic, 0) -> new 
PartitionState(0, 0, 0, brokerList, 0, brokerSet)).asJava,
+        collection.immutable.Map(new TopicPartition(topic, 0) -> new 
PartitionState(0, 0, 0, brokerList, 0, brokerList)).asJava,
         Set(new Node(0, "host1", 0), new Node(1, "host2", 1), new Node(2, 
"host2", 2)).asJava).build()
       rm.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ())
       rm.getLeaderReplicaIfLocal(new TopicPartition(topic, 0))

http://git-wip-us.apache.org/repos/asf/kafka/blob/eb2aa184/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala 
b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 3b0e93c..7c171b0 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -181,14 +181,14 @@ class RequestQuotaTest extends BaseRequestTest {
 
         case ApiKeys.LEADER_AND_ISR =>
           new LeaderAndIsrRequest.Builder(brokerId, Int.MaxValue,
-            Map(tp -> new PartitionState(Int.MaxValue, brokerId, Int.MaxValue, 
List(brokerId).asJava, 2, Set(brokerId).asJava)).asJava,
+            Map(tp -> new PartitionState(Int.MaxValue, brokerId, Int.MaxValue, 
List(brokerId).asJava, 2, Seq(brokerId).asJava)).asJava,
             Set(new Node(brokerId, "localhost", 0)).asJava)
 
         case ApiKeys.STOP_REPLICA =>
           new StopReplicaRequest.Builder(brokerId, Int.MaxValue, true, 
Set(tp).asJava)
 
         case ApiKeys.UPDATE_METADATA_KEY =>
-          val partitionState = Map(tp -> new PartitionState(Int.MaxValue, 
brokerId, Int.MaxValue, List(brokerId).asJava, 2, Set(brokerId).asJava)).asJava
+          val partitionState = Map(tp -> new PartitionState(Int.MaxValue, 
brokerId, Int.MaxValue, List(brokerId).asJava, 2, Seq(brokerId).asJava)).asJava
           val securityProtocol = SecurityProtocol.PLAINTEXT
           val brokers = Set(new UpdateMetadataRequest.Broker(brokerId,
             Seq(new UpdateMetadataRequest.EndPoint("localhost", 0, 
securityProtocol,

Reply via email to