[ 
https://issues.apache.org/jira/browse/SPARK-35915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17371817#comment-17371817
 ] 

Jungtaek Lim edited comment on SPARK-35915 at 6/30/21, 5:27 AM:
----------------------------------------------------------------

OK the code comment was referring the old behavior, and it may be still better 
to follow the comment. That looks to be something I missed when introducing 
consumer pool.

So we don't expect anything other than pool to call close(), but I admit it's 
easy to make mistake. (I meant in Spark codebase.) We should probably need to 
change something to advise callers not to do it.


was (Author: kabhwan):
OK the code comment was referring the old behavior, and it may be still better 
to follow the comment. That looks to be something I missed when introducing 
consumer pool.

So we don't expect anything other than pool to call close(), but I admit it's 
easy to make mistake. We should probably need to change something to advise 
callers not to do it.

> Kafka doesn't recover from data loss
> ------------------------------------
>
>                 Key: SPARK-35915
>                 URL: https://issues.apache.org/jira/browse/SPARK-35915
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 3.1.1
>            Reporter: Yuval Yellin
>            Priority: Major
>
> I configured a strcutured streaming source for kafka with 
> failOnDataLoss=false, 
> Getting this error when checkopint offsets are not found :
>  
> {code:java}
> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
> Task 7 in stage 5.0 failed 1 times, most recent failure: Lost task 7.0 in 
> stage 5.0 (TID 113) ( executor driver): java.lang.IllegalStateException: This 
> consumer has already been closed.
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2439)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.seekToBeginning(KafkaConsumer.java:1656)
>   at 
> org.apache.spark.sql.kafka010.consumer.InternalKafkaConsumer.getAvailableOffsetRange(KafkaDataConsumer.scala:108)
>   at 
> org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.getEarliestAvailableOffsetBetween(KafkaDataConsumer.scala:385)
>   at 
> org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.$anonfun$get$1(KafkaDataConsumer.scala:332)
>   at 
> org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
>   at 
> org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.runUninterruptiblyIfPossible(KafkaDataConsumer.scala:604)
>   at 
> org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.get(KafkaDataConsumer.scala:287)
>   at 
> org.apache.spark.sql.kafka010.KafkaBatchPartitionReader.next(KafkaBatchPartitionReader.scala:63)
>   at 
> org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:79)
>   at 
> org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:112)
>   at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
> {code}
>  
> The issue seems to me to be related to the OffsetOutOfRange exception in 
> (line (323 in KafkaDataConsumer): 
>  
> {code:java}
>  case e: OffsetOutOfRangeException =>
>     // When there is some error thrown, it's better to use a new consumer to 
> drop all cached
>     // states in the old consumer. We don't need to worry about the 
> performance because this
>     // is not a common path.
>     releaseConsumer()
>     fetchedData.reset()
>     reportDataLoss(topicPartition, groupId, failOnDataLoss,
>       s"Cannot fetch offset $toFetchOffset", e)
>     toFetchOffset = getEarliestAvailableOffsetBetween(consumer, 
> toFetchOffset, untilOffset)
> }
> {code}
> seems like releaseConsumer will destoy the consumer , which later is used ...
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to