Repository: spark
Updated Branches:
  refs/heads/master 4e3365b57 -> ed249db9c


[SPARK-25237][SQL] Remove updateBytesReadWithFileSize in FileScanRDD

## What changes were proposed in this pull request?
This pr removed the method `updateBytesReadWithFileSize` in `FileScanRDD` 
because it computes input metrics by file size supported in Hadoop 2.5 and 
earlier. The current Spark does not support the versions, so it causes wrong 
input metric numbers.

This is rework from #22232.

Closes #22232

## How was this patch tested?
Added tests in `FileBasedDataSourceSuite`.

Closes #22324 from maropu/pr22232-2.

Lead-authored-by: dujunling <dujunl...@huawei.com>
Co-authored-by: Takeshi Yamamuro <yamam...@apache.org>
Signed-off-by: Sean Owen <sean.o...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ed249db9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ed249db9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ed249db9

Branch: refs/heads/master
Commit: ed249db9c464062fbab7c6f68ad24caaa95cec82
Parents: 4e3365b
Author: dujunling <dujunl...@huawei.com>
Authored: Thu Sep 6 21:44:46 2018 -0700
Committer: Sean Owen <sean.o...@databricks.com>
Committed: Thu Sep 6 21:44:46 2018 -0700

----------------------------------------------------------------------
 .../sql/execution/datasources/FileScanRDD.scala | 10 --------
 .../spark/sql/FileBasedDataSourceSuite.scala    | 24 ++++++++++++++++++++
 2 files changed, 24 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ed249db9/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
index 99fc78f..345c9d8 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
@@ -89,14 +89,6 @@ class FileScanRDD(
         inputMetrics.setBytesRead(existingBytesRead + getBytesReadCallback())
       }
 
-      // If we can't get the bytes read from the FS stats, fall back to the 
file size,
-      // which may be inaccurate.
-      private def updateBytesReadWithFileSize(): Unit = {
-        if (currentFile != null) {
-          inputMetrics.incBytesRead(currentFile.length)
-        }
-      }
-
       private[this] val files = 
split.asInstanceOf[FilePartition].files.toIterator
       private[this] var currentFile: PartitionedFile = null
       private[this] var currentIterator: Iterator[Object] = null
@@ -139,7 +131,6 @@ class FileScanRDD(
 
       /** Advances to the next file. Returns true if a new non-empty iterator 
is available. */
       private def nextIterator(): Boolean = {
-        updateBytesReadWithFileSize()
         if (files.hasNext) {
           currentFile = files.next()
           logInfo(s"Reading File $currentFile")
@@ -208,7 +199,6 @@ class FileScanRDD(
 
       override def close(): Unit = {
         updateBytesRead()
-        updateBytesReadWithFileSize()
         InputFileBlockHolder.unset()
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/ed249db9/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
index 4aa6afd..304ede9 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
@@ -20,10 +20,13 @@ package org.apache.spark.sql
 import java.io.{File, FileNotFoundException}
 import java.util.Locale
 
+import scala.collection.mutable
+
 import org.apache.hadoop.fs.Path
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.spark.SparkException
+import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
 import org.apache.spark.sql.TestingUDT.{IntervalData, IntervalUDT, NullData, 
NullUDT}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
@@ -473,6 +476,27 @@ class FileBasedDataSourceSuite extends QueryTest with 
SharedSQLContext with Befo
       }
     }
   }
+
+  test("SPARK-25237 compute correct input metrics in FileScanRDD") {
+    withTempPath { p =>
+      val path = p.getAbsolutePath
+      spark.range(1000).repartition(1).write.csv(path)
+      val bytesReads = new mutable.ArrayBuffer[Long]()
+      val bytesReadListener = new SparkListener() {
+        override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
+          bytesReads += taskEnd.taskMetrics.inputMetrics.bytesRead
+        }
+      }
+      sparkContext.addSparkListener(bytesReadListener)
+      try {
+        spark.read.csv(path).limit(1).collect()
+        sparkContext.listenerBus.waitUntilEmpty(1000L)
+        assert(bytesReads.sum === 7860)
+      } finally {
+        sparkContext.removeSparkListener(bytesReadListener)
+      }
+    }
+  }
 }
 
 object TestingUDT {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to