Updated Branches:
  refs/heads/master 527113b40 -> e89978461

SAMZA-87: BrokerProxy doesn't properly handle offset out of range exceptions.


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/e8997846
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/e8997846
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/e8997846

Branch: refs/heads/master
Commit: e899784618e45a8b304b7277003dba1003c01e74
Parents: 527113b
Author: Jakob Homan <[email protected]>
Authored: Mon Dec 2 13:06:38 2013 -0800
Committer: Jakob Homan <[email protected]>
Committed: Mon Dec 2 13:06:38 2013 -0800

----------------------------------------------------------------------
 build.gradle                                    |   4 +-
 .../apache/samza/system/kafka/BrokerProxy.scala | 154 +++++++++++--------
 .../samza/system/kafka/DefaultFetch.scala       |  48 ------
 .../apache/samza/system/kafka/GetOffset.scala   |  45 ++++--
 .../system/kafka/KafkaSystemConsumer.scala      |  75 +++++----
 .../samza/system/kafka/TestBrokerProxy.scala    | 101 ++++++++++--
 6 files changed, 247 insertions(+), 180 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e8997846/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index f30128f..556a0a3 100644
--- a/build.gradle
+++ b/build.gradle
@@ -80,6 +80,9 @@ project(":samza-kafka_$scalaVersion") {
     // these can all go away when kafka is in maven
     testCompile files("lib/kafka_$scalaVersion-$kafkaVersion-test.jar")
     // end these can all go away when kafka is in maven
+
+    // Logging in tests is good.
+    testRuntime "org.slf4j:slf4j-simple:1.6.2"
   }
 
   test {
@@ -192,4 +195,3 @@ project(":samza-test_$scalaVersion") {
     maxHeapSize = "1024m"
   }
 }
-

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e8997846/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
index f4f616e..7db32c0 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
@@ -21,20 +21,15 @@
 
 package org.apache.samza.system.kafka
 
-import kafka.consumer.SimpleConsumer
 import kafka.api._
-import kafka.common.ErrorMapping
-import java.util.concurrent.{ CountDownLatch, ConcurrentHashMap }
+import kafka.common.{NotLeaderForPartitionException, 
UnknownTopicOrPartitionException, ErrorMapping, TopicAndPartition}
+import java.util.concurrent.{ConcurrentHashMap, CountDownLatch}
 import scala.collection.JavaConversions._
-import org.apache.samza.config.Config
-import org.apache.samza.util.KafkaUtil
-import org.apache.samza.config.KafkaConfig.Config2Kafka
-import org.apache.samza.config.StreamConfig.Config2Stream
-import org.apache.samza.metrics.MetricsRegistry
-import kafka.common.TopicAndPartition
 import kafka.message.MessageSet
 import grizzled.slf4j.Logging
 import java.nio.channels.ClosedByInterruptException
+import java.util.Map.Entry
+import scala.collection.mutable
 
 /**
  * A BrokerProxy consolidates Kafka fetches meant for a particular broker and 
retrieves them all at once, providing
@@ -58,7 +53,7 @@ abstract class BrokerProxy(
   val sleepMSWhileNoTopicPartitions = 1000
 
   /** What's the next offset for a particular partition? **/
-  val nextOffsets: ConcurrentHashMap[TopicAndPartition, Long] = new 
ConcurrentHashMap[TopicAndPartition, Long]()
+  val nextOffsets:mutable.ConcurrentMap[TopicAndPartition, Long] = new 
ConcurrentHashMap[TopicAndPartition, Long]()
 
   /** Block on the first call to get message if the fetcher has not yet 
returned its initial results **/
   // TODO: It should be sufficient to just use the count down latch and await 
