[ 
https://issues.apache.org/jira/browse/SPARK-24351?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huangtengfei updated SPARK-24351:
---------------------------------
    Description: 
In structured streaming, there is a conf spark.sql.streaming.minBatchesToRetain 
which is set to specify 'The minimum number of batches that must be retained 
and made recoverable' as described in 
[SQLConf|https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L802].
 In continuous processing, the metadata purge is triggered when an epoch is 
committed in 
[ContinuousExecution|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala#L306].
 
 Since currentBatchId increases independently in cp mode, the current committed 
epoch may be far behind currentBatchId if some task hangs for some time. It is 
not safe to discard the metadata with thresholdBatchId computed based on 
currentBatchId because we may clean all the metadata in the checkpoint 
directory.

  was:
In structured streaming, there is a conf spark.sql.streaming.minBatchesToRetain 
which is set to specify 'The minimum number of batches that must be retained 
and made recoverable' as described in 
[SQLConf](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L802).
 In continuous processing, the metadata purge is triggered when an epoch is 
committed in 
[ContinuousExecution](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala#L306).
 
Since currentBatchId increases independently in cp mode, the current committed 
epoch may be far behind currentBatchId if some task hangs for some time. It is 
not safe to discard the metadata with thresholdBatchId computed based on 
currentBatchId because we may clean all the metadata in the checkpoint 
directory.


> offsetLog/commitLog purge thresholdBatchId should be computed with current 
> committed epoch but not currentBatchId in CP mode
> ----------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-24351
>                 URL: https://issues.apache.org/jira/browse/SPARK-24351
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.3.0
>            Reporter: huangtengfei
>            Priority: Major
>
> In structured streaming, there is a conf 
> spark.sql.streaming.minBatchesToRetain which is set to specify 'The minimum 
> number of batches that must be retained and made recoverable' as described in 
> [SQLConf|https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L802].
>  In continuous processing, the metadata purge is triggered when an epoch is 
> committed in 
> [ContinuousExecution|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala#L306].
>  
>  Since currentBatchId increases independently in cp mode, the current 
> committed epoch may be far behind currentBatchId if some task hangs for some 
> time. It is not safe to discard the metadata with thresholdBatchId computed 
> based on currentBatchId because we may clean all the metadata in the 
> checkpoint directory.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to