kafka-899; LeaderNotAvailableException the first time a new message for a partition is processed; patched by Jun Rao; reviewed by Neha Narkhede
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1caae2c2 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1caae2c2 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1caae2c2 Branch: refs/heads/trunk Commit: 1caae2c2a10856ea2a31c8b82f1fae5b107a2a07 Parents: d93cbc6 Author: Jun Rao <jun...@gmail.com> Authored: Thu May 30 22:16:47 2013 -0700 Committer: Jun Rao <jun...@gmail.com> Committed: Thu May 30 22:16:47 2013 -0700 ---------------------------------------------------------------------- .../kafka/common/UnknownTopicException.scala | 25 -------------- .../producer/async/DefaultEventHandler.scala | 36 ++++++++++++-------- .../producer/async/ProducerSendThread.scala | 2 +- 3 files changed, 22 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/1caae2c2/core/src/main/scala/kafka/common/UnknownTopicException.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/common/UnknownTopicException.scala b/core/src/main/scala/kafka/common/UnknownTopicException.scala deleted file mode 100644 index 710d3bf..0000000 --- a/core/src/main/scala/kafka/common/UnknownTopicException.scala +++ /dev/null @@ -1,25 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.common - -/** - * Thrown when a request is made for a topic, that hasn't been created in a Kafka cluster - */ -class UnknownTopicException(message: String) extends RuntimeException(message) { - def this() = this(null) -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/1caae2c2/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala index 1a74951..a00a0df 100644 --- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala +++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala @@ -149,7 +149,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig, try { for (message <- messages) { val topicPartitionsList = getPartitionListForTopic(message) - val partitionIndex = getPartition(message.key, topicPartitionsList) + val partitionIndex = getPartition(message.topic, message.key, topicPartitionsList) val brokerPartition = topicPartitionsList(partitionIndex) // postpone the failure until the send operation, so that requests for other brokers are handled correctly @@ -177,9 +177,9 @@ class DefaultEventHandler[K,V](config: ProducerConfig, } Some(ret) }catch { // Swallow recoverable exceptions and return None so that they can be retried. - case ute: UnknownTopicException => warn("Failed to collate messages by topic,partition due to", ute); None - case lnae: LeaderNotAvailableException => warn("Failed to collate messages by topic,partition due to", lnae); None - case oe => error("Failed to collate messages by topic, partition due to", oe); None + case ute: UnknownTopicOrPartitionException => warn("Failed to collate messages by topic,partition due to: " + ute.getMessage); None + case lnae: LeaderNotAvailableException => warn("Failed to collate messages by topic,partition due to: " + lnae.getMessage); None + case oe => error("Failed to collate messages by topic, partition due to: " + oe.getMessage); None } } @@ -200,25 +200,24 @@ class DefaultEventHandler[K,V](config: ProducerConfig, * @param topicPartitionList the list of available partitions * @return the partition id */ - private def getPartition(key: K, topicPartitionList: Seq[PartitionAndLeader]): Int = { + private def getPartition(topic: String, key: K, topicPartitionList: Seq[PartitionAndLeader]): Int = { val numPartitions = topicPartitionList.size if(numPartitions <= 0) - throw new UnknownTopicOrPartitionException("Invalid number of partitions: " + numPartitions + - "\n Valid values are > 0") + throw new UnknownTopicOrPartitionException("Topic " + topic + " doesn't exist") val partition = if(key == null) { // If the key is null, we don't really need a partitioner so we just send to the next // available partition val availablePartitions = topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined) if (availablePartitions.isEmpty) - throw new LeaderNotAvailableException("No leader for any partition") + throw new LeaderNotAvailableException("No leader for any partition in topic " + topic) val index = Utils.abs(partitionCounter.getAndIncrement()) % availablePartitions.size availablePartitions(index).partitionId } else partitioner.partition(key, numPartitions) if(partition < 0 || partition >= numPartitions) - throw new UnknownTopicOrPartitionException("Invalid partition id : " + partition + - "\n Valid values are in the range inclusive [0, " + (numPartitions-1) + "]") + throw new UnknownTopicOrPartitionException("Invalid partition id: " + partition + " for topic " + topic + + "; Valid values are in the inclusive range of [0, " + (numPartitions-1) + "]") partition } @@ -253,11 +252,18 @@ class DefaultEventHandler[K,V](config: ProducerConfig, successfullySentData.foreach(m => messagesPerTopic(m._1).foreach(message => trace("Successfully sent message: %s".format(Utils.readString(message.message.payload))))) } - failedTopicPartitions = response.status.filter(_._2.error != ErrorMapping.NoError).toSeq - .map(partitionStatus => partitionStatus._1) - if(failedTopicPartitions.size > 0) - error("Produce request with correlation id %d failed due to response %s. List of failed topic partitions is %s" - .format(currentCorrelationId, response.toString, failedTopicPartitions.mkString(","))) + val failedPartitionsAndStatus = response.status.filter(_._2.error != ErrorMapping.NoError).toSeq + failedTopicPartitions = failedPartitionsAndStatus.map(partitionStatus => partitionStatus._1) + if(failedTopicPartitions.size > 0) { + val errorString = failedPartitionsAndStatus + .sortWith((p1, p2) => p1._1.topic.compareTo(p2._1.topic) < 0 || + (p1._1.topic.compareTo(p2._1.topic) == 0 && p1._1.partition < p2._1.partition)) + .map{ + case(topicAndPartition, status) => + topicAndPartition.toString + ": " + ErrorMapping.exceptionFor(status.error).getClass.getName + }.mkString(",") + warn("Produce request with correlation id %d failed due to %s".format(currentCorrelationId, errorString)) + } failedTopicPartitions } else Seq.empty[TopicAndPartition] http://git-wip-us.apache.org/repos/asf/kafka/blob/1caae2c2/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala index 090400d..2b41a49 100644 --- a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala +++ b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala @@ -50,7 +50,7 @@ class ProducerSendThread[K,V](val threadName: String, } def shutdown = { - info("Beging shutting down ProducerSendThread") + info("Begin shutting down ProducerSendThread") queue.put(shutdownCommand) shutdownLatch.await info("Shutdown ProducerSendThread complete")