Repository: kafka Updated Branches: refs/heads/trunk c07d01722 -> 8ef804dc1
KAFKA-3414; Return of MetadataCache.getAliveBrokers should not be mutated by cache updates `Map.values` returns `DefaultValuesIterable` where the default implementation of `toSeq` is (sadly) `toStream`. `Stream` is a lazy collection and it can reflect changes to the underlying map before it's `forced`. I verified that the test failed before my change. Author: Ismael Juma <[email protected]> Reviewers: Gwen Shapira Closes #1088 from ijuma/kafka-3414-get-alive-brokers-no-mutation Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8ef804dc Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8ef804dc Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8ef804dc Branch: refs/heads/trunk Commit: 8ef804dc194bb562b6dbe48855e81965cacd1114 Parents: c07d017 Author: Ismael Juma <[email protected]> Authored: Thu Mar 17 13:56:27 2016 -0700 Committer: Gwen Shapira <[email protected]> Committed: Thu Mar 17 13:56:27 2016 -0700 ---------------------------------------------------------------------- .../main/scala/kafka/server/MetadataCache.scala | 2 +- .../unit/kafka/server/MetadataCacheTest.scala | 28 ++++++++++++++++++++ 2 files changed, 29 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/8ef804dc/core/src/main/scala/kafka/server/MetadataCache.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala index b23ecbe..4b68f70 100755 --- a/core/src/main/scala/kafka/server/MetadataCache.scala +++ b/core/src/main/scala/kafka/server/MetadataCache.scala @@ -132,7 +132,7 @@ private[server] class MetadataCache(brokerId: Int) extends Logging { def getAliveBrokers: Seq[Broker] = { inReadLock(partitionMetadataLock) { - aliveBrokers.values.toSeq + aliveBrokers.values.toBuffer } } http://git-wip-us.apache.org/repos/asf/kafka/blob/8ef804dc/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala index dcc310f..017faea 100644 --- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala +++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala @@ -238,4 +238,32 @@ class MetadataCacheTest { } + @Test + def getAliveBrokersShouldNotBeMutatedByUpdateCache() { + val topic = "topic" + val cache = new MetadataCache(1) + + def updateCache(brokerIds: Set[Int]) { + val brokers = brokerIds.map { brokerId => + new Broker(brokerId, Map(SecurityProtocol.PLAINTEXT -> new EndPoint("foo", 9092)).asJava, "") + } + val controllerEpoch = 1 + val leader = 0 + val leaderEpoch = 0 + val replicas = asSet[Integer](0) + val isr = asList[Integer](0, 1) + val partitionStates = Map( + new TopicPartition(topic, 0) -> new PartitionState(controllerEpoch, leader, leaderEpoch, isr, 3, replicas)) + val updateMetadataRequest = new UpdateMetadataRequest(2, controllerEpoch, partitionStates.asJava, brokers.asJava) + cache.updateCache(15, updateMetadataRequest) + } + + val initialBrokerIds = (0 to 2).toSet + updateCache(initialBrokerIds) + val aliveBrokersFromCache = cache.getAliveBrokers + // This should not change `aliveBrokersFromCache` + updateCache((0 to 3).toSet) + assertEquals(initialBrokerIds, aliveBrokersFromCache.map(_.id).toSet) + } + }
