Repository: spark
Updated Branches:
  refs/heads/branch-1.1 dee331738 -> 286f1efb0


[SPARK-4107] Fix incorrect handling of read() and skip() return values 
(branch-1.1 backport)

`read()` may return fewer bytes than requested; when this occurred, the old 
code would silently return less data than requested, which might cause stream 
corruption errors.  `skip()` faces similar issues, too.

This patch fixes several cases where we mis-handle these methods' return values.

This is a backport of #2969 to `branch-1.1`.

Author: Josh Rosen <[email protected]>

Closes #2974 from JoshRosen/spark-4107-branch-1.1-backport and squashes the 
following commits:

d82c05b [Josh Rosen] [SPARK-4107] Fix incorrect handling of read() and skip() 
return values


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/286f1efb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/286f1efb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/286f1efb

Branch: refs/heads/branch-1.1
Commit: 286f1efb0554f055de5dfc0b317b1dff120ce5a0
Parents: dee3317
Author: Josh Rosen <[email protected]>
Authored: Tue Oct 28 12:30:12 2014 -0700
Committer: Reynold Xin <[email protected]>
Committed: Tue Oct 28 12:30:12 2014 -0700

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/TestUtils.scala |  9 ++-------
 .../org/apache/spark/storage/DiskStore.scala    | 10 ++++++++--
 .../org/apache/spark/storage/TachyonStore.scala | 21 +++++++-------------
 .../scala/org/apache/spark/util/Utils.scala     |  6 +++---
 .../org/apache/spark/FileServerSuite.scala      |  8 ++------
 5 files changed, 22 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/286f1efb/core/src/main/scala/org/apache/spark/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala 
b/core/src/main/scala/org/apache/spark/TestUtils.scala
index 8ca7310..c5e7a73 100644
--- a/core/src/main/scala/org/apache/spark/TestUtils.scala
+++ b/core/src/main/scala/org/apache/spark/TestUtils.scala
@@ -23,8 +23,8 @@ import java.util.jar.{JarEntry, JarOutputStream}
 
 import scala.collection.JavaConversions._
 
+import com.google.common.io.{ByteStreams, Files}
 import javax.tools.{JavaFileObject, SimpleJavaFileObject, ToolProvider}
-import com.google.common.io.Files
 
 /**
  * Utilities for tests. Included in main codebase since it's used by multiple
@@ -63,12 +63,7 @@ private[spark] object TestUtils {
       jarStream.putNextEntry(jarEntry)
 
       val in = new FileInputStream(file)
-      val buffer = new Array[Byte](10240)
-      var nRead = 0
-      while (nRead <= 0) {
-        nRead = in.read(buffer, 0, buffer.length)
-        jarStream.write(buffer, 0, nRead)
-      }
+      ByteStreams.copy(in, jarStream)
       in.close()
     }
     jarStream.close()

http://git-wip-us.apache.org/repos/asf/spark/blob/286f1efb/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala 
b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
index 295c706..247e240 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.storage
 
-import java.io.{FileOutputStream, RandomAccessFile}
+import java.io.{IOException, FileOutputStream, RandomAccessFile}
 import java.nio.ByteBuffer
 import java.nio.channels.FileChannel.MapMode
 
@@ -111,7 +111,13 @@ private[spark] class DiskStore(blockManager: BlockManager, 
diskManager: DiskBloc
       // For small files, directly read rather than memory map
       if (segment.length < minMemoryMapBytes) {
         val buf = ByteBuffer.allocate(segment.length.toInt)
-        channel.read(buf, segment.offset)
+        channel.position(segment.offset)
+        while (buf.remaining() != 0) {
+          if (channel.read(buf) == -1) {
+            throw new IOException("Reached EOF before filling buffer\n" +
+              
s"offset=${segment.offset}\nblockId=$blockId\nbuf.remaining=${buf.remaining}")
+          }
+        }
         buf.flip()
         Some(buf)
       } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/286f1efb/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala 
b/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala
index 932b561..6dbad5f 100644
--- a/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala
@@ -20,6 +20,7 @@ package org.apache.spark.storage
 import java.io.IOException
 import java.nio.ByteBuffer
 
+import com.google.common.io.ByteStreams
 import tachyon.client.{ReadType, WriteType}
 
 import org.apache.spark.Logging
@@ -105,25 +106,17 @@ private[spark] class TachyonStore(
       return None
     }
     val is = file.getInStream(ReadType.CACHE)
-    var buffer: ByteBuffer = null
+    assert (is != null)
     try {
-      if (is != null) {
-        val size = file.length
-        val bs = new Array[Byte](size.asInstanceOf[Int])
-        val fetchSize = is.read(bs, 0, size.asInstanceOf[Int])
-        buffer = ByteBuffer.wrap(bs)
-        if (fetchSize != size) {
-          logWarning(s"Failed to fetch the block $blockId from Tachyon: Size 
$size " +
-            s"is not equal to fetched size $fetchSize")
-          return None
-        }
-      }
+      val size = file.length
+      val bs = new Array[Byte](size.asInstanceOf[Int])
+      ByteStreams.readFully(is, bs)
+      Some(ByteBuffer.wrap(bs))
     } catch {
       case ioe: IOException =>
         logWarning(s"Failed to fetch the block $blockId from Tachyon", ioe)
-        return None
+        None
     }
-    Some(buffer)
   }
 
   override def contains(blockId: BlockId): Boolean = {

http://git-wip-us.apache.org/repos/asf/spark/blob/286f1efb/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index aefbb24..6d3bef0 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -33,7 +33,7 @@ import scala.reflect.ClassTag
 import scala.util.Try
 import scala.util.control.{ControlThrowable, NonFatal}
 
-import com.google.common.io.Files
+import com.google.common.io.{ByteStreams, Files}
 import com.google.common.util.concurrent.ThreadFactoryBuilder
 import org.apache.commons.lang3.SystemUtils
 import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
@@ -1005,8 +1005,8 @@ private[spark] object Utils extends Logging {
     val stream = new FileInputStream(file)
 
     try {
-      stream.skip(effectiveStart)
-      stream.read(buff)
+      ByteStreams.skipFully(stream, effectiveStart)
+      ByteStreams.readFully(stream, buff)
     } finally {
       stream.close()
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/286f1efb/core/src/test/scala/org/apache/spark/FileServerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/FileServerSuite.scala 
b/core/src/test/scala/org/apache/spark/FileServerSuite.scala
index 7e18f45..5997e01 100644
--- a/core/src/test/scala/org/apache/spark/FileServerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileServerSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark
 import java.io._
 import java.util.jar.{JarEntry, JarOutputStream}
 
+import com.google.common.io.ByteStreams
 import com.google.common.io.Files
 import org.scalatest.FunSuite
 
@@ -60,12 +61,7 @@ class FileServerSuite extends FunSuite with 
LocalSparkContext {
     jar.putNextEntry(jarEntry)
 
     val in = new FileInputStream(textFile)
-    val buffer = new Array[Byte](10240)
-    var nRead = 0
-    while (nRead <= 0) {
-      nRead = in.read(buffer, 0, buffer.length)
-      jar.write(buffer, 0, nRead)
-    }
+    ByteStreams.copy(in, jar)
 
     in.close()
     jar.close()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to