Repository: spark
Updated Branches:
  refs/heads/master 70e69fc4d -> 12252d1da


[SPARK-13071] Coalescing HadoopRDD overwrites existing input metrics

This issue is causing tests to fail consistently in master with Hadoop 2.6 / 
2.7. This is because for Hadoop 2.5+ we overwrite existing values of 
`InputMetrics#bytesRead` in each call to `HadoopRDD#compute`. In the case of 
coalesce, e.g.
```
sc.textFile(..., 4).coalesce(2).count()
```
we will call `compute` multiple times in the same task, overwriting `bytesRead` 
values from previous calls to `compute`.

For a regression test, see `InputOutputMetricsSuite.input metrics for old 
hadoop with coalesce`. I did not add a new regression test because it's 
impossible without significant refactoring; there's a lot of existing duplicate 
code in this corner of Spark.

This was caused by #10835.

Author: Andrew Or <[email protected]>

Closes #10973 from andrewor14/fix-input-metrics-coalesce.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/12252d1d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/12252d1d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/12252d1d

Branch: refs/heads/master
Commit: 12252d1da90fa7d2dffa3a7c249ecc8821dee130
Parents: 70e69fc
Author: Andrew Or <[email protected]>
Authored: Fri Jan 29 18:03:04 2016 -0800
Committer: Andrew Or <[email protected]>
Committed: Fri Jan 29 18:03:08 2016 -0800

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala      | 7 ++++++-
 core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala   | 7 ++++++-
 .../spark/sql/execution/datasources/SqlNewHadoopRDD.scala     | 7 ++++++-
 3 files changed, 18 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/12252d1d/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 3204e6a..e2ebd7f 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -215,6 +215,7 @@ class HadoopRDD[K, V](
       // TODO: there is a lot of duplicate code between this and NewHadoopRDD 
and SqlNewHadoopRDD
 
       val inputMetrics = 
context.taskMetrics().registerInputMetrics(DataReadMethod.Hadoop)
+      val existingBytesRead = inputMetrics.bytesRead
 
       // Sets the thread local variable for the file's name
       split.inputSplit.value match {
@@ -230,9 +231,13 @@ class HadoopRDD[K, V](
         case _ => None
       }
 
+      // For Hadoop 2.5+, we get our input bytes from thread-local Hadoop 
FileSystem statistics.
+      // If we do a coalesce, however, we are likely to compute multiple 
partitions in the same
+      // task and in the same thread, in which case we need to avoid override 
values written by
+      // previous partitions (SPARK-13071).
       def updateBytesRead(): Unit = {
         getBytesReadCallback.foreach { getBytesRead =>
-          inputMetrics.setBytesRead(getBytesRead())
+          inputMetrics.setBytesRead(existingBytesRead + getBytesRead())
         }
       }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/12252d1d/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 4d2816e..e71d340 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -130,6 +130,7 @@ class NewHadoopRDD[K, V](
       val conf = getConf
 
       val inputMetrics = 
context.taskMetrics().registerInputMetrics(DataReadMethod.Hadoop)
+      val existingBytesRead = inputMetrics.bytesRead
 
       // Find a function that will return the FileSystem bytes read by this 
thread. Do this before
       // creating RecordReader, because RecordReader's constructor might read 
some bytes
@@ -139,9 +140,13 @@ class NewHadoopRDD[K, V](
         case _ => None
       }
 
+      // For Hadoop 2.5+, we get our input bytes from thread-local Hadoop 
FileSystem statistics.
+      // If we do a coalesce, however, we are likely to compute multiple 
partitions in the same
+      // task and in the same thread, in which case we need to avoid override 
values written by
+      // previous partitions (SPARK-13071).
       def updateBytesRead(): Unit = {
         getBytesReadCallback.foreach { getBytesRead =>
-          inputMetrics.setBytesRead(getBytesRead())
+          inputMetrics.setBytesRead(existingBytesRead + getBytesRead())
         }
       }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/12252d1d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala
index edd87c2..9703b16 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala
@@ -127,6 +127,7 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag](
       val conf = getConf(isDriverSide = false)
 
       val inputMetrics = 
context.taskMetrics().registerInputMetrics(DataReadMethod.Hadoop)
+      val existingBytesRead = inputMetrics.bytesRead
 
       // Sets the thread local variable for the file's name
       split.serializableHadoopSplit.value match {
@@ -142,9 +143,13 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag](
         case _ => None
       }
 
+      // For Hadoop 2.5+, we get our input bytes from thread-local Hadoop 
FileSystem statistics.
+      // If we do a coalesce, however, we are likely to compute multiple 
partitions in the same
+      // task and in the same thread, in which case we need to avoid override 
values written by
+      // previous partitions (SPARK-13071).
       def updateBytesRead(): Unit = {
         getBytesReadCallback.foreach { getBytesRead =>
-          inputMetrics.setBytesRead(getBytesRead())
+          inputMetrics.setBytesRead(existingBytesRead + getBytesRead())
         }
       }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to