Repository: spark Updated Branches: refs/heads/master 4e3365b57 -> ed249db9c
[SPARK-25237][SQL] Remove updateBytesReadWithFileSize in FileScanRDD ## What changes were proposed in this pull request? This pr removed the method `updateBytesReadWithFileSize` in `FileScanRDD` because it computes input metrics by file size supported in Hadoop 2.5 and earlier. The current Spark does not support the versions, so it causes wrong input metric numbers. This is rework from #22232. Closes #22232 ## How was this patch tested? Added tests in `FileBasedDataSourceSuite`. Closes #22324 from maropu/pr22232-2. Lead-authored-by: dujunling <dujunl...@huawei.com> Co-authored-by: Takeshi Yamamuro <yamam...@apache.org> Signed-off-by: Sean Owen <sean.o...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ed249db9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ed249db9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ed249db9 Branch: refs/heads/master Commit: ed249db9c464062fbab7c6f68ad24caaa95cec82 Parents: 4e3365b Author: dujunling <dujunl...@huawei.com> Authored: Thu Sep 6 21:44:46 2018 -0700 Committer: Sean Owen <sean.o...@databricks.com> Committed: Thu Sep 6 21:44:46 2018 -0700 ---------------------------------------------------------------------- .../sql/execution/datasources/FileScanRDD.scala | 10 -------- .../spark/sql/FileBasedDataSourceSuite.scala | 24 ++++++++++++++++++++ 2 files changed, 24 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/ed249db9/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala index 99fc78f..345c9d8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala @@ -89,14 +89,6 @@ class FileScanRDD( inputMetrics.setBytesRead(existingBytesRead + getBytesReadCallback()) } - // If we can't get the bytes read from the FS stats, fall back to the file size, - // which may be inaccurate. - private def updateBytesReadWithFileSize(): Unit = { - if (currentFile != null) { - inputMetrics.incBytesRead(currentFile.length) - } - } - private[this] val files = split.asInstanceOf[FilePartition].files.toIterator private[this] var currentFile: PartitionedFile = null private[this] var currentIterator: Iterator[Object] = null @@ -139,7 +131,6 @@ class FileScanRDD( /** Advances to the next file. Returns true if a new non-empty iterator is available. */ private def nextIterator(): Boolean = { - updateBytesReadWithFileSize() if (files.hasNext) { currentFile = files.next() logInfo(s"Reading File $currentFile") @@ -208,7 +199,6 @@ class FileScanRDD( override def close(): Unit = { updateBytesRead() - updateBytesReadWithFileSize() InputFileBlockHolder.unset() } } http://git-wip-us.apache.org/repos/asf/spark/blob/ed249db9/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index 4aa6afd..304ede9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -20,10 +20,13 @@ package org.apache.spark.sql import java.io.{File, FileNotFoundException} import java.util.Locale +import scala.collection.mutable + import org.apache.hadoop.fs.Path import org.scalatest.BeforeAndAfterAll import org.apache.spark.SparkException +import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} import org.apache.spark.sql.TestingUDT.{IntervalData, IntervalUDT, NullData, NullUDT} import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf @@ -473,6 +476,27 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo } } } + + test("SPARK-25237 compute correct input metrics in FileScanRDD") { + withTempPath { p => + val path = p.getAbsolutePath + spark.range(1000).repartition(1).write.csv(path) + val bytesReads = new mutable.ArrayBuffer[Long]() + val bytesReadListener = new SparkListener() { + override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { + bytesReads += taskEnd.taskMetrics.inputMetrics.bytesRead + } + } + sparkContext.addSparkListener(bytesReadListener) + try { + spark.read.csv(path).limit(1).collect() + sparkContext.listenerBus.waitUntilEmpty(1000L) + assert(bytesReads.sum === 7860) + } finally { + sparkContext.removeSparkListener(bytesReadListener) + } + } + } } object TestingUDT { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org