Repository: spark
Updated Branches:
  refs/heads/branch-2.2 098aaec30 -> 7a04def92


[SPARK-21621][CORE] Reset numRecordsWritten after 
DiskBlockObjectWriter.commitAndGet called

## What changes were proposed in this pull request?

We should reset numRecordsWritten to zero after 
DiskBlockObjectWriter.commitAndGet called.
Because when `revertPartialWritesAndClose` be called, we decrease the written 
records in `ShuffleWriteMetrics` . However, we decreased the written records to 
zero, this should be wrong, we should only decreased the number reords after 
the last `commitAndGet` called.

## How was this patch tested?
Modified existing test.

Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Author: Xianyang Liu <xianyang....@intel.com>

Closes #18830 from ConeyLiu/DiskBlockObjectWriter.

(cherry picked from commit 534a063f7c693158437d13224f50d4ae789ff6fb)
Signed-off-by: Wenchen Fan <wenc...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7a04def9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7a04def9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7a04def9

Branch: refs/heads/branch-2.2
Commit: 7a04def920438ef0e08b66a95befeec981e5571e
Parents: 098aaec
Author: Xianyang Liu <xianyang....@intel.com>
Authored: Mon Aug 7 17:04:53 2017 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Mon Aug 7 17:05:02 2017 +0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/storage/DiskBlockObjectWriter.scala     | 2 ++
 .../org/apache/spark/storage/DiskBlockObjectWriterSuite.scala      | 1 +
 2 files changed, 3 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7a04def9/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala 
b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala
index eb3ff92..a024c83 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala
@@ -95,6 +95,7 @@ private[spark] class DiskBlockObjectWriter(
   /**
    * Keep track of number of records written and also use this to periodically
    * output bytes written since the latter is expensive to do for each record.
+   * And we reset it after every commitAndGet called.
    */
   private var numRecordsWritten = 0
 
@@ -185,6 +186,7 @@ private[spark] class DiskBlockObjectWriter(
       // In certain compression codecs, more bytes are written after streams 
are closed
       writeMetrics.incBytesWritten(committedPosition - reportedPosition)
       reportedPosition = committedPosition
+      numRecordsWritten = 0
       fileSegment
     } else {
       new FileSegment(file, committedPosition, 0)

http://git-wip-us.apache.org/repos/asf/spark/blob/7a04def9/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala
index bfb3ac4..cea5501 100644
--- 
a/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala
@@ -116,6 +116,7 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with 
BeforeAndAfterEach {
     writer.revertPartialWritesAndClose()
     assert(firstSegment.length === file.length())
     assert(writeMetrics.bytesWritten === file.length())
+    assert(writeMetrics.recordsWritten == 1)
   }
 
   test("calling revertPartialWritesAndClose() after commit() should have no 
effect") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to