BroadcastSuite updated to test both HttpBroadcast and TorrentBroadcast in 
local, local[N], local-cluster settings.

Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/e178ae4e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/e178ae4e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/e178ae4e

Branch: refs/heads/master
Commit: e178ae4e9b052803aaef851e0c5496a2aa1e6fb3
Parents: 6a84e40
Author: Mosharaf Chowdhury <mosha...@cs.berkeley.edu>
Authored: Thu Oct 17 16:38:43 2013 -0700
Committer: Mosharaf Chowdhury <mosha...@cs.berkeley.edu>
Committed: Thu Oct 17 16:38:43 2013 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/BroadcastSuite.scala | 47 ++++++++++++++++++--
 1 file changed, 44 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e178ae4e/core/src/test/scala/org/apache/spark/BroadcastSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/BroadcastSuite.scala 
b/core/src/test/scala/org/apache/spark/BroadcastSuite.scala
index b3a53d9..6a5d2fa 100644
--- a/core/src/test/scala/org/apache/spark/BroadcastSuite.scala
+++ b/core/src/test/scala/org/apache/spark/BroadcastSuite.scala
@@ -20,8 +20,37 @@ package org.apache.spark
 import org.scalatest.FunSuite
 
 class BroadcastSuite extends FunSuite with LocalSparkContext {
-  
-  test("basic broadcast") {
+
+  test("Using HttpBroadcast locally") {
+    System.setProperty("spark.broadcast.factory", 
"org.apache.spark.broadcast.HttpBroadcastFactory")
+    sc = new SparkContext("local", "test")
+    val list = List(1, 2, 3, 4)
+    val listBroadcast = sc.broadcast(list)
+    val results = sc.parallelize(1 to 2).map(x => (x, listBroadcast.value.sum))
+    assert(results.collect.toSet === Set((1, 10), (2, 10)))
+  }
+
+  test("Accessing HttpBroadcast variables from multiple threads") {
+    System.setProperty("spark.broadcast.factory", 
"org.apache.spark.broadcast.HttpBroadcastFactory")
+    sc = new SparkContext("local[10]", "test")
+    val list = List(1, 2, 3, 4)
+    val listBroadcast = sc.broadcast(list)
+    val results = sc.parallelize(1 to 10).map(x => (x, 
listBroadcast.value.sum))
+    assert(results.collect.toSet === (1 to 10).map(x => (x, 10)).toSet)
+  }
+
+  test("Accessing HttpBroadcast variables in a local cluster") {
+    System.setProperty("spark.broadcast.factory", 
"org.apache.spark.broadcast.HttpBroadcastFactory")
+    val numSlaves = 4
+    sc = new SparkContext("local-cluster[%d, 1, 512]".format(numSlaves), 
"test")
+    val list = List(1, 2, 3, 4)
+    val listBroadcast = sc.broadcast(list)
+    val results = sc.parallelize(1 to numSlaves).map(x => (x, 
listBroadcast.value.sum))
+    assert(results.collect.toSet === (1 to numSlaves).map(x => (x, 10)).toSet)
+  }
+
+  test("Using TorrentBroadcast locally") {
+    System.setProperty("spark.broadcast.factory", 
"org.apache.spark.broadcast.TorrentBroadcastFactory")
     sc = new SparkContext("local", "test")
     val list = List(1, 2, 3, 4)
     val listBroadcast = sc.broadcast(list)
@@ -29,11 +58,23 @@ class BroadcastSuite extends FunSuite with 
LocalSparkContext {
     assert(results.collect.toSet === Set((1, 10), (2, 10)))
   }
 
-  test("broadcast variables accessed in multiple threads") {
+  test("Accessing TorrentBroadcast variables from multiple threads") {
+    System.setProperty("spark.broadcast.factory", 
"org.apache.spark.broadcast.TorrentBroadcastFactory")
     sc = new SparkContext("local[10]", "test")
     val list = List(1, 2, 3, 4)
     val listBroadcast = sc.broadcast(list)
     val results = sc.parallelize(1 to 10).map(x => (x, 
listBroadcast.value.sum))
     assert(results.collect.toSet === (1 to 10).map(x => (x, 10)).toSet)
   }
+
+  test("Accessing TorrentBroadcast variables in a local cluster") {
+    System.setProperty("spark.broadcast.factory", 
"org.apache.spark.broadcast.TorrentBroadcastFactory")
+    val numSlaves = 4
+    sc = new SparkContext("local-cluster[%d, 1, 512]".format(numSlaves), 
"test")
+    val list = List(1, 2, 3, 4)
+    val listBroadcast = sc.broadcast(list)
+    val results = sc.parallelize(1 to numSlaves).map(x => (x, 
listBroadcast.value.sum))
+    assert(results.collect.toSet === (1 to numSlaves).map(x => (x, 10)).toSet)
+  }
+
 }

Reply via email to