Updated Branches: refs/heads/master 527113b40 -> e89978461
SAMZA-87: BrokerProxy doesn't properly handle offset out of range exceptions. Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/e8997846 Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/e8997846 Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/e8997846 Branch: refs/heads/master Commit: e899784618e45a8b304b7277003dba1003c01e74 Parents: 527113b Author: Jakob Homan <[email protected]> Authored: Mon Dec 2 13:06:38 2013 -0800 Committer: Jakob Homan <[email protected]> Committed: Mon Dec 2 13:06:38 2013 -0800 ---------------------------------------------------------------------- build.gradle | 4 +- .../apache/samza/system/kafka/BrokerProxy.scala | 154 +++++++++++-------- .../samza/system/kafka/DefaultFetch.scala | 48 ------ .../apache/samza/system/kafka/GetOffset.scala | 45 ++++-- .../system/kafka/KafkaSystemConsumer.scala | 75 +++++---- .../samza/system/kafka/TestBrokerProxy.scala | 101 ++++++++++-- 6 files changed, 247 insertions(+), 180 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e8997846/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index f30128f..556a0a3 100644 --- a/build.gradle +++ b/build.gradle @@ -80,6 +80,9 @@ project(":samza-kafka_$scalaVersion") { // these can all go away when kafka is in maven testCompile files("lib/kafka_$scalaVersion-$kafkaVersion-test.jar") // end these can all go away when kafka is in maven + + // Logging in tests is good. + testRuntime "org.slf4j:slf4j-simple:1.6.2" } test { @@ -192,4 +195,3 @@ project(":samza-test_$scalaVersion") { maxHeapSize = "1024m" } } - http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e8997846/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala index f4f616e..7db32c0 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala @@ -21,20 +21,15 @@ package org.apache.samza.system.kafka -import kafka.consumer.SimpleConsumer import kafka.api._ -import kafka.common.ErrorMapping -import java.util.concurrent.{ CountDownLatch, ConcurrentHashMap } +import kafka.common.{NotLeaderForPartitionException, UnknownTopicOrPartitionException, ErrorMapping, TopicAndPartition} +import java.util.concurrent.{ConcurrentHashMap, CountDownLatch} import scala.collection.JavaConversions._ -import org.apache.samza.config.Config -import org.apache.samza.util.KafkaUtil -import org.apache.samza.config.KafkaConfig.Config2Kafka -import org.apache.samza.config.StreamConfig.Config2Stream -import org.apache.samza.metrics.MetricsRegistry -import kafka.common.TopicAndPartition import kafka.message.MessageSet import grizzled.slf4j.Logging import java.nio.channels.ClosedByInterruptException +import java.util.Map.Entry +import scala.collection.mutable /** * A BrokerProxy consolidates Kafka fetches meant for a particular broker and retrieves them all at once, providing @@ -58,7 +53,7 @@ abstract class BrokerProxy( val sleepMSWhileNoTopicPartitions = 1000 /** What's the next offset for a particular partition? **/ - val nextOffsets: ConcurrentHashMap[TopicAndPartition, Long] = new ConcurrentHashMap[TopicAndPartition, Long]() + val nextOffsets:mutable.ConcurrentMap[TopicAndPartition, Long] = new ConcurrentHashMap[TopicAndPartition, Long]() /** Block on the first call to get message if the fetcher has not yet returned its initial results **/ // TODO: It should be sufficient to just use the count down latch and await on it for each of the calls, but @@ -75,7 +70,7 @@ abstract class BrokerProxy( val hostString = "%s:%d" format (host, port) info("Creating new SimpleConsumer for host %s for system %s" format (hostString, system)) - val sc = new SimpleConsumer(host, port, timeout, bufferSize, clientID) with DefaultFetch { + val sc = new DefaultFetchSimpleConsumer(host, port, timeout, bufferSize, clientID) { val fetchSize: Int = 256 * 1024 } @@ -98,7 +93,7 @@ abstract class BrokerProxy( metrics.topicPartitions(host, port).set(nextOffsets.size) debug("Removed %s" format tp) } else { - warn("Asked to remove topic and partition %s, but not in map (keys = %s)" format (tp, nextOffsets.keys().mkString(","))) + warn("Asked to remove topic and partition %s, but not in map (keys = %s)" format (tp, nextOffsets.keys.mkString(","))) } } @@ -116,14 +111,14 @@ abstract class BrokerProxy( } catch { // If we're interrupted, don't try and reconnect. We should shut down. case e: InterruptedException => - debug("Shutting down due to interrupt exception.") + warn("Shutting down due to interrupt exception.") Thread.currentThread.interrupt case e: ClosedByInterruptException => - debug("Shutting down due to closed by interrupt exception.") + warn("Shutting down due to closed by interrupt exception.") Thread.currentThread.interrupt case e: Throwable => { warn("Recreating simple consumer and retrying connection") - debug("Stack trace for fetchMessages exception.", e) + warn("Stack trace for fetchMessages exception.", e) simpleConsumer.close() simpleConsumer = createSimpleConsumer() metrics.reconnects(host, port).inc @@ -131,7 +126,6 @@ abstract class BrokerProxy( } } } - } }, "BrokerProxy thread pointed at %s:%d for client %s" format (host, port, clientID)) @@ -140,78 +134,104 @@ abstract class BrokerProxy( val response: FetchResponse = simpleConsumer.defaultFetch(nextOffsets.filterKeys(messageSink.needsMoreMessages(_)).toList: _*) firstCall = false firstCallBarrier.countDown() - if (response.hasError) { - // FetchResponse should really return Option and a list of the errors so we don't have to find them ourselves - case class Error(tp: TopicAndPartition, code: Short, exception: Throwable) - - val errors = for ( - error <- response.data.entrySet.filter(_.getValue.error != ErrorMapping.NoError); - errorCode <- Option(response.errorCode(error.getKey.topic, error.getKey.partition)); // Scala's being cranky about referring to error.getKey values... - exception <- Option(ErrorMapping.exceptionFor(errorCode)) - ) yield new Error(error.getKey, errorCode, exception) - - val (notLeaders, otherErrors) = errors.partition(_.code == ErrorMapping.NotLeaderForPartitionCode) - - if (!notLeaders.isEmpty) { - info("Abdicating. Got not leader exception for: " + notLeaders.mkString(",")) - - notLeaders.foreach(e => { - // Go back one message, since the fetch for nextOffset failed, and - // abdicate requires lastOffset, not nextOffset. - messageSink.abdicate(e.tp, nextOffsets.remove(e.tp) - 1) - }) - } - if (!otherErrors.isEmpty) { - warn("Got error codes during multifetch. Throwing an exception to trigger reconnect. Errors: %s" format errors.mkString(",")) - otherErrors.foreach(e => ErrorMapping.maybeThrowException(e.code)) // One will get thrown - } - } + // Split response into errors and non errors, processing the errors first + val (nonErrorResponses, errorResponses) = response.data.entrySet().partition(_.getValue.error == ErrorMapping.NoError) - def moveMessagesToTheirQueue(tp: TopicAndPartition, data: FetchResponsePartitionData) = { - val messageSet: MessageSet = data.messages - var nextOffset = nextOffsets(tp) + handleErrors(errorResponses, response) - messageSink.setIsAtHighWatermark(tp, data.hw == 0 || data.hw == nextOffset) - - for (message <- messageSet.iterator) { - messageSink.addMessage(tp, message, data.hw) // TODO: Verify this is correct + nonErrorResponses.foreach { nonError => moveMessagesToTheirQueue(nonError.getKey, nonError.getValue) } + } - nextOffset = message.nextOffset + def handleErrors(errorResponses: mutable.Set[Entry[TopicAndPartition, FetchResponsePartitionData]], response:FetchResponse) = { + // Need to be mindful of a tp that was removed by another thread + def abdicate(tp:TopicAndPartition) = nextOffsets.remove(tp) match { + case Some(offset) => messageSink.abdicate(tp, offset -1) + case None => warn("Tried to abdicate for topic partition not in map. Removed in interim?") + } - val bytesSize = message.message.payloadSize + message.message.keySize - metrics.reads(tp).inc - metrics.bytesRead(tp).inc(bytesSize) - metrics.brokerBytesRead(host, port).inc(bytesSize) - metrics.offsets(tp).set(nextOffset) + // FetchResponse should really return Option and a list of the errors so we don't have to find them ourselves + case class Error(tp: TopicAndPartition, code: Short, exception: Throwable) + + // Now subdivide the errors into three types: non-recoverable, not leader (== abdicate) and offset out of range (== get new offset) + + // Convert FetchResponse into easier-to-work-with Errors + val errors = for ( + error <- errorResponses; + errorCode <- Option(response.errorCode(error.getKey.topic, error.getKey.partition)); // Scala's being cranky about referring to error.getKey values... + exception <- Option(ErrorMapping.exceptionFor(errorCode)) + ) yield new Error(error.getKey, errorCode, exception) + + val (notLeaders, otherErrors) = errors.partition(_.code == ErrorMapping.NotLeaderForPartitionCode) + val (offsetOutOfRangeErrors, remainingErrors) = otherErrors.partition(_.code == ErrorMapping.OffsetOutOfRangeCode) + + // Can recover from two types of errors: not leader (go find the new leader) and offset out of range (go get the new offset) + // However, we want to bail as quickly as possible if there are non recoverable errors so that the state of the other + // topic-partitions remains the same. That way, when we've rebuilt the simple consumer, we can come around and + // handle the recoverable errors. + remainingErrors.foreach(e => { + warn("Got non-recoverable error codes during multifetch. Throwing an exception to trigger reconnect. Errors: %s" format remainingErrors.mkString(",")) + ErrorMapping.maybeThrowException(e.code) }) + + // Go back one message, since the fetch for nextOffset failed, and + // abdicate requires lastOffset, not nextOffset. + notLeaders.foreach(e => abdicate(e.tp)) + + offsetOutOfRangeErrors.foreach(e => { + warn("Received OffsetOutOfRange exception for %s. Current offset = %s" format (e.tp, nextOffsets.getOrElse(e.tp, "not found in map, likely removed in the interim"))) + + try { + val newOffset = offsetGetter.getNextOffset(simpleConsumer, e.tp, null) + // Put the new offset into the map (if the tp still exists). Will catch it on the next go-around + nextOffsets.replace(e.tp, newOffset) + } catch { + // UnknownTopic or NotLeader are routine events and handled via abdication. All others, bail. + case _ @ (_:UnknownTopicOrPartitionException | _: NotLeaderForPartitionException) => warn("Received (UnknownTopicOr|NotLeaderFor)Partition exception. Abdicating") + abdicate(e.tp) + case other => throw other } + }) + } - nextOffsets.replace(tp, nextOffset) // use replace rather than put in case this tp was removed while we were fetching. + def moveMessagesToTheirQueue(tp: TopicAndPartition, data: FetchResponsePartitionData) = { + val messageSet: MessageSet = data.messages + var nextOffset = nextOffsets(tp) - // Update high water mark - val hw = data.hw - if (hw >= 0) { - metrics.lag(tp).set(hw - nextOffset) - } else { - debug("Got a high water mark less than 0 (%d) for %s, so skipping." format (hw, tp)) - } + messageSink.setIsAtHighWatermark(tp, data.hw == 0 || data.hw == nextOffset) + require(messageSet != null) + for (message <- messageSet.iterator) { + messageSink.addMessage(tp, message, data.hw) // TODO: Verify this is correct + + nextOffset = message.nextOffset + + val bytesSize = message.message.payloadSize + message.message.keySize + metrics.reads(tp).inc + metrics.bytesRead(tp).inc(bytesSize) + metrics.brokerBytesRead(host, port).inc(bytesSize) + metrics.offsets(tp).set(nextOffset) } - response.data.foreach { case (tp, data) => moveMessagesToTheirQueue(tp, data) } + nextOffsets.replace(tp, nextOffset) // use replace rather than put in case this tp was removed while we were fetching. + // Update high water mark + val hw = data.hw + if (hw >= 0) { + metrics.lag(tp).set(hw - nextOffset) + } else { + debug("Got a high water mark less than 0 (%d) for %s, so skipping." format (hw, tp)) + } } - override def toString() = "BrokerProxy for %s:%d" format (host, port) def start { - debug("Starting broker proxy for %s:%s." format (host, port)) + info("Starting " + toString) thread.setDaemon(true) thread.start } def stop { - debug("Shutting down broker proxy for %s:%s." format (host, port)) + info("Shutting down " + toString) thread.interrupt thread.join http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e8997846/samza-kafka/src/main/scala/org/apache/samza/system/kafka/DefaultFetch.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/DefaultFetch.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/DefaultFetch.scala deleted file mode 100644 index 41710f2..0000000 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/DefaultFetch.scala +++ /dev/null @@ -1,48 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.samza.system.kafka - -import kafka.consumer.SimpleConsumer -import kafka.api.FetchRequestBuilder -import kafka.common.TopicAndPartition - -/** - * Extension to a SimpleConsumer that defines the default parameters necessary for default fetch requests. Builds - * such a fetch request, requests the fetch and returns the result - */ -trait DefaultFetch { - self:SimpleConsumer => - val maxWait:Int = Int.MaxValue - val minBytes:Int = 1 - val clientId:String - val fetchSize:Int - - def defaultFetch(fetches:(TopicAndPartition, Long)*) = { - val fbr = new FetchRequestBuilder().maxWait(1000) - .minBytes(minBytes) - .clientId(clientId) - - fetches.foreach(f => fbr.addFetch(f._1.topic, f._1.partition, f._2, fetchSize)) - - this.fetch(fbr.build()) - } -} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e8997846/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala index 326d6c9..7ad5435 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala @@ -21,15 +21,18 @@ package org.apache.samza.system.kafka -import kafka.consumer.SimpleConsumer import kafka.common.{ OffsetOutOfRangeException, ErrorMapping } import kafka.api._ -import org.apache.samza.config.KafkaConfig -import org.apache.samza.config.KafkaConfig.Config2Kafka import kafka.common.TopicAndPartition import kafka.api.PartitionOffsetRequestInfo import grizzled.slf4j.Logging +import kafka.message.MessageAndOffset +/** + * Obtain the correct offsets for topics, be it earliest or largest + * @param default Value to return if no offset has been specified for topic + * @param autoOffsetResetTopics Topics that have been specified as auto offset + */ class GetOffset(default: String, autoOffsetResetTopics: Map[String, String] = Map()) extends Logging with Toss { private def getAutoOffset(topic: String): Long = { @@ -48,11 +51,11 @@ class GetOffset(default: String, autoOffsetResetTopics: Map[String, String] = Ma /** * An offset was provided but may not be valid. Verify its validity. */ - private def useLastCheckpointedOffset(sc: DefaultFetch, last: String, tp: TopicAndPartition): Option[Long] = { + private def useLastCheckpointedOffset(sc: DefaultFetchSimpleConsumer, last: String, tp: TopicAndPartition): Option[Long] = { try { info("Validating offset %s for topic and partition %s" format (last, tp)) - val messages = sc.defaultFetch((tp, last.toLong)) + val messages: FetchResponse = sc.defaultFetch((tp, last.toLong)) if (messages.hasError) { ErrorMapping.maybeThrowException(messages.errorCode(tp.topic, tp.partition)) @@ -60,28 +63,38 @@ class GetOffset(default: String, autoOffsetResetTopics: Map[String, String] = Ma info("Able to successfully read from offset %s for topic and partition %s. Using it to instantiate consumer." format (last, tp)) - val nextOffset = messages - .messageSet(tp.topic, tp.partition) - .head - .nextOffset - - info("Got next offset %s for %s." format (nextOffset, tp)) + val messageSet = messages.messageSet(tp.topic, tp.partition) - Some(nextOffset) + if(messageSet.isEmpty) { // No messages have been written since our checkpoint + info("Got empty response for checkpointed offset, using checkpointed") + Some(last.toLong) + } else { + val nextOffset = messageSet.head.nextOffset + info("Got next offset %s for %s." format (nextOffset, tp)) + Some(nextOffset) + } } catch { case e: OffsetOutOfRangeException => - info("An out of range Kafka offset (%s) was supplied for topic and partition %s, so falling back to autooffset.reset." format (last, tp)) + info("An out-of-range Kafka offset (%s) was supplied for topic and partition %s, so falling back to autooffset.reset." format (last, tp)) None } } - def getNextOffset(sc: SimpleConsumer with DefaultFetch, tp: TopicAndPartition, lastCheckpointedOffset: String): Long = { + /** + * Using the provided SimpleConsumer, obtain the next offset to read for the specified topic + * @param sc SimpleConsumer used to query the Kafka Broker + * @param tp TopicAndPartition we offset for + * @param lastCheckpointedOffset Null is acceptable. If not null, return the last checkpointed offset, after checking it is valid + * @return Next offset to read or throw an exception if one has been received via the simple consumer + */ + def getNextOffset(sc: DefaultFetchSimpleConsumer, tp: TopicAndPartition, lastCheckpointedOffset: String): Long = { val offsetRequest = new OffsetRequest(Map(tp -> new PartitionOffsetRequestInfo(getAutoOffset(tp.topic), 1))) val offsetResponse = sc.getOffsetsBefore(offsetRequest) val partitionOffsetResponse = offsetResponse.partitionErrorAndOffsets.get(tp).getOrElse(toss("Unable to find offset information for %s" format tp)) - val autoOffset = partitionOffsetResponse.offsets.headOption.getOrElse(toss("Got response, but no offsets defined for %s" format tp)) - info("Got offset %d for topic and partition %s" format (autoOffset, tp)) + ErrorMapping.maybeThrowException(partitionOffsetResponse.error) + + val autoOffset = partitionOffsetResponse.offsets.headOption.getOrElse(toss("Got response, but no offsets defined for %s" format tp)) val actualOffset = Option(lastCheckpointedOffset) match { case Some(last) => useLastCheckpointedOffset(sc, last, tp).getOrElse(autoOffset) http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e8997846/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala index 7624a8e..2b73c61 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala @@ -19,14 +19,9 @@ package org.apache.samza.system.kafka -import org.apache.samza.util.{ KafkaUtil, ClientUtilTopicMetadataStore } +import org.apache.samza.util.ClientUtilTopicMetadataStore import kafka.common.TopicAndPartition -import org.apache.samza.config.{ KafkaConfig, Config } -import org.apache.samza.SamzaException -import org.apache.samza.config.KafkaConfig.Config2Kafka -import org.apache.samza.metrics.MetricsRegistry import grizzled.slf4j.Logging -import scala.collection.JavaConversions._ import kafka.message.MessageAndOffset import org.apache.samza.Partition import kafka.utils.Utils @@ -37,8 +32,6 @@ import kafka.serializer.Decoder import org.apache.samza.util.BlockingEnvelopeMap import org.apache.samza.system.SystemStreamPartition import org.apache.samza.system.IncomingMessageEnvelope -import java.nio.charset.Charset -import kafka.api.PartitionMetadata object KafkaSystemConsumer { def toTopicAndPartition(systemStreamPartition: SystemStreamPartition) = { @@ -97,45 +90,48 @@ private[kafka] class KafkaSystemConsumer( } def refreshBrokers(topicPartitionsAndOffsets: Map[TopicAndPartition, String]) { - var done = false - - while (!done) { + var tpToRefresh = topicPartitionsAndOffsets.keySet.toList + while (!tpToRefresh.isEmpty) { try { val getTopicMetadata = (topics: Set[String]) => { new ClientUtilTopicMetadataStore(brokerListString, clientId).getTopicInfo(topics) } - - val partitionMetadata = TopicMetadataCache.getTopicMetadata( - topicPartitionsAndOffsets.keys.map(_.topic).toSet, - systemName, - getTopicMetadata) - - topicPartitionsAndOffsets.map { - case (topicAndPartition, lastOffset) => - // TODO whatever we do, we can't say Broker, even though we're - // manipulating it here. Broker is a private type and Scala doesn't seem - // to care about that as long as you don't explicitly declare its type. - val brokerOption = partitionMetadata(topicAndPartition.topic) - .partitionsMetadata - .find(_.partitionId == topicAndPartition.partition) - .flatMap(_.leader) - - brokerOption match { - case Some(broker) => - val brokerProxy = brokerProxies.getOrElseUpdate((broker.host, broker.port), new BrokerProxy(broker.host, broker.port, systemName, clientId, metrics, timeout, bufferSize, offsetGetter) { - val messageSink: MessageSink = sink - }) - - brokerProxy.addTopicPartition(topicAndPartition, lastOffset) - case _ => warn("No such topic-partition: %s, dropping." format topicAndPartition) - } + val topics = tpToRefresh.map(_.topic).toSet + val partitionMetadata = TopicMetadataCache.getTopicMetadata(topics, systemName, getTopicMetadata) + + // addTopicPartition one at a time, leaving the to-be-done list intact in case of exceptions. + // This avoids trying to re-add the same topic partition repeatedly + def refresh(tp:List[TopicAndPartition]) = { + val head :: rest = tpToRefresh + val lastOffset = topicPartitionsAndOffsets.get(head).get + // Whatever we do, we can't say Broker, even though we're + // manipulating it here. Broker is a private type and Scala doesn't seem + // to care about that as long as you don't explicitly declare its type. + val brokerOption = partitionMetadata(head.topic) + .partitionsMetadata + .find(_.partitionId == head.partition) + .flatMap(_.leader) + + brokerOption match { + case Some(broker) => + val brokerProxy = brokerProxies.getOrElseUpdate((broker.host, broker.port), new BrokerProxy(broker.host, broker.port, systemName, clientId, metrics, timeout, bufferSize, offsetGetter) { + val messageSink: MessageSink = sink + }) + + brokerProxy.addTopicPartition(head, lastOffset) + case None => warn("No such topic-partition: %s, dropping." format head) + } + rest } - done = true + + while(!tpToRefresh.isEmpty) { + tpToRefresh = refresh(tpToRefresh) + } } catch { case e: Throwable => - warn("An exception was thrown while refreshing brokers for %s. Waiting a bit and retrying, since we can't continue without broker metadata." format topicPartitionsAndOffsets.keySet) - debug(e) + warn("An exception was thrown while refreshing brokers for %s. Waiting a bit and retrying, since we can't continue without broker metadata." format tpToRefresh.head) + debug("Exception while refreshing brokers", e) try { Thread.sleep(brokerMetadataFailureRefreshMs) @@ -181,6 +177,7 @@ private[kafka] class KafkaSystemConsumer( } def abdicate(tp: TopicAndPartition, lastOffset: Long) { + info("Abdicating for %s" format (tp)) refreshBrokers(Map(tp -> lastOffset.toString)) } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/e8997846/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala index 85f5887..151c699 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala @@ -22,22 +22,25 @@ package org.apache.samza.system.kafka import org.junit._ import org.junit.Assert._ -import org.apache.samza.config.MapConfig -import org.mockito.Mockito -import org.apache.samza.metrics._ +import org.mockito.{Matchers, Mockito} import scala.collection.JavaConversions._ import kafka.consumer.SimpleConsumer import org.mockito.Mockito._ import org.mockito.Matchers._ import kafka.api._ -import kafka.message.{ Message, MessageSet, MessageAndOffset } +import kafka.message.{MessageSet, Message, MessageAndOffset, ByteBufferMessageSet} import kafka.common.TopicAndPartition import kafka.api.PartitionOffsetsResponse import java.nio.ByteBuffer import org.apache.samza.SamzaException -import kafka.message.ByteBufferMessageSet +import grizzled.slf4j.Logging +import kafka.common.ErrorMapping +import org.mockito.stubbing.Answer +import org.mockito.invocation.InvocationOnMock +import java.util.concurrent.CountDownLatch -class TestBrokerProxy { + +class TestBrokerProxy extends Logging { val tp2 = new TopicAndPartition("Redbird", 2013) def getMockBrokerProxy() = { @@ -55,8 +58,6 @@ class TestBrokerProxy { } val system = "daSystem" - val config = new MapConfig(Map[String, String]("job.name" -> "Jobby McJob", - "systems.%s.Redbird.consumer.auto.offset.reset".format(system) -> "largest")) val host = "host" val port = 2222 val tp = new TopicAndPartition("Redbird", 2012) @@ -86,7 +87,7 @@ class TestBrokerProxy { } alreadyCreatedConsumer = true - new SimpleConsumer("a", 1, 2, 3, "b") with DefaultFetch { + new DefaultFetchSimpleConsumer("a", 1, 2, 3, "b") { val fetchSize: Int = 42 val sc = Mockito.mock(classOf[SimpleConsumer]) @@ -114,6 +115,8 @@ class TestBrokerProxy { def getMessage() = new Message(Mockito.mock(classOf[ByteBuffer])) val messages = List(new MessageAndOffset(getMessage, 42), new MessageAndOffset(getMessage, 84)) + when(messageSet.sizeInBytes).thenReturn(43) + when(messageSet.size).thenReturn(44) when(messageSet.iterator).thenReturn(messages.iterator) when(messageSet.head).thenReturn(messages.head) messageSet @@ -178,7 +181,87 @@ class TestBrokerProxy { fail("Should have thrown an exception") } catch { case se: SamzaException => assertEquals(se.getMessage, "Already consuming TopicPartition [Redbird,2012]") + case other: Throwable => fail("Got some other exception than what we were expecting: " + other) } } + @Test def brokerProxyCorrectlyHandlesOffsetOutOfRange():Unit = { + // Need to wait for the thread to do some work before ending the test + val countdownLatch = new CountDownLatch(1) + var failString:String = null + + val mockMessageSink = mock(classOf[MessageSink]) + when(mockMessageSink.needsMoreMessages(any())).thenReturn(true) + + val doNothingMetrics = new KafkaSystemConsumerMetrics() + + val tp = new TopicAndPartition("topic", 42) + + val mockOffsetGetter = mock(classOf[GetOffset]) + // This will be used by the simple consumer below, and this is the response that simple consumer needs + when(mockOffsetGetter.getNextOffset(any(classOf[DefaultFetchSimpleConsumer]), Matchers.eq(tp), Matchers.eq(null))).thenReturn(1492l) + + var callsToCreateSimpleConsumer = 0 + val mockSimpleConsumer = mock(classOf[DefaultFetchSimpleConsumer]) + + // Create an answer that first indicates offset out of range on first invocation and on second + // verifies that the parameters have been updated to what we expect them to be + val answer = new Answer[FetchResponse](){ + var invocationCount = 0 + def answer(invocation: InvocationOnMock): FetchResponse = { + val arguments = invocation.getArguments()(0).asInstanceOf[List[Object]](0).asInstanceOf[(String, Long)] + + if(invocationCount == 0) { + if(arguments != (tp, 0)) { + failString = "First invocation did not have the right arguments: " + arguments + countdownLatch.countDown() + } + val mfr = mock(classOf[FetchResponse]) + when(mfr.hasError).thenReturn(true) + when(mfr.errorCode("topic", 42)).thenReturn(ErrorMapping.OffsetOutOfRangeCode) + + val messageSet = mock(classOf[MessageSet]) + when(messageSet.iterator).thenReturn(Iterator.empty) + val response = mock(classOf[FetchResponsePartitionData]) + when(response.error).thenReturn(ErrorMapping.OffsetOutOfRangeCode) + val responseMap = Map(tp -> response) + when(mfr.data).thenReturn(responseMap) + invocationCount += 1 + mfr + } else { + if(arguments != (tp, 1492)) { + failString = "On second invocation, arguments were not correct: " + arguments + } + countdownLatch.countDown() + Thread.currentThread().interrupt() + null + } + } + } + + when(mockSimpleConsumer.defaultFetch(any())).thenAnswer(answer) + + // So now we have a fetch response that will fail. Prime the mockGetOffset to send us to a new offset + + val bp = new BrokerProxy("host", 423, "system", "clientID", doNothingMetrics, Int.MaxValue, 1024000, mockOffsetGetter) { + val messageSink: MessageSink = mockMessageSink + + override def createSimpleConsumer() = { + if(callsToCreateSimpleConsumer > 1) { + failString = "Tried to create more than one simple consumer" + countdownLatch.countDown() + } + callsToCreateSimpleConsumer += 1 + mockSimpleConsumer + } + } + + bp.addTopicPartition(tp, "earliest") + bp.start + countdownLatch.await() + bp.stop + if(failString != null) { + fail(failString) + } + } }
