[ 
https://issues.apache.org/jira/browse/STORM-2281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Narendra Bidari updated STORM-2281:
-----------------------------------
    Description: 
For Kafka Spout New Consumer in Trident, if we increase the spout parallelism 
more than one then we can see that the below error happens

It is reproducible most of the times, it it does not then just kill and restart 
topology.  (if spout parallelism is 1 there is no problem, it only happens with 
multiple spouts)

Steps to Reproduce:
1. Create a Spout Only Trident Topology (or read write topology)
2. Create a topic with multiple partition (2 or more) 
3. Pump some data and try to read with parallelism of 2 or more


No current assignment for partition input-1 
at 
org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:464)
 ~[storm-core-1.0.2.jar:1.0.2] 
at 
org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:430)
 ~[storm-core-1.0.2.jar:1.0.2] 
at 
org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:73)
 ~[storm-core-1.0.2.jar:1.0.2] 
at 
org.apache.storm.daemon.executor$fn__8058$fn__8071$fn__8124.invoke(executor.clj:850)
 ~[storm-core-1.0.2.jar:1.0.2] 
at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484) 
[storm-core-1.0.2.jar:1.0.2] 
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] 
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_77] 
Caused by: java.lang.IllegalStateException: No current assignment for partition 
input-1 
at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:231)
 ~[kafka-clients-0.10.0.0.jar:?] 
at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:256)
 ~[kafka-clients-0.10.0.0.jar:?] 
at 
org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1134) 
~[kafka-clients-0.10.0.0.jar:?] 
at 
org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.seek(KafkaTridentSpoutEmitter.java:139)
 ~[storm-kafka-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] 
at 
org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:88)
 ~[storm-kafka-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] 
at 
org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:47)
 ~[storm-kafka-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] 
at 
org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTridentSpoutExecutor.java:128)
 ~[storm-core-1.0.2.jar:1.0.2] 
at 
org.apache.storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:82)
 ~[storm-core-1.0.2.jar:1.0.2]

  was:
For Kafka Spout New Consumer in Trident, if we increase the spout parallelism 
more than one then we can see that the below error happens

It is reproducible most of the times, it it does not then just kill and restart 
topology.  (if spout parallelism is 1 there is no problem, it only happens with 
multiple spouts)

Steps to Reproduce:
1. Create a Spout Only Trident Topology (or read write topology)
2. Create a topic with multiple partition (2 or more) 
3. Pump some data and try to read with parallelism of 2 or more


ERROR b.s.util - Async loop died! 
java.lang.RuntimeException: java.lang.IllegalStateException: No current 
assignment for partition narendra44-1 
at 
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:135)
 ~[storm-core-0.10.0.jar:0.10.0] 
at 
backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:106)
 ~[storm-core-0.10.0.jar:0.10.0] 
at 
backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) 
~[storm-core-0.10.0.jar:0.10.0] 
at 
backtype.storm.daemon.executor$fn__5694$fn__5707$fn__5758.invoke(executor.clj:819)
 ~[storm-core-0.10.0.jar:0.10.0] 
at backtype.storm.util$async_loop$fn__545.invoke(util.clj:479) 
[storm-core-0.10.0.jar:0.10.0] 
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.6.0.jar:?] 
at java.lang.Thread.run(Thread.java:745) [?:1.7.0_79] 
Caused by: java.lang.IllegalStateException: No current assignment for partition 
narendra44-1 
at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:220)
 ~[kafka-clients-0.9.0.1.jar:?] 
at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.needOffsetReset(SubscriptionState.java:284)
 ~[kafka-clients-0.9.0.1.jar:?] 
at 
org.apache.kafka.clients.consumer.KafkaConsumer.seekToEnd(KafkaConsumer.java:1079)
 ~[kafka-clients-0.9.0.1.jar:?] 
at 
org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.seek(KafkaTridentSpoutEmitter.java:146)
 ~[classes/:?] 
at 
org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:89)
 ~[classes/:?] 
at 
org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:1)
 ~[classes/:?] 
at 
storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTridentSpoutExecutor.java:127)
 ~[storm-core-0.10.0.jar:0.10.0] 
at 
storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:82) 
~[storm-core-0.10.0.jar:0.10.0] 
at 
storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:370)
 ~[storm-core-0.10.0.jar:0.10.0] 
at 
backtype.storm.daemon.executor$fn__5694$tuple_action_fn__5696.invoke(executor.clj:690)
 ~[storm-core-0.10.0.jar:0.10.0] 
at 
backtype.storm.daemon.executor$mk_task_receiver$fn__5615.invoke(executor.clj:436)
 ~[storm-core-0.10.0.jar:0.10.0] 
at 
backtype.storm.disruptor$clojure_handler$reify__5189.onEvent(disruptor.clj:58) 
~[storm-core-0.10.0.jar:0.10.0] 
at 
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:132)
 ~[storm-core-0.10.0.jar:0.10.0] 


> Running Multiple Kafka Spouts (Trident) we are getting Illegal State Exception
> ------------------------------------------------------------------------------
>
>                 Key: STORM-2281
>                 URL: https://issues.apache.org/jira/browse/STORM-2281
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-kafka-client, trident
>    Affects Versions: 1.0.0
>         Environment: centos mac
>            Reporter: Narendra Bidari
>            Priority: Critical
>              Labels: kafka, trident
>         Attachments: Screen Shot 2017-01-09 at 8.54.17 PM.png
>
>
> For Kafka Spout New Consumer in Trident, if we increase the spout parallelism 
> more than one then we can see that the below error happens
> It is reproducible most of the times, it it does not then just kill and 
> restart topology.  (if spout parallelism is 1 there is no problem, it only 
> happens with multiple spouts)
> Steps to Reproduce:
> 1. Create a Spout Only Trident Topology (or read write topology)
> 2. Create a topic with multiple partition (2 or more) 
> 3. Pump some data and try to read with parallelism of 2 or more
> No current assignment for partition input-1 
> at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:464)
>  ~[storm-core-1.0.2.jar:1.0.2] 
> at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:430)
>  ~[storm-core-1.0.2.jar:1.0.2] 
> at 
> org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:73)
>  ~[storm-core-1.0.2.jar:1.0.2] 
> at 
> org.apache.storm.daemon.executor$fn__8058$fn__8071$fn__8124.invoke(executor.clj:850)
>  ~[storm-core-1.0.2.jar:1.0.2] 
> at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484) 
> [storm-core-1.0.2.jar:1.0.2] 
> at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] 
> at java.lang.Thread.run(Thread.java:745) [?:1.8.0_77] 
> Caused by: java.lang.IllegalStateException: No current assignment for 
> partition input-1 
> at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:231)
>  ~[kafka-clients-0.10.0.0.jar:?] 
> at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:256)
>  ~[kafka-clients-0.10.0.0.jar:?] 
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1134) 
> ~[kafka-clients-0.10.0.0.jar:?] 
> at 
> org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.seek(KafkaTridentSpoutEmitter.java:139)
>  ~[storm-kafka-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] 
> at 
> org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:88)
>  ~[storm-kafka-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] 
> at 
> org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:47)
>  ~[storm-kafka-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] 
> at 
> org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTridentSpoutExecutor.java:128)
>  ~[storm-core-1.0.2.jar:1.0.2] 
> at 
> org.apache.storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:82)
>  ~[storm-core-1.0.2.jar:1.0.2]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to