[ https://issues.apache.org/jira/browse/STORM-586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14236589#comment-14236589 ]
ASF GitHub Bot commented on STORM-586: -------------------------------------- Github user harshach commented on a diff in the pull request: https://github.com/apache/storm/pull/339#discussion_r21414360 --- Diff: external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java --- @@ -17,6 +17,9 @@ */ package storm.kafka; -public class UpdateOffsetException extends RuntimeException { --- End diff -- I think UpdateOffsetException shouldn't extend FailedFetchException . They meant to be different exceptions . The problem here is if a method throws UpdateOffsetException and there is a catch for FailedFetchException its get caught in that catch but we want to have different behavior for FailedFetchException and UpdateOffsetException. Also UpdateOffsetException can be named differently, may be TopicOffsetOutOfRangeException and for any other errors use FailedFetchException. > Trident kafka spout fails instead of updating offset when kafka offset is out > of range. > --------------------------------------------------------------------------------------- > > Key: STORM-586 > URL: https://issues.apache.org/jira/browse/STORM-586 > Project: Apache Storm > Issue Type: Bug > Affects Versions: 0.9.3 > Reporter: Parth Brahmbhatt > Assignee: Parth Brahmbhatt > Priority: Critical > > Trident KafkaEmitter does not catch the newly added UpdateOffsetException > which results in the spout failing repeatedly instead of automatically > updating the offset to earliest time. > PROBLEM: > Exception while using the Trident Kafka Spout. > 2014-12-04 18:38:03 b.s.util ERROR Async loop died! > java.lang.RuntimeException: storm.kafka.UpdateOffsetException > at > backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:107) > ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 > at > backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:78) > ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 > at > backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:77) > ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 > at > backtype.storm.daemon.executor$fn_4195$fn4207$fn_4254.invoke(executor.clj:745) > ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 > at backtype.storm.util$async_loop$fn__442.invoke(util.clj:436) > ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 > at clojure.lang.AFn.run(AFn.java:24) clojure-1.4.0.jar:na > at java.lang.Thread.run(Thread.java:745) na:1.7.0_71 > Caused by: storm.kafka.UpdateOffsetException: null > at storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:186) ~stormjar.jar:na > at > storm.kafka.trident.TridentKafkaEmitter.fetchMessages(TridentKafkaEmitter.java:132) > ~stormjar.jar:na > at > storm.kafka.trident.TridentKafkaEmitter.doEmitNewPartitionBatch(TridentKafkaEmitter.java:113) > ~stormjar.jar:na > at > storm.kafka.trident.TridentKafkaEmitter.failFastEmitNewPartitionBatch(TridentKafkaEmitter.java:72) > ~stormjar.jar:na > at > storm.kafka.trident.TridentKafkaEmitter.access$400(TridentKafkaEmitter.java:46) > ~stormjar.jar:na > at > storm.kafka.trident.TridentKafkaEmitter$2.emitPartitionBatchNew(TridentKafkaEmitter.java:233) > ~stormjar.jar:na > at > storm.kafka.trident.TridentKafkaEmitter$2.emitPartitionBatchNew(TridentKafkaEmitter.java:225) > ~stormjar.jar:na > at > storm.trident.spout.PartitionedTridentSpoutExecutor$Emitter$1.init(PartitionedTridentSpoutExecutor.java:125) > ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 > at > storm.trident.topology.state.RotatingTransactionalState.getState(RotatingTransactionalState.java:83) > ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 > at > storm.trident.topology.state.RotatingTransactionalState.getStateOrCreate(RotatingTransactionalState.java:110) > ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 > at > storm.trident.spout.PartitionedTridentSpoutExecutor$Emitter.emitBatch(PartitionedTridentSpoutExecutor.java:121) > ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 > at > storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:82) > ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 > at > storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:369) > ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 > at > backtype.storm.daemon.executor$fn_4195$tuple_action_fn_4197.invoke(executor.clj:630) > ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 > at > backtype.storm.daemon.executor$mk_task_receiver$fn__4118.invoke(executor.clj:398) > ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 > at > backtype.storm.disruptor$clojure_handler$reify__723.onEvent(disruptor.clj:58) > ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 > at > backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:99) > ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 > ... 6 common frames omitted > 2014-12-04 18:38:03 b.s.d.executor ERROR > java.lang.RuntimeException: storm.kafka.UpdateOffsetException > at > backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:107) > ~[storm-core-0.9.1.2.1.7.0 -- This message was sent by Atlassian JIRA (v6.3.4#6332)