Repository: spark
Updated Branches:
  refs/heads/master 4ceb048b3 -> 46c63417c


[SPARK-4107] Fix incorrect handling of read() and skip() return values

`read()` may return fewer bytes than requested; when this occurred, the old 
code would silently return less data than requested, which might cause stream 
corruption errors.  `skip()` faces similar issues, too.

This patch fixes several cases where we mis-handle these methods' return values.

Author: Josh Rosen <[email protected]>

Closes #2969 from JoshRosen/file-channel-read-fix and squashes the following 
commits:

e724a9f [Josh Rosen] Fix similar issue of not checking skip() return value.
cbc03ce [Josh Rosen] Update the other log message, too.
01e6015 [Josh Rosen] file.getName -> file.getAbsolutePath
d961d95 [Josh Rosen] Fix another issue in FileServerSuite.
b9265d2 [Josh Rosen] Fix a similar (minor) issue in TestUtils.
cd9d76f [Josh Rosen] Fix a similar error in Tachyon:
3db0008 [Josh Rosen] Fix a similar read() error in Utils.offsetBytes().
db985ed [Josh Rosen] Fix unsafe usage of FileChannel.read():


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/46c63417
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/46c63417
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/46c63417

Branch: refs/heads/master
Commit: 46c63417c1bb1aea07baf9036cc5b8f1c3781bbe
Parents: 4ceb048
Author: Josh Rosen <[email protected]>
Authored: Tue Oct 28 00:04:16 2014 -0700
Committer: Reynold Xin <[email protected]>
Committed: Tue Oct 28 00:04:16 2014 -0700

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/TestUtils.scala |  9 ++-------
 .../apache/spark/network/ManagedBuffer.scala    | 10 ++++++++--
 .../shuffle/IndexShuffleBlockManager.scala      |  4 +++-
 .../org/apache/spark/storage/DiskStore.scala    | 10 ++++++++--
 .../org/apache/spark/storage/TachyonStore.scala | 21 +++++++-------------
 .../scala/org/apache/spark/util/Utils.scala     |  6 +++---
 .../org/apache/spark/FileServerSuite.scala      |  8 ++------
 7 files changed, 33 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/46c63417/core/src/main/scala/org/apache/spark/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala 
b/core/src/main/scala/org/apache/spark/TestUtils.scala
index e72826d..3407814 100644
--- a/core/src/main/scala/org/apache/spark/TestUtils.scala
+++ b/core/src/main/scala/org/apache/spark/TestUtils.scala
@@ -23,8 +23,8 @@ import java.util.jar.{JarEntry, JarOutputStream}
 
 import scala.collection.JavaConversions._
 
+import com.google.common.io.{ByteStreams, Files}
 import javax.tools.{JavaFileObject, SimpleJavaFileObject, ToolProvider}
-import com.google.common.io.Files
 
 import org.apache.spark.util.Utils
 
@@ -64,12 +64,7 @@ private[spark] object TestUtils {
       jarStream.putNextEntry(jarEntry)
 
       val in = new FileInputStream(file)
-      val buffer = new Array[Byte](10240)
-      var nRead = 0
-      while (nRead <= 0) {
-        nRead = in.read(buffer, 0, buffer.length)
-        jarStream.write(buffer, 0, nRead)
-      }
+      ByteStreams.copy(in, jarStream)
       in.close()
     }
     jarStream.close()

