Updated Branches:
  refs/heads/branch-0.8 c59ce1808 -> 994956183

Merge pull request #197 from aarondav/patrick-fix

Fix 'timeWriting' stat for shuffle files

Due to concurrent git branches, changes from shuffle file consolidation patch
caused the shuffle write timing patch to no longer actually measure the time,
since it requires time be measured after the stream has been closed.

(cherry picked from commit 972171b9d93b07e8511a2da3a33f897ba033484b)
Signed-off-by: Reynold Xin <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/99495618
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/99495618
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/99495618

Branch: refs/heads/branch-0.8
Commit: 994956183a6fd565bd585ba6e566b2fbe6cb4d59
Parents: c59ce18
Author: Reynold Xin <[email protected]>
Authored: Mon Nov 25 07:50:46 2013 +0800
Committer: Reynold Xin <[email protected]>
Committed: Mon Nov 25 07:51:23 2013 +0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/scheduler/ShuffleMapTask.scala   | 9 ++++++---
 1 file changed, 6 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/99495618/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala 
b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
index 1dc71a0..0f2deb4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
@@ -167,6 +167,7 @@ private[spark] class ShuffleMapTask(
       var totalTime = 0L
       val compressedSizes: Array[Byte] = shuffle.writers.map { writer: 
BlockObjectWriter =>
         writer.commit()
+        writer.close()
         val size = writer.fileSegment().length
         totalBytes += size
         totalTime += writer.timeWriting()
@@ -184,14 +185,16 @@ private[spark] class ShuffleMapTask(
     } catch { case e: Exception =>
       // If there is an exception from running the task, revert the partial 
writes
       // and throw the exception upstream to Spark.
-      if (shuffle != null) {
-        shuffle.writers.foreach(_.revertPartialWrites())
+      if (shuffle != null && shuffle.writers != null) {
+        for (writer <- shuffle.writers) {
+          writer.revertPartialWrites()
+          writer.close()
+        }
       }
       throw e
     } finally {
       // Release the writers back to the shuffle block manager.
       if (shuffle != null && shuffle.writers != null) {
-        shuffle.writers.foreach(_.close())
         shuffle.releaseWriters(success)
       }
       // Execute the callbacks on task completion.

Reply via email to