BroadcastSuite updated to test both HttpBroadcast and TorrentBroadcast in local, local[N], local-cluster settings.
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/e178ae4e Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/e178ae4e Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/e178ae4e Branch: refs/heads/master Commit: e178ae4e9b052803aaef851e0c5496a2aa1e6fb3 Parents: 6a84e40 Author: Mosharaf Chowdhury <mosha...@cs.berkeley.edu> Authored: Thu Oct 17 16:38:43 2013 -0700 Committer: Mosharaf Chowdhury <mosha...@cs.berkeley.edu> Committed: Thu Oct 17 16:38:43 2013 -0700 ---------------------------------------------------------------------- .../scala/org/apache/spark/BroadcastSuite.scala | 47 ++++++++++++++++++-- 1 file changed, 44 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e178ae4e/core/src/test/scala/org/apache/spark/BroadcastSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/BroadcastSuite.scala b/core/src/test/scala/org/apache/spark/BroadcastSuite.scala index b3a53d9..6a5d2fa 100644 --- a/core/src/test/scala/org/apache/spark/BroadcastSuite.scala +++ b/core/src/test/scala/org/apache/spark/BroadcastSuite.scala @@ -20,8 +20,37 @@ package org.apache.spark import org.scalatest.FunSuite class BroadcastSuite extends FunSuite with LocalSparkContext { - - test("basic broadcast") { + + test("Using HttpBroadcast locally") { + System.setProperty("spark.broadcast.factory", "org.apache.spark.broadcast.HttpBroadcastFactory") + sc = new SparkContext("local", "test") + val list = List(1, 2, 3, 4) + val listBroadcast = sc.broadcast(list) + val results = sc.parallelize(1 to 2).map(x => (x, listBroadcast.value.sum)) + assert(results.collect.toSet === Set((1, 10), (2, 10))) + } + + test("Accessing HttpBroadcast variables from multiple threads") { + System.setProperty("spark.broadcast.factory", "org.apache.spark.broadcast.HttpBroadcastFactory") + sc = new SparkContext("local[10]", "test") + val list = List(1, 2, 3, 4) + val listBroadcast = sc.broadcast(list) + val results = sc.parallelize(1 to 10).map(x => (x, listBroadcast.value.sum)) + assert(results.collect.toSet === (1 to 10).map(x => (x, 10)).toSet) + } + + test("Accessing HttpBroadcast variables in a local cluster") { + System.setProperty("spark.broadcast.factory", "org.apache.spark.broadcast.HttpBroadcastFactory") + val numSlaves = 4 + sc = new SparkContext("local-cluster[%d, 1, 512]".format(numSlaves), "test") + val list = List(1, 2, 3, 4) + val listBroadcast = sc.broadcast(list) + val results = sc.parallelize(1 to numSlaves).map(x => (x, listBroadcast.value.sum)) + assert(results.collect.toSet === (1 to numSlaves).map(x => (x, 10)).toSet) + } + + test("Using TorrentBroadcast locally") { + System.setProperty("spark.broadcast.factory", "org.apache.spark.broadcast.TorrentBroadcastFactory") sc = new SparkContext("local", "test") val list = List(1, 2, 3, 4) val listBroadcast = sc.broadcast(list) @@ -29,11 +58,23 @@ class BroadcastSuite extends FunSuite with LocalSparkContext { assert(results.collect.toSet === Set((1, 10), (2, 10))) } - test("broadcast variables accessed in multiple threads") { + test("Accessing TorrentBroadcast variables from multiple threads") { + System.setProperty("spark.broadcast.factory", "org.apache.spark.broadcast.TorrentBroadcastFactory") sc = new SparkContext("local[10]", "test") val list = List(1, 2, 3, 4) val listBroadcast = sc.broadcast(list) val results = sc.parallelize(1 to 10).map(x => (x, listBroadcast.value.sum)) assert(results.collect.toSet === (1 to 10).map(x => (x, 10)).toSet) } + + test("Accessing TorrentBroadcast variables in a local cluster") { + System.setProperty("spark.broadcast.factory", "org.apache.spark.broadcast.TorrentBroadcastFactory") + val numSlaves = 4 + sc = new SparkContext("local-cluster[%d, 1, 512]".format(numSlaves), "test") + val list = List(1, 2, 3, 4) + val listBroadcast = sc.broadcast(list) + val results = sc.parallelize(1 to numSlaves).map(x => (x, listBroadcast.value.sum)) + assert(results.collect.toSet === (1 to numSlaves).map(x => (x, 10)).toSet) + } + }