[ 
https://issues.apache.org/jira/browse/STORM-586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14236589#comment-14236589
 ] 

ASF GitHub Bot commented on STORM-586:
--------------------------------------

Github user harshach commented on a diff in the pull request:

    https://github.com/apache/storm/pull/339#discussion_r21414360
  
    --- Diff: 
external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java ---
    @@ -17,6 +17,9 @@
      */
     package storm.kafka;
     
    -public class UpdateOffsetException extends RuntimeException {
    --- End diff --
    
    I think UpdateOffsetException shouldn't extend FailedFetchException . They 
meant to be different exceptions . The problem here is if a method throws 
UpdateOffsetException and there is a catch for FailedFetchException its get 
caught in that catch but we want to have different behavior for 
FailedFetchException and UpdateOffsetException. Also UpdateOffsetException can 
be named differently, may be TopicOffsetOutOfRangeException and for any other 
errors use FailedFetchException.


> Trident kafka spout fails instead of updating offset when kafka offset is out 
> of range.
> ---------------------------------------------------------------------------------------
>
>                 Key: STORM-586
>                 URL: https://issues.apache.org/jira/browse/STORM-586
>             Project: Apache Storm
>          Issue Type: Bug
>    Affects Versions: 0.9.3
>            Reporter: Parth Brahmbhatt
>            Assignee: Parth Brahmbhatt
>            Priority: Critical
>
> Trident KafkaEmitter does not catch the newly added UpdateOffsetException 
> which results in the spout failing repeatedly instead of automatically 
> updating the offset to earliest time. 
> PROBLEM: 
> Exception while using the Trident Kafka Spout.
> 2014-12-04 18:38:03 b.s.util ERROR Async loop died! 
> java.lang.RuntimeException: storm.kafka.UpdateOffsetException 
> at 
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:107)
>  ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 
> at 
> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:78)
>  ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 
> at 
> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:77)
>  ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 
> at 
> backtype.storm.daemon.executor$fn_4195$fn4207$fn_4254.invoke(executor.clj:745)
>  ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 
> at backtype.storm.util$async_loop$fn__442.invoke(util.clj:436) 
> ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 
> at clojure.lang.AFn.run(AFn.java:24) clojure-1.4.0.jar:na 
> at java.lang.Thread.run(Thread.java:745) na:1.7.0_71 
> Caused by: storm.kafka.UpdateOffsetException: null 
> at storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:186) ~stormjar.jar:na 
> at 
> storm.kafka.trident.TridentKafkaEmitter.fetchMessages(TridentKafkaEmitter.java:132)
>  ~stormjar.jar:na 
> at 
> storm.kafka.trident.TridentKafkaEmitter.doEmitNewPartitionBatch(TridentKafkaEmitter.java:113)
>  ~stormjar.jar:na 
> at 
> storm.kafka.trident.TridentKafkaEmitter.failFastEmitNewPartitionBatch(TridentKafkaEmitter.java:72)
>  ~stormjar.jar:na 
> at 
> storm.kafka.trident.TridentKafkaEmitter.access$400(TridentKafkaEmitter.java:46)
>  ~stormjar.jar:na 
> at 
> storm.kafka.trident.TridentKafkaEmitter$2.emitPartitionBatchNew(TridentKafkaEmitter.java:233)
>  ~stormjar.jar:na 
> at 
> storm.kafka.trident.TridentKafkaEmitter$2.emitPartitionBatchNew(TridentKafkaEmitter.java:225)
>  ~stormjar.jar:na 
> at 
> storm.trident.spout.PartitionedTridentSpoutExecutor$Emitter$1.init(PartitionedTridentSpoutExecutor.java:125)
>  ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 
> at 
> storm.trident.topology.state.RotatingTransactionalState.getState(RotatingTransactionalState.java:83)
>  ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 
> at 
> storm.trident.topology.state.RotatingTransactionalState.getStateOrCreate(RotatingTransactionalState.java:110)
>  ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 
> at 
> storm.trident.spout.PartitionedTridentSpoutExecutor$Emitter.emitBatch(PartitionedTridentSpoutExecutor.java:121)
>  ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 
> at 
> storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:82)
>  ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 
> at 
> storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:369)
>  ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 
> at 
> backtype.storm.daemon.executor$fn_4195$tuple_action_fn_4197.invoke(executor.clj:630)
>  ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 
> at 
> backtype.storm.daemon.executor$mk_task_receiver$fn__4118.invoke(executor.clj:398)
>  ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 
> at 
> backtype.storm.disruptor$clojure_handler$reify__723.onEvent(disruptor.clj:58) 
> ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 
> at 
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:99)
>  ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 
> ... 6 common frames omitted 
> 2014-12-04 18:38:03 b.s.d.executor ERROR 
> java.lang.RuntimeException: storm.kafka.UpdateOffsetException 
> at 
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:107)
>  ~[storm-core-0.9.1.2.1.7.0



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to