[ 
https://issues.apache.org/jira/browse/KAFKA-725?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15232503#comment-15232503
 ] 

Stig Rohde Døssing commented on KAFKA-725:
------------------------------------------

[~guozhang] Makes sense. I'm wondering if it would be better for the request to 
be put into purgatory then? If the request hits inbetween the high watermark 
and the end of the log, we can reasonably expect that offset to be readable 
shortly, while if the client gets a general OffsetOutOfRangeException, it might 
make more sense for the client to restart at either end of the log.

What I mean is basically, what does proper handling of this exception look like 
on the client-side now?

> Broker Exception: Attempt to read with a maximum offset less than start offset
> ------------------------------------------------------------------------------
>
>                 Key: KAFKA-725
>                 URL: https://issues.apache.org/jira/browse/KAFKA-725
>             Project: Kafka
>          Issue Type: Bug
>          Components: log
>    Affects Versions: 0.8.0
>            Reporter: Chris Riccomini
>            Assignee: Stig Rohde Døssing
>             Fix For: 0.10.0.0
>
>
> I have a simple consumer that's reading from a single topic/partition pair. 
> Running it seems to trigger these messages on the broker periodically:
> 2013/01/22 23:04:54.936 ERROR [KafkaApis] [kafka-request-handler-4] [kafka] 
> []  [KafkaApi-466] error when processing request (MyTopic,4,7951732,2097152)
> java.lang.IllegalArgumentException: Attempt to read with a maximum offset 
> (7951715) less than the start offset (7951732).
>         at kafka.log.LogSegment.read(LogSegment.scala:105)
>         at kafka.log.Log.read(Log.scala:390)
>         at 
> kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:372)
>         at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:330)
>         at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:326)
>         at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
>         at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
>         at scala.collection.immutable.Map$Map1.foreach(Map.scala:105)
>         at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
>         at scala.collection.immutable.Map$Map1.map(Map.scala:93)
>         at 
> kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.scala:326)
>         at 
> kafka.server.KafkaApis$$anonfun$maybeUnblockDelayedFetchRequests$2.apply(KafkaApis.scala:165)
>         at 
> kafka.server.KafkaApis$$anonfun$maybeUnblockDelayedFetchRequests$2.apply(KafkaApis.scala:164)
>         at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
>         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
>         at 
> kafka.server.KafkaApis.maybeUnblockDelayedFetchRequests(KafkaApis.scala:164)
>         at 
> kafka.server.KafkaApis$$anonfun$handleProducerRequest$2.apply(KafkaApis.scala:186)
>         at 
> kafka.server.KafkaApis$$anonfun$handleProducerRequest$2.apply(KafkaApis.scala:185)
>         at scala.collection.immutable.Map$Map2.foreach(Map.scala:127)
>         at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:185)
>         at kafka.server.KafkaApis.handle(KafkaApis.scala:58)
>         at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:41)
>         at java.lang.Thread.run(Thread.java:619)
> When I shut the consumer down, I don't see the exceptions anymore.
> This is the code that my consumer is running:
>           while(true) {
>             // we believe the consumer to be connected, so try and use it for 
> a fetch request
>             val request = new FetchRequestBuilder()
>               .addFetch(topic, partition, nextOffset, fetchSize)
>               .maxWait(Int.MaxValue)
>               // TODO for super high-throughput, might be worth waiting for 
> more bytes
>               .minBytes(1)
>               .build
>             debug("Fetching messages for stream %s and offset %s." format 
> (streamPartition, nextOffset))
>             val messages = connectedConsumer.fetch(request)
>             debug("Fetch complete for stream %s and offset %s. Got messages: 
> %s" format (streamPartition, nextOffset, messages))
>             if (messages.hasError) {
>               warn("Got error code from broker for %s: %s. Shutting down 
> consumer to trigger a reconnect." format (streamPartition, 
> messages.errorCode(topic, partition)))
>               ErrorMapping.maybeThrowException(messages.errorCode(topic, 
> partition))
>             }
>             messages.messageSet(topic, partition).foreach(msg => {
>               watchers.foreach(_.onMessagesReady(msg.offset.toString, 
> msg.message.payload))
>               nextOffset = msg.nextOffset
>             })
>           }
> Any idea what might be causing this error?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to