[ https://issues.apache.org/jira/browse/STORM-586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14252638#comment-14252638 ]
ASF GitHub Bot commented on STORM-586: -------------------------------------- Github user nathanmarz commented on the pull request: https://github.com/apache/storm/pull/339#issuecomment-67581379 -1 Why are offset out of range exceptions happening when re-emitting a batch? That shouldn't be possible so I'd like to know why this is happening. Emitting a different batch than the first time actually breaks the contract of transactional spouts – which is unacceptable (though it's fine for opaque transactional spouts). Because it breaks the contract of transactional spouts, it would be better to just error repeatedly than silently do the wrong thing. The part of this code that catches the error and emits an empty batch for the same batch id should only be applied towards the opaque spout. Additionally, by better understanding why those offset out of range exceptions are happening, there may be an alternative solution that doesn't involve breaking the contract of transactional spouts. > Trident kafka spout fails instead of updating offset when kafka offset is out > of range. > --------------------------------------------------------------------------------------- > > Key: STORM-586 > URL: https://issues.apache.org/jira/browse/STORM-586 > Project: Apache Storm > Issue Type: Bug > Affects Versions: 0.9.3 > Reporter: Parth Brahmbhatt > Assignee: Parth Brahmbhatt > Priority: Critical > > Trident KafkaEmitter does not catch the newly added UpdateOffsetException > which results in the spout failing repeatedly instead of automatically > updating the offset to earliest time. > PROBLEM: > Exception while using the Trident Kafka Spout. > 2014-12-04 18:38:03 b.s.util ERROR Async loop died! > java.lang.RuntimeException: storm.kafka.UpdateOffsetException > at > backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:107) > ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 > at > backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:78) > ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 > at > backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:77) > ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 > at > backtype.storm.daemon.executor$fn_4195$fn4207$fn_4254.invoke(executor.clj:745) > ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 > at backtype.storm.util$async_loop$fn__442.invoke(util.clj:436) > ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 > at clojure.lang.AFn.run(AFn.java:24) clojure-1.4.0.jar:na > at java.lang.Thread.run(Thread.java:745) na:1.7.0_71 > Caused by: storm.kafka.UpdateOffsetException: null > at storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:186) ~stormjar.jar:na > at > storm.kafka.trident.TridentKafkaEmitter.fetchMessages(TridentKafkaEmitter.java:132) > ~stormjar.jar:na > at > storm.kafka.trident.TridentKafkaEmitter.doEmitNewPartitionBatch(TridentKafkaEmitter.java:113) > ~stormjar.jar:na > at > storm.kafka.trident.TridentKafkaEmitter.failFastEmitNewPartitionBatch(TridentKafkaEmitter.java:72) > ~stormjar.jar:na > at > storm.kafka.trident.TridentKafkaEmitter.access$400(TridentKafkaEmitter.java:46) > ~stormjar.jar:na > at > storm.kafka.trident.TridentKafkaEmitter$2.emitPartitionBatchNew(TridentKafkaEmitter.java:233) > ~stormjar.jar:na > at > storm.kafka.trident.TridentKafkaEmitter$2.emitPartitionBatchNew(TridentKafkaEmitter.java:225) > ~stormjar.jar:na > at > storm.trident.spout.PartitionedTridentSpoutExecutor$Emitter$1.init(PartitionedTridentSpoutExecutor.java:125) > ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 > at > storm.trident.topology.state.RotatingTransactionalState.getState(RotatingTransactionalState.java:83) > ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 > at > storm.trident.topology.state.RotatingTransactionalState.getStateOrCreate(RotatingTransactionalState.java:110) > ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 > at > storm.trident.spout.PartitionedTridentSpoutExecutor$Emitter.emitBatch(PartitionedTridentSpoutExecutor.java:121) > ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 > at > storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:82) > ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 > at > storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:369) > ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 > at > backtype.storm.daemon.executor$fn_4195$tuple_action_fn_4197.invoke(executor.clj:630) > ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 > at > backtype.storm.daemon.executor$mk_task_receiver$fn__4118.invoke(executor.clj:398) > ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 > at > backtype.storm.disruptor$clojure_handler$reify__723.onEvent(disruptor.clj:58) > ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 > at > backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:99) > ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 > ... 6 common frames omitted > 2014-12-04 18:38:03 b.s.d.executor ERROR > java.lang.RuntimeException: storm.kafka.UpdateOffsetException > at > backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:107) > ~[storm-core-0.9.1.2.1.7.0 -- This message was sent by Atlassian JIRA (v6.3.4#6332)