kafka-959; DefaultEventHandler can send more produce requests than necesary; patched by Guozhanh Wang; reviewed by Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/76d39052 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/76d39052 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/76d39052 Branch: refs/heads/trunk Commit: 76d39052f717cb6d9fdac6a516df298036b5ffff Parents: bc5620c Author: Guozhang Wang <[email protected]> Authored: Thu Jul 25 10:10:35 2013 -0700 Committer: Jun Rao <[email protected]> Committed: Thu Jul 25 10:10:35 2013 -0700 ---------------------------------------------------------------------- .../producer/async/DefaultEventHandler.scala | 27 +++++++++++++++----- 1 file changed, 20 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/76d39052/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala index a00a0df..f71a242 100644 --- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala +++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala @@ -45,12 +45,14 @@ class DefaultEventHandler[K,V](config: ProducerConfig, private val topicMetadataRefreshInterval = config.topicMetadataRefreshIntervalMs private var lastTopicMetadataRefreshTime = 0L private val topicMetadataToRefresh = Set.empty[String] + private val sendPartitionPerTopicCache = HashMap.empty[String, Int] private val producerStats = ProducerStatsRegistry.getProducerStats(config.clientId) private val producerTopicStats = ProducerTopicStatsRegistry.getProducerTopicStats(config.clientId) def handle(events: Seq[KeyedMessage[K,V]]) { lock synchronized { + sendPartitionPerTopicCache.clear() val serializedData = serialize(events) serializedData.foreach { keyed => @@ -206,18 +208,29 @@ class DefaultEventHandler[K,V](config: ProducerConfig, throw new UnknownTopicOrPartitionException("Topic " + topic + " doesn't exist") val partition = if(key == null) { - // If the key is null, we don't really need a partitioner so we just send to the next - // available partition - val availablePartitions = topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined) - if (availablePartitions.isEmpty) - throw new LeaderNotAvailableException("No leader for any partition in topic " + topic) - val index = Utils.abs(partitionCounter.getAndIncrement()) % availablePartitions.size - availablePartitions(index).partitionId + // If the key is null, we don't really need a partitioner + // So we look up in the send partition cache for the topic to decide the target partition + val id = sendPartitionPerTopicCache.get(topic) + id match { + case Some(partitionId) => + // directly return the partitionId without checking availability of the leader, + // since we want to postpone the failure until the send operation anyways + partitionId + case None => + val availablePartitions = topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined) + if (availablePartitions.isEmpty) + throw new LeaderNotAvailableException("No leader for any partition in topic " + topic) + val index = Utils.abs(partitionCounter.getAndIncrement()) % availablePartitions.size + val partitionId = availablePartitions(index).partitionId + sendPartitionPerTopicCache.put(topic, partitionId) + partitionId + } } else partitioner.partition(key, numPartitions) if(partition < 0 || partition >= numPartitions) throw new UnknownTopicOrPartitionException("Invalid partition id: " + partition + " for topic " + topic + "; Valid values are in the inclusive range of [0, " + (numPartitions-1) + "]") + trace("Assigning message of topic %s and key %s to a selected partition %d".format(topic, if (key == null) "[none]" else key.toString, partition)) partition }
