Repository: kafka Updated Branches: refs/heads/trunk 9e5d481c7 -> bfbd3acbf
KAFKA-2056; Fix transient testRangePartitionAssignor failure; reviewed by Guozhang Wang Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bfbd3acb Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bfbd3acb Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bfbd3acb Branch: refs/heads/trunk Commit: bfbd3acbf71fa1b913de154f7ffa12aead28a2d2 Parents: 9e5d481 Author: Fangmin Lv <[email protected]> Authored: Wed Apr 15 11:16:58 2015 -0700 Committer: Guozhang Wang <[email protected]> Committed: Wed Apr 15 11:16:58 2015 -0700 ---------------------------------------------------------------------- core/src/main/scala/kafka/consumer/PartitionAssignor.scala | 6 ++++++ .../main/scala/kafka/consumer/ZookeeperConsumerConnector.scala | 4 +--- 2 files changed, 7 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/bfbd3acb/core/src/main/scala/kafka/consumer/PartitionAssignor.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala index 4afda8b..849284a 100755 --- a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala +++ b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala @@ -112,6 +112,9 @@ class RoundRobinAssignor() extends PartitionAssignor with Logging { assignmentForConsumer += (topicPartition -> threadId) }) } + + // assign Map.empty for the consumers which are not associated with topic partitions + ctx.consumers.foreach(consumerId => partitionAssignment.getAndMaybePut(consumerId)) partitionAssignment } } @@ -164,6 +167,9 @@ class RangeAssignor() extends PartitionAssignor with Logging { } } } + + // assign Map.empty for the consumers which are not associated with topic partitions + ctx.consumers.foreach(consumerId => partitionAssignment.getAndMaybePut(consumerId)) partitionAssignment } } http://git-wip-us.apache.org/repos/asf/kafka/blob/bfbd3acb/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index e250b94..aa8d940 100755 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -684,9 +684,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, releasePartitionOwnership(topicRegistry) val assignmentContext = new AssignmentContext(group, consumerIdString, config.excludeInternalTopics, zkClient) val globalPartitionAssignment = partitionAssignor.assign(assignmentContext) - val partitionAssignment = Option(globalPartitionAssignment.get(assignmentContext.consumerId)).getOrElse( - mutable.HashMap.empty[TopicAndPartition, ConsumerThreadId] - ) + val partitionAssignment = globalPartitionAssignment.get(assignmentContext.consumerId) val currentTopicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]]( valueFactory = Some((topic: String) => new Pool[Int, PartitionTopicInfo]))
