Repository: spark
Updated Branches:
  refs/heads/master adf648b5b -> 69f5e9cce


[SPARK-25674][SQL] If the records are incremented by more than 1 at a time,the 
number of bytes might rarely ever get updated

## What changes were proposed in this pull request?
If the records are incremented by more than 1 at a time,the number of bytes 
might rarely ever get updated,because it might skip over the count that is an 
exact multiple of UPDATE_INPUT_METRICS_INTERVAL_RECORDS.

This PR just checks whether the increment causes the value to exceed a higher 
multiple of UPDATE_INPUT_METRICS_INTERVAL_RECORDS.

## How was this patch tested?
existed unit tests

Closes #22594 from 10110346/inputMetrics.

Authored-by: liuxian <[email protected]>
Signed-off-by: Sean Owen <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/69f5e9cc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/69f5e9cc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/69f5e9cc

Branch: refs/heads/master
Commit: 69f5e9cce14632a1f912c3632243a4e20b275365
Parents: adf648b
Author: liuxian <[email protected]>
Authored: Thu Oct 11 14:24:15 2018 -0700
Committer: Sean Owen <[email protected]>
Committed: Thu Oct 11 14:24:15 2018 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/execution/datasources/FileScanRDD.scala    | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/69f5e9cc/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
index 345c9d8..dd3c154 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
@@ -104,12 +104,15 @@ class FileScanRDD(
         val nextElement = currentIterator.next()
         // TODO: we should have a better separation of row based and batch 
based scan, so that we
         // don't need to run this `if` for every record.
+        val preNumRecordsRead = inputMetrics.recordsRead
         if (nextElement.isInstanceOf[ColumnarBatch]) {
           
inputMetrics.incRecordsRead(nextElement.asInstanceOf[ColumnarBatch].numRows())
         } else {
           inputMetrics.incRecordsRead(1)
         }
-        if (inputMetrics.recordsRead % 
SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS == 0) {
+        // The records may be incremented by more than 1 at a time.
+        if (preNumRecordsRead / 
SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS !=
+          inputMetrics.recordsRead / 
SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS) {
           updateBytesRead()
         }
         nextElement


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to