[
https://issues.apache.org/jira/browse/SPARK-28781?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
CacheCheck updated SPARK-28781:
-------------------------------
Description:
Once the update is called, newData is persisted at line 82. However, only when
the checkpoint is handling (satisfy the condition at line 94), the persist data
is used for the second time (do checkpoint at line 97). The other data which is
not satisfied to the checkpoint condition is unnecessary to be cached. The
persistedQueue avoids too many unnecessary cached data, but it is best to avoid
every unnecessary persist operation.
{code:scala}
def update(newData: T): Unit = {
persist(newData)
persistedQueue.enqueue(newData)
// We try to maintain 2 Datasets in persistedQueue to support the semantics
of this class:
// Users should call [[update()]] when a new Dataset has been created,
// before the Dataset has been materialized.
while (persistedQueue.size > 3) {
val dataToUnpersist = persistedQueue.dequeue()
unpersist(dataToUnpersist)
}
updateCount += 1
// Handle checkpointing (after persisting)
if (checkpointInterval != -1 && (updateCount % checkpointInterval) == 0
&& sc.getCheckpointDir.nonEmpty) {
// Add new checkpoint before removing old checkpoints.
checkpoint(newData)
{code}
was:Once the update is called, newData is persisted at line 82. However, only
when the checkpoint is handling (satisfy the condition at line 94), the persist
data is used for the second time (do checkpoint at line 97). The other data
which is not satisfied to the checkpoint condition is unnecessary to be cached.
The persistedQueue avoids too many unnecessary cached data, but it is best to
avoid every unnecessary persist operation.
> Unneccesary persist in PeriodicCheckpointer.update()
> ----------------------------------------------------
>
> Key: SPARK-28781
> URL: https://issues.apache.org/jira/browse/SPARK-28781
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 2.4.3
> Reporter: CacheCheck
> Priority: Major
>
> Once the update is called, newData is persisted at line 82. However, only
> when the checkpoint is handling (satisfy the condition at line 94), the
> persist data is used for the second time (do checkpoint at line 97). The
> other data which is not satisfied to the checkpoint condition is unnecessary
> to be cached. The persistedQueue avoids too many unnecessary cached data, but
> it is best to avoid every unnecessary persist operation.
> {code:scala}
> def update(newData: T): Unit = {
> persist(newData)
> persistedQueue.enqueue(newData)
> // We try to maintain 2 Datasets in persistedQueue to support the
> semantics of this class:
> // Users should call [[update()]] when a new Dataset has been created,
> // before the Dataset has been materialized.
> while (persistedQueue.size > 3) {
> val dataToUnpersist = persistedQueue.dequeue()
> unpersist(dataToUnpersist)
> }
> updateCount += 1
> // Handle checkpointing (after persisting)
> if (checkpointInterval != -1 && (updateCount % checkpointInterval) == 0
> && sc.getCheckpointDir.nonEmpty) {
> // Add new checkpoint before removing old checkpoints.
> checkpoint(newData)
> {code}
--
This message was sent by Atlassian Jira
(v8.3.2#803003)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]