[ 
https://issues.apache.org/jira/browse/KAFKA-2413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14661177#comment-14661177
 ] 

Ashish K Singh commented on KAFKA-2413:
---------------------------------------

[~hachikuji] I was planning to play with it to get more accustomed with the new 
consumer's intricacies. If you have not already worked out a patch, then is it 
OK I try to fix it by tonight?

> New consumer's subscribe(Topic...) api fails if called more than once
> ---------------------------------------------------------------------
>
>                 Key: KAFKA-2413
>                 URL: https://issues.apache.org/jira/browse/KAFKA-2413
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>            Reporter: Ashish K Singh
>            Assignee: Ashish K Singh
>
> I believe new consumer is supposed to allow adding to existing topic 
> subscriptions. If it is then the issue is that on trying to subscribe to a 
> topic when consumer is already subscribed to a topic, below exception is 
> thrown.
> {code}
> [2015-08-06 16:06:48,591] ERROR [KafkaApi-2] error when handling request null 
> (kafka.server.KafkaApis:103)
> java.util.NoSuchElementException: key not found: topic
>       at scala.collection.MapLike$class.default(MapLike.scala:228)
>       at scala.collection.AbstractMap.default(Map.scala:58)
>       at scala.collection.MapLike$class.apply(MapLike.scala:141)
>       at scala.collection.AbstractMap.apply(Map.scala:58)
>       at 
> kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:109)
>       at 
> kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:108)
>       at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>       at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>       at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>       at 
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
>       at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
>       at kafka.coordinator.RangeAssignor.assign(PartitionAssignor.scala:108)
>       at 
> kafka.coordinator.ConsumerCoordinator.reassignPartitions(ConsumerCoordinator.scala:378)
>       at 
> kafka.coordinator.ConsumerCoordinator.rebalance(ConsumerCoordinator.scala:360)
>       at 
> kafka.coordinator.ConsumerCoordinator.onCompleteRebalance(ConsumerCoordinator.scala:414)
>       at 
> kafka.coordinator.DelayedRebalance.onComplete(DelayedRebalance.scala:39)
>       at 
> kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:72)
>       at 
> kafka.coordinator.DelayedRebalance$$anonfun$tryComplete$1.apply$mcZ$sp(DelayedRebalance.scala:37)
>       at 
> kafka.coordinator.ConsumerCoordinator.tryCompleteRebalance(ConsumerCoordinator.scala:388)
>       at 
> kafka.coordinator.DelayedRebalance.tryComplete(DelayedRebalance.scala:37)
>       at 
> kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:307)
>       at 
> kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:227)
>       at 
> kafka.coordinator.ConsumerCoordinator.doJoinGroup(ConsumerCoordinator.scala:186)
>       at 
> kafka.coordinator.ConsumerCoordinator.handleJoinGroup(ConsumerCoordinator.scala:131)
>       at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:578)
>       at kafka.server.KafkaApis.handle(KafkaApis.scala:67)
>       at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
>       at java.lang.Thread.run(Thread.java:745)
> Unexpected error in join group response: The server experienced an unexpected 
> error when processing the request
> org.apache.kafka.common.KafkaException: Unexpected error in join group 
> response: The server experienced an unexpected error when processing the 
> request
>       at 
> org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:362)
>       at 
> org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:311)
>       at 
> org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:703)
>       at 
> org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:677)
>       at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:163)
>       at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:129)
>       at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:105)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:293)
>       at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:237)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:274)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:182)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:172)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:145)
>       at 
> org.apache.kafka.clients.consumer.internals.Coordinator.reassignPartitions(Coordinator.java:197)
>       at 
> org.apache.kafka.clients.consumer.internals.Coordinator.ensurePartitionAssignment(Coordinator.java:172)
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:764)
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:725)
>       at 
> kafka.api.ConsumerTest$$anonfun$testRepetitiveTopicSubscription$2.apply$mcZ$sp(ConsumerTest.scala:80)
>       at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:616)
>       at 
> kafka.api.ConsumerTest.testRepetitiveTopicSubscription(ConsumerTest.scala:79)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:606)
>       at junit.framework.TestCase.runTest(TestCase.java:168)
>       at junit.framework.TestCase.runBare(TestCase.java:134)
>       at junit.framework.TestResult$1.protect(TestResult.java:110)
>       at junit.framework.TestResult.runProtected(TestResult.java:128)
>       at junit.framework.TestResult.run(TestResult.java:113)
>       at junit.framework.TestCase.run(TestCase.java:124)
>       at junit.framework.TestSuite.runTest(TestSuite.java:232)
>       at junit.framework.TestSuite.run(TestSuite.java:227)
>       at org.scalatest.junit.JUnit3Suite.run(JUnit3Suite.scala:309)
>       at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:55)
>       at 
> org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$3.apply(Runner.scala:2563)
>       at 
> org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$3.apply(Runner.scala:2557)
>       at scala.collection.immutable.List.foreach(List.scala:318)
>       at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:2557)
>       at 
> org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1044)
>       at 
> org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1043)
>       at 
> org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:2722)
>       at 
> org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1043)
>       at org.scalatest.tools.Runner$.run(Runner.scala:883)
>       at org.scalatest.tools.Runner.run(Runner.scala)
>       at 
> org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:138)
>       at 
> org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:28)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:606)
>       at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to