Repository: spark
Updated Branches:
  refs/heads/master 342b57db6 -> 5a8f64f33


[SPARK-3958] TorrentBroadcast cleanup / debugging improvements.

This PR makes several changes to TorrentBroadcast in order to make
it easier to reason about, which should help when debugging SPARK-3958.
The key changes:

- Remove all state from the global TorrentBroadcast object.  This state
  consisted mainly of configuration options, like the block size and
  compression codec, and was read by the blockify / unblockify methods.
  Unfortunately, the use of `lazy val` for `BLOCK_SIZE` meant that the block
  size was always determined by the first SparkConf that TorrentBroadast was
  initialized with; as a result, unit tests could not properly test
  TorrentBroadcast with different block sizes.

  Instead, blockifyObject and unBlockifyObject now accept compression codecs
  and blockSizes as arguments.  These arguments are supplied at the call sites
  inside of TorrentBroadcast instances.  Each TorrentBroadcast instance
  determines these values from SparkEnv's SparkConf.  I was careful to ensure
  that we do not accidentally serialize CompressionCodec or SparkConf objects
  as part of the TorrentBroadcast object.

- Remove special-case handling of local-mode in TorrentBroadcast.  I don't
  think that broadcast implementations should know about whether we're running
  in local mode.  If we want to optimize the performance of broadcast in local
  mode, then we should detect this at a higher level and use a dummy
  LocalBroadcastFactory implementation instead.

  Removing this code fixes a subtle error condition: in the old local mode
  code, a failure to find the broadcast in the local BlockManager would lead
  to an attempt to deblockify zero blocks, which could lead to confusing
  deserialization or decompression errors when we attempted to decompress
  an empty byte array.  This should never have happened, though: a failure to
  find the block in local mode is evidence of some other error.  The changes
  here will make it easier to debug those errors if they ever happen.

- Add a check that throws an exception when attempting to deblockify an
  empty array.

- Use ScalaCheck to add a test to check that TorrentBroadcast's
  blockifyObject and unBlockifyObject methods are inverses.

- Misc. cleanup and logging improvements.

Author: Josh Rosen <[email protected]>

Closes #2844 from JoshRosen/torrentbroadcast-bugfix and squashes the following 
commits:

1e8268d [Josh Rosen] Address Reynold's review comments
2a9fdfd [Josh Rosen] Address Reynold's review comments.
c3b08f9 [Josh Rosen] Update TorrentBroadcast tests to reflect removal of 
special local-mode optimizations.
5c22782 [Josh Rosen] Store broadcast variable's value in the driver.
33fc754 [Josh Rosen] Change blockify/unblockifyObject to accept serializer as 
argument.
618a872 [Josh Rosen] [SPARK-3958] TorrentBroadcast cleanup / debugging 
improvements.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5a8f64f3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5a8f64f3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5a8f64f3

Branch: refs/heads/master
Commit: 5a8f64f33632fbf89d16cade2e0e66c5ed60760b
Parents: 342b57d
Author: Josh Rosen <[email protected]>
Authored: Tue Oct 21 00:49:11 2014 -0700
Committer: Josh Rosen <[email protected]>
Committed: Tue Oct 21 00:49:11 2014 -0700

----------------------------------------------------------------------
 .../spark/broadcast/TorrentBroadcast.scala      | 136 +++++++++----------
 .../broadcast/TorrentBroadcastFactory.scala     |  11 +-
 .../apache/spark/broadcast/BroadcastSuite.scala |  42 ++++--
 3 files changed, 97 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5a8f64f3/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala 
b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
index 42d5868..99af2e9 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
@@ -26,6 +26,7 @@ import scala.util.Random
 
 import org.apache.spark.{Logging, SparkConf, SparkEnv, SparkException}
 import org.apache.spark.io.CompressionCodec
+import org.apache.spark.serializer.Serializer
 import org.apache.spark.storage.{BroadcastBlockId, StorageLevel}
 import org.apache.spark.util.ByteBufferInputStream
 import org.apache.spark.util.io.ByteArrayChunkOutputStream
@@ -46,14 +47,12 @@ import org.apache.spark.util.io.ByteArrayChunkOutputStream
  * This prevents the driver from being the bottleneck in sending out multiple 
copies of the
  * broadcast data (one per executor) as done by the 
[[org.apache.spark.broadcast.HttpBroadcast]].
  *
+ * When initialized, TorrentBroadcast objects read SparkEnv.get.conf.
+ *
  * @param obj object to broadcast
- * @param isLocal whether Spark is running in local mode (single JVM process).
  * @param id A unique identifier for the broadcast variable.
  */
