[
https://issues.apache.org/jira/browse/STORM-1367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Rakesh Surendra reassigned STORM-1367:
--------------------------------------
Assignee: Rakesh Surendra
> Issues with OpaqueTridentKafkaSpout - TridentKafkaConfig getting
> (java.lang.RuntimeException: kafka.common.OffsetOutOfRangeException)
> -------------------------------------------------------------------------------------------------------------------------------------
>
> Key: STORM-1367
> URL: https://issues.apache.org/jira/browse/STORM-1367
> Project: Apache Storm
> Issue Type: Bug
> Components: storm-core, storm-kafka, storm-multilang
> Affects Versions: 0.10.0
> Reporter: Rakesh Surendra
> Assignee: Rakesh Surendra
> Labels: patch, performance
>
> I'm using trident topology with OpaqueTridentKafkaSpout.
> Code snippet of TridentKafkaConfig i’m using :-
> OpaqueTridentKafkaSpout kafkaSpout = null;
> TridentKafkaConfig spoutConfig = new TridentKafkaConfig(new
> ZkHosts("xxx.x.x.9:2181,xxx.x.x.1:2181,xxx.x.x.2:2181"), "topic_name");
> spoutConfig.scheme = new SchemeAsMultiScheme(new
> StringScheme());
> spoutConfig.forceFromStart = true;
> spoutConfig.fetchSizeBytes = 147483600;
> kafkaSpout = new OpaqueTridentKafkaSpout(spoutConfig);
> I get this runtime exception from one of the workers :-
> java.lang.RuntimeException: storm.kafka.UpdateOffsetException at
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:135)
> at
> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:106)
> at
> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
> at
> backtype.storm.daemon.executor$fn__5694$fn__5707$fn__5758.invoke(executor.clj:819)
> at backtype.storm.util$async_loop$fn__545.invoke(util.clj:479) at
> clojure.lang.AFn.run(AFn.java:22) at java.lang.Thread.run(Thread.java:745)
> Caused by: storm.kafka.UpdateOffsetException at
> storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:186) at
> storm.kafka.trident.TridentKafkaEmitter.fetchMessages(TridentKafkaEmitter.java:132)
> at
> storm.kafka.trident.TridentKafkaEmitter.doEmitNewPartitionBatch(TridentKafkaEmitter.java:113)
> at
> storm.kafka.trident.TridentKafkaEmitter.failFastEmitNewPartitionBatch(TridentKafkaEmitter.java:72)
> at
> storm.kafka.trident.TridentKafkaEmitter.emitNewPartitionBatch(TridentKafkaEmitter.java:79)
> at
> storm.kafka.trident.TridentKafkaEmitter.access$000(TridentKafkaEmitter.java:46)
> at
> storm.kafka.trident.TridentKafkaEmitter$1.emitPartitionBatch(TridentKafkaEmitter.java:204)
> at
> storm.kafka.trident.TridentKafkaEmitter$1.emitPartitionBatch(TridentKafkaEmitter.java:194)
> at
> storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTridentSpoutExecutor.java:127)
> at
> storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:82)
> at
> storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:370)
> at
> backtype.storm.daemon.executor$fn__5694$tuple_action_fn__5696.invoke(executor.clj:690)
> at
> backtype.storm.daemon.executor$mk_task_receiver$fn__5615.invoke(executor.clj:436)
> at
> backtype.storm.disruptor$clojure_handler$reify__5189.onEvent(disruptor.clj:58)
> at
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:127)
> ... 6 more
> But when i set spoutConfig.forceFromStart = true... It works just fine. I
> need a trident topology to give out accurate exactly-once processing even
> when the topology is restarted.
> As per some posts, I have tried setting spoutConfig :-
> spoutConfig.maxOffsetBehind = Long.MAX_VALUE;
> spoutConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
> My Kafka retention time is default - 128 Hours i.e. 7 Days and kafka producer
> is sending 6800 messages/second to Storm/Trident topology. I have gone
> through most of the posts, but none of them seem to solve this issue.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)