[ 
https://issues.apache.org/jira/browse/STORM-2847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16284286#comment-16284286
 ] 

Stig Rohde Døssing edited comment on STORM-2847 at 12/8/17 9:50 PM:
--------------------------------------------------------------------

I think I have a guess as to what could be wrong. If the spout is deactivated 
and then reactivated, we replace the KafkaConsumer with a new one. 
Unfortunately 
https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscription.java#L58
 will ensure that we don't assign partitions to the new consumer unless the 
assignment changes, because we reuse the old ManualPartitionSubscription 
instance.

I'll look at fixing this in the next few days. Hopefully it's what's causing 
this.


was (Author: srdo):
I think I have a guess as to what could be wrong. If the spout is deactivated 
and then reactivated, we replace the KafkaConsumer with a new one. 
Unfortunately 
https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscription.java#L58
 will ensure that we don't assign partitions to the new consumer unless the 
assignment changes.

I'll look at fixing this in the next few days. Hopefully it's what's causing 
this.

> Exception thrown after rebalance IllegalArgumentException
> ---------------------------------------------------------
>
>                 Key: STORM-2847
>                 URL: https://issues.apache.org/jira/browse/STORM-2847
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-kafka-client
>    Affects Versions: 1.2.0
>            Reporter: Evan Rosebrook
>            Assignee: Stig Rohde Døssing
>
> After rebalance the storm-kafka-client spout attempts to check the current 
> position of partitions that are no longer assigned to the current spout. This 
> occurs in a topology with multiple spout instances.
> java.lang.IllegalArgumentException: You can only check the position for 
> partitions assigned to this consumer. at 
> org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1262)
>  at 
> org.apache.storm.kafka.spout.KafkaSpout.commitOffsetsForAckedTuples(KafkaSpout.java:473)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to