on it for each of the calls, but
@@ -75,7 +70,7 @@ abstract class BrokerProxy(
     val hostString = "%s:%d" format (host, port)
     info("Creating new SimpleConsumer for host %s for system %s" format 
(hostString, system))
 
-    val sc = new SimpleConsumer(host, port, timeout, bufferSize, clientID) 
with DefaultFetch {
+    val sc = new DefaultFetchSimpleConsumer(host, port, timeout, bufferSize, 
clientID) {
       val fetchSize: Int = 256 * 1024
     }
 
@@ -98,7 +93,7 @@ abstract class BrokerProxy(
       metrics.topicPartitions(host, port).set(nextOffsets.size)
       debug("Removed %s" format tp)
     } else {
-      warn("Asked to remove topic and partition %s, but not in map (keys = 
%s)" format (tp, nextOffsets.keys().mkString(",")))
+      warn("Asked to remove topic and partition %s, but not in map (keys = 
%s)" format (tp, nextOffsets.keys.mkString(",")))
     }
   }
 
@@ -116,14 +111,14 @@ abstract class BrokerProxy(
           } catch {
             // If we're interrupted, don't try and reconnect. We should shut 
down.
             case e: InterruptedException =>
-              debug("Shutting down due to interrupt exception.")
+              warn("Shutting down due to interrupt exception.")
               Thread.currentThread.interrupt
             case e: ClosedByInterruptException =>
-              debug("Shutting down due to closed by interrupt exception.")
+              warn("Shutting down due to closed by interrupt exception.")
               Thread.currentThread.interrupt
             case e: Throwable => {
               warn("Recreating simple consumer and retrying connection")
-              debug("Stack trace for fetchMessages exception.", e)
+              warn("Stack trace for fetchMessages exception.", e)
               simpleConsumer.close()
               simpleConsumer = createSimpleConsumer()
               metrics.reconnects(host, port).inc
@@ -131,7 +126,6 @@ abstract class BrokerProxy(
           }
         }
       }
-
     }
   }, "BrokerProxy thread pointed at %s:%d for client %s" format (host, port, 
clientID))
 