-private[spark] class TorrentBroadcast[T: ClassTag](
-    obj : T,
-    @transient private val isLocal: Boolean,
-    id: Long)
+private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
   extends Broadcast[T](id) with Logging with Serializable {
 
   /**
@@ -62,6 +61,20 @@ private[spark] class TorrentBroadcast[T: ClassTag](
    * blocks from the driver and/or other executors.
    */
   @transient private var _value: T = obj
+  /** The compression codec to use, or None if compression is disabled */
+  @transient private var compressionCodec: Option[CompressionCodec] = _
+  /** Size of each block. Default value is 4MB.  This value is only read by 
the broadcaster. */
+  @transient private var blockSize: Int = _
+
+  private def setConf(conf: SparkConf) {
+    compressionCodec = if (conf.getBoolean("spark.broadcast.compress", true)) {
+      Some(CompressionCodec.createCodec(conf))
+    } else {
+      None
+    }
+    blockSize = conf.getInt("spark.broadcast.blockSize", 4096) * 1024
+  }
+  setConf(SparkEnv.get.conf)
 
   private val broadcastId = BroadcastBlockId(id)
 
@@ -76,23 +89,20 @@ private[spark] class TorrentBroadcast[T: ClassTag](
    * @return number of blocks this broadcast variable is divided into
    */
   private def writeBlocks(): Int = {
-    // For local mode, just put the object in the BlockManager so we can find 
it later.
-    SparkEnv.get.blockManager.putSingle(
-      broadcastId, _value, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
-
-    if (!isLocal) {
-      val blocks = TorrentBroadcast.blockifyObject(_value)
-      blocks.zipWithIndex.foreach { case (block, i) =>
-        SparkEnv.get.blockManager.putBytes(
-          BroadcastBlockId(id, "piece" + i),
-          block,
-          StorageLevel.MEMORY_AND_DISK_SER,
-          tellMaster = true)
-      }
-      blocks.length
-    } else {
-      0
+    // Store a copy of the broadcast variable in the driver so that tasks run 
on the driver
+    // do not create a duplicate copy of the broadcast variable's value.
+    SparkEnv.get.blockManager.putSingle(broadcastId, _value, 
StorageLevel.MEMORY_AND_DISK,
+      tellMaster = false)
+    val blocks =
+      TorrentBroadcast.blockifyObject(_value, blockSize, 
SparkEnv.get.serializer, compressionCodec)
+    blocks.zipWithIndex.foreach { case (block, i) =>
+      SparkEnv.get.blockManager.putBytes(
+        BroadcastBlockId(id, "piece" + i),
+        block,
+        StorageLevel.MEMORY_AND_DISK_SER,
+        tellMaster = true)
     }
+    blocks.length
   }
 
   /** Fetch torrent blocks from the driver and/or other executors. */
@@ -104,29 +114,24 @@ private[spark] class TorrentBroadcast[T: ClassTag](
 
     for (pid <- Random.shuffle(Seq.range(0, numBlocks))) {
       val pieceId = BroadcastBlockId(id, "piece" + pid)
-
-      // First try getLocalBytes because  there is a chance that previous 
attempts to fetch the
+      logDebug(s"Reading piece $pieceId of $broadcastId")
+      // First try getLocalBytes because there is a chance that previous 
attempts to fetch the
       // broadcast blocks have already fetched some of the blocks. In that 
case, some blocks
       // would be available locally (on this executor).
-      var blockOpt = bm.getLocalBytes(pieceId)
-      if (!blockOpt.isDefined) {
-        blockOpt = bm.getRemoteBytes(pieceId)
-        blockOpt match {
-          case Some(block) =>
-            // If we found the block from remote executors/driver's 
BlockManager, put the block
-            // in this executor's BlockManager.
-            SparkEnv.get.blockManager.putBytes(
-              pieceId,
-              block,
-              StorageLevel.MEMORY_AND_DISK_SER,
-              tellMaster = true)
-
-          case None =>
-            throw new SparkException("Failed to get " + pieceId + " of " + 
broadcastId)
-        }
+      def getLocal: Option[ByteBuffer] = bm.getLocalBytes(pieceId)
+      def getRemote: Option[ByteBuffer] = bm.getRemoteBytes(pieceId).map { 
block =>
+        // If we found the block from remote executors/driver's BlockManager, 
put the block
+        // in this executor's BlockManager.
+        SparkEnv.get.blockManager.putBytes(
+          pieceId,
+          block,
+          StorageLevel.MEMORY_AND_DISK_SER,
+          tellMaster = true)
+        block
       }
-      // If we get here, the option is defined.
-      blocks(pid) = blockOpt.get
+      val block: ByteBuffer = getLocal.orElse(getRemote).getOrElse(
+        throw new SparkException(s"Failed to get $pieceId of $broadcastId"))
+      blocks(pid) = block
     }
     blocks
   }
@@ -156,6 +161,7 @@ private[spark] class TorrentBroadcast[T: ClassTag](
   private def readObject(in: ObjectInputStream) {
     in.defaultReadObject()
     TorrentBroadcast.synchronized {
+      setConf(SparkEnv.get.conf)
       SparkEnv.get.blockManager.getLocal(broadcastId).map(_.data.next()) match 
{
         case Some(x) =>
           _value = x.asInstanceOf[T]
@@ -167,7 +173,8 @@ private[spark] class TorrentBroadcast[T: ClassTag](
           val time = (System.nanoTime() - start) / 1e9
           logInfo("Reading broadcast variable " + id + " took " + time + " s")
 
-          _value = TorrentBroadcast.unBlockifyObject[T](blocks)
+          _value =
+            TorrentBroadcast.unBlockifyObject[T](blocks, 
SparkEnv.get.serializer, compressionCodec)
           // Store the merged copy in BlockManager so other tasks on this 
executor don't
           // need to re-fetch it.
           SparkEnv.get.blockManager.putSingle(
@@ -179,43 +186,29 @@ private[spark] class TorrentBroadcast[T: ClassTag](
 
 
 private object TorrentBroadcast extends Logging {
-  /** Size of each block. Default value is 4MB. */
-  private lazy val BLOCK_SIZE = conf.getInt("spark.broadcast.blockSize", 4096) 
* 1024
-  private var initialized = false
-  private var conf: SparkConf = null
-  private var compress: Boolean = false
-  private var compressionCodec: CompressionCodec = null
-
-  def initialize(_isDriver: Boolean, conf: SparkConf) {
-    TorrentBroadcast.conf = conf // TODO: we might have to fix it in tests
-    synchronized {
-      if (!initialized) {
-        compress = conf.getBoolean("spark.broadcast.compress", true)
-        compressionCodec = CompressionCodec.createCodec(conf)
-        initialized = true
-      }
-    }
-  }
 
-  def stop() {
-    initialized = false
-  }
-
-  def blockifyObject[T: ClassTag](obj: T): Array[ByteBuffer] = {
-    val bos = new ByteArrayChunkOutputStream(BLOCK_SIZE)
-    val out: OutputStream = if (compress) 
compressionCodec.compressedOutputStream(bos) else bos
-    val ser = SparkEnv.get.serializer.newInstance()
+  def blockifyObject[T: ClassTag](
+      obj: T,
+      blockSize: Int,
+      serializer: Serializer,
+      compressionCodec: Option[CompressionCodec]): Array[ByteBuffer] = {
+    val bos = new ByteArrayChunkOutputStream(blockSize)
+    val out: OutputStream = compressionCodec.map(c => 
c.compressedOutputStream(bos)).getOrElse(bos)
+    val ser = serializer.newInstance()
     val serOut = ser.serializeStream(out)
     serOut.writeObject[T](obj).close()
     bos.toArrays.map(ByteBuffer.wrap)
   }
 
-  def unBlockifyObject[T: ClassTag](blocks: Array[ByteBuffer]): T = {
+  def unBlockifyObject[T: ClassTag](
+      blocks: Array[ByteBuffer],
+      serializer: Serializer,
+      compressionCodec: Option[CompressionCodec]): T = {
+    require(blocks.nonEmpty, "Cannot unblockify an empty array of blocks")
     val is = new SequenceInputStream(
       asJavaEnumeration(blocks.iterator.map(block => new 
ByteBufferInputStream(block))))
-    val in: InputStream = if (compress) 
compressionCodec.compressedInputStream(is) else is
-
-    val ser = SparkEnv.get.serializer.newInstance()
+    val in: InputStream = compressionCodec.map(c => 
c.compressedInputStream(is)).getOrElse(is)
+    val ser = serializer.newInstance()
     val serIn = ser.deserializeStream(in)
     val obj = serIn.readObject[T]()
     serIn.close()
@@ -227,6 +220,7 @@ private object TorrentBroadcast extends Logging {
    * If removeFromDriver is true, also remove these persisted blocks on the 
driver.
    */
   def unpersist(id: Long, removeFromDriver: Boolean, blocking: Boolean) = {
+    logDebug(s"Unpersisting TorrentBroadcast $id")
     SparkEnv.get.blockManager.master.removeBroadcast(id, removeFromDriver, 
blocking)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/5a8f64f3/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcastFactory.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcastFactory.scala 
b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcastFactory.scala
index ad0f701..fb024c1 100644
--- 
a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcastFactory.scala
+++ 
b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcastFactory.scala
@@ -28,14 +28,13 @@ import org.apache.spark.{SecurityManager, SparkConf}
  */
 class TorrentBroadcastFactory extends BroadcastFactory {
 
-  override def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: 
SecurityManager) {
-    TorrentBroadcast.initialize(isDriver, conf)
-  }
+  override def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: 
SecurityManager) { }
 
-  override def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: 
Long) =
-    new TorrentBroadcast[T](value_, isLocal, id)
+  override def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: 
Long) = {
+    new TorrentBroadcast[T](value_, id)
+  }
 
-  override def stop() { TorrentBroadcast.stop() }
+  override def stop() { }
 
   /**
    * Remove all persisted state associated with the torrent broadcast with the 
given ID.

http://git-wip-us.apache.org/repos/asf/spark/blob/5a8f64f3/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala 
b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
index acaf321..e096c8c 100644
--- a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
+++ b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
@@ -17,12 +17,15 @@
 
 package org.apache.spark.broadcast
 
+import scala.util.Random
+
 import org.scalatest.FunSuite
 
 import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, 
SparkException}
+import org.apache.spark.io.SnappyCompressionCodec
+import org.apache.spark.serializer.JavaSerializer
 import org.apache.spark.storage._
 
-
 class BroadcastSuite extends FunSuite with LocalSparkContext {
 
   private val httpConf = broadcastConf("HttpBroadcastFactory")
@@ -84,6 +87,24 @@ class BroadcastSuite extends FunSuite with LocalSparkContext 
{
     assert(results.collect().toSet === (1 to numSlaves).map(x => (x, 
10)).toSet)
   }
 
+  test("TorrentBroadcast's blockifyObject and unblockifyObject are inverses") {
+    import org.apache.spark.broadcast.TorrentBroadcast._
+    val blockSize = 1024
+    val conf = new SparkConf()
+    val compressionCodec = Some(new SnappyCompressionCodec(conf))
+    val serializer = new JavaSerializer(conf)
+    val seed = 42
+    val rand = new Random(seed)
+    for (trial <- 1 to 100) {
+      val size = 1 + rand.nextInt(1024 * 10)
+      val data: Array[Byte] = new Array[Byte](size)
+      rand.nextBytes(data)
+      val blocks = blockifyObject(data, blockSize, serializer, 
compressionCodec)
+      val unblockified = unBlockifyObject[Array[Byte]](blocks, serializer, 
compressionCodec)
+      assert(unblockified === data)
+    }
+  }
+
   test("Unpersisting HttpBroadcast on executors only in local mode") {
     testUnpersistHttpBroadcast(distributed = false, removeFromDriver = false)
   }
@@ -193,26 +214,17 @@ class BroadcastSuite extends FunSuite with 
LocalSparkContext {
 
       blockId = BroadcastBlockId(broadcastId, "piece0")
       statuses = bmm.getBlockStatus(blockId, askSlaves = true)
-      assert(statuses.size === (if (distributed) 1 else 0))
+      assert(statuses.size === 1)
     }
 
     // Verify that blocks are persisted in both the executors and the driver
     def afterUsingBroadcast(broadcastId: Long, bmm: BlockManagerMaster) {
       var blockId = BroadcastBlockId(broadcastId)
-      var statuses = bmm.getBlockStatus(blockId, askSlaves = true)
-      if (distributed) {
-        assert(statuses.size === numSlaves + 1)
-      } else {
-        assert(statuses.size === 1)
-      }
+      val statuses = bmm.getBlockStatus(blockId, askSlaves = true)
+      assert(statuses.size === numSlaves + 1)
 
       blockId = BroadcastBlockId(broadcastId, "piece0")
-      statuses = bmm.getBlockStatus(blockId, askSlaves = true)
-      if (distributed) {
-        assert(statuses.size === numSlaves + 1)
-      } else {
-        assert(statuses.size === 0)
-      }
+      assert(statuses.size === numSlaves + 1)
     }
 
     // Verify that blocks are unpersisted on all executors, and on all nodes 
if removeFromDriver
@@ -224,7 +236,7 @@ class BroadcastSuite extends FunSuite with 
LocalSparkContext {
       assert(statuses.size === expectedNumBlocks)
 
       blockId = BroadcastBlockId(broadcastId, "piece0")
-      expectedNumBlocks = if (removeFromDriver || !distributed) 0 else 1
+      expectedNumBlocks = if (removeFromDriver) 0 else 1
       statuses = bmm.getBlockStatus(blockId, askSlaves = true)
       assert(statuses.size === expectedNumBlocks)
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to