Repository: spark
Updated Branches:
  refs/heads/branch-2.3 7102aeeb2 -> 5324a85a2


[SPARK-25674][SQL] If the records are incremented by more than 1 at a time,the 
number of bytes might rarely ever get updated

## What changes were proposed in this pull request?
If the records are incremented by more than 1 at a time,the number of bytes 
might rarely ever get updated,because it might skip over the count that is an 
exact multiple of UPDATE_INPUT_METRICS_INTERVAL_RECORDS.

This PR just checks whether the increment causes the value to exceed a higher 
multiple of UPDATE_INPUT_METRICS_INTERVAL_RECORDS.

## How was this patch tested?
existed unit tests

Closes #22594 from 10110346/inputMetrics.

Authored-by: liuxian <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
(cherry picked from commit 69f5e9cce14632a1f912c3632243a4e20b275365)
Signed-off-by: Sean Owen <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5324a85a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5324a85a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5324a85a

Branch: refs/heads/branch-2.3
Commit: 5324a85a2fb0d632dff028c7d1d2ea8a4ead94f4
Parents: 7102aee
Author: liuxian <[email protected]>
Authored: Thu Oct 11 14:24:15 2018 -0700
Committer: Sean Owen <[email protected]>
Committed: Thu Oct 11 14:24:58 2018 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/execution/datasources/FileScanRDD.scala    | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5324a85a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
index 28c36b6..d691715 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
@@ -112,12 +112,15 @@ class FileScanRDD(
         val nextElement = currentIterator.next()
         // TODO: we should have a better separation of row based and batch 
based scan, so that we
         // don't need to run this `if` for every record.
+        val preNumRecordsRead = inputMetrics.recordsRead
         if (nextElement.isInstanceOf[ColumnarBatch]) {
           
inputMetrics.incRecordsRead(nextElement.asInstanceOf[ColumnarBatch].numRows())
         } else {
           inputMetrics.incRecordsRead(1)
         }
-        if (inputMetrics.recordsRead % 
SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS == 0) {
+        // The records may be incremented by more than 1 at a time.
+        if (preNumRecordsRead / 
SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS !=
+          inputMetrics.recordsRead / 
SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS) {
           updateBytesRead()
         }
         nextElement


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to