[
https://issues.apache.org/jira/browse/SPARK-18925?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Vladimir Pchelko updated SPARK-18925:
-------------------------------------
Description:
With default settings mapWithState leads to storing up to 10 copies of
MapWithStateRDD in memory:
(DSream, InternalMapWithStateDStream, DEFAULT_CHECKPOINT_DURATION_MULTIPLIER,
rememberDuration, minRememberDuration)
In my project we quickly get OutOfMemory, because we have to track many
millions of events * 2-3KB per event -> about 50 GB per MapWithStateRDD.
Using cluster with +500GB memory is unacceptable for our task.
Reducing CHECKPOINT_DURATION_MULTIPLIER is unacceptable, it slightly 'fixes'
memory issue, but leads to new one - we unable process data in real-time -
because the checkpointing duration is in several times longer that
batchInterval.
So I investigated the mapWithState process and concluded that for proper
functioning of mapWithState, we need the current and the last checkpointed
MapWithStateRDD.
To fix memory issues in my project: I override clearMetadata for
InternalMapWithStateDStream and unpersist all oldRDDs:
val oldRDDs = generatedRDDs.filter(_._1 <= (time - slideDuration))
except the last checkpointed
val checkpointedKeys = oldRDDs.filter(_._2.isCheckpointed).keys
if (checkpointedKeys.nonEmpty) {
oldRDDs -= checkpointedKeys.max
}
... (C/P of DStream clearMetadata)
Please correct me.
was:
With default settings mapWithState leads to storing up to 10 copies of
MapWithStateRDD in memory:
(DSream, InternalMapWithStateDStream, DEFAULT_CHECKPOINT_DURATION_MULTIPLIER,
rememberDuration, minRememberDuration)
In my project we qucikly get OutOfMemory, because we have to track many
millions of events * 2-3KB per event -> about 50 GB per MapWithStateRDD.
Using cluster with +500GB memory is unacceptable for our task.
Reducing CHECKPOINT_DURATION_MULTIPLIER is unacceptable, it slightly 'fixes'
memory issue, but lead to new one - we unable to process in real-time - because
the checkpointing duration is in several times longer that batchInterval.
So I inverstigated the mapWithState process and concluded that for proper
functioning of mapWithState, we need the current and the last checkpointed
MapWithStateRDD.
To fix memory issues in my project: I override clearMetadata for
InternalMapWithStateDStream and unpersist all oldRDDs:
val oldRDDs = generatedRDDs.filter(_._1 <= (time - slideDuration))
except the last checkpointed
val checkpointedKeys = oldRDDs.filter(_._2.isCheckpointed).keys
if (checkpointedKeys.nonEmpty) {
oldRDDs -= checkpointedKeys.max
}
... (C/P of DStream clearMetadata)
Please correct me.
> Reduce memory usage of mapWithState
> -----------------------------------
>
> Key: SPARK-18925
> URL: https://issues.apache.org/jira/browse/SPARK-18925
> Project: Spark
> Issue Type: Improvement
> Components: DStreams
> Affects Versions: 2.0.0, 2.0.1, 2.0.2
> Reporter: Vladimir Pchelko
>
> With default settings mapWithState leads to storing up to 10 copies of
> MapWithStateRDD in memory:
> (DSream, InternalMapWithStateDStream, DEFAULT_CHECKPOINT_DURATION_MULTIPLIER,
> rememberDuration, minRememberDuration)
> In my project we quickly get OutOfMemory, because we have to track many
> millions of events * 2-3KB per event -> about 50 GB per MapWithStateRDD.
> Using cluster with +500GB memory is unacceptable for our task.
> Reducing CHECKPOINT_DURATION_MULTIPLIER is unacceptable, it slightly 'fixes'
> memory issue, but leads to new one - we unable process data in real-time -
> because the checkpointing duration is in several times longer that
> batchInterval.
> So I investigated the mapWithState process and concluded that for proper
> functioning of mapWithState, we need the current and the last checkpointed
> MapWithStateRDD.
> To fix memory issues in my project: I override clearMetadata for
> InternalMapWithStateDStream and unpersist all oldRDDs:
> val oldRDDs = generatedRDDs.filter(_._1 <= (time - slideDuration))
> except the last checkpointed
> val checkpointedKeys = oldRDDs.filter(_._2.isCheckpointed).keys
> if (checkpointedKeys.nonEmpty) {
> oldRDDs -= checkpointedKeys.max
> }
> ... (C/P of DStream clearMetadata)
> Please correct me.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]