E. Sammer created KAFKA-2284:
--------------------------------
Summary: ConsumerRebalanceListener receives wrong type in
partitionOwnership values
Key: KAFKA-2284
URL: https://issues.apache.org/jira/browse/KAFKA-2284
Project: Kafka
Issue Type: Bug
Components: consumer
Affects Versions: 0.8.2.0
Reporter: E. Sammer
Assignee: Neha Narkhede
Priority: Blocker
The ConsumerRebalanceListener's beforeReleasingPartitions() method is supposed
to receive an arg of Map<String, Set<Integer>> (topic -> Set(partitions)). Even
though the type of the map value is specified as a java.util.Set, a
scala.collection.convert.Wrappers$JSetWrapper is passed instead which does not
implement Set<T> causing a class cast exception as soon as one attempts to
access any value of the map. It looks as if this method was never tested
against the actual types specified by the interface.
Here's what happens if you call {{Set<T> foo = partitionOwnership.get(topic)}}:
{code}
2015-06-18 07:28:43,776
(search-consumer_esammer-mbp.local-1434637723383-12126c1b_watcher_executor)
[WARN -
com.rocana.search.consumer.ConsumerService$1.beforeReleasingPartitions(ConsumerService.java:246)]
Exception while rebalancing!
java.lang.ClassCastException: scala.collection.convert.Wrappers$JSetWrapper
cannot be cast to java.util.Set
at
com.rocana.search.consumer.IndexConsumerWorker.onRebalance(IndexConsumerWorker.java:80)
at
com.rocana.search.consumer.ConsumerService$1.beforeReleasingPartitions(ConsumerService.java:244)
at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:675)
at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1$$anonfun$apply$mcV$sp$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:625)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:142)
at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:619)
at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:616)
at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:616)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:615)
at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:568)
{code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)