Repository: spark
Updated Branches:
  refs/heads/branch-2.4 cd4065596 -> e80ab130e


[SPARK-25674][SQL] If the records are incremented by more than 1 at a time,the 
number of bytes might rarely ever get updated

## What changes were proposed in this pull request?
If the records are incremented by more than 1 at a time,the number of bytes 
might rarely ever get updated,because it might skip over the count that is an 
exact multiple of UPDATE_INPUT_METRICS_INTERVAL_RECORDS.

This PR just checks whether the increment causes the value to exceed a higher 
multiple of UPDATE_INPUT_METRICS_INTERVAL_RECORDS.

## How was this patch tested?
existed unit tests

Closes #22594 from 10110346/inputMetrics.

Authored-by: liuxian <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
(cherry picked from commit 69f5e9cce14632a1f912c3632243a4e20b275365)
Signed-off-by: Sean Owen <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e80ab130
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e80ab130
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e80ab130

Branch: refs/heads/branch-2.4
Commit: e80ab130e2327949224d433a14ba1e33310ee24c
Parents: cd40655
Author: liuxian <[email protected]>
Authored: Thu Oct 11 14:24:15 2018 -0700
Committer: Sean Owen <[email protected]>
Committed: Thu Oct 11 14:24:31 2018 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/execution/datasources/FileScanRDD.scala    | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e80ab130/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
index 345c9d8..dd3c154 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
@@ -104,12 +104,15 @@ class FileScanRDD(
         val nextElement = currentIterator.next()
         // TODO: we should have a better separation of row based and batch 
based scan, so that we
         // don't need to run this `if` for every record.
+        val preNumRecordsRead = inputMetrics.recordsRead
         if (nextElement.isInstanceOf[ColumnarBatch]) {
           
inputMetrics.incRecordsRead(nextElement.asInstanceOf[ColumnarBatch].numRows())
         } else {
           inputMetrics.incRecordsRead(1)
         }
-        if (inputMetrics.recordsRead % 
SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS == 0) {
+        // The records may be incremented by more than 1 at a time.
+        if (preNumRecordsRead / 
SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS !=
+          inputMetrics.recordsRead / 
SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS) {
           updateBytesRead()
         }
         nextElement


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to