http://git-wip-us.apache.org/repos/asf/spark/blob/46c63417/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala 
b/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala
index 4c9ca97..4211ba4 100644
--- a/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala
+++ b/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala
@@ -81,7 +81,13 @@ final class FileSegmentManagedBuffer(val file: File, val 
offset: Long, val lengt
       // Just copy the buffer if it's sufficiently small, as memory mapping 
has a high overhead.
       if (length < MIN_MEMORY_MAP_BYTES) {
         val buf = ByteBuffer.allocate(length.toInt)
-        channel.read(buf, offset)
+        channel.position(offset)
+        while (buf.remaining() != 0) {
+          if (channel.read(buf) == -1) {
+            throw new IOException("Reached EOF before filling buffer\n" +
+              
s"offset=$offset\nfile=${file.getAbsolutePath}\nbuf.remaining=${buf.remaining}")
+          }
+        }
         buf.flip()
         buf
       } else {
@@ -106,7 +112,7 @@ final class FileSegmentManagedBuffer(val file: File, val 
offset: Long, val lengt
     var is: FileInputStream = null
     try {
       is = new FileInputStream(file)
-      is.skip(offset)
+      ByteStreams.skipFully(is, offset)
       ByteStreams.limit(is, length)
     } catch {
       case e: IOException =>

http://git-wip-us.apache.org/repos/asf/spark/blob/46c63417/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala 
b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala
index 4ab3433..b5cd34c 100644
--- 
a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala
+++ 
b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala
@@ -20,6 +20,8 @@ package org.apache.spark.shuffle
 import java.io._
 import java.nio.ByteBuffer
 
+import com.google.common.io.ByteStreams
+
 import org.apache.spark.SparkEnv
 import org.apache.spark.network.{ManagedBuffer, FileSegmentManagedBuffer}
 import org.apache.spark.storage._
@@ -101,7 +103,7 @@ class IndexShuffleBlockManager extends ShuffleBlockManager {
 
     val in = new DataInputStream(new FileInputStream(indexFile))
     try {
-      in.skip(blockId.reduceId * 8)
+      ByteStreams.skipFully(in, blockId.reduceId * 8)
       val offset = in.readLong()
       val nextOffset = in.readLong()
       new FileSegmentManagedBuffer(

http://git-wip-us.apache.org/repos/asf/spark/blob/46c63417/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala 
b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
index bac459e..8dadf67 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.storage
 
-import java.io.{File, FileOutputStream, RandomAccessFile}
+import java.io.{IOException, File, FileOutputStream, RandomAccessFile}
 import java.nio.ByteBuffer
 import java.nio.channels.FileChannel.MapMode
 
@@ -110,7 +110,13 @@ private[spark] class DiskStore(blockManager: BlockManager, 
diskManager: DiskBloc
       // For small files, directly read rather than memory map
       if (length < minMemoryMapBytes) {
         val buf = ByteBuffer.allocate(length.toInt)
-        channel.read(buf, offset)
+        channel.position(offset)
+        while (buf.remaining() != 0) {
+          if (channel.read(buf) == -1) {
+            throw new IOException("Reached EOF before filling buffer\n" +
+              
s"offset=$offset\nfile=${file.getAbsolutePath}\nbuf.remaining=${buf.remaining}")
+          }
+        }
         buf.flip()
         Some(buf)
       } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/46c63417/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala 
b/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala
index 932b561..6dbad5f 100644
--- a/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala
@@ -20,6 +20,7 @@ package org.apache.spark.storage
 import java.io.IOException
 import java.nio.ByteBuffer
 
+import com.google.common.io.ByteStreams
 import tachyon.client.{ReadType, WriteType}
 
 import org.apache.spark.Logging
@@ -105,25 +106,17 @@ private[spark] class TachyonStore(
       return None
     }
     val is = file.getInStream(ReadType.CACHE)
-    var buffer: ByteBuffer = null
+    assert (is != null)
     try {
-      if (is != null) {
-        val size = file.length
-        val bs = new Array[Byte](size.asInstanceOf[Int])
-        val fetchSize = is.read(bs, 0, size.asInstanceOf[Int])
-        buffer = ByteBuffer.wrap(bs)
-        if (fetchSize != size) {
-          logWarning(s"Failed to fetch the block $blockId from Tachyon: Size 
$size " +
-            s"is not equal to fetched size $fetchSize")
-          return None
-        }
-      }
+      val size = file.length
+      val bs = new Array[Byte](size.asInstanceOf[Int])
+      ByteStreams.readFully(is, bs)
+      Some(ByteBuffer.wrap(bs))
     } catch {
       case ioe: IOException =>
         logWarning(s"Failed to fetch the block $blockId from Tachyon", ioe)
-        return None
+        None
     }
-    Some(buffer)
   }
 
   override def contains(blockId: BlockId): Boolean = {

http://git-wip-us.apache.org/repos/asf/spark/blob/46c63417/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 93ac9f1..4660030 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -33,7 +33,7 @@ import scala.reflect.ClassTag
 import scala.util.Try
 import scala.util.control.{ControlThrowable, NonFatal}
 
-import com.google.common.io.Files
+import com.google.common.io.{ByteStreams, Files}
 import com.google.common.util.concurrent.ThreadFactoryBuilder
 import org.apache.commons.lang3.SystemUtils
 import org.apache.hadoop.conf.Configuration
@@ -1062,8 +1062,8 @@ private[spark] object Utils extends Logging {
     val stream = new FileInputStream(file)
 
     try {
-      stream.skip(effectiveStart)
-      stream.read(buff)
+      ByteStreams.skipFully(stream, effectiveStart)
+      ByteStreams.readFully(stream, buff)
     } finally {
       stream.close()
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/46c63417/core/src/test/scala/org/apache/spark/FileServerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/FileServerSuite.scala 
b/core/src/test/scala/org/apache/spark/FileServerSuite.scala
index a886702..379c2a6 100644
--- a/core/src/test/scala/org/apache/spark/FileServerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileServerSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark
 import java.io._
 import java.util.jar.{JarEntry, JarOutputStream}
 
+import com.google.common.io.ByteStreams
 import org.scalatest.FunSuite
 
 import org.apache.spark.SparkContext._
@@ -58,12 +59,7 @@ class FileServerSuite extends FunSuite with 
LocalSparkContext {
     jar.putNextEntry(jarEntry)
 
     val in = new FileInputStream(textFile)
-    val buffer = new Array[Byte](10240)
-    var nRead = 0
-    while (nRead <= 0) {
-      nRead = in.read(buffer, 0, buffer.length)
-      jar.write(buffer, 0, nRead)
-    }
+    ByteStreams.copy(in, jar)
 
     in.close()
     jar.close()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to