Repository: spark Updated Branches: refs/heads/branch-1.0 d5f24aae1 -> bae35b12e
[SPARK-2897][SPARK-2920]TorrentBroadcast does use the serializer class specified in the spark option "spark.serializer" Author: GuoQiang Li <[email protected]> Closes #1836 from witgo/SPARK-2897 and squashes the following commits: 23cdc5b [GuoQiang Li] review commit ada4fba [GuoQiang Li] TorrentBroadcast does not support broadcast compression fb91792 [GuoQiang Li] org.apache.spark.broadcast.TorrentBroadcast does use the serializer class specified in the spark option "spark.serializer" (cherry picked from commit ec79063fad44751a6689f5e58d47886babeaecff) Signed-off-by: Reynold Xin <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bae35b12 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bae35b12 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bae35b12 Branch: refs/heads/branch-1.0 Commit: bae35b12e65428379763c31e028cd6c059caacd8 Parents: d5f24aa Author: GuoQiang Li <[email protected]> Authored: Fri Aug 8 16:57:26 2014 -0700 Committer: Reynold Xin <[email protected]> Committed: Fri Aug 8 16:58:16 2014 -0700 ---------------------------------------------------------------------- .../spark/broadcast/TorrentBroadcast.scala | 31 ++++++++++++++++---- .../scala/org/apache/spark/BroadcastSuite.scala | 10 +++++-- 2 files changed, 33 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/bae35b12/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala index 734de37..3484fa2 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala @@ -17,15 +17,16 @@ package org.apache.spark.broadcast -import java.io.{ByteArrayInputStream, ObjectInputStream, ObjectOutputStream} +import java.io.{ByteArrayOutputStream, ByteArrayInputStream, InputStream, + ObjectInputStream, ObjectOutputStream, OutputStream} import scala.reflect.ClassTag import scala.math import scala.util.Random import org.apache.spark.{Logging, SparkConf, SparkEnv, SparkException} +import org.apache.spark.io.CompressionCodec import org.apache.spark.storage.{BroadcastBlockId, StorageLevel} -import org.apache.spark.util.Utils /** * A [[org.apache.spark.broadcast.Broadcast]] implementation that uses a BitTorrent-like @@ -215,11 +216,15 @@ private[spark] object TorrentBroadcast extends Logging { private lazy val BLOCK_SIZE = conf.getInt("spark.broadcast.blockSize", 4096) * 1024 private var initialized = false private var conf: SparkConf = null + private var compress: Boolean = false + private var compressionCodec: CompressionCodec = null def initialize(_isDriver: Boolean, conf: SparkConf) { TorrentBroadcast.conf = conf // TODO: we might have to fix it in tests synchronized { if (!initialized) { + compress = conf.getBoolean("spark.broadcast.compress", true) + compressionCodec = CompressionCodec.createCodec(conf) initialized = true } } @@ -229,8 +234,13 @@ private[spark] object TorrentBroadcast extends Logging { initialized = false } - def blockifyObject[T](obj: T): TorrentInfo = { - val byteArray = Utils.serialize[T](obj) + def blockifyObject[T: ClassTag](obj: T): TorrentInfo = { + val bos = new ByteArrayOutputStream() + val out: OutputStream = if (compress) compressionCodec.compressedOutputStream(bos) else bos + val ser = SparkEnv.get.serializer.newInstance() + val serOut = ser.serializeStream(out) + serOut.writeObject[T](obj).close() + val byteArray = bos.toByteArray val bais = new ByteArrayInputStream(byteArray) var blockNum = byteArray.length / BLOCK_SIZE @@ -256,7 +266,7 @@ private[spark] object TorrentBroadcast extends Logging { info } - def unBlockifyObject[T]( + def unBlockifyObject[T: ClassTag]( arrayOfBlocks: Array[TorrentBlock], totalBytes: Int, totalBlocks: Int): T = { @@ -265,7 +275,16 @@ private[spark] object TorrentBroadcast extends Logging { System.arraycopy(arrayOfBlocks(i).byteArray, 0, retByteArray, i * BLOCK_SIZE, arrayOfBlocks(i).byteArray.length) } - Utils.deserialize[T](retByteArray, Thread.currentThread.getContextClassLoader) + + val in: InputStream = { + val arrIn = new ByteArrayInputStream(retByteArray) + if (compress) compressionCodec.compressedInputStream(arrIn) else arrIn + } + val ser = SparkEnv.get.serializer.newInstance() + val serIn = ser.deserializeStream(in) + val obj = serIn.readObject[T]() + serIn.close() + obj } /** http://git-wip-us.apache.org/repos/asf/spark/blob/bae35b12/core/src/test/scala/org/apache/spark/BroadcastSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/BroadcastSuite.scala b/core/src/test/scala/org/apache/spark/BroadcastSuite.scala index c993625..97f3ee5 100644 --- a/core/src/test/scala/org/apache/spark/BroadcastSuite.scala +++ b/core/src/test/scala/org/apache/spark/BroadcastSuite.scala @@ -46,7 +46,10 @@ class BroadcastSuite extends FunSuite with LocalSparkContext { test("Accessing HttpBroadcast variables in a local cluster") { val numSlaves = 4 - sc = new SparkContext("local-cluster[%d, 1, 512]".format(numSlaves), "test", httpConf) + val conf = httpConf.clone + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + conf.set("spark.broadcast.compress", "true") + sc = new SparkContext("local-cluster[%d, 1, 512]".format(numSlaves), "test", conf) val list = List[Int](1, 2, 3, 4) val broadcast = sc.broadcast(list) val results = sc.parallelize(1 to numSlaves).map(x => (x, broadcast.value.sum)) @@ -71,7 +74,10 @@ class BroadcastSuite extends FunSuite with LocalSparkContext { test("Accessing TorrentBroadcast variables in a local cluster") { val numSlaves = 4 - sc = new SparkContext("local-cluster[%d, 1, 512]".format(numSlaves), "test", torrentConf) + val conf = torrentConf.clone + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + conf.set("spark.broadcast.compress", "true") + sc = new SparkContext("local-cluster[%d, 1, 512]".format(numSlaves), "test", conf) val list = List[Int](1, 2, 3, 4) val broadcast = sc.broadcast(list) val results = sc.parallelize(1 to numSlaves).map(x => (x, broadcast.value.sum)) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
