Repository: samza Updated Branches: refs/heads/master 2a531b0bb -> 57aae364b
SAMZA-920: BrokerProxy.abdicateAll can get stuck on adding and removing the same partitions infinitely Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/57aae364 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/57aae364 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/57aae364 Branch: refs/heads/master Commit: 57aae364bb919afcf25164e0204d3a776c779411 Parents: 2a531b0 Author: Ivan Simoneko <[email protected]> Authored: Sat Apr 9 10:11:02 2016 -0700 Committer: Yi Pan (Data Infrastructure) <[email protected]> Committed: Sat Apr 9 10:11:02 2016 -0700 ---------------------------------------------------------------------- .../main/scala/org/apache/samza/system/kafka/BrokerProxy.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/57aae364/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala index 9aa9818..cbb8881 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala @@ -214,7 +214,8 @@ class BrokerProxy( * TopicAndPartition. */ def abdicateAll { - nextOffsets.keySet.foreach(abdicate(_)) + val immutableNextOffsetsCopy = nextOffsets.toMap + immutableNextOffsetsCopy.keySet.foreach(abdicate(_)) } def handleErrors(errorResponses: mutable.Set[Entry[TopicAndPartition, FetchResponsePartitionData]], response:FetchResponse) = {
