[ 
https://issues.apache.org/jira/browse/STORM-586?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rakesh Surendra reopened STORM-586:
-----------------------------------

Updated storm version from 0.9.5 to 0.10, but my spout throws 
storm.kafka.UpdateOffsetException. 

{code}
java.lang.RuntimeException: storm.kafka.UpdateOffsetException at 
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:135)
 at 
backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:106)
 at 
backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) 
at 
backtype.storm.daemon.executor$fn__5694$fn__5707$fn__5758.invoke(executor.clj:819)
 at backtype.storm.util$async_loop$fn__545.invoke(util.clj:479) at 
clojure.lang.AFn.run(AFn.java:22) at java.lang.Thread.run(Thread.java:745) 
Caused by: storm.kafka.UpdateOffsetException at 
storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:186) at 
storm.kafka.trident.TridentKafkaEmitter.fetchMessages(TridentKafkaEmitter.java:132)
 at 
storm.kafka.trident.TridentKafkaEmitter.doEmitNewPartitionBatch(TridentKafkaEmitter.java:113)
 at 
storm.kafka.trident.TridentKafkaEmitter.failFastEmitNewPartitionBatch(TridentKafkaEmitter.java:72)
 at 
storm.kafka.trident.TridentKafkaEmitter.emitNewPartitionBatch(TridentKafkaEmitter.java:79)
 at 
storm.kafka.trident.TridentKafkaEmitter.access$000(TridentKafkaEmitter.java:46) 
at 
storm.kafka.trident.TridentKafkaEmitter$1.emitPartitionBatch(TridentKafkaEmitter.java:204)
 at 
storm.kafka.trident.TridentKafkaEmitter$1.emitPartitionBatch(TridentKafkaEmitter.java:194)
 at 
storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTridentSpoutExecutor.java:127)
 at 
storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:82) 
at 
storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:370)
 at 
backtype.storm.daemon.executor$fn__5694$tuple_action_fn__5696.invoke(executor.clj:690)
 at 
backtype.storm.daemon.executor$mk_task_receiver$fn__5615.invoke(executor.clj:436)
 at 
backtype.storm.disruptor$clojure_handler$reify__5189.onEvent(disruptor.clj:58) 
at 
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:127)
 ... 6 more
{code}

> Trident kafka spout fails instead of updating offset when kafka offset is out 
> of range.
> ---------------------------------------------------------------------------------------
>
>                 Key: STORM-586
>                 URL: https://issues.apache.org/jira/browse/STORM-586
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-kafka
>    Affects Versions: 0.9.3
>            Reporter: Parth Brahmbhatt
>            Assignee: Parth Brahmbhatt
>            Priority: Critical
>             Fix For: 0.10.0
>
>
> Trident KafkaEmitter does not catch the newly added UpdateOffsetException 
> which results in the spout failing repeatedly instead of automatically 
> updating the offset to earliest time. 
> PROBLEM:  Exception while using the Trident Kafka Spout.
> {code}
> 2014-12-04 18:38:03 b.s.util ERROR Async loop died! 
> java.lang.RuntimeException: storm.kafka.UpdateOffsetException 
> at 
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:107)
>  ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 
> at 
> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:78)
>  ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 
> at 
> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:77)
>  ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 
> at 
> backtype.storm.daemon.executor$fn_4195$fn4207$fn_4254.invoke(executor.clj:745)
>  ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 
> at backtype.storm.util$async_loop$fn__442.invoke(util.clj:436) 
> ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 
> at clojure.lang.AFn.run(AFn.java:24) clojure-1.4.0.jar:na 
> at java.lang.Thread.run(Thread.java:745) na:1.7.0_71 
> Caused by: storm.kafka.UpdateOffsetException: null 
> at storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:186) ~stormjar.jar:na 
> at 
> storm.kafka.trident.TridentKafkaEmitter.fetchMessages(TridentKafkaEmitter.java:132)
>  ~stormjar.jar:na 
> at 
> storm.kafka.trident.TridentKafkaEmitter.doEmitNewPartitionBatch(TridentKafkaEmitter.java:113)
>  ~stormjar.jar:na 
> at 
> storm.kafka.trident.TridentKafkaEmitter.failFastEmitNewPartitionBatch(TridentKafkaEmitter.java:72)
>  ~stormjar.jar:na 
> at 
> storm.kafka.trident.TridentKafkaEmitter.access$400(TridentKafkaEmitter.java:46)
>  ~stormjar.jar:na 
> at 
> storm.kafka.trident.TridentKafkaEmitter$2.emitPartitionBatchNew(TridentKafkaEmitter.java:233)
>  ~stormjar.jar:na 
> at 
> storm.kafka.trident.TridentKafkaEmitter$2.emitPartitionBatchNew(TridentKafkaEmitter.java:225)
>  ~stormjar.jar:na 
> at 
> storm.trident.spout.PartitionedTridentSpoutExecutor$Emitter$1.init(PartitionedTridentSpoutExecutor.java:125)
>  ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 
> at 
> storm.trident.topology.state.RotatingTransactionalState.getState(RotatingTransactionalState.java:83)
>  ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 
> at 
> storm.trident.topology.state.RotatingTransactionalState.getStateOrCreate(RotatingTransactionalState.java:110)
>  ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 
> at 
> storm.trident.spout.PartitionedTridentSpoutExecutor$Emitter.emitBatch(PartitionedTridentSpoutExecutor.java:121)
>  ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 
> at 
> storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:82)
>  ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 
> at 
> storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:369)
>  ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 
> at 
> backtype.storm.daemon.executor$fn_4195$tuple_action_fn_4197.invoke(executor.clj:630)
>  ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 
> at 
> backtype.storm.daemon.executor$mk_task_receiver$fn__4118.invoke(executor.clj:398)
>  ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 
> at 
> backtype.storm.disruptor$clojure_handler$reify__723.onEvent(disruptor.clj:58) 
> ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 
> at 
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:99)
>  ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 
> ... 6 common frames omitted 
> 2014-12-04 18:38:03 b.s.d.executor ERROR 
> java.lang.RuntimeException: storm.kafka.UpdateOffsetException 
> at 
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:107)
>  ~[storm-core-0.9.1.2.1.7.0
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to