Updated Branches:
  refs/heads/master ac11fd92d -> 49134a47b

SAMZA-83: Discard rather than throw exception for Kafka topics with
non-existent partitions


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/49134a47
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/49134a47
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/49134a47

Branch: refs/heads/master
Commit: 49134a47bc6916c1ab705ac21b8a97d11d211c8c
Parents: ac11fd9
Author: Jakob Glen Homan <[email protected]>
Authored: Mon Nov 11 11:03:26 2013 -0800
Committer: Jakob Glen Homan <[email protected]>
Committed: Mon Nov 11 11:03:26 2013 -0800

----------------------------------------------------------------------
 .../org/apache/samza/system/kafka/KafkaSystemConsumer.scala    | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/49134a47/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
index b8e17ce..9b83259 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
@@ -38,6 +38,7 @@ import org.apache.samza.util.BlockingEnvelopeMap
 import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.system.IncomingMessageEnvelope
 import java.nio.charset.Charset
+import kafka.api.PartitionMetadata
 
 object KafkaSystemConsumer {
   def toTopicAndPartition(systemStreamPartition: SystemStreamPartition) = {
@@ -117,8 +118,7 @@ private[kafka] class KafkaSystemConsumer(
             val brokerOption = partitionMetadata(topicAndPartition.topic)
               .partitionsMetadata
               .find(_.partitionId == topicAndPartition.partition)
-              .getOrElse(toss("Can't find leader for %s" format 
topicAndPartition))
-              .leader
+              .flatMap(_.leader)
 
             brokerOption match {
               case Some(broker) =>
@@ -127,7 +127,7 @@ private[kafka] class KafkaSystemConsumer(
                 })
 
                 brokerProxy.addTopicPartition(topicAndPartition, lastOffset)
-              case _ => warn("Broker for %s not defined! " format 
topicAndPartition)
+              case _ => warn("No such topic-partition: %s, dropping." format 
topicAndPartition)
             }
         }
 

Reply via email to