Updated Branches: refs/heads/branch-0.8 c59ce1808 -> 994956183
Merge pull request #197 from aarondav/patrick-fix Fix 'timeWriting' stat for shuffle files Due to concurrent git branches, changes from shuffle file consolidation patch caused the shuffle write timing patch to no longer actually measure the time, since it requires time be measured after the stream has been closed. (cherry picked from commit 972171b9d93b07e8511a2da3a33f897ba033484b) Signed-off-by: Reynold Xin <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/99495618 Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/99495618 Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/99495618 Branch: refs/heads/branch-0.8 Commit: 994956183a6fd565bd585ba6e566b2fbe6cb4d59 Parents: c59ce18 Author: Reynold Xin <[email protected]> Authored: Mon Nov 25 07:50:46 2013 +0800 Committer: Reynold Xin <[email protected]> Committed: Mon Nov 25 07:51:23 2013 +0800 ---------------------------------------------------------------------- .../scala/org/apache/spark/scheduler/ShuffleMapTask.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/99495618/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala index 1dc71a0..0f2deb4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala @@ -167,6 +167,7 @@ private[spark] class ShuffleMapTask( var totalTime = 0L val compressedSizes: Array[Byte] = shuffle.writers.map { writer: BlockObjectWriter => writer.commit() + writer.close() val size = writer.fileSegment().length totalBytes += size totalTime += writer.timeWriting() @@ -184,14 +185,16 @@ private[spark] class ShuffleMapTask( } catch { case e: Exception => // If there is an exception from running the task, revert the partial writes // and throw the exception upstream to Spark. - if (shuffle != null) { - shuffle.writers.foreach(_.revertPartialWrites()) + if (shuffle != null && shuffle.writers != null) { + for (writer <- shuffle.writers) { + writer.revertPartialWrites() + writer.close() + } } throw e } finally { // Release the writers back to the shuffle block manager. if (shuffle != null && shuffle.writers != null) { - shuffle.writers.foreach(_.close()) shuffle.releaseWriters(success) } // Execute the callbacks on task completion.
