[ 
https://issues.apache.org/jira/browse/SPARK-18925?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vladimir Pchelko updated SPARK-18925:
-------------------------------------
    Description: 
With default settings mapWithState leads to storing up to 10 copies of 
MapWithStateRDD in memory: 
(DSream, InternalMapWithStateDStream, DEFAULT_CHECKPOINT_DURATION_MULTIPLIER, 
rememberDuration, minRememberDuration)

In my project we quickly get OutOfMemory, because we have to track many 
millions of events * 2-3KB per event -> about 50 GB per MapWithStateRDD. 
Using cluster with +500GB memory is unacceptable for our task.
Reducing CHECKPOINT_DURATION_MULTIPLIER is unacceptable, it slightly 'fixes' 
memory issue, but leads to new one - we unable process data in real-time - 
because the checkpointing duration is in several times longer than 
batchInterval.

So I investigated the mapWithState process and concluded that for proper 
functioning of mapWithState, we need the current MapWithStateRDD and the last 
checkpointed MapWithStateRDD.

To fix memory issues in my project: I override clearMetadata for 
InternalMapWithStateDStream and unpersist all oldRDDs:
  val oldRDDs = generatedRDDs.filter(_._1 <= (time - slideDuration))
except the last checkpointed
    val checkpointedKeys = oldRDDs.filter(_._2.isCheckpointed).keys
    if (checkpointedKeys.nonEmpty) {
      oldRDDs -= checkpointedKeys.max
    }
... (C/P of DStream clearMetadata)


Please correct me.

  was:
With default settings mapWithState leads to storing up to 10 copies of 
MapWithStateRDD in memory: 
(DSream, InternalMapWithStateDStream, DEFAULT_CHECKPOINT_DURATION_MULTIPLIER, 
rememberDuration, minRememberDuration)

In my project we quickly get OutOfMemory, because we have to track many 
millions of events * 2-3KB per event -> about 50 GB per MapWithStateRDD. 
Using cluster with +500GB memory is unacceptable for our task.
Reducing CHECKPOINT_DURATION_MULTIPLIER is unacceptable, it slightly 'fixes' 
memory issue, but leads to new one - we unable process data in real-time - 
because the checkpointing duration is in several times longer than 
batchInterval.

So I investigated the mapWithState process and concluded that for proper 
functioning of mapWithState, we need the current and the last checkpointed 
MapWithStateRDD.

To fix memory issues in my project: I override clearMetadata for 
InternalMapWithStateDStream and unpersist all oldRDDs:
  val oldRDDs = generatedRDDs.filter(_._1 <= (time - slideDuration))
except the last checkpointed
    val checkpointedKeys = oldRDDs.filter(_._2.isCheckpointed).keys
    if (checkpointedKeys.nonEmpty) {
      oldRDDs -= checkpointedKeys.max
    }
... (C/P of DStream clearMetadata)


Please correct me.


> Reduce memory usage of mapWithState
> -----------------------------------
>
>                 Key: SPARK-18925
>                 URL: https://issues.apache.org/jira/browse/SPARK-18925
>             Project: Spark
>          Issue Type: Improvement
>          Components: DStreams
>    Affects Versions: 2.0.0, 2.0.1, 2.0.2
>            Reporter: Vladimir Pchelko
>
> With default settings mapWithState leads to storing up to 10 copies of 
> MapWithStateRDD in memory: 
> (DSream, InternalMapWithStateDStream, DEFAULT_CHECKPOINT_DURATION_MULTIPLIER, 
> rememberDuration, minRememberDuration)
> In my project we quickly get OutOfMemory, because we have to track many 
> millions of events * 2-3KB per event -> about 50 GB per MapWithStateRDD. 
> Using cluster with +500GB memory is unacceptable for our task.
> Reducing CHECKPOINT_DURATION_MULTIPLIER is unacceptable, it slightly 'fixes' 
> memory issue, but leads to new one - we unable process data in real-time - 
> because the checkpointing duration is in several times longer than 
> batchInterval.
> So I investigated the mapWithState process and concluded that for proper 
> functioning of mapWithState, we need the current MapWithStateRDD and the last 
> checkpointed MapWithStateRDD.
> To fix memory issues in my project: I override clearMetadata for 
> InternalMapWithStateDStream and unpersist all oldRDDs:
>   val oldRDDs = generatedRDDs.filter(_._1 <= (time - slideDuration))
> except the last checkpointed
>     val checkpointedKeys = oldRDDs.filter(_._2.isCheckpointed).keys
>     if (checkpointedKeys.nonEmpty) {
>       oldRDDs -= checkpointedKeys.max
>     }
> ... (C/P of DStream clearMetadata)
> Please correct me.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to