[
https://issues.apache.org/jira/browse/STORM-586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14238639#comment-14238639
]
ASF GitHub Bot commented on STORM-586:
--------------------------------------
GitHub user Parth-Brahmbhatt reopened a pull request:
https://github.com/apache/storm/pull/339
STORM-586: TridentKafkaEmitter should catch updateOffsetException.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/Parth-Brahmbhatt/incubator-storm STORM-586
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/storm/pull/339.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #339
----
commit 65e9f0c814b2cddc772880042259b66194fd6fb7
Author: Parth Brahmbhatt <[email protected]>
Date: 2014-12-05T22:48:34Z
STORM-586: TridentKafkaEmitter should catch updateOffsetException.
commit 86839dc6b789045a13cf28cba008e52c4d835fa4
Author: Parth Brahmbhatt <[email protected]>
Date: 2014-12-08T22:49:29Z
Ading special case for retry batch, in case of trident a transaction retry
should not jump the offset requested as part of retry.
commit b2f48b41f19398498c7ae41c2059f3685b87ac22
Author: Parth Brahmbhatt <[email protected]>
Date: 2014-12-08T23:06:26Z
Renaming UpdateOffsetException to TopicOffsetOutOfRangeException.
commit fcf31350b62ca0efeeea96c8e1b0134edb81c1eb
Author: Parth Brahmbhatt <[email protected]>
Date: 2014-12-08T23:10:13Z
Reverting back to TopicOffsetOutOfRangeException extends RunTimeException.
----
> Trident kafka spout fails instead of updating offset when kafka offset is out
> of range.
> ---------------------------------------------------------------------------------------
>
> Key: STORM-586
> URL: https://issues.apache.org/jira/browse/STORM-586
> Project: Apache Storm
> Issue Type: Bug
> Affects Versions: 0.9.3
> Reporter: Parth Brahmbhatt
> Assignee: Parth Brahmbhatt
> Priority: Critical
>
> Trident KafkaEmitter does not catch the newly added UpdateOffsetException
> which results in the spout failing repeatedly instead of automatically
> updating the offset to earliest time.
> PROBLEM:
> Exception while using the Trident Kafka Spout.
> 2014-12-04 18:38:03 b.s.util ERROR Async loop died!
> java.lang.RuntimeException: storm.kafka.UpdateOffsetException
> at
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:107)
> ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784
> at
> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:78)
> ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784
> at
> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:77)
> ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784
> at
> backtype.storm.daemon.executor$fn_4195$fn4207$fn_4254.invoke(executor.clj:745)
> ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784
> at backtype.storm.util$async_loop$fn__442.invoke(util.clj:436)
> ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784
> at clojure.lang.AFn.run(AFn.java:24) clojure-1.4.0.jar:na
> at java.lang.Thread.run(Thread.java:745) na:1.7.0_71
> Caused by: storm.kafka.UpdateOffsetException: null
> at storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:186) ~stormjar.jar:na
> at
> storm.kafka.trident.TridentKafkaEmitter.fetchMessages(TridentKafkaEmitter.java:132)
> ~stormjar.jar:na
> at
> storm.kafka.trident.TridentKafkaEmitter.doEmitNewPartitionBatch(TridentKafkaEmitter.java:113)
> ~stormjar.jar:na
> at
> storm.kafka.trident.TridentKafkaEmitter.failFastEmitNewPartitionBatch(TridentKafkaEmitter.java:72)
> ~stormjar.jar:na
> at
> storm.kafka.trident.TridentKafkaEmitter.access$400(TridentKafkaEmitter.java:46)
> ~stormjar.jar:na
> at
> storm.kafka.trident.TridentKafkaEmitter$2.emitPartitionBatchNew(TridentKafkaEmitter.java:233)
> ~stormjar.jar:na
> at
> storm.kafka.trident.TridentKafkaEmitter$2.emitPartitionBatchNew(TridentKafkaEmitter.java:225)
> ~stormjar.jar:na
> at
> storm.trident.spout.PartitionedTridentSpoutExecutor$Emitter$1.init(PartitionedTridentSpoutExecutor.java:125)
> ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784
> at
> storm.trident.topology.state.RotatingTransactionalState.getState(RotatingTransactionalState.java:83)
> ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784
> at
> storm.trident.topology.state.RotatingTransactionalState.getStateOrCreate(RotatingTransactionalState.java:110)
> ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784
> at
> storm.trident.spout.PartitionedTridentSpoutExecutor$Emitter.emitBatch(PartitionedTridentSpoutExecutor.java:121)
> ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784
> at
> storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:82)
> ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784
> at
> storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:369)
> ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784
> at
> backtype.storm.daemon.executor$fn_4195$tuple_action_fn_4197.invoke(executor.clj:630)
> ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784
> at
> backtype.storm.daemon.executor$mk_task_receiver$fn__4118.invoke(executor.clj:398)
> ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784
> at
> backtype.storm.disruptor$clojure_handler$reify__723.onEvent(disruptor.clj:58)
> ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784
> at
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:99)
> ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784
> ... 6 common frames omitted
> 2014-12-04 18:38:03 b.s.d.executor ERROR
> java.lang.RuntimeException: storm.kafka.UpdateOffsetException
> at
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:107)
> ~[storm-core-0.9.1.2.1.7.0
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)