[
https://issues.apache.org/jira/browse/SPARK-17503?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Josh Rosen resolved SPARK-17503.
--------------------------------
Resolution: Fixed
Fix Version/s: 2.1.0
2.0.1
Fixed for 2.0.1 / 2.1.0 by Sean's PR.
> Memory leak in Memory store when unable to cache the whole RDD in memory
> ------------------------------------------------------------------------
>
> Key: SPARK-17503
> URL: https://issues.apache.org/jira/browse/SPARK-17503
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 2.1.0
> Reporter: Sean Zhong
> Assignee: Sean Zhong
> Fix For: 2.0.1, 2.1.0
>
> Attachments: Screen Shot 2016-09-12 at 4.16.15 PM.png, Screen Shot
> 2016-09-12 at 4.34.19 PM.png
>
>
> h2.Problem description:
> The following query triggers out of memory error.
> {code}
> sc.parallelize(1 to 1000000000, 100).map(x => new
> Array[Long](1000)).cache().count()
> {code}
> This is not expected, we should fallback to use disk instead if there is not
> enough memory for cache.
> Stacktrace:
> {code}
> scala> sc.parallelize(1 to 1000000000, 100).map(x => new
> Array[Long](1000)).cache().count()
> [Stage 0:> (0 + 5) /
> 5]16/09/11 17:27:20 WARN MemoryStore: Not enough space to cache rdd_1_4 in
> memory! (computed 631.5 MB so far)
> 16/09/11 17:27:20 WARN MemoryStore: Not enough space to cache rdd_1_0 in
> memory! (computed 631.5 MB so far)
> 16/09/11 17:27:20 WARN BlockManager: Putting block rdd_1_0 failed
> 16/09/11 17:27:20 WARN BlockManager: Putting block rdd_1_4 failed
> 16/09/11 17:27:21 WARN MemoryStore: Not enough space to cache rdd_1_1 in
> memory! (computed 947.3 MB so far)
> 16/09/11 17:27:21 WARN BlockManager: Putting block rdd_1_1 failed
> 16/09/11 17:27:22 WARN MemoryStore: Not enough space to cache rdd_1_3 in
> memory! (computed 1423.7 MB so far)
> 16/09/11 17:27:22 WARN BlockManager: Putting block rdd_1_3 failed
> java.lang.OutOfMemoryError: Java heap space
> Dumping heap to java_pid26528.hprof ...
> Heap dump file created [6551021666 bytes in 9.876 secs]
> 16/09/11 17:28:15 WARN NettyRpcEnv: Ignored message: HeartbeatResponse(false)
> 16/09/11 17:28:15 WARN NettyRpcEndpointRef: Error sending message [message =
> Heartbeat(driver,[Lscala.Tuple2;@46c9ce96,BlockManagerId(driver, 127.0.0.1,
> 55360))] in 1 attempts
> org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [10
> seconds]. This timeout is controlled by spark.executor.heartbeatInterval
> at
> org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)
> at
> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63)
> at
> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
> at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83)
> at
> org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102)
> at
> org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:523)
> at
> org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Executor.scala:552)
> at
> org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:552)
> at
> org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:552)
> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1857)
> at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:552)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.TimeoutException: Futures timed out after [10
> seconds]
> at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
> at
> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
> at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
> at
> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
> at scala.concurrent.Await$.result(package.scala:190)
> at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:81)
> ... 14 more
> 16/09/11 17:28:15 ERROR Executor: Exception in task 3.0 in stage 0.0 (TID 3)
> java.lang.OutOfMemoryError: Java heap space
> at
> $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:24)
> at
> $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:23)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> at scala.collection.Iterator$JoinIterator.next(Iterator.scala:232)
> at
> org.apache.spark.storage.memory.PartiallyUnrolledIterator.next(MemoryStore.scala:683)
> at
> org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
> at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1684)
> at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134)
> at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1915)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1915)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
> at org.apache.spark.scheduler.Task.run(Task.scala:86)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> 16/09/11 17:28:15 ERROR Executor: Exception in task 4.0 in stage 0.0 (TID 4)
> java.lang.OutOfMemoryError: Java heap space
> at
> $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:24)
> at
> $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:23)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> at scala.collection.Iterator$JoinIterator.next(Iterator.scala:232)
> at
> org.apache.spark.storage.memory.PartiallyUnrolledIterator.next(MemoryStore.scala:683)
> at
> org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
> at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1684)
> at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134)
> at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1915)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1915)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
> at org.apache.spark.scheduler.Task.run(Task.scala:86)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> 16/09/11 17:28:15 ERROR SparkUncaughtExceptionHandler: Uncaught exception in
> thread Thread[Executor task launch worker-3,5,main]
> java.lang.OutOfMemoryError: Java heap space
> at
> $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:24)
> at
> $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:23)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> at scala.collection.Iterator$JoinIterator.next(Iterator.scala:232)
> at
> org.apache.spark.storage.memory.PartiallyUnrolledIterator.next(MemoryStore.scala:683)
> at
> org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
> at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1684)
> at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134)
> at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1915)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1915)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
> at org.apache.spark.scheduler.Task.run(Task.scala:86)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> 16/09/11 17:28:15 ERROR SparkUncaughtExceptionHandler: Uncaught exception in
> thread Thread[Executor task launch worker-4,5,main]
> java.lang.OutOfMemoryError: Java heap space
> at
> $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:24)
> at
> $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:23)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> at scala.collection.Iterator$JoinIterator.next(Iterator.scala:232)
> at
> org.apache.spark.storage.memory.PartiallyUnrolledIterator.next(MemoryStore.scala:683)
> at
> org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
> at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1684)
> at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134)
> at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1915)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1915)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
> at org.apache.spark.scheduler.Task.run(Task.scala:86)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> 16/09/11 17:28:15 WARN TaskSetManager: Lost task 4.0 in stage 0.0 (TID 4,
> localhost): java.lang.OutOfMemoryError: Java heap space
> at
> $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:24)
> at
> $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:23)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> at scala.collection.Iterator$JoinIterator.next(Iterator.scala:232)
> at
> org.apache.spark.storage.memory.PartiallyUnrolledIterator.next(MemoryStore.scala:683)
> at
> org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
> at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1684)
> at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134)
> at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1915)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1915)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
> at org.apache.spark.scheduler.Task.run(Task.scala:86)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> {code}
> h2.Analysis:
> When the RDD is too big to cache, Spark returns a PartiallyUnrolledIterator.
> {code}
> // line 287, in file MemoryStore.scala
> } else {
> // We ran out of space while unrolling the values for this block
> logUnrollFailureMessage(blockId, vector.estimateSize())
> Left(new PartiallyUnrolledIterator(
> this, unrollMemoryUsedByThisBlock, unrolled = vector.iterator, rest =
> values))
> }
> {code}
> Parameter 'unrolled' points to a vector array buffer, which stores all input
> values we have read so far when trying to cache the RDD. Parameter 'rest' is
> a iterator over all unread input values.
> For example, if the input RDD partition has 100GB bytes, and Spark executor
> has a 10GB cache, then parameter 'unrolled' will points to a array of 10GB
> bytes, the parameter 'rest' iterator points to unread 90GB input data.
> We expect the 10GB 'unrolled' memory to be garbage collected immediately
> after all values in 'unrolled' have been consumed by
> PartiallyUnrolledIterator. But current Spark code will not collect the 10GB
> 'unrolled' until all 100GB input data has been processed.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]