[ https://issues.apache.org/jira/browse/KAFKA-2413?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Onur Karaman reassigned KAFKA-2413: ----------------------------------- Assignee: Onur Karaman (was: Ashish K Singh) > New consumer's subscribe(Topic...) api fails if called more than once > --------------------------------------------------------------------- > > Key: KAFKA-2413 > URL: https://issues.apache.org/jira/browse/KAFKA-2413 > Project: Kafka > Issue Type: Bug > Components: consumer > Reporter: Ashish K Singh > Assignee: Onur Karaman > > I believe new consumer is supposed to allow adding to existing topic > subscriptions. If it is then the issue is that on trying to subscribe to a > topic when consumer is already subscribed to a topic, below exception is > thrown. > {code} > [2015-08-06 16:06:48,591] ERROR [KafkaApi-2] error when handling request null > (kafka.server.KafkaApis:103) > java.util.NoSuchElementException: key not found: topic > at scala.collection.MapLike$class.default(MapLike.scala:228) > at scala.collection.AbstractMap.default(Map.scala:58) > at scala.collection.MapLike$class.apply(MapLike.scala:141) > at scala.collection.AbstractMap.apply(Map.scala:58) > at > kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:109) > at > kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:108) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) > at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) > at kafka.coordinator.RangeAssignor.assign(PartitionAssignor.scala:108) > at > kafka.coordinator.ConsumerCoordinator.reassignPartitions(ConsumerCoordinator.scala:378) > at > kafka.coordinator.ConsumerCoordinator.rebalance(ConsumerCoordinator.scala:360) > at > kafka.coordinator.ConsumerCoordinator.onCompleteRebalance(ConsumerCoordinator.scala:414) > at > kafka.coordinator.DelayedRebalance.onComplete(DelayedRebalance.scala:39) > at > kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:72) > at > kafka.coordinator.DelayedRebalance$$anonfun$tryComplete$1.apply$mcZ$sp(DelayedRebalance.scala:37) > at > kafka.coordinator.ConsumerCoordinator.tryCompleteRebalance(ConsumerCoordinator.scala:388) > at > kafka.coordinator.DelayedRebalance.tryComplete(DelayedRebalance.scala:37) > at > kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:307) > at > kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:227) > at > kafka.coordinator.ConsumerCoordinator.doJoinGroup(ConsumerCoordinator.scala:186) > at > kafka.coordinator.ConsumerCoordinator.handleJoinGroup(ConsumerCoordinator.scala:131) > at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:578) > at kafka.server.KafkaApis.handle(KafkaApis.scala:67) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60) > at java.lang.Thread.run(Thread.java:745) > Unexpected error in join group response: The server experienced an unexpected > error when processing the request > org.apache.kafka.common.KafkaException: Unexpected error in join group > response: The server experienced an unexpected error when processing the > request > at > org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:362) > at > org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:311) > at > org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:703) > at > org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:677) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:163) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:129) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:105) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:293) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:237) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:274) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:182) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:172) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:145) > at > org.apache.kafka.clients.consumer.internals.Coordinator.reassignPartitions(Coordinator.java:197) > at > org.apache.kafka.clients.consumer.internals.Coordinator.ensurePartitionAssignment(Coordinator.java:172) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:764) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:725) > at > kafka.api.ConsumerTest$$anonfun$testRepetitiveTopicSubscription$2.apply$mcZ$sp(ConsumerTest.scala:80) > at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:616) > at > kafka.api.ConsumerTest.testRepetitiveTopicSubscription(ConsumerTest.scala:79) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at junit.framework.TestCase.runTest(TestCase.java:168) > at junit.framework.TestCase.runBare(TestCase.java:134) > at junit.framework.TestResult$1.protect(TestResult.java:110) > at junit.framework.TestResult.runProtected(TestResult.java:128) > at junit.framework.TestResult.run(TestResult.java:113) > at junit.framework.TestCase.run(TestCase.java:124) > at junit.framework.TestSuite.runTest(TestSuite.java:232) > at junit.framework.TestSuite.run(TestSuite.java:227) > at org.scalatest.junit.JUnit3Suite.run(JUnit3Suite.scala:309) > at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:55) > at > org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$3.apply(Runner.scala:2563) > at > org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$3.apply(Runner.scala:2557) > at scala.collection.immutable.List.foreach(List.scala:318) > at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:2557) > at > org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1044) > at > org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1043) > at > org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:2722) > at > org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1043) > at org.scalatest.tools.Runner$.run(Runner.scala:883) > at org.scalatest.tools.Runner.run(Runner.scala) > at > org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:138) > at > org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:28) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)