Updated Branches: refs/heads/master ac11fd92d -> 49134a47b
SAMZA-83: Discard rather than throw exception for Kafka topics with non-existent partitions Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/49134a47 Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/49134a47 Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/49134a47 Branch: refs/heads/master Commit: 49134a47bc6916c1ab705ac21b8a97d11d211c8c Parents: ac11fd9 Author: Jakob Glen Homan <[email protected]> Authored: Mon Nov 11 11:03:26 2013 -0800 Committer: Jakob Glen Homan <[email protected]> Committed: Mon Nov 11 11:03:26 2013 -0800 ---------------------------------------------------------------------- .../org/apache/samza/system/kafka/KafkaSystemConsumer.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/49134a47/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala index b8e17ce..9b83259 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala @@ -38,6 +38,7 @@ import org.apache.samza.util.BlockingEnvelopeMap import org.apache.samza.system.SystemStreamPartition import org.apache.samza.system.IncomingMessageEnvelope import java.nio.charset.Charset +import kafka.api.PartitionMetadata object KafkaSystemConsumer { def toTopicAndPartition(systemStreamPartition: SystemStreamPartition) = { @@ -117,8 +118,7 @@ private[kafka] class KafkaSystemConsumer( val brokerOption = partitionMetadata(topicAndPartition.topic) .partitionsMetadata .find(_.partitionId == topicAndPartition.partition) - .getOrElse(toss("Can't find leader for %s" format topicAndPartition)) - .leader + .flatMap(_.leader) brokerOption match { case Some(broker) => @@ -127,7 +127,7 @@ private[kafka] class KafkaSystemConsumer( }) brokerProxy.addTopicPartition(topicAndPartition, lastOffset) - case _ => warn("Broker for %s not defined! " format topicAndPartition) + case _ => warn("No such topic-partition: %s, dropping." format topicAndPartition) } }
