[
https://issues.apache.org/jira/browse/STORM-2281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15820548#comment-15820548
]
Jungtaek Lim commented on STORM-2281:
-------------------------------------
Trident implementation of storm-kafka-client is only applied to 1.x and master
branch, and none of version line is released yet.
Did you build your own Storm with 1.x or master branch? If then please change
Affects Version/s accordingly.
And could you reproduce this with Storm 1.0.2 or Storm 1.x branch? I'm not sure
storm-kafka-client works with Storm 0.x version line.
> Running Multiple Kafka Spouts (Trident) we are getting Illegal State Exception
> ------------------------------------------------------------------------------
>
> Key: STORM-2281
> URL: https://issues.apache.org/jira/browse/STORM-2281
> Project: Apache Storm
> Issue Type: Bug
> Components: storm-kafka-client, trident
> Affects Versions: 1.0.0
> Environment: centos mac
> Reporter: Narendra Bidari
> Priority: Critical
> Labels: kafka, trident
> Attachments: Screen Shot 2017-01-09 at 8.54.17 PM.png
>
>
> For Kafka Spout New Consumer in Trident, if we increase the spout parallelism
> more than one then we can see that the below error happens
> It is reproducible most of the times, it it does not then just kill and
> restart topology. (if spout parallelism is 1 there is no problem, it only
> happens with multiple spouts)
> Steps to Reproduce:
> 1. Create a Spout Only Trident Topology (or read write topology)
> 2. Create a topic with multiple partition (2 or more)
> 3. Pump some data and try to read with parallelism of 2 or more
> ERROR b.s.util - Async loop died!
> java.lang.RuntimeException: java.lang.IllegalStateException: No current
> assignment for partition narendra44-1
> at
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:135)
> ~[storm-core-0.10.0.jar:0.10.0]
> at
> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:106)
> ~[storm-core-0.10.0.jar:0.10.0]
> at
> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
> ~[storm-core-0.10.0.jar:0.10.0]
> at
> backtype.storm.daemon.executor$fn__5694$fn__5707$fn__5758.invoke(executor.clj:819)
> ~[storm-core-0.10.0.jar:0.10.0]
> at backtype.storm.util$async_loop$fn__545.invoke(util.clj:479)
> [storm-core-0.10.0.jar:0.10.0]
> at clojure.lang.AFn.run(AFn.java:22) [clojure-1.6.0.jar:?]
> at java.lang.Thread.run(Thread.java:745) [?:1.7.0_79]
> Caused by: java.lang.IllegalStateException: No current assignment for
> partition narendra44-1
> at
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:220)
> ~[kafka-clients-0.9.0.1.jar:?]
> at
> org.apache.kafka.clients.consumer.internals.SubscriptionState.needOffsetReset(SubscriptionState.java:284)
> ~[kafka-clients-0.9.0.1.jar:?]
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.seekToEnd(KafkaConsumer.java:1079)
> ~[kafka-clients-0.9.0.1.jar:?]
> at
> org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.seek(KafkaTridentSpoutEmitter.java:146)
> ~[classes/:?]
> at
> org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:89)
> ~[classes/:?]
> at
> org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:1)
> ~[classes/:?]
> at
> storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTridentSpoutExecutor.java:127)
> ~[storm-core-0.10.0.jar:0.10.0]
> at
> storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:82)
> ~[storm-core-0.10.0.jar:0.10.0]
> at
> storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:370)
> ~[storm-core-0.10.0.jar:0.10.0]
> at
> backtype.storm.daemon.executor$fn__5694$tuple_action_fn__5696.invoke(executor.clj:690)
> ~[storm-core-0.10.0.jar:0.10.0]
> at
> backtype.storm.daemon.executor$mk_task_receiver$fn__5615.invoke(executor.clj:436)
> ~[storm-core-0.10.0.jar:0.10.0]
> at
> backtype.storm.disruptor$clojure_handler$reify__5189.onEvent(disruptor.clj:58)
> ~[storm-core-0.10.0.jar:0.10.0]
> at
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:132)
> ~[storm-core-0.10.0.jar:0.10.0]
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)