[ 
https://issues.apache.org/jira/browse/SPARK-32151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17159294#comment-17159294
 ] 

Ed Mitchell commented on SPARK-32151:
-------------------------------------

I could do that if I wanted to start back at the beginning or the end of the 
topic, but in this case, I would like it to restart back at the offsets defined 
by my datastore.

> Kafka does not allow Partition Rebalance Handling
> -------------------------------------------------
>
>                 Key: SPARK-32151
>                 URL: https://issues.apache.org/jira/browse/SPARK-32151
>             Project: Spark
>          Issue Type: Improvement
>          Components: DStreams
>    Affects Versions: 2.4.5
>            Reporter: Ed Mitchell
>            Priority: Minor
>
> When a consumer group rebalance occurs when the Spark driver is using the 
> Subscribe or Subscribe Pattern ConsumerStrategy, driver's offsets are cleared 
> when partitions are revoked and then reassigned.
> While this doesn't happen in the normal rebalance scenario of more consumers 
> joining the group (though it could), it does happen when the partition leader 
> is reelected because of a Kafka node being stopped or decommissioned.
> This seems to only occur when users specify their own offsets and do not use 
> Kafka as the persistent store of offsets (they use their own database, and 
> possibly if using checkpointing).
> This could probably affect Structured Streaming.
> This presents itself as an "NoOffsetForPartitionException":
> {code:java}
> 20/05/13 01:37:00 ERROR JobScheduler: Error generating jobs for time 
> 1589333820000 ms
> org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined 
> offset with no reset policy for partitions: [production-ad-metrics-1, 
> production-ad-metrics-2, production-ad-metrics-0, production-ad-metrics-5, 
> production-ad-metrics-6, production-ad-metrics-3, production-ad-metrics-4, 
> production-ad-metrics-7]
>   at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.resetMissingPositions(SubscriptionState.java:391)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:2185)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1222)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1181)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
>   at 
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.paranoidPoll(DirectKafkaInputDStream.scala:172)
>   at 
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:191)
>   at 
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:228)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
>   at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
>   at 
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
>   at scala.Option.orElse(Option.scala:289)
>   at 
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
>   at 
> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
>   at 
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
>   at 
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
>   at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>   at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>   at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
>   at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
>   at 
> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121)
>   at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
>   at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
>   at scala.util.Try$.apply(Try.scala:192)
>   at 
> org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
>   at 
> org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
>   at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
>   at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
>   at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
> {code}
> This can be fixed by allowing the user to specify an
> {code:java}
> org.apache.kafka.clients.consumer.ConsumerRebalanceListener{code}
> in the KafkaConsumer#subscribe method.
> The documentation for ConsumerRebalanceListener states that you can use 
> KafkaConsumer#seek with fetched offsets 
> I'm suggesting adding a new ConsumerStrategy that allows users to specify a 
> function to fetch offsets with a Collection of TopicPartitions. The reason 
> for this is to keep the Spark user from having to interact with the Kafka API 
> directly.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to