Hi Pankaj, >> After the second consumer group comes upDo you mean a second consumer starts >> with the same consumer group as the first ?
createDirectStream is overloaded. One of the method, doesn't need you to specify partitions of a topic. Cheers - Sree On Thursday, June 8, 2017 9:56 AM, "Rastogi, Pankaj" <pankaj.rast...@verizon.com> wrote: Hi, Thank you for your reply! You got it right! I am trying to run multiple streams using the same consumer, so that I can distribute different partitions among different instances of the consumer group. I don¹t want to provide the list of partitions in createDirectStream API. If I do that then it will become difficult to handle consumer failure as those partitions won¹t be ready by any consumer. Also I will have to handle addition of new partitions. I wanted to see if I can use partition rebalance feature. Pankaj On 6/8/17, 8:24 AM, "Cody Koeninger" <c...@koeninger.org> wrote: >Can you explain in more detail what you mean by "distribute Kafka >topics among different instances of same consumer group"? > >If you're trying to run multiple streams using the same consumer >group, it's already documented that you shouldn't do that. > >On Thu, Jun 8, 2017 at 12:43 AM, Rastogi, Pankaj ><pankaj.rast...@verizon.com> wrote: >> Hi, >> I have been trying to distribute Kafka topics among different >>instances of >> same consumer group. I am using KafkaDirectStream API for creating >>DStreams. >> After the second consumer group comes up, Kafka does partition >>rebalance and >> then Spark driver of the first consumer dies with the following >>exception: >> >> java.lang.IllegalStateException: No current assignment for partition >> myTopic_5-0 >> at >> >>org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedSta >>te(SubscriptionState.java:264) >> at >> >>org.apache.kafka.clients.consumer.internals.SubscriptionState.needOffsetR >>eset(SubscriptionState.java:336) >> at >> >>org.apache.kafka.clients.consumer.KafkaConsumer.seekToEnd(KafkaConsumer.j >>ava:1236) >> at >> >>org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets >>(DirectKafkaInputDStream.scala:197) >> at >> >>org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(Direc >>tKafkaInputDStream.scala:214) >> at >> >>org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonf >>un$1$$anonfun$apply$7.apply(DStream.scala:341) >> at >> >>org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonf >>un$1$$anonfun$apply$7.apply(DStream.scala:341) >> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) >> at >> >>org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonf >>un$1.apply(DStream.scala:340) >> at >> >>org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonf >>un$1.apply(DStream.scala:340) >> at >> >>org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(D >>Stream.scala:415) >> at >> >>org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply( >>DStream.scala:335) >> at >> >>org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply( >>DStream.scala:333) >> at scala.Option.orElse(Option.scala:257) >> at >> >>org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330 >>) >> at >> >>org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(Tr >>ansformedDStream.scala:42) >> at >> >>org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(Tr >>ansformedDStream.scala:42) >> at >> >>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.sca >>la:244) >> at >> >>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.sca >>la:244) >> at scala.collection.immutable.List.foreach(List.scala:318) >> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) >> at scala.collection.AbstractTraversable.map(Traversable.scala:105) >> at >> >>org.apache.spark.streaming.dstream.TransformedDStream.compute(Transformed >>DStream.scala:42) >> at >> >>org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonf >>un$1$$anonfun$apply$7.apply(DStream.scala:341) >> at >> >>org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonf >>un$1$$anonfun$apply$7.apply(DStream.scala:341) >> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) >> at >> >>org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonf >>un$1.apply(DStream.scala:340) >> at >> >>org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonf >>un$1.apply(DStream.scala:340) >> at >> >>org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(D >>Stream.scala:415) >> at >> >>org.apache.spark.streaming.dstream.TransformedDStream.createRDDWithLocalP >>roperties(TransformedDStream.scala:65) >> at >> >>org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply( >>DStream.scala:335) >> at >> >>org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply( >>DStream.scala:333) >> at scala.Option.orElse(Option.scala:257) >> at >> >>org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330 >>) >> at >> >>org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStr >>eam.scala:48) >> at >> >>org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.sca >>la:117) >> at >> >>org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.sca >>la:116) >> at >> >>scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike >>.scala:251) >> at >> >>scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike >>.scala:251) >> at >> >>scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scal >>a:59) >> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >> at >>scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) >> at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) >> at >> >>org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:1 >>16) >> at >> >>org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGen >>erator.scala:249) >> at >> >>org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGen >>erator.scala:247) >> at scala.util.Try$.apply(Try.scala:161) >> at >> >>org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerat >>or.scala:247) >> at >> >>org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$stream >>ing$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183) >> at >> >>org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGe >>nerator.scala:89) >> at >> >>org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGe >>nerator.scala:88) >> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) >> >> I see that there is Spark ticket opened with the same >> >>issue(https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache. >>org_jira_browse_SPARK-2D19547&d=DwIBaQ&c=udBTRvFvXC5Dhqg7UHpJlPps3mZ3LRxp >>b6__0PomBTQ&r=zQqmwCNxd6rBWnFRMGXIzVL1nRVw40AD5ViBUj89NkA&m=z6Y6ytitXzXsa >>VNguwUlEw9bqH1xFzdB41wcJAbNex4&s=wKLdTZtkzJCT8c4egqXfosrZ3KJAC0rNSZG_DPAL >>OYw&e= ) but it has been >> marked as INVALID. Can someone explain why this ticket is marked >>INVALID. >> >> Thanks, >> Pankaj > >--------------------------------------------------------------------- >To unsubscribe e-mail: dev-unsubscr...@spark.apache.org > --------------------------------------------------------------------- To unsubscribe e-mail: dev-unsubscr...@spark.apache.org