Repository: spark
Updated Branches:
  refs/heads/branch-1.6 52d8837c6 -> d2518acc1


[SPARK-16873][CORE] Fix SpillReader NPE when spillFile has no data

## What changes were proposed in this pull request?

SpillReader NPE when spillFile has no data. See follow logs:

16/07/31 20:54:04 INFO collection.ExternalSorter: spill memory to 
file:/data4/yarnenv/local/usercache/tesla/appcache/application_1465785263942_56138/blockmgr-db5f46c3-d7a4-4f93-8b77-565e469696fb/09/temp_shuffle_ec3ece08-4569-4197-893a-4a5dfcbbf9fa,
 fileSize:0.0 B
16/07/31 20:54:04 WARN memory.TaskMemoryManager: leak 164.3 MB memory from 
org.apache.spark.util.collection.ExternalSorter3db4b52d
16/07/31 20:54:04 ERROR executor.Executor: Managed memory leak detected; size = 
190458101 bytes, TID = 2358516/07/31 20:54:04 ERROR executor.Executor: 
Exception in task 1013.0 in stage 18.0 (TID 23585)
java.lang.NullPointerException
        at 
org.apache.spark.util.collection.ExternalSorter$SpillReader.cleanup(ExternalSorter.scala:624)
        at 
org.apache.spark.util.collection.ExternalSorter$SpillReader.nextBatchStream(ExternalSorter.scala:539)
        at 
org.apache.spark.util.collection.ExternalSorter$SpillReader.<init>(ExternalSorter.scala:507)
        at 
org.apache.spark.util.collection.ExternalSorter$SpillableIterator.spill(ExternalSorter.scala:816)
        at 
org.apache.spark.util.collection.ExternalSorter.forceSpill(ExternalSorter.scala:251)
        at org.apache.spark.util.collection.Spillable.spill(Spillable.scala:109)
        at 
org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:154)
        at 
org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:249)
        at 
org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:112)
        at 
org.apache.spark.shuffle.sort.ShuffleExternalSorter.acquireNewPageIfNecessary(ShuffleExternalSorter.java:346)
        at 
org.apache.spark.shuffle.sort.ShuffleExternalSorter.insertRecord(ShuffleExternalSorter.java:367)
        at 
org.apache.spark.shuffle.sort.UnsafeShuffleWriter.insertRecordIntoSorter(UnsafeShuffleWriter.java:237)
        at 
org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:164)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:744)
16/07/31 20:54:30 INFO executor.Executor: Executor is trying to kill task 
1090.1 in stage 18.0 (TID 23793)
16/07/31 20:54:30 INFO executor.CoarseGrainedExecutorBackend: Driver commanded 
a shutdown

## How was this patch tested?

Manual test.

Author: sharkd <sharkd...@gmail.com>
Author: sharkdtu <shark...@tencent.com>

Closes #14479 from sharkdtu/master.

(cherry picked from commit 583d91a1957f4258a64184cc6b9007588791d332)
Signed-off-by: Reynold Xin <r...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d2518acc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d2518acc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d2518acc

Branch: refs/heads/branch-1.6
Commit: d2518acc1df44b1ecb8eed20404bcc1277f358a4
Parents: 52d8837
Author: sharkd <sharkd...@gmail.com>
Authored: Wed Aug 3 19:20:34 2016 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Wed Aug 3 19:21:16 2016 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/util/collection/ExternalSorter.scala  | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d2518acc/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala 
b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
index 44b1d90..60ec1ca 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
@@ -592,7 +592,9 @@ private[spark] class ExternalSorter[K, V, C](
       val ds = deserializeStream
       deserializeStream = null
       fileStream = null
-      ds.close()
+      if (ds != null) {
+        ds.close()
+      }
       // NOTE: We don't do file.delete() here because that is done in 
ExternalSorter.stop().
       // This should also be fixed in ExternalAppendOnlyMap.
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to