kafka-899; LeaderNotAvailableException the first time a new message for a 
partition is processed; patched by Jun Rao; reviewed by Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1caae2c2
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1caae2c2
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1caae2c2

Branch: refs/heads/trunk
Commit: 1caae2c2a10856ea2a31c8b82f1fae5b107a2a07
Parents: d93cbc6
Author: Jun Rao <jun...@gmail.com>
Authored: Thu May 30 22:16:47 2013 -0700
Committer: Jun Rao <jun...@gmail.com>
Committed: Thu May 30 22:16:47 2013 -0700

----------------------------------------------------------------------
 .../kafka/common/UnknownTopicException.scala    | 25 --------------
 .../producer/async/DefaultEventHandler.scala    | 36 ++++++++++++--------
 .../producer/async/ProducerSendThread.scala     |  2 +-
 3 files changed, 22 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1caae2c2/core/src/main/scala/kafka/common/UnknownTopicException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/UnknownTopicException.scala 
b/core/src/main/scala/kafka/common/UnknownTopicException.scala
deleted file mode 100644
index 710d3bf..0000000
--- a/core/src/main/scala/kafka/common/UnknownTopicException.scala
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.common
-
-/**
- * Thrown when a request is made for a topic, that hasn't been created in a 
Kafka cluster
- */
-class UnknownTopicException(message: String) extends RuntimeException(message) 
{
-  def this() = this(null)
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/1caae2c2/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala 
b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
index 1a74951..a00a0df 100644
--- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
+++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
@@ -149,7 +149,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
     try {
       for (message <- messages) {
         val topicPartitionsList = getPartitionListForTopic(message)
-        val partitionIndex = getPartition(message.key, topicPartitionsList)
+        val partitionIndex = getPartition(message.topic, message.key, 
topicPartitionsList)
         val brokerPartition = topicPartitionsList(partitionIndex)
 
         // postpone the failure until the send operation, so that requests for 
other brokers are handled correctly
@@ -177,9 +177,9 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
       }
       Some(ret)
     }catch {    // Swallow recoverable exceptions and return None so that they 
can be retried.
-      case ute: UnknownTopicException => warn("Failed to collate messages by 
topic,partition due to", ute); None
-      case lnae: LeaderNotAvailableException => warn("Failed to collate 
messages by topic,partition due to", lnae); None
-      case oe => error("Failed to collate messages by topic, partition due 
to", oe); None
+      case ute: UnknownTopicOrPartitionException => warn("Failed to collate 
messages by topic,partition due to: " + ute.getMessage); None
+      case lnae: LeaderNotAvailableException => warn("Failed to collate 
messages by topic,partition due to: " + lnae.getMessage); None
+      case oe => error("Failed to collate messages by topic, partition due to: 
" + oe.getMessage); None
     }
   }
 
@@ -200,25 +200,24 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
    * @param topicPartitionList the list of available partitions
    * @return the partition id
    */
-  private def getPartition(key: K, topicPartitionList: 
Seq[PartitionAndLeader]): Int = {
+  private def getPartition(topic: String, key: K, topicPartitionList: 
Seq[PartitionAndLeader]): Int = {
     val numPartitions = topicPartitionList.size
     if(numPartitions <= 0)
-      throw new UnknownTopicOrPartitionException("Invalid number of 
partitions: " + numPartitions +
-        "\n Valid values are > 0")
+      throw new UnknownTopicOrPartitionException("Topic " + topic + " doesn't 
exist")
     val partition =
       if(key == null) {
         // If the key is null, we don't really need a partitioner so we just 
send to the next
         // available partition
         val availablePartitions = 
topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined)
         if (availablePartitions.isEmpty)
-          throw new LeaderNotAvailableException("No leader for any partition")
+          throw new LeaderNotAvailableException("No leader for any partition 
in topic " + topic)
         val index = Utils.abs(partitionCounter.getAndIncrement()) % 
availablePartitions.size
         availablePartitions(index).partitionId
       } else
         partitioner.partition(key, numPartitions)
     if(partition < 0 || partition >= numPartitions)
-      throw new UnknownTopicOrPartitionException("Invalid partition id : " + 
partition +
-        "\n Valid values are in the range inclusive [0, " + (numPartitions-1) 
+ "]")
+      throw new UnknownTopicOrPartitionException("Invalid partition id: " + 
partition + " for topic " + topic +
+        "; Valid values are in the inclusive range of [0, " + 
(numPartitions-1) + "]")
     partition
   }
 
@@ -253,11 +252,18 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
             successfullySentData.foreach(m => 
messagesPerTopic(m._1).foreach(message =>
               trace("Successfully sent message: 
%s".format(Utils.readString(message.message.payload)))))
           }
-          failedTopicPartitions = response.status.filter(_._2.error != 
ErrorMapping.NoError).toSeq
-            .map(partitionStatus => partitionStatus._1)
-          if(failedTopicPartitions.size > 0)
-            error("Produce request with correlation id %d failed due to 
response %s. List of failed topic partitions is %s"
-              .format(currentCorrelationId, response.toString, 
failedTopicPartitions.mkString(",")))
+          val failedPartitionsAndStatus = response.status.filter(_._2.error != 
ErrorMapping.NoError).toSeq
+          failedTopicPartitions = 
failedPartitionsAndStatus.map(partitionStatus => partitionStatus._1)
+          if(failedTopicPartitions.size > 0) {
+            val errorString = failedPartitionsAndStatus
+              .sortWith((p1, p2) => p1._1.topic.compareTo(p2._1.topic) < 0 ||
+                                    (p1._1.topic.compareTo(p2._1.topic) == 0 
&& p1._1.partition < p2._1.partition))
+              .map{
+                case(topicAndPartition, status) =>
+                  topicAndPartition.toString + ": " + 
ErrorMapping.exceptionFor(status.error).getClass.getName
+              }.mkString(",")
+            warn("Produce request with correlation id %d failed due to 
%s".format(currentCorrelationId, errorString))
+          }
           failedTopicPartitions
         } else
           Seq.empty[TopicAndPartition]

http://git-wip-us.apache.org/repos/asf/kafka/blob/1caae2c2/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala 
b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
index 090400d..2b41a49 100644
--- a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
+++ b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
@@ -50,7 +50,7 @@ class ProducerSendThread[K,V](val threadName: String,
   }
 
   def shutdown = {
-    info("Beging shutting down ProducerSendThread")
+    info("Begin shutting down ProducerSendThread")
     queue.put(shutdownCommand)
     shutdownLatch.await
     info("Shutdown ProducerSendThread complete")

Reply via email to