@@ -140,78 +134,104 @@ abstract class BrokerProxy(
     val response: FetchResponse = 
simpleConsumer.defaultFetch(nextOffsets.filterKeys(messageSink.needsMoreMessages(_)).toList:
 _*)
     firstCall = false
     firstCallBarrier.countDown()
-    if (response.hasError) {
-      // FetchResponse should really return Option and a list of the errors so 
we don't have to find them ourselves
-      case class Error(tp: TopicAndPartition, code: Short, exception: 
Throwable)
-
-      val errors = for (
-        error <- response.data.entrySet.filter(_.getValue.error != 
ErrorMapping.NoError);
-        errorCode <- Option(response.errorCode(error.getKey.topic, 
error.getKey.partition)); // Scala's being cranky about referring to 
error.getKey values...
-        exception <- Option(ErrorMapping.exceptionFor(errorCode))
-      ) yield new Error(error.getKey, errorCode, exception)
-
-      val (notLeaders, otherErrors) = errors.partition(_.code == 
ErrorMapping.NotLeaderForPartitionCode)
-
-      if (!notLeaders.isEmpty) {
-        info("Abdicating. Got not leader exception for: " + 
notLeaders.mkString(","))
-
-        notLeaders.foreach(e => {
-          // Go back one message, since the fetch for nextOffset failed, and 
-          // abdicate requires lastOffset, not nextOffset.
-          messageSink.abdicate(e.tp, nextOffsets.remove(e.tp) - 1)
-        })
-      }
 
-      if (!otherErrors.isEmpty) {
-        warn("Got error codes during multifetch. Throwing an exception to 
trigger reconnect. Errors: %s" format errors.mkString(","))
-        otherErrors.foreach(e => ErrorMapping.maybeThrowException(e.code)) // 
One will get thrown
-      }
-    }
+    // Split response into errors and non errors, processing the errors first
+    val (nonErrorResponses, errorResponses) = 
response.data.entrySet().partition(_.getValue.error == ErrorMapping.NoError)
 
-    def moveMessagesToTheirQueue(tp: TopicAndPartition, data: 
FetchResponsePartitionData) = {
-      val messageSet: MessageSet = data.messages
-      var nextOffset = nextOffsets(tp)
+    handleErrors(errorResponses, response)
 
-      messageSink.setIsAtHighWatermark(tp, data.hw == 0 || data.hw == 
nextOffset)
-
-      for (message <- messageSet.iterator) {
-        messageSink.addMessage(tp, message, data.hw) // TODO: Verify this is 
correct
+    nonErrorResponses.foreach { nonError => 
moveMessagesToTheirQueue(nonError.getKey, nonError.getValue) }
+  }
 
-        nextOffset = message.nextOffset
+  def handleErrors(errorResponses: mutable.Set[Entry[TopicAndPartition, 
FetchResponsePartitionData]], response:FetchResponse) = {
+    // Need to be mindful of a tp that was removed by another thread
+    def abdicate(tp:TopicAndPartition) = nextOffsets.remove(tp) match {
+        case Some(offset) => messageSink.abdicate(tp, offset -1)
+        case None         => warn("Tried to abdicate for topic partition not 
in map. Removed in interim?")
+      }
 
-        val bytesSize = message.message.payloadSize + message.message.keySize
-        metrics.reads(tp).inc
-        metrics.bytesRead(tp).inc(bytesSize)
-        metrics.brokerBytesRead(host, port).inc(bytesSize)
-        metrics.offsets(tp).set(nextOffset)
+    // FetchResponse should really return Option and a list of the errors so 
we don't have to find them ourselves
+    case class Error(tp: TopicAndPartition, code: Short, exception: Throwable)
+
+    // Now subdivide the errors into three types: non-recoverable, not leader 
(== abdicate) and offset out of range (== get new offset)
+
+    // Convert FetchResponse into easier-to-work-with Errors
+    val errors = for (
+      error <- errorResponses;
+      errorCode <- Option(response.errorCode(error.getKey.topic, 
error.getKey.partition)); // Scala's being cranky about referring to 
error.getKey values...
+      exception <- Option(ErrorMapping.exceptionFor(errorCode))
+    ) yield new Error(error.getKey, errorCode, exception)
+
+    val (notLeaders, otherErrors) = errors.partition(_.code == 
ErrorMapping.NotLeaderForPartitionCode)
+    val (offsetOutOfRangeErrors, remainingErrors) = 
otherErrors.partition(_.code == ErrorMapping.OffsetOutOfRangeCode)
+
+    // Can recover from two types of errors: not leader (go find the new 
leader) and offset out of range (go get the new offset)
+    // However, we want to bail as quickly as possible if there are non 
recoverable errors so that the state of the other
+    // topic-partitions remains the same.  That way, when we've rebuilt the 
simple consumer, we can come around and
+    // handle the recoverable errors.
+    remainingErrors.foreach(e => {
+      warn("Got non-recoverable error codes during multifetch. Throwing an 
exception to trigger reconnect. Errors: %s" format 
remainingErrors.mkString(","))
+      ErrorMapping.maybeThrowException(e.code) })
+
+    // Go back one message, since the fetch for nextOffset failed, and
+    // abdicate requires lastOffset, not nextOffset.
+    notLeaders.foreach(e => abdicate(e.tp))
+
+    offsetOutOfRangeErrors.foreach(e => {
+      warn("Received OffsetOutOfRange exception for %s. Current offset = %s" 
format (e.tp, nextOffsets.getOrElse(e.tp, "not found in map, likely removed in 
the interim")))
+
+      try {
+        val newOffset = offsetGetter.getNextOffset(simpleConsumer, e.tp, null)
+        // Put the new offset into the map (if the tp still exists).  Will 
catch it on the next go-around
+        nextOffsets.replace(e.tp, newOffset)
+      } catch {
+        // UnknownTopic or NotLeader are routine events and handled via 
abdication.  All others, bail.
+        case _ @ (_:UnknownTopicOrPartitionException | _: 
NotLeaderForPartitionException) => warn("Received 
(UnknownTopicOr|NotLeaderFor)Partition exception. Abdicating")
+                                                                               
              abdicate(e.tp)
+        case other => throw other
       }
+    })
+  }
 
-      nextOffsets.replace(tp, nextOffset) // use replace rather than put in 
case this tp was removed while we were fetching.
+  def moveMessagesToTheirQueue(tp: TopicAndPartition, data: 
FetchResponsePartitionData) = {
+    val messageSet: MessageSet = data.messages
+    var nextOffset = nextOffsets(tp)
 
-      // Update high water mark
-      val hw = data.hw
-      if (hw >= 0) {
-        metrics.lag(tp).set(hw - nextOffset)
-      } else {
-        debug("Got a high water mark less than 0 (%d) for %s, so skipping." 
format (hw, tp))
-      }
+    messageSink.setIsAtHighWatermark(tp, data.hw == 0 || data.hw == nextOffset)
+    require(messageSet != null)
+    for (message <- messageSet.iterator) {
+      messageSink.addMessage(tp, message, data.hw) // TODO: Verify this is 
correct
+
+      nextOffset = message.nextOffset
+
+      val bytesSize = message.message.payloadSize + message.message.keySize
+      metrics.reads(tp).inc
+      metrics.bytesRead(tp).inc(bytesSize)
+      metrics.brokerBytesRead(host, port).inc(bytesSize)
+      metrics.offsets(tp).set(nextOffset)
     }
 
-    response.data.foreach { case (tp, data) => moveMessagesToTheirQueue(tp, 
data) }
+    nextOffsets.replace(tp, nextOffset) // use replace rather than put in case 
this tp was removed while we were fetching.
 
+    // Update high water mark
+    val hw = data.hw
+    if (hw >= 0) {
+      metrics.lag(tp).set(hw - nextOffset)
+    } else {
+      debug("Got a high water mark less than 0 (%d) for %s, so skipping." 
format (hw, tp))
+    }
   }
-
   override def toString() = "BrokerProxy for %s:%d" format (host, port)
 
   def start {
-    debug("Starting broker proxy for %s:%s." format (host, port))
+    info("Starting " + toString)
 
     thread.setDaemon(true)
     thread.start
   }
 
   def stop {
-    debug("Shutting down broker proxy for %s:%s." format (host, port))
+    info("Shutting down " + toString)
 
     thread.interrupt
     thread.join

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e8997846/samza-kafka/src/main/scala/org/apache/samza/system/kafka/DefaultFetch.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/DefaultFetch.scala 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/DefaultFetch.scala
deleted file mode 100644
index 41710f2..0000000
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/DefaultFetch.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.samza.system.kafka
-
-import kafka.consumer.SimpleConsumer
-import kafka.api.FetchRequestBuilder
-import kafka.common.TopicAndPartition
-
-/**
- * Extension to a SimpleConsumer that defines the default parameters necessary 
for default fetch requests.  Builds
- * such a fetch request, requests the fetch and returns the result
- */
-trait DefaultFetch {
-  self:SimpleConsumer =>
-  val maxWait:Int = Int.MaxValue
-  val minBytes:Int = 1
-  val clientId:String
-  val fetchSize:Int
-
-  def defaultFetch(fetches:(TopicAndPartition, Long)*) = {
-    val fbr = new FetchRequestBuilder().maxWait(1000)
-                                       .minBytes(minBytes)
-                                       .clientId(clientId)
-
-    fetches.foreach(f => fbr.addFetch(f._1.topic, f._1.partition, f._2, 
fetchSize))
-
-    this.fetch(fbr.build())
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e8997846/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala
index 326d6c9..7ad5435 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala
@@ -21,15 +21,18 @@
 
 package org.apache.samza.system.kafka
 
-import kafka.consumer.SimpleConsumer
 import kafka.common.{ OffsetOutOfRangeException, ErrorMapping }
 import kafka.api._
-import org.apache.samza.config.KafkaConfig
-import org.apache.samza.config.KafkaConfig.Config2Kafka
 import kafka.common.TopicAndPartition
 import kafka.api.PartitionOffsetRequestInfo
 import grizzled.slf4j.Logging
+import kafka.message.MessageAndOffset
 
+/**
+ * Obtain the correct offsets for topics, be it earliest or largest
+ * @param default Value to return if no offset has been specified for topic
+ * @param autoOffsetResetTopics Topics that have been specified as auto offset
+ */
 class GetOffset(default: String, autoOffsetResetTopics: Map[String, String] = 
Map()) extends Logging with Toss {
 
   private def getAutoOffset(topic: String): Long = {
@@ -48,11 +51,11 @@ class GetOffset(default: String, autoOffsetResetTopics: 
Map[String, String] = Ma
   /**
    *  An offset was provided but may not be valid.  Verify its validity.
    */
-  private def useLastCheckpointedOffset(sc: DefaultFetch, last: String, tp: 
TopicAndPartition): Option[Long] = {
+  private def useLastCheckpointedOffset(sc: DefaultFetchSimpleConsumer, last: 
String, tp: TopicAndPartition): Option[Long] = {
     try {
       info("Validating offset %s for topic and partition %s" format (last, tp))
 
-      val messages = sc.defaultFetch((tp, last.toLong))
+      val messages: FetchResponse = sc.defaultFetch((tp, last.toLong))
 
       if (messages.hasError) {
         ErrorMapping.maybeThrowException(messages.errorCode(tp.topic, 
tp.partition))
@@ -60,28 +63,38 @@ class GetOffset(default: String, autoOffsetResetTopics: 
Map[String, String] = Ma
 
       info("Able to successfully read from offset %s for topic and partition 
%s. Using it to instantiate consumer." format (last, tp))
 
-      val nextOffset = messages
-        .messageSet(tp.topic, tp.partition)
-        .head
-        .nextOffset
-
-      info("Got next offset %s for %s." format (nextOffset, tp))
+      val messageSet = messages.messageSet(tp.topic, tp.partition)
 
-      Some(nextOffset)
+      if(messageSet.isEmpty) { // No messages have been written since our 
checkpoint
+        info("Got empty response for checkpointed offset, using checkpointed")
+        Some(last.toLong)
+      } else {
+        val nextOffset = messageSet.head.nextOffset
+        info("Got next offset %s for %s." format (nextOffset, tp))
+        Some(nextOffset)
+      }
     } catch {
       case e: OffsetOutOfRangeException =>
-        info("An out of range Kafka offset (%s) was supplied for topic and 
partition %s, so falling back to autooffset.reset." format (last, tp))
+        info("An out-of-range Kafka offset (%s) was supplied for topic and 
partition %s, so falling back to autooffset.reset." format (last, tp))
         None
     }
   }
 
-  def getNextOffset(sc: SimpleConsumer with DefaultFetch, tp: 
TopicAndPartition, lastCheckpointedOffset: String): Long = {
+  /**
+   * Using the provided SimpleConsumer, obtain the next offset to read for the 
specified topic
+   * @param sc SimpleConsumer used to query the Kafka Broker
+   * @param tp TopicAndPartition we offset for
+   * @param lastCheckpointedOffset Null is acceptable. If not null, return the 
last checkpointed offset, after checking it is valid
+   * @return Next offset to read or throw an exception if one has been 
received via the simple consumer
+   */
+  def getNextOffset(sc: DefaultFetchSimpleConsumer, tp: TopicAndPartition, 
lastCheckpointedOffset: String): Long = {
     val offsetRequest = new OffsetRequest(Map(tp -> new 
PartitionOffsetRequestInfo(getAutoOffset(tp.topic), 1)))
     val offsetResponse = sc.getOffsetsBefore(offsetRequest)
     val partitionOffsetResponse = 
offsetResponse.partitionErrorAndOffsets.get(tp).getOrElse(toss("Unable to find 
offset information for %s" format tp))
-    val autoOffset = 
partitionOffsetResponse.offsets.headOption.getOrElse(toss("Got response, but no 
offsets defined for %s" format tp))
 
-    info("Got offset %d for topic and partition %s" format (autoOffset, tp))
+    ErrorMapping.maybeThrowException(partitionOffsetResponse.error)
+
+    val autoOffset = 
partitionOffsetResponse.offsets.headOption.getOrElse(toss("Got response, but no 
offsets defined for %s" format tp))
 
     val actualOffset = Option(lastCheckpointedOffset) match {
       case Some(last) => useLastCheckpointedOffset(sc, last, 
tp).getOrElse(autoOffset)

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e8997846/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
index 7624a8e..2b73c61 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
@@ -19,14 +19,9 @@
 
 package org.apache.samza.system.kafka
 
-import org.apache.samza.util.{ KafkaUtil, ClientUtilTopicMetadataStore }
+import org.apache.samza.util.ClientUtilTopicMetadataStore
 import kafka.common.TopicAndPartition
-import org.apache.samza.config.{ KafkaConfig, Config }
-import org.apache.samza.SamzaException
-import org.apache.samza.config.KafkaConfig.Config2Kafka
-import org.apache.samza.metrics.MetricsRegistry
 import grizzled.slf4j.Logging
-import scala.collection.JavaConversions._
 import kafka.message.MessageAndOffset
 import org.apache.samza.Partition
 import kafka.utils.Utils
@@ -37,8 +32,6 @@ import kafka.serializer.Decoder
 import org.apache.samza.util.BlockingEnvelopeMap
 import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.system.IncomingMessageEnvelope
-import java.nio.charset.Charset
-import kafka.api.PartitionMetadata
 
 object KafkaSystemConsumer {
   def toTopicAndPartition(systemStreamPartition: SystemStreamPartition) = {
@@ -97,45 +90,48 @@ private[kafka] class KafkaSystemConsumer(
   }
 
   def refreshBrokers(topicPartitionsAndOffsets: Map[TopicAndPartition, 
String]) {
-    var done = false
-
-    while (!done) {
+    var tpToRefresh = topicPartitionsAndOffsets.keySet.toList
+    while (!tpToRefresh.isEmpty) {
       try {
         val getTopicMetadata = (topics: Set[String]) => {
           new ClientUtilTopicMetadataStore(brokerListString, 
clientId).getTopicInfo(topics)
         }
-
-        val partitionMetadata = TopicMetadataCache.getTopicMetadata(
-          topicPartitionsAndOffsets.keys.map(_.topic).toSet,
-          systemName,
-          getTopicMetadata)
-
-        topicPartitionsAndOffsets.map {
-          case (topicAndPartition, lastOffset) =>
-            // TODO whatever we do, we can't say Broker, even though we're 
-            // manipulating it here. Broker is a private type and Scala 
doesn't seem 
-            // to care about that as long as you don't explicitly declare its 
type.
-            val brokerOption = partitionMetadata(topicAndPartition.topic)
-              .partitionsMetadata
-              .find(_.partitionId == topicAndPartition.partition)
-              .flatMap(_.leader)
-
-            brokerOption match {
-              case Some(broker) =>
-                val brokerProxy = brokerProxies.getOrElseUpdate((broker.host, 
broker.port), new BrokerProxy(broker.host, broker.port, systemName, clientId, 
metrics, timeout, bufferSize, offsetGetter) {
-                  val messageSink: MessageSink = sink
-                })
-
-                brokerProxy.addTopicPartition(topicAndPartition, lastOffset)
-              case _ => warn("No such topic-partition: %s, dropping." format 
topicAndPartition)
-            }
+        val topics = tpToRefresh.map(_.topic).toSet
+        val partitionMetadata = TopicMetadataCache.getTopicMetadata(topics, 
systemName, getTopicMetadata)
+
+        // addTopicPartition one at a time, leaving the to-be-done list intact 
in case of exceptions.
+        // This avoids trying to re-add the same topic partition repeatedly
+        def refresh(tp:List[TopicAndPartition]) = {
+          val head :: rest = tpToRefresh
+          val lastOffset = topicPartitionsAndOffsets.get(head).get
+          // Whatever we do, we can't say Broker, even though we're
+          // manipulating it here. Broker is a private type and Scala doesn't 
seem
+          // to care about that as long as you don't explicitly declare its 
type.
+          val brokerOption = partitionMetadata(head.topic)
+                             .partitionsMetadata
+                             .find(_.partitionId == head.partition)
+                             .flatMap(_.leader)
+
+          brokerOption match {
+            case Some(broker) =>
+              val brokerProxy = brokerProxies.getOrElseUpdate((broker.host, 
broker.port), new BrokerProxy(broker.host, broker.port, systemName, clientId, 
metrics, timeout, bufferSize, offsetGetter) {
+                val messageSink: MessageSink = sink
+              })
+
+              brokerProxy.addTopicPartition(head, lastOffset)
+            case None => warn("No such topic-partition: %s, dropping." format 
head)
+          }
+          rest
         }
 
-        done = true
+
+        while(!tpToRefresh.isEmpty) {
+          tpToRefresh = refresh(tpToRefresh)
+        }
       } catch {
         case e: Throwable =>
-          warn("An exception was thrown while refreshing brokers for %s. 
Waiting a bit and retrying, since we can't continue without broker metadata." 
format topicPartitionsAndOffsets.keySet)
-          debug(e)
+          warn("An exception was thrown while refreshing brokers for %s. 
Waiting a bit and retrying, since we can't continue without broker metadata." 
format tpToRefresh.head)
+          debug("Exception while refreshing brokers", e)
 
           try {
             Thread.sleep(brokerMetadataFailureRefreshMs)
@@ -181,6 +177,7 @@ private[kafka] class KafkaSystemConsumer(
     }
 
     def abdicate(tp: TopicAndPartition, lastOffset: Long) {
+      info("Abdicating for %s" format (tp))
       refreshBrokers(Map(tp -> lastOffset.toString))
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e8997846/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
 
b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
index 85f5887..151c699 100644
--- 
a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
+++ 
b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
@@ -22,22 +22,25 @@ package org.apache.samza.system.kafka
 
 import org.junit._
 import org.junit.Assert._
-import org.apache.samza.config.MapConfig
-import org.mockito.Mockito
-import org.apache.samza.metrics._
+import org.mockito.{Matchers, Mockito}
 import scala.collection.JavaConversions._
 import kafka.consumer.SimpleConsumer
 import org.mockito.Mockito._
 import org.mockito.Matchers._
 import kafka.api._
-import kafka.message.{ Message, MessageSet, MessageAndOffset }
+import kafka.message.{MessageSet, Message, MessageAndOffset, 
ByteBufferMessageSet}
 import kafka.common.TopicAndPartition
 import kafka.api.PartitionOffsetsResponse
 import java.nio.ByteBuffer
 import org.apache.samza.SamzaException
-import kafka.message.ByteBufferMessageSet
+import grizzled.slf4j.Logging
+import kafka.common.ErrorMapping
+import org.mockito.stubbing.Answer
+import org.mockito.invocation.InvocationOnMock
+import java.util.concurrent.CountDownLatch
 
-class TestBrokerProxy {
+
+class TestBrokerProxy extends Logging {
   val tp2 = new TopicAndPartition("Redbird", 2013)
 
   def getMockBrokerProxy() = {
@@ -55,8 +58,6 @@ class TestBrokerProxy {
     }
 
     val system = "daSystem"
-    val config = new MapConfig(Map[String, String]("job.name" -> "Jobby McJob",
-      "systems.%s.Redbird.consumer.auto.offset.reset".format(system) -> 
"largest"))
     val host = "host"
     val port = 2222
     val tp = new TopicAndPartition("Redbird", 2012)
@@ -86,7 +87,7 @@ class TestBrokerProxy {
         }
         alreadyCreatedConsumer = true
 
-        new SimpleConsumer("a", 1, 2, 3, "b") with DefaultFetch {
+        new DefaultFetchSimpleConsumer("a", 1, 2, 3, "b") {
           val fetchSize: Int = 42
 
           val sc = Mockito.mock(classOf[SimpleConsumer])
@@ -114,6 +115,8 @@ class TestBrokerProxy {
               def getMessage() = new Message(Mockito.mock(classOf[ByteBuffer]))
               val messages = List(new MessageAndOffset(getMessage, 42), new 
MessageAndOffset(getMessage, 84))
 
+              when(messageSet.sizeInBytes).thenReturn(43)
+              when(messageSet.size).thenReturn(44)
               when(messageSet.iterator).thenReturn(messages.iterator)
               when(messageSet.head).thenReturn(messages.head)
               messageSet
@@ -178,7 +181,87 @@ class TestBrokerProxy {
       fail("Should have thrown an exception")
     } catch {
       case se: SamzaException => assertEquals(se.getMessage, "Already 
consuming TopicPartition [Redbird,2012]")
+      case other: Throwable => fail("Got some other exception than what we 
were expecting: " + other)
     }
   }
 
+  @Test def brokerProxyCorrectlyHandlesOffsetOutOfRange():Unit = {
+    // Need to wait for the thread to do some work before ending the test
+    val countdownLatch = new CountDownLatch(1)
+    var failString:String = null
+
+    val mockMessageSink = mock(classOf[MessageSink])
+    when(mockMessageSink.needsMoreMessages(any())).thenReturn(true)
+
+    val doNothingMetrics = new KafkaSystemConsumerMetrics()
+
+    val tp = new TopicAndPartition("topic", 42)
+
+    val mockOffsetGetter = mock(classOf[GetOffset])
+    // This will be used by the simple consumer below, and this is the 
response that simple consumer needs
+    
when(mockOffsetGetter.getNextOffset(any(classOf[DefaultFetchSimpleConsumer]), 
Matchers.eq(tp), Matchers.eq(null))).thenReturn(1492l)
+
+    var callsToCreateSimpleConsumer = 0
+    val mockSimpleConsumer = mock(classOf[DefaultFetchSimpleConsumer])
+
+    // Create an answer that first indicates offset out of range on first 
invocation and on second
+    // verifies that the parameters have been updated to what we expect them 
to be
+    val answer = new Answer[FetchResponse](){
+      var invocationCount = 0
+      def answer(invocation: InvocationOnMock): FetchResponse = {
+        val arguments = 
invocation.getArguments()(0).asInstanceOf[List[Object]](0).asInstanceOf[(String,
 Long)]
+
+        if(invocationCount == 0) {
+          if(arguments != (tp, 0)) {
+            failString = "First invocation did not have the right arguments: " 
+ arguments
+            countdownLatch.countDown()
+          }
+          val mfr = mock(classOf[FetchResponse])
+          when(mfr.hasError).thenReturn(true)
+          when(mfr.errorCode("topic", 
42)).thenReturn(ErrorMapping.OffsetOutOfRangeCode)
+
+          val messageSet = mock(classOf[MessageSet])
+          when(messageSet.iterator).thenReturn(Iterator.empty)
+          val response = mock(classOf[FetchResponsePartitionData])
+          when(response.error).thenReturn(ErrorMapping.OffsetOutOfRangeCode)
+          val responseMap = Map(tp -> response)
+          when(mfr.data).thenReturn(responseMap)
+          invocationCount += 1
+          mfr
+        } else {
+          if(arguments != (tp, 1492)) {
+            failString = "On second invocation, arguments were not correct: " 
+ arguments
+          }
+          countdownLatch.countDown()
+          Thread.currentThread().interrupt()
+          null
+        }
+      }
+    }
+    
+    when(mockSimpleConsumer.defaultFetch(any())).thenAnswer(answer)
+
+    // So now we have a fetch response that will fail.  Prime the 
mockGetOffset to send us to a new offset
+
+    val bp = new BrokerProxy("host", 423, "system", "clientID", 
doNothingMetrics, Int.MaxValue, 1024000, mockOffsetGetter) {
+      val messageSink: MessageSink = mockMessageSink
+
+      override def createSimpleConsumer() = {
+        if(callsToCreateSimpleConsumer > 1) {
+          failString = "Tried to create more than one simple consumer"
+          countdownLatch.countDown()
+        }
+        callsToCreateSimpleConsumer += 1
+        mockSimpleConsumer
+      }
+    }
+
+    bp.addTopicPartition(tp, "earliest")
+    bp.start
+    countdownLatch.await()
+    bp.stop
+    if(failString != null) {
+      fail(failString)
+    }
+  }
 }

Reply via email to