Repository: spark Updated Branches: refs/heads/branch-2.4 cd4065596 -> e80ab130e
[SPARK-25674][SQL] If the records are incremented by more than 1 at a time,the number of bytes might rarely ever get updated ## What changes were proposed in this pull request? If the records are incremented by more than 1 at a time,the number of bytes might rarely ever get updatedï¼because it might skip over the count that is an exact multiple of UPDATE_INPUT_METRICS_INTERVAL_RECORDS. This PR just checks whether the increment causes the value to exceed a higher multiple of UPDATE_INPUT_METRICS_INTERVAL_RECORDS. ## How was this patch tested? existed unit tests Closes #22594 from 10110346/inputMetrics. Authored-by: liuxian <[email protected]> Signed-off-by: Sean Owen <[email protected]> (cherry picked from commit 69f5e9cce14632a1f912c3632243a4e20b275365) Signed-off-by: Sean Owen <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e80ab130 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e80ab130 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e80ab130 Branch: refs/heads/branch-2.4 Commit: e80ab130e2327949224d433a14ba1e33310ee24c Parents: cd40655 Author: liuxian <[email protected]> Authored: Thu Oct 11 14:24:15 2018 -0700 Committer: Sean Owen <[email protected]> Committed: Thu Oct 11 14:24:31 2018 -0700 ---------------------------------------------------------------------- .../apache/spark/sql/execution/datasources/FileScanRDD.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/e80ab130/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala index 345c9d8..dd3c154 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala @@ -104,12 +104,15 @@ class FileScanRDD( val nextElement = currentIterator.next() // TODO: we should have a better separation of row based and batch based scan, so that we // don't need to run this `if` for every record. + val preNumRecordsRead = inputMetrics.recordsRead if (nextElement.isInstanceOf[ColumnarBatch]) { inputMetrics.incRecordsRead(nextElement.asInstanceOf[ColumnarBatch].numRows()) } else { inputMetrics.incRecordsRead(1) } - if (inputMetrics.recordsRead % SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS == 0) { + // The records may be incremented by more than 1 at a time. + if (preNumRecordsRead / SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS != + inputMetrics.recordsRead / SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS) { updateBytesRead() } nextElement --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
