Repository: spark
Updated Branches:
  refs/heads/branch-1.1 2cd40db2b -> 12a61d820


[SPARK-3948][Shuffle]Fix stream corruption bug in sort-based shuffle

Kernel 2.6.32 bug will lead to unexpected behavior of transferTo in copyStream, 
and this will corrupt the shuffle output file in sort-based shuffle, which will 
somehow introduce PARSING_ERROR(2), deserialization error or offset out of 
range. Here fix this by adding append flag, also add some position checking 
code. Details can be seen in 
[SPARK-3948](https://issues.apache.org/jira/browse/SPARK-3948).

Author: jerryshao <[email protected]>

Closes #2824 from jerryshao/SPARK-3948 and squashes the following commits:

be0533a [jerryshao] Address the comments
a82b184 [jerryshao] add configuration to control the NIO way of copying stream
e17ada2 [jerryshao] Fix kernel 2.6.32 bug led unexpected behavior of transferTo

(cherry picked from commit c7aeecd08fd329085760fa89025ec0d9c04f5e3f)
Signed-off-by: Josh Rosen <[email protected]>

Conflicts:
        core/src/main/scala/org/apache/spark/util/Utils.scala


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/12a61d82
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/12a61d82
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/12a61d82

Branch: refs/heads/branch-1.1
Commit: 12a61d8208b3f01047299e7f803d2bc7985ff3d0
Parents: 2cd40db
Author: jerryshao <[email protected]>
Authored: Mon Oct 20 10:20:21 2014 -0700
Committer: Josh Rosen <[email protected]>
Committed: Mon Oct 20 10:22:11 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/util/Utils.scala     | 27 +++++++++++++++++---
 .../spark/util/collection/ExternalSorter.scala  |  5 ++--
 2 files changed, 27 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/12a61d82/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 3f0a80b..0b52d72 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -292,23 +292,44 @@ private[spark] object Utils extends Logging {
     dir
   }
 
-  /** Copy all data from an InputStream to an OutputStream */
+  /** Copy all data from an InputStream to an OutputStream. NIO way of file 
stream to file stream
+    * copying is disabled by default unless explicitly set transferToEnabled 
as true,
+    * the parameter transferToEnabled should be configured by 
spark.file.transferTo = [true|false].
+    */
   def copyStream(in: InputStream,
                  out: OutputStream,
-                 closeStreams: Boolean = false): Long =
+                 closeStreams: Boolean = false,
+                 transferToEnabled: Boolean = false): Long =
   {
     var count = 0L
     try {
-      if (in.isInstanceOf[FileInputStream] && 
out.isInstanceOf[FileOutputStream]) {
+      if (in.isInstanceOf[FileInputStream] && 
out.isInstanceOf[FileOutputStream]
+        && transferToEnabled) {
         // When both streams are File stream, use transferTo to improve copy 
performance.
         val inChannel = in.asInstanceOf[FileInputStream].getChannel()
         val outChannel = out.asInstanceOf[FileOutputStream].getChannel()
+        val initialPos = outChannel.position()
         val size = inChannel.size()
 
         // In case transferTo method transferred less data than we have 
required.
         while (count < size) {
           count += inChannel.transferTo(count, size - count, outChannel)
         }
+
+        // Check the position after transferTo loop to see if it is in the 
right position and
+        // give user information if not.
+        // Position will not be increased to the expected length after calling 
transferTo in
+        // kernel version 2.6.32, this issue can be seen in
+        // https://bugs.openjdk.java.net/browse/JDK-7052359
+        // This will lead to stream corruption issue when using sort-based 
shuffle (SPARK-3948).
+        val finalPos = outChannel.position()
+        assert(finalPos == initialPos + size,
+          s"""
+             |Current position $finalPos do not equal to expected position 
${initialPos + size}
+             |after transferTo, please check your kernel version to see if it 
is 2.6.32,
+             |this is a kernel bug which will lead to unexpected behavior when 
using transferTo.
+             |You can set spark.file.transferTo = false to disable this NIO 
feature.
+           """.stripMargin)
       } else {
         val buf = new Array[Byte](8192)
         var n = 0

http://git-wip-us.apache.org/repos/asf/spark/blob/12a61d82/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala 
b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
index b58c7dd..43bbc68 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
@@ -93,6 +93,7 @@ private[spark] class ExternalSorter[K, V, C](
   private val conf = SparkEnv.get.conf
   private val spillingEnabled = conf.getBoolean("spark.shuffle.spill", true)
   private val fileBufferSize = conf.getInt("spark.shuffle.file.buffer.kb", 32) 
* 1024
+  private val transferToEnabled = conf.getBoolean("spark.file.transferTo", 
true)
 
   // Size of object batches when reading/writing from serializers.
   //
@@ -743,10 +744,10 @@ private[spark] class ExternalSorter[K, V, C](
       var out: FileOutputStream = null
       var in: FileInputStream = null
       try {
-        out = new FileOutputStream(outputFile)
+        out = new FileOutputStream(outputFile, true)
         for (i <- 0 until numPartitions) {
           in = new FileInputStream(partitionWriters(i).fileSegment().file)
-          val size = org.apache.spark.util.Utils.copyStream(in, out, false)
+          val size = org.apache.spark.util.Utils.copyStream(in, out, false, 
transferToEnabled)
           in.close()
           in = null
           lengths(i) = size


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to