Repository: spark Updated Branches: refs/heads/branch-2.4 d64b35588 -> 8bc7ab03d
[SPARK-25674][FOLLOW-UP] Update the stats for each ColumnarBatch ## What changes were proposed in this pull request? This PR is a follow-up of https://github.com/apache/spark/pull/22594 . This alternative can avoid the unneeded computation in the hot code path. - For row-based scan, we keep the original way. - For the columnar scan, we just need to update the stats after each batch. ## How was this patch tested? N/A Closes #22731 from gatorsmile/udpateStatsFileScanRDD. Authored-by: gatorsmile <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit 4cee191c04f14d7272347e4b29201763c6cfb6bf) Signed-off-by: Sean Owen <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8bc7ab03 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8bc7ab03 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8bc7ab03 Branch: refs/heads/branch-2.4 Commit: 8bc7ab03dd417f0f9cca509df4566d621e633305 Parents: d64b355 Author: gatorsmile <[email protected]> Authored: Tue Oct 16 08:58:29 2018 +0800 Committer: Sean Owen <[email protected]> Committed: Mon Oct 15 21:20:49 2018 -0500 ---------------------------------------------------------------------- .../sql/execution/datasources/FileScanRDD.scala | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/8bc7ab03/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala index dd3c154..ffea33c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala @@ -85,7 +85,7 @@ class FileScanRDD( // If we do a coalesce, however, we are likely to compute multiple partitions in the same // task and in the same thread, in which case we need to avoid override values written by // previous partitions (SPARK-13071). - private def updateBytesRead(): Unit = { + private def incTaskInputMetricsBytesRead(): Unit = { inputMetrics.setBytesRead(existingBytesRead + getBytesReadCallback()) } @@ -106,15 +106,16 @@ class FileScanRDD( // don't need to run this `if` for every record. val preNumRecordsRead = inputMetrics.recordsRead if (nextElement.isInstanceOf[ColumnarBatch]) { + incTaskInputMetricsBytesRead() inputMetrics.incRecordsRead(nextElement.asInstanceOf[ColumnarBatch].numRows()) } else { + // too costly to update every record + if (inputMetrics.recordsRead % + SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS == 0) { + incTaskInputMetricsBytesRead() + } inputMetrics.incRecordsRead(1) } - // The records may be incremented by more than 1 at a time. - if (preNumRecordsRead / SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS != - inputMetrics.recordsRead / SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS) { - updateBytesRead() - } nextElement } @@ -201,7 +202,7 @@ class FileScanRDD( } override def close(): Unit = { - updateBytesRead() + incTaskInputMetricsBytesRead() InputFileBlockHolder.unset() } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
