Default blockSize is 4MB. BroadcastTest2 example added for testing broadcasts.
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/feb45d39 Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/feb45d39 Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/feb45d39 Branch: refs/heads/master Commit: feb45d391f8d09c120d7d43e72e96e9bf9784fa0 Parents: 6e5a60f Author: Mosharaf Chowdhury <mosha...@cs.berkeley.edu> Authored: Mon Oct 14 14:50:49 2013 -0700 Committer: Mosharaf Chowdhury <mosha...@cs.berkeley.edu> Committed: Wed Oct 16 21:33:33 2013 -0700 ---------------------------------------------------------------------- .../spark/broadcast/TorrentBroadcast.scala | 2 +- .../apache/spark/examples/BroadcastTest2.scala | 59 ++++++++++++++++++++ 2 files changed, 60 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/feb45d39/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala index 29e0dd2..c174804 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala @@ -179,7 +179,7 @@ extends Logging { initialized = false } - val BlockSize = System.getProperty("spark.broadcast.blockSize", "2048").toInt * 1024 + val BlockSize = System.getProperty("spark.broadcast.blockSize", "4096").toInt * 1024 def blockifyObject[IN](obj: IN): TorrentInfo = { val byteArray = Utils.serialize[IN](obj) http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/feb45d39/examples/src/main/scala/org/apache/spark/examples/BroadcastTest2.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/BroadcastTest2.scala b/examples/src/main/scala/org/apache/spark/examples/BroadcastTest2.scala new file mode 100644 index 0000000..4b96d0c --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/BroadcastTest2.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples + +import org.apache.spark.SparkContext + +object BroadcastTest2 { + def main(args: Array[String]) { + if (args.length == 0) { + System.err.println("Usage: BroadcastTest2 <master> [slices] [numElem] [broadcastAlgo] [blockSize]") + System.exit(1) + } + + val bcName = if (args.length > 3) args(3) else "Http" + val blockSize = if (args.length > 4) args(4) else "4096" + + System.setProperty("spark.broadcast.factory", "org.apache.spark.broadcast." + bcName + "BroadcastFactory") + System.setProperty("spark.broadcast.blockSize", blockSize) + + val sc = new SparkContext(args(0), "Broadcast Test 2", + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) + + val slices = if (args.length > 1) args(1).toInt else 2 + val num = if (args.length > 2) args(2).toInt else 1000000 + + var arr1 = new Array[Int](num) + for (i <- 0 until arr1.length) { + arr1(i) = i + } + + for (i <- 0 until 3) { + println("Iteration " + i) + println("===========") + val startTime = System.nanoTime + val barr1 = sc.broadcast(arr1) + sc.parallelize(1 to 10, slices).foreach { + i => println(barr1.value.size) + } + println("Iteration %d took %.0f milliseconds".format(i, (System.nanoTime - startTime) / 1E6)) + } + + System.exit(0) + } +}