Repository: spark Updated Branches: refs/heads/branch-1.1 dee331738 -> 286f1efb0
[SPARK-4107] Fix incorrect handling of read() and skip() return values (branch-1.1 backport) `read()` may return fewer bytes than requested; when this occurred, the old code would silently return less data than requested, which might cause stream corruption errors. `skip()` faces similar issues, too. This patch fixes several cases where we mis-handle these methods' return values. This is a backport of #2969 to `branch-1.1`. Author: Josh Rosen <[email protected]> Closes #2974 from JoshRosen/spark-4107-branch-1.1-backport and squashes the following commits: d82c05b [Josh Rosen] [SPARK-4107] Fix incorrect handling of read() and skip() return values Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/286f1efb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/286f1efb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/286f1efb Branch: refs/heads/branch-1.1 Commit: 286f1efb0554f055de5dfc0b317b1dff120ce5a0 Parents: dee3317 Author: Josh Rosen <[email protected]> Authored: Tue Oct 28 12:30:12 2014 -0700 Committer: Reynold Xin <[email protected]> Committed: Tue Oct 28 12:30:12 2014 -0700 ---------------------------------------------------------------------- .../main/scala/org/apache/spark/TestUtils.scala | 9 ++------- .../org/apache/spark/storage/DiskStore.scala | 10 ++++++++-- .../org/apache/spark/storage/TachyonStore.scala | 21 +++++++------------- .../scala/org/apache/spark/util/Utils.scala | 6 +++--- .../org/apache/spark/FileServerSuite.scala | 8 ++------ 5 files changed, 22 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/286f1efb/core/src/main/scala/org/apache/spark/TestUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala b/core/src/main/scala/org/apache/spark/TestUtils.scala index 8ca7310..c5e7a73 100644 --- a/core/src/main/scala/org/apache/spark/TestUtils.scala +++ b/core/src/main/scala/org/apache/spark/TestUtils.scala @@ -23,8 +23,8 @@ import java.util.jar.{JarEntry, JarOutputStream} import scala.collection.JavaConversions._ +import com.google.common.io.{ByteStreams, Files} import javax.tools.{JavaFileObject, SimpleJavaFileObject, ToolProvider} -import com.google.common.io.Files /** * Utilities for tests. Included in main codebase since it's used by multiple @@ -63,12 +63,7 @@ private[spark] object TestUtils { jarStream.putNextEntry(jarEntry) val in = new FileInputStream(file) - val buffer = new Array[Byte](10240) - var nRead = 0 - while (nRead <= 0) { - nRead = in.read(buffer, 0, buffer.length) - jarStream.write(buffer, 0, nRead) - } + ByteStreams.copy(in, jarStream) in.close() } jarStream.close() http://git-wip-us.apache.org/repos/asf/spark/blob/286f1efb/core/src/main/scala/org/apache/spark/storage/DiskStore.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index 295c706..247e240 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -17,7 +17,7 @@ package org.apache.spark.storage -import java.io.{FileOutputStream, RandomAccessFile} +import java.io.{IOException, FileOutputStream, RandomAccessFile} import java.nio.ByteBuffer import java.nio.channels.FileChannel.MapMode @@ -111,7 +111,13 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc // For small files, directly read rather than memory map if (segment.length < minMemoryMapBytes) { val buf = ByteBuffer.allocate(segment.length.toInt) - channel.read(buf, segment.offset) + channel.position(segment.offset) + while (buf.remaining() != 0) { + if (channel.read(buf) == -1) { + throw new IOException("Reached EOF before filling buffer\n" + + s"offset=${segment.offset}\nblockId=$blockId\nbuf.remaining=${buf.remaining}") + } + } buf.flip() Some(buf) } else { http://git-wip-us.apache.org/repos/asf/spark/blob/286f1efb/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala b/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala index 932b561..6dbad5f 100644 --- a/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala @@ -20,6 +20,7 @@ package org.apache.spark.storage import java.io.IOException import java.nio.ByteBuffer +import com.google.common.io.ByteStreams import tachyon.client.{ReadType, WriteType} import org.apache.spark.Logging @@ -105,25 +106,17 @@ private[spark] class TachyonStore( return None } val is = file.getInStream(ReadType.CACHE) - var buffer: ByteBuffer = null + assert (is != null) try { - if (is != null) { - val size = file.length - val bs = new Array[Byte](size.asInstanceOf[Int]) - val fetchSize = is.read(bs, 0, size.asInstanceOf[Int]) - buffer = ByteBuffer.wrap(bs) - if (fetchSize != size) { - logWarning(s"Failed to fetch the block $blockId from Tachyon: Size $size " + - s"is not equal to fetched size $fetchSize") - return None - } - } + val size = file.length + val bs = new Array[Byte](size.asInstanceOf[Int]) + ByteStreams.readFully(is, bs) + Some(ByteBuffer.wrap(bs)) } catch { case ioe: IOException => logWarning(s"Failed to fetch the block $blockId from Tachyon", ioe) - return None + None } - Some(buffer) } override def contains(blockId: BlockId): Boolean = { http://git-wip-us.apache.org/repos/asf/spark/blob/286f1efb/core/src/main/scala/org/apache/spark/util/Utils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index aefbb24..6d3bef0 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -33,7 +33,7 @@ import scala.reflect.ClassTag import scala.util.Try import scala.util.control.{ControlThrowable, NonFatal} -import com.google.common.io.Files +import com.google.common.io.{ByteStreams, Files} import com.google.common.util.concurrent.ThreadFactoryBuilder import org.apache.commons.lang3.SystemUtils import org.apache.hadoop.fs.{FileSystem, FileUtil, Path} @@ -1005,8 +1005,8 @@ private[spark] object Utils extends Logging { val stream = new FileInputStream(file) try { - stream.skip(effectiveStart) - stream.read(buff) + ByteStreams.skipFully(stream, effectiveStart) + ByteStreams.readFully(stream, buff) } finally { stream.close() } http://git-wip-us.apache.org/repos/asf/spark/blob/286f1efb/core/src/test/scala/org/apache/spark/FileServerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/FileServerSuite.scala b/core/src/test/scala/org/apache/spark/FileServerSuite.scala index 7e18f45..5997e01 100644 --- a/core/src/test/scala/org/apache/spark/FileServerSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileServerSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark import java.io._ import java.util.jar.{JarEntry, JarOutputStream} +import com.google.common.io.ByteStreams import com.google.common.io.Files import org.scalatest.FunSuite @@ -60,12 +61,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext { jar.putNextEntry(jarEntry) val in = new FileInputStream(textFile) - val buffer = new Array[Byte](10240) - var nRead = 0 - while (nRead <= 0) { - nRead = in.read(buffer, 0, buffer.length) - jar.write(buffer, 0, nRead) - } + ByteStreams.copy(in, jar) in.close() jar.close() --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
