Hi Pankaj,
>> After the second consumer group comes upDo you mean a second consumer starts 
>> with the same consumer group as the first ?

createDirectStream is overloaded. One of the method, doesn't need you to 
specify partitions of a topic.

Cheers
- Sree 

    On Thursday, June 8, 2017 9:56 AM, "Rastogi, Pankaj" 
<pankaj.rast...@verizon.com> wrote:
 

 Hi,
 Thank you for your reply!
 You got it right! I am trying to run multiple streams using the same
consumer, so that I can distribute different partitions among different
instances of the consumer group. I don¹t want to provide the list of
partitions in createDirectStream API. If I do that then it will become
difficult to handle consumer failure as those partitions won¹t be ready by
any consumer. Also I will have to handle addition of new partitions.

I wanted to see if I can use partition rebalance feature.

Pankaj

On 6/8/17, 8:24 AM, "Cody Koeninger" <c...@koeninger.org> wrote:

>Can you explain in more detail what you mean by "distribute Kafka
>topics among different instances of same consumer group"?
>
>If you're trying to run multiple streams using the same consumer
>group, it's already documented that you shouldn't do that.
>
>On Thu, Jun 8, 2017 at 12:43 AM, Rastogi, Pankaj
><pankaj.rast...@verizon.com> wrote:
>> Hi,
>>  I have been trying to distribute Kafka topics among different
>>instances of
>> same consumer group. I am using KafkaDirectStream API for creating
>>DStreams.
>> After the second consumer group comes up, Kafka does partition
>>rebalance and
>> then Spark driver of the first consumer dies with the following
>>exception:
>>
>> java.lang.IllegalStateException: No current assignment for partition
>> myTopic_5-0
>> at
>> 
>>org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedSta
>>te(SubscriptionState.java:264)
>> at
>> 
>>org.apache.kafka.clients.consumer.internals.SubscriptionState.needOffsetR
>>eset(SubscriptionState.java:336)
>> at
>> 
>>org.apache.kafka.clients.consumer.KafkaConsumer.seekToEnd(KafkaConsumer.j
>>ava:1236)
>> at
>> 
>>org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets
>>(DirectKafkaInputDStream.scala:197)
>> at
>> 
>>org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(Direc
>>tKafkaInputDStream.scala:214)
>> at
>> 
>>org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonf
>>un$1$$anonfun$apply$7.apply(DStream.scala:341)
>> at
>> 
>>org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonf
>>un$1$$anonfun$apply$7.apply(DStream.scala:341)
>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>> at
>> 
>>org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonf
>>un$1.apply(DStream.scala:340)
>> at
>> 
>>org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonf
>>un$1.apply(DStream.scala:340)
>> at
>> 
>>org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(D
>>Stream.scala:415)
>> at
>> 
>>org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(
>>DStream.scala:335)
>> at
>> 
>>org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(
>>DStream.scala:333)
>> at scala.Option.orElse(Option.scala:257)
>> at
>> 
>>org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330
>>)
>> at
>> 
>>org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(Tr
>>ansformedDStream.scala:42)
>> at
>> 
>>org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(Tr
>>ansformedDStream.scala:42)
>> at
>> 
>>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.sca
>>la:244)
>> at
>> 
>>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.sca
>>la:244)
>> at scala.collection.immutable.List.foreach(List.scala:318)
>> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>> at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>> at
>> 
>>org.apache.spark.streaming.dstream.TransformedDStream.compute(Transformed
>>DStream.scala:42)
>> at
>> 
>>org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonf
>>un$1$$anonfun$apply$7.apply(DStream.scala:341)
>> at
>> 
>>org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonf
>>un$1$$anonfun$apply$7.apply(DStream.scala:341)
>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>> at
>> 
>>org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonf
>>un$1.apply(DStream.scala:340)
>> at
>> 
>>org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonf
>>un$1.apply(DStream.scala:340)
>> at
>> 
>>org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(D
>>Stream.scala:415)
>> at
>> 
>>org.apache.spark.streaming.dstream.TransformedDStream.createRDDWithLocalP
>>roperties(TransformedDStream.scala:65)
>> at
>> 
>>org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(
>>DStream.scala:335)
>> at
>> 
>>org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(
>>DStream.scala:333)
>> at scala.Option.orElse(Option.scala:257)
>> at
>> 
>>org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330
>>)
>> at
>> 
>>org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStr
>>eam.scala:48)
>> at
>> 
>>org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.sca
>>la:117)
>> at
>> 
>>org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.sca
>>la:116)
>> at
>> 
>>scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike
>>.scala:251)
>> at
>> 
>>scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike
>>.scala:251)
>> at
>> 
>>scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scal
>>a:59)
>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> at 
>>scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
>> at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
>> at
>> 
>>org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:1
>>16)
>> at
>> 
>>org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGen
>>erator.scala:249)
>> at
>> 
>>org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGen
>>erator.scala:247)
>> at scala.util.Try$.apply(Try.scala:161)
>> at
>> 
>>org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerat
>>or.scala:247)
>> at
>> 
>>org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$stream
>>ing$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
>> at
>> 
>>org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGe
>>nerator.scala:89)
>> at
>> 
>>org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGe
>>nerator.scala:88)
>> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>
>> I see that there is Spark ticket opened with the same
>> 
>>issue(https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.
>>org_jira_browse_SPARK-2D19547&d=DwIBaQ&c=udBTRvFvXC5Dhqg7UHpJlPps3mZ3LRxp
>>b6__0PomBTQ&r=zQqmwCNxd6rBWnFRMGXIzVL1nRVw40AD5ViBUj89NkA&m=z6Y6ytitXzXsa
>>VNguwUlEw9bqH1xFzdB41wcJAbNex4&s=wKLdTZtkzJCT8c4egqXfosrZ3KJAC0rNSZG_DPAL
>>OYw&e= ) but it has been
>> marked as INVALID. Can someone explain why this ticket is marked
>>INVALID.
>>
>> Thanks,
>> Pankaj
>
>---------------------------------------------------------------------
>To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>


---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org


   

Reply via email to