[ 
https://issues.apache.org/jira/browse/KAFKA-725?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15232514#comment-15232514
 ] 

Guozhang Wang commented on KAFKA-725:
-------------------------------------

Usually client code should gracefully handle OffsetOutOfRangeException by 
requesting the current log end offset and retry fetching. Note that this 
handling would just make this exception potentially being triggered multiple 
times until the data is replicated complete and HW advanced; in most cases this 
is fine as it is only transient. But I agree that a more ideal solution is to 
park the request in purgatory so that we can reduce round trips retrying in 
this case as well.

> Broker Exception: Attempt to read with a maximum offset less than start offset
> ------------------------------------------------------------------------------
>
>                 Key: KAFKA-725
>                 URL: https://issues.apache.org/jira/browse/KAFKA-725
>             Project: Kafka
>          Issue Type: Bug
>          Components: log
>    Affects Versions: 0.8.0
>            Reporter: Chris Riccomini
>            Assignee: Stig Rohde Døssing
>             Fix For: 0.10.0.0
>
>
> I have a simple consumer that's reading from a single topic/partition pair. 
> Running it seems to trigger these messages on the broker periodically:
> 2013/01/22 23:04:54.936 ERROR [KafkaApis] [kafka-request-handler-4] [kafka] 
> []  [KafkaApi-466] error when processing request (MyTopic,4,7951732,2097152)
> java.lang.IllegalArgumentException: Attempt to read with a maximum offset 
> (7951715) less than the start offset (7951732).
>         at kafka.log.LogSegment.read(LogSegment.scala:105)
>         at kafka.log.Log.read(Log.scala:390)
>         at 
> kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:372)
>         at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:330)
>         at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:326)
>         at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
>         at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
>         at scala.collection.immutable.Map$Map1.foreach(Map.scala:105)
>         at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
>         at scala.collection.immutable.Map$Map1.map(Map.scala:93)
>         at 
> kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.scala:326)
>         at 
> kafka.server.KafkaApis$$anonfun$maybeUnblockDelayedFetchRequests$2.apply(KafkaApis.scala:165)
>         at 
> kafka.server.KafkaApis$$anonfun$maybeUnblockDelayedFetchRequests$2.apply(KafkaApis.scala:164)
>         at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
>         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
>         at 
> kafka.server.KafkaApis.maybeUnblockDelayedFetchRequests(KafkaApis.scala:164)
>         at 
> kafka.server.KafkaApis$$anonfun$handleProducerRequest$2.apply(KafkaApis.scala:186)
>         at 
> kafka.server.KafkaApis$$anonfun$handleProducerRequest$2.apply(KafkaApis.scala:185)
>         at scala.collection.immutable.Map$Map2.foreach(Map.scala:127)
>         at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:185)
>         at kafka.server.KafkaApis.handle(KafkaApis.scala:58)
>         at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:41)
>         at java.lang.Thread.run(Thread.java:619)
> When I shut the consumer down, I don't see the exceptions anymore.
> This is the code that my consumer is running:
>           while(true) {
>             // we believe the consumer to be connected, so try and use it for 
> a fetch request
>             val request = new FetchRequestBuilder()
>               .addFetch(topic, partition, nextOffset, fetchSize)
>               .maxWait(Int.MaxValue)
>               // TODO for super high-throughput, might be worth waiting for 
> more bytes
>               .minBytes(1)
>               .build
>             debug("Fetching messages for stream %s and offset %s." format 
> (streamPartition, nextOffset))
>             val messages = connectedConsumer.fetch(request)
>             debug("Fetch complete for stream %s and offset %s. Got messages: 
> %s" format (streamPartition, nextOffset, messages))
>             if (messages.hasError) {
>               warn("Got error code from broker for %s: %s. Shutting down 
> consumer to trigger a reconnect." format (streamPartition, 
> messages.errorCode(topic, partition)))
>               ErrorMapping.maybeThrowException(messages.errorCode(topic, 
> partition))
>             }
>             messages.messageSet(topic, partition).foreach(msg => {
>               watchers.foreach(_.onMessagesReady(msg.offset.toString, 
> msg.message.payload))
>               nextOffset = msg.nextOffset
>             })
>           }
> Any idea what might be causing this error?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to