[ https://issues.apache.org/jira/browse/SPARK-18925?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Vladimir Pchelko updated SPARK-18925: ------------------------------------- Description: With default settings mapWithState leads to storing up to 10 copies of MapWithStateRDD in memory: (DSream, InternalMapWithStateDStream, DEFAULT_CHECKPOINT_DURATION_MULTIPLIER, rememberDuration, minRememberDuration) In my project we quickly get OutOfMemory, because we have to track many millions of events * 2-3KB per event -> about 50 GB per MapWithStateRDD. Using cluster with +500GB memory is unacceptable for our task. Reducing CHECKPOINT_DURATION_MULTIPLIER is unacceptable, it slightly 'fixes' memory issue, but leads to new one - we unable process data in real-time - because the checkpointing duration is in several times longer than batchInterval. So I investigated the mapWithState process and concluded that for proper functioning of mapWithState, we need the current MapWithStateRDD and the last checkpointed MapWithStateRDD. To fix memory issues in my project: I override clearMetadata for InternalMapWithStateDStream and unpersist all oldRDDs: val oldRDDs = generatedRDDs.filter(_._1 <= (time - slideDuration)) except the last checkpointed val checkpointedKeys = oldRDDs.filter(_._2.isCheckpointed).keys if (checkpointedKeys.nonEmpty) { oldRDDs -= checkpointedKeys.max } ... (C/P of DStream clearMetadata) Please correct me. was: With default settings mapWithState leads to storing up to 10 copies of MapWithStateRDD in memory: (DSream, InternalMapWithStateDStream, DEFAULT_CHECKPOINT_DURATION_MULTIPLIER, rememberDuration, minRememberDuration) In my project we quickly get OutOfMemory, because we have to track many millions of events * 2-3KB per event -> about 50 GB per MapWithStateRDD. Using cluster with +500GB memory is unacceptable for our task. Reducing CHECKPOINT_DURATION_MULTIPLIER is unacceptable, it slightly 'fixes' memory issue, but leads to new one - we unable process data in real-time - because the checkpointing duration is in several times longer than batchInterval. So I investigated the mapWithState process and concluded that for proper functioning of mapWithState, we need the current and the last checkpointed MapWithStateRDD. To fix memory issues in my project: I override clearMetadata for InternalMapWithStateDStream and unpersist all oldRDDs: val oldRDDs = generatedRDDs.filter(_._1 <= (time - slideDuration)) except the last checkpointed val checkpointedKeys = oldRDDs.filter(_._2.isCheckpointed).keys if (checkpointedKeys.nonEmpty) { oldRDDs -= checkpointedKeys.max } ... (C/P of DStream clearMetadata) Please correct me. > Reduce memory usage of mapWithState > ----------------------------------- > > Key: SPARK-18925 > URL: https://issues.apache.org/jira/browse/SPARK-18925 > Project: Spark > Issue Type: Improvement > Components: DStreams > Affects Versions: 2.0.0, 2.0.1, 2.0.2 > Reporter: Vladimir Pchelko > > With default settings mapWithState leads to storing up to 10 copies of > MapWithStateRDD in memory: > (DSream, InternalMapWithStateDStream, DEFAULT_CHECKPOINT_DURATION_MULTIPLIER, > rememberDuration, minRememberDuration) > In my project we quickly get OutOfMemory, because we have to track many > millions of events * 2-3KB per event -> about 50 GB per MapWithStateRDD. > Using cluster with +500GB memory is unacceptable for our task. > Reducing CHECKPOINT_DURATION_MULTIPLIER is unacceptable, it slightly 'fixes' > memory issue, but leads to new one - we unable process data in real-time - > because the checkpointing duration is in several times longer than > batchInterval. > So I investigated the mapWithState process and concluded that for proper > functioning of mapWithState, we need the current MapWithStateRDD and the last > checkpointed MapWithStateRDD. > To fix memory issues in my project: I override clearMetadata for > InternalMapWithStateDStream and unpersist all oldRDDs: > val oldRDDs = generatedRDDs.filter(_._1 <= (time - slideDuration)) > except the last checkpointed > val checkpointedKeys = oldRDDs.filter(_._2.isCheckpointed).keys > if (checkpointedKeys.nonEmpty) { > oldRDDs -= checkpointedKeys.max > } > ... (C/P of DStream clearMetadata) > Please correct me. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org