Repository: spark Updated Branches: refs/heads/branch-2.3 7102aeeb2 -> 5324a85a2
[SPARK-25674][SQL] If the records are incremented by more than 1 at a time,the number of bytes might rarely ever get updated ## What changes were proposed in this pull request? If the records are incremented by more than 1 at a time,the number of bytes might rarely ever get updatedï¼because it might skip over the count that is an exact multiple of UPDATE_INPUT_METRICS_INTERVAL_RECORDS. This PR just checks whether the increment causes the value to exceed a higher multiple of UPDATE_INPUT_METRICS_INTERVAL_RECORDS. ## How was this patch tested? existed unit tests Closes #22594 from 10110346/inputMetrics. Authored-by: liuxian <[email protected]> Signed-off-by: Sean Owen <[email protected]> (cherry picked from commit 69f5e9cce14632a1f912c3632243a4e20b275365) Signed-off-by: Sean Owen <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5324a85a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5324a85a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5324a85a Branch: refs/heads/branch-2.3 Commit: 5324a85a2fb0d632dff028c7d1d2ea8a4ead94f4 Parents: 7102aee Author: liuxian <[email protected]> Authored: Thu Oct 11 14:24:15 2018 -0700 Committer: Sean Owen <[email protected]> Committed: Thu Oct 11 14:24:58 2018 -0700 ---------------------------------------------------------------------- .../apache/spark/sql/execution/datasources/FileScanRDD.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/5324a85a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala index 28c36b6..d691715 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala @@ -112,12 +112,15 @@ class FileScanRDD( val nextElement = currentIterator.next() // TODO: we should have a better separation of row based and batch based scan, so that we // don't need to run this `if` for every record. + val preNumRecordsRead = inputMetrics.recordsRead if (nextElement.isInstanceOf[ColumnarBatch]) { inputMetrics.incRecordsRead(nextElement.asInstanceOf[ColumnarBatch].numRows()) } else { inputMetrics.incRecordsRead(1) } - if (inputMetrics.recordsRead % SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS == 0) { + // The records may be incremented by more than 1 at a time. + if (preNumRecordsRead / SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS != + inputMetrics.recordsRead / SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS) { updateBytesRead() } nextElement --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
