kafka-947; isr-expiration-thread may block LeaderAndIsr request for a relatively long period; patched by Jun Rao; reviewed by Joel Koshy
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/cc6027b3 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/cc6027b3 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/cc6027b3 Branch: refs/heads/trunk Commit: cc6027b3279ac5cd8908d01d35511c93716fbe60 Parents: 7b43f01 Author: Jun Rao <jun...@gmail.com> Authored: Thu Jun 20 13:22:56 2013 -0700 Committer: Jun Rao <jun...@gmail.com> Committed: Thu Jun 20 13:22:56 2013 -0700 ---------------------------------------------------------------------- core/src/main/scala/kafka/server/ReplicaManager.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/cc6027b3/core/src/main/scala/kafka/server/ReplicaManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 9d41e82..d885ba1 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -282,9 +282,11 @@ class ReplicaManager(val config: KafkaConfig, private def maybeShrinkIsr(): Unit = { trace("Evaluating ISR list of partitions to see which replicas can be removed from the ISR") + var curLeaderPartitions: List[Partition] = null leaderPartitionsLock synchronized { - leaderPartitions.foreach(partition => partition.maybeShrinkIsr(config.replicaLagTimeMaxMs, config.replicaLagMaxMessages)) + curLeaderPartitions = leaderPartitions.toList } + curLeaderPartitions.foreach(partition => partition.maybeShrinkIsr(config.replicaLagTimeMaxMs, config.replicaLagMaxMessages)) } def recordFollowerPosition(topic: String, partitionId: Int, replicaId: Int, offset: Long) = {