[ 
https://issues.apache.org/jira/browse/SPARK-28781?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

CacheCheck updated SPARK-28781:
-------------------------------
    Description: 
Once the update is called, newData is persisted at line 82. However, only when 
the checkpoint is handling (satisfy the condition at line 94), the persist data 
is used for the second time (do checkpoint at line 97). The other data which is 
not satisfied to the checkpoint condition is unnecessary to be cached. The 
persistedQueue avoids too many unnecessary cached data, but it is best to avoid 
every unnecessary persist operation.
{code:scala}
def update(newData: T): Unit = {
    persist(newData)
    persistedQueue.enqueue(newData)
    // We try to maintain 2 Datasets in persistedQueue to support the semantics 
of this class:
    // Users should call [[update()]] when a new Dataset has been created,
    // before the Dataset has been materialized.
    while (persistedQueue.size > 3) {
      val dataToUnpersist = persistedQueue.dequeue()
      unpersist(dataToUnpersist)
    }
    updateCount += 1

    // Handle checkpointing (after persisting)
    if (checkpointInterval != -1 && (updateCount % checkpointInterval) == 0
      && sc.getCheckpointDir.nonEmpty) {
      // Add new checkpoint before removing old checkpoints.
      checkpoint(newData)
{code}

  was:Once the update is called, newData is persisted at line 82. However, only 
when the checkpoint is handling (satisfy the condition at line 94), the persist 
data is used for the second time (do checkpoint at line 97). The other data 
which is not satisfied to the checkpoint condition is unnecessary to be cached. 
The persistedQueue avoids too many unnecessary cached data, but it is best to 
avoid every unnecessary persist operation.


> Unneccesary persist in PeriodicCheckpointer.update()
> ----------------------------------------------------
>
>                 Key: SPARK-28781
>                 URL: https://issues.apache.org/jira/browse/SPARK-28781
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.4.3
>            Reporter: CacheCheck
>            Priority: Major
>
> Once the update is called, newData is persisted at line 82. However, only 
> when the checkpoint is handling (satisfy the condition at line 94), the 
> persist data is used for the second time (do checkpoint at line 97). The 
> other data which is not satisfied to the checkpoint condition is unnecessary 
> to be cached. The persistedQueue avoids too many unnecessary cached data, but 
> it is best to avoid every unnecessary persist operation.
> {code:scala}
> def update(newData: T): Unit = {
>     persist(newData)
>     persistedQueue.enqueue(newData)
>     // We try to maintain 2 Datasets in persistedQueue to support the 
> semantics of this class:
>     // Users should call [[update()]] when a new Dataset has been created,
>     // before the Dataset has been materialized.
>     while (persistedQueue.size > 3) {
>       val dataToUnpersist = persistedQueue.dequeue()
>       unpersist(dataToUnpersist)
>     }
>     updateCount += 1
>     // Handle checkpointing (after persisting)
>     if (checkpointInterval != -1 && (updateCount % checkpointInterval) == 0
>       && sc.getCheckpointDir.nonEmpty) {
>       // Add new checkpoint before removing old checkpoints.
>       checkpoint(newData)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to