Narendra Bidari created STORM-2281:
--------------------------------------
Summary: Running Multiple Spouts (Trident) we are getting Illegal
State Exception
Key: STORM-2281
URL: https://issues.apache.org/jira/browse/STORM-2281
Project: Apache Storm
Issue Type: Bug
Components: storm-kafka-client, trident
Affects Versions: 1.0.0
Environment: centos mac
Reporter: Narendra Bidari
Priority: Critical
Attachments: Screen Shot 2017-01-09 at 8.54.17 PM.png
For Kafka Spout New Consumer in Trident, if we increase the spout parallelism
more than one then we can see that the below error happens
It is reproducible most of the times, it it does not then just kill and restart
topology. (if spout parallelism is 1 there is no problem, it only happens with
multiple spouts)
Steps to Reproduce:
1. Create a Spout Only Trident Topology (or read write topology)
2. Create a topic with multiple partition (2 or more)
3. Pump some data and try to read with parallelism of 2 or more
ERROR b.s.util - Async loop died!
java.lang.RuntimeException: java.lang.IllegalStateException: No current
assignment for partition narendra44-1
at
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:135)
~[storm-core-0.10.0.jar:0.10.0]
at
backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:106)
~[storm-core-0.10.0.jar:0.10.0]
at
backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
~[storm-core-0.10.0.jar:0.10.0]
at
backtype.storm.daemon.executor$fn__5694$fn__5707$fn__5758.invoke(executor.clj:819)
~[storm-core-0.10.0.jar:0.10.0]
at backtype.storm.util$async_loop$fn__545.invoke(util.clj:479)
[storm-core-0.10.0.jar:0.10.0]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.6.0.jar:?]
at java.lang.Thread.run(Thread.java:745) [?:1.7.0_79]
Caused by: java.lang.IllegalStateException: No current assignment for partition
narendra44-1
at
org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:220)
~[kafka-clients-0.9.0.1.jar:?]
at
org.apache.kafka.clients.consumer.internals.SubscriptionState.needOffsetReset(SubscriptionState.java:284)
~[kafka-clients-0.9.0.1.jar:?]
at
org.apache.kafka.clients.consumer.KafkaConsumer.seekToEnd(KafkaConsumer.java:1079)
~[kafka-clients-0.9.0.1.jar:?]
at
org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.seek(KafkaTridentSpoutEmitter.java:146)
~[classes/:?]
at
org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:89)
~[classes/:?]
at
org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:1)
~[classes/:?]
at
storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTridentSpoutExecutor.java:127)
~[storm-core-0.10.0.jar:0.10.0]
at
storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:82)
~[storm-core-0.10.0.jar:0.10.0]
at
storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:370)
~[storm-core-0.10.0.jar:0.10.0]
at
backtype.storm.daemon.executor$fn__5694$tuple_action_fn__5696.invoke(executor.clj:690)
~[storm-core-0.10.0.jar:0.10.0]
at
backtype.storm.daemon.executor$mk_task_receiver$fn__5615.invoke(executor.clj:436)
~[storm-core-0.10.0.jar:0.10.0]
at
backtype.storm.disruptor$clojure_handler$reify__5189.onEvent(disruptor.clj:58)
~[storm-core-0.10.0.jar:0.10.0]
at
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:132)
~[storm-core-0.10.0.jar:0.10.0]
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)