Repository: kafka
Updated Branches:
  refs/heads/trunk c07d01722 -> 8ef804dc1


KAFKA-3414; Return of MetadataCache.getAliveBrokers should not be mutated by 
cache updates

`Map.values` returns `DefaultValuesIterable` where the default implementation 
of `toSeq` is (sadly) `toStream`. `Stream` is a lazy collection and it can 
reflect changes to the underlying map before it's `forced`.

I verified that the test failed before my change.

Author: Ismael Juma <[email protected]>

Reviewers: Gwen Shapira

Closes #1088 from ijuma/kafka-3414-get-alive-brokers-no-mutation


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8ef804dc
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8ef804dc
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8ef804dc

Branch: refs/heads/trunk
Commit: 8ef804dc194bb562b6dbe48855e81965cacd1114
Parents: c07d017
Author: Ismael Juma <[email protected]>
Authored: Thu Mar 17 13:56:27 2016 -0700
Committer: Gwen Shapira <[email protected]>
Committed: Thu Mar 17 13:56:27 2016 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/server/MetadataCache.scala |  2 +-
 .../unit/kafka/server/MetadataCacheTest.scala   | 28 ++++++++++++++++++++
 2 files changed, 29 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/8ef804dc/core/src/main/scala/kafka/server/MetadataCache.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala 
b/core/src/main/scala/kafka/server/MetadataCache.scala
index b23ecbe..4b68f70 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -132,7 +132,7 @@ private[server] class MetadataCache(brokerId: Int) extends 
Logging {
 
   def getAliveBrokers: Seq[Broker] = {
     inReadLock(partitionMetadataLock) {
-      aliveBrokers.values.toSeq
+      aliveBrokers.values.toBuffer
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8ef804dc/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala 
b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
index dcc310f..017faea 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
@@ -238,4 +238,32 @@ class MetadataCacheTest {
 
   }
 
+  @Test
+  def getAliveBrokersShouldNotBeMutatedByUpdateCache() {
+    val topic = "topic"
+    val cache = new MetadataCache(1)
+
+    def updateCache(brokerIds: Set[Int]) {
+      val brokers = brokerIds.map { brokerId =>
+        new Broker(brokerId, Map(SecurityProtocol.PLAINTEXT -> new 
EndPoint("foo", 9092)).asJava, "")
+      }
+      val controllerEpoch = 1
+      val leader = 0
+      val leaderEpoch = 0
+      val replicas = asSet[Integer](0)
+      val isr = asList[Integer](0, 1)
+      val partitionStates = Map(
+        new TopicPartition(topic, 0) -> new PartitionState(controllerEpoch, 
leader, leaderEpoch, isr, 3, replicas))
+      val updateMetadataRequest = new UpdateMetadataRequest(2, 
controllerEpoch, partitionStates.asJava, brokers.asJava)
+      cache.updateCache(15, updateMetadataRequest)
+    }
+
+    val initialBrokerIds = (0 to 2).toSet
+    updateCache(initialBrokerIds)
+    val aliveBrokersFromCache = cache.getAliveBrokers
+    // This should not change `aliveBrokersFromCache`
+    updateCache((0 to 3).toSet)
+    assertEquals(initialBrokerIds, aliveBrokersFromCache.map(_.id).toSet)
+  }
+
 }

Reply via email to