[ 
https://issues.apache.org/jira/browse/SPARK-32151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ed Mitchell updated SPARK-32151:
--------------------------------
    Description: 
When a consumer group rebalance occurs when the Spark driver is using the 
Subscribe or Subscribe Pattern ConsumerStrategy, driver's offsets are cleared 
when partitions are revoked and then reassigned.

While this doesn't happen in the normal rebalance scenario of more consumers 
joining the group (though it could), it does happen when the partition leader 
is reelected because of a Kafka node being stopped or decommissioned.

This seems to only occur when users specify their own offsets and do not use 
Kafka as the persistent store of offsets (they use their own database, and 
possibly if using checkpointing).

This could probably affect Structured Streaming.

This presents itself as an "NoOffsetForPartitionException":
{code:java}
20/05/13 01:37:00 ERROR JobScheduler: Error generating jobs for time 
1589333820000 ms
org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined 
offset with no reset policy for partitions: [production-ad-metrics-1, 
production-ad-metrics-2, production-ad-metrics-0, production-ad-metrics-5, 
production-ad-metrics-6, production-ad-metrics-3, production-ad-metrics-4, 
production-ad-metrics-7]
  at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.resetMissingPositions(SubscriptionState.java:391)
  at 
org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:2185)
  at 
org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1222)
  at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1181)
  at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
  at 
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.paranoidPoll(DirectKafkaInputDStream.scala:172)
  at 
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:191)
  at 
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:228)
  at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
  at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
  at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
  at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
  at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
  at 
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
  at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
  at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
  at scala.Option.orElse(Option.scala:289)
  at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
  at 
org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
  at 
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
  at 
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
  at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
  at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
  at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
  at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
  at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
  at 
org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121)
  at 
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
  at 
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
  at scala.util.Try$.apply(Try.scala:192)
  at 
org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
  at 
org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
  at 
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
  at 
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
{code}
This can be fixed by allowing the user to specify an
{code:java}
org.apache.kafka.clients.consumer.ConsumerRebalanceListener{code}
in the KafkaConsumer#subscribe method.

The documentation for ConsumerRebalanceListener states that you can use 
KafkaConsumer#seek with fetched offsets 

I'm suggesting adding a new ConsumerStrategy that allows users to specify a 
function to fetch offsets with a Collection of TopicPartitions. The reason for 
this is to keep the Spark user from having to interact with the Kafka API 
directly.

  was:
When a consumer group rebalance occurs when the Spark driver is using the 
Subscribe or Subscribe Pattern ConsumerStrategy, driver's offsets are cleared 
when partitions are revoked and then reassigned.

While this doesn't happen in the normal rebalance scenario of more consumers 
joining the group (though it could), it does happen when the partition leader 
is reelected because of a Kafka node being stopped or decommissioned.

This seems to only occur when users specify their own offsets and do not use 
Kafka as the persistent store of offsets (they use their own database, and 
possibly if using checkpointing).

This could probably affect Structured Streaming.

This presents itself as an "NoOffsetForPartitionException":
{noformat}
20/05/13 01:37:00 ERROR JobScheduler: Error generating jobs for time 
1589333820000 
msorg.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined 
offset with no reset policy for partitions: [production-ad-metrics-1, 
production-ad-metrics-2, production-ad-metrics-0, production-ad-metrics-5, 
production-ad-metrics-6, production-ad-metrics-3, production-ad-metrics-4, 
production-ad-metrics-7]  at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.resetMissingPositions(SubscriptionState.java:391)
  at 
org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:2185)
  at 
org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1222)
  at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1181)  
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)  
at 
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.paranoidPoll(DirectKafkaInputDStream.scala:172)
  at 
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:191)
  at 
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:228)
  at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
  at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
  at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)  at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
  at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
  at 
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
  at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
  at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
  at scala.Option.orElse(Option.scala:289)  at 
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)  at 
org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
  at 
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
  at 
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
  at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
  at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
  at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)  
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)  at 
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)  at 
scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)  at 
org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121)  
at 
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
  at 
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
  at scala.util.Try$.apply(Try.scala:192)  at 
org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
  at 
org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
  at 
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
  at 
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49){noformat}
This can be fixed by allowing the user to specify an
{code:java}
org.apache.kafka.clients.consumer.ConsumerRebalanceListener{code}
in the KafkaConsumer#subscribe method.

The documentation for ConsumerRebalanceListener states that you can use 
KafkaConsumer#seek with fetched offsets 

I'm suggesting adding a new ConsumerStrategy that allows users to specify a 
function to fetch offsets with a Collection of TopicPartitions. The reason for 
this is to keep the Spark user from having to interact with the Kafka API 
directly.


> Kafka does not allow Partition Rebalance Handling
> -------------------------------------------------
>
>                 Key: SPARK-32151
>                 URL: https://issues.apache.org/jira/browse/SPARK-32151
>             Project: Spark
>          Issue Type: Improvement
>          Components: DStreams
>    Affects Versions: 2.4.5
>            Reporter: Ed Mitchell
>            Priority: Minor
>
> When a consumer group rebalance occurs when the Spark driver is using the 
> Subscribe or Subscribe Pattern ConsumerStrategy, driver's offsets are cleared 
> when partitions are revoked and then reassigned.
> While this doesn't happen in the normal rebalance scenario of more consumers 
> joining the group (though it could), it does happen when the partition leader 
> is reelected because of a Kafka node being stopped or decommissioned.
> This seems to only occur when users specify their own offsets and do not use 
> Kafka as the persistent store of offsets (they use their own database, and 
> possibly if using checkpointing).
> This could probably affect Structured Streaming.
> This presents itself as an "NoOffsetForPartitionException":
> {code:java}
> 20/05/13 01:37:00 ERROR JobScheduler: Error generating jobs for time 
> 1589333820000 ms
> org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined 
> offset with no reset policy for partitions: [production-ad-metrics-1, 
> production-ad-metrics-2, production-ad-metrics-0, production-ad-metrics-5, 
> production-ad-metrics-6, production-ad-metrics-3, production-ad-metrics-4, 
> production-ad-metrics-7]
>   at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.resetMissingPositions(SubscriptionState.java:391)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:2185)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1222)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1181)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
>   at 
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.paranoidPoll(DirectKafkaInputDStream.scala:172)
>   at 
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:191)
>   at 
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:228)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
>   at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
>   at 
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
>   at scala.Option.orElse(Option.scala:289)
>   at 
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
>   at 
> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
>   at 
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
>   at 
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
>   at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>   at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>   at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
>   at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
>   at 
> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121)
>   at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
>   at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
>   at scala.util.Try$.apply(Try.scala:192)
>   at 
> org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
>   at 
> org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
>   at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
>   at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
>   at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
> {code}
> This can be fixed by allowing the user to specify an
> {code:java}
> org.apache.kafka.clients.consumer.ConsumerRebalanceListener{code}
> in the KafkaConsumer#subscribe method.
> The documentation for ConsumerRebalanceListener states that you can use 
> KafkaConsumer#seek with fetched offsets 
> I'm suggesting adding a new ConsumerStrategy that allows users to specify a 
> function to fetch offsets with a Collection of TopicPartitions. The reason 
> for this is to keep the Spark user from having to interact with the Kafka API 
> directly.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to