Repository: spark Updated Branches: refs/heads/master 19c8fb02b -> 44dd57fb6
http://git-wip-us.apache.org/repos/asf/spark/blob/44dd57fb/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala index 47bf1e5..3a10daa 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala @@ -24,6 +24,7 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.mqtt._ +import org.apache.spark.SparkConf /** * A simple Mqtt publisher for demonstration purposes, repeatedly publishes @@ -64,7 +65,6 @@ object MQTTPublisher { } } -// scalastyle:off /** * A sample wordcount with MqttStream stream * @@ -74,30 +74,28 @@ object MQTTPublisher { * Eclipse paho project provides Java library for Mqtt Client http://www.eclipse.org/paho/ * Example Java code for Mqtt Publisher and Subscriber can be found here * https://bitbucket.org/mkjinesh/mqttclient - * Usage: MQTTWordCount <master> <MqttbrokerUrl> <topic> - * In local mode, <master> should be 'local[n]' with n > 1 - * <MqttbrokerUrl> and <topic> describe where Mqtt publisher is running. + * Usage: MQTTWordCount <MqttbrokerUrl> <topic> +\ * <MqttbrokerUrl> and <topic> describe where Mqtt publisher is running. * * To run this example locally, you may run publisher as - * `$ ./bin/run-example org.apache.spark.examples.streaming.MQTTPublisher tcp://localhost:1883 foo` + * `$ ./bin/spark-submit examples.jar \ + * --class org.apache.spark.examples.streaming.MQTTPublisher tcp://localhost:1883 foo` * and run the example as - * `$ ./bin/run-example org.apache.spark.examples.streaming.MQTTWordCount local[2] tcp://localhost:1883 foo` + * `$ ./bin/spark-submit examples.jar \ + * --class org.apache.spark.examples.streaming.MQTTWordCount tcp://localhost:1883 foo` */ -// scalastyle:on object MQTTWordCount { def main(args: Array[String]) { - if (args.length < 3) { + if (args.length < 2) { System.err.println( - "Usage: MQTTWordCount <master> <MqttbrokerUrl> <topic>" + - " In local mode, <master> should be 'local[n]' with n > 1") + "Usage: MQTTWordCount <MqttbrokerUrl> <topic>") System.exit(1) } - val Seq(master, brokerUrl, topic) = args.toSeq - - val ssc = new StreamingContext(master, "MqttWordCount", Seconds(2), System.getenv("SPARK_HOME"), - StreamingContext.jarOfClass(this.getClass).toSeq) + val Seq(brokerUrl, topic) = args.toSeq + val sparkConf = new SparkConf().setAppName("MQTTWordCount") + val ssc = new StreamingContext(sparkConf, Seconds(2)) val lines = MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_ONLY_SER_2) val words = lines.flatMap(x => x.toString.split(" ")) http://git-wip-us.apache.org/repos/asf/spark/blob/44dd57fb/examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala index acfe9a4..ad7a199 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala @@ -17,41 +17,38 @@ package org.apache.spark.examples.streaming +import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.storage.StorageLevel -// scalastyle:off /** * Counts words in text encoded with UTF8 received from the network every second. * - * Usage: NetworkWordCount <master> <hostname> <port> - * <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1. - * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data. + * Usage: NetworkWordCount <hostname> <port> + * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data. * * To run this on your local machine, you need to first run a Netcat server * `$ nc -lk 9999` * and then run the example - * `$ ./bin/run-example org.apache.spark.examples.streaming.NetworkWordCount local[2] localhost 9999` + * `$ ./bin/spark-submit examples.jar \ + * --class org.apache.spark.examples.streaming.NetworkWordCount localhost 9999` */ -// scalastyle:on object NetworkWordCount { def main(args: Array[String]) { - if (args.length < 3) { - System.err.println("Usage: NetworkWordCount <master> <hostname> <port>\n" + - "In local mode, <master> should be 'local[n]' with n > 1") + if (args.length < 2) { + System.err.println("Usage: NetworkWordCount <hostname> <port>") System.exit(1) } StreamingExamples.setStreamingLogLevels() - + val sparkConf = new SparkConf().setAppName("NetworkWordCount"); // Create the context with a 1 second batch size - val ssc = new StreamingContext(args(0), "NetworkWordCount", Seconds(1), - System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq) + val ssc = new StreamingContext(sparkConf, Seconds(1)) // Create a NetworkInputDStream on target ip:port and count the // words in input stream of \n delimited text (eg. generated by 'nc') - val lines = ssc.socketTextStream(args(1), args(2).toInt, StorageLevel.MEMORY_ONLY_SER) + val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_ONLY_SER) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print() http://git-wip-us.apache.org/repos/asf/spark/blob/44dd57fb/examples/src/main/scala/org/apache/spark/examples/streaming/QueueStream.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/QueueStream.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/QueueStream.scala index f92f72f..4caa906 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/QueueStream.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/QueueStream.scala @@ -19,6 +19,7 @@ package org.apache.spark.examples.streaming import scala.collection.mutable.SynchronizedQueue +import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ @@ -26,16 +27,11 @@ import org.apache.spark.streaming.StreamingContext._ object QueueStream { def main(args: Array[String]) { - if (args.length < 1) { - System.err.println("Usage: QueueStream <master>") - System.exit(1) - } StreamingExamples.setStreamingLogLevels() - + val sparkConf = new SparkConf().setAppName("QueueStream") // Create the context - val ssc = new StreamingContext(args(0), "QueueStream", Seconds(1), - System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq) + val ssc = new StreamingContext(sparkConf, Seconds(1)) // Create the queue through which RDDs can be pushed to // a QueueInputDStream http://git-wip-us.apache.org/repos/asf/spark/blob/44dd57fb/examples/src/main/scala/org/apache/spark/examples/streaming/RawNetworkGrep.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/RawNetworkGrep.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/RawNetworkGrep.scala index 1b0319a..a9aaa44 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/RawNetworkGrep.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/RawNetworkGrep.scala @@ -17,6 +17,7 @@ package org.apache.spark.examples.streaming +import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming._ import org.apache.spark.util.IntParam @@ -27,29 +28,26 @@ import org.apache.spark.util.IntParam * will only work with spark.streaming.util.RawTextSender running on all worker nodes * and with Spark using Kryo serialization (set Java property "spark.serializer" to * "org.apache.spark.serializer.KryoSerializer"). - * Usage: RawNetworkGrep <master> <numStreams> <host> <port> <batchMillis> - * <master> is the Spark master URL + * Usage: RawNetworkGrep <numStreams> <host> <port> <batchMillis> * <numStream> is the number rawNetworkStreams, which should be same as number * of work nodes in the cluster * <host> is "localhost". * <port> is the port on which RawTextSender is running in the worker nodes. * <batchMillise> is the Spark Streaming batch duration in milliseconds. */ - object RawNetworkGrep { def main(args: Array[String]) { - if (args.length != 5) { - System.err.println("Usage: RawNetworkGrep <master> <numStreams> <host> <port> <batchMillis>") + if (args.length != 4) { + System.err.println("Usage: RawNetworkGrep <numStreams> <host> <port> <batchMillis>") System.exit(1) } StreamingExamples.setStreamingLogLevels() - val Array(master, IntParam(numStreams), host, IntParam(port), IntParam(batchMillis)) = args - + val Array(IntParam(numStreams), host, IntParam(port), IntParam(batchMillis)) = args + val sparkConf = new SparkConf().setAppName("RawNetworkGrep") // Create the context - val ssc = new StreamingContext(master, "RawNetworkGrep", Milliseconds(batchMillis), - System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq) + val ssc = new StreamingContext(sparkConf, Duration(batchMillis)) val rawStreams = (1 to numStreams).map(_ => ssc.rawSocketStream[String](host, port, StorageLevel.MEMORY_ONLY_SER_2)).toArray http://git-wip-us.apache.org/repos/asf/spark/blob/44dd57fb/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala index b0bc31c..ace785d 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala @@ -17,19 +17,21 @@ package org.apache.spark.examples.streaming +import java.io.File +import java.nio.charset.Charset + +import com.google.common.io.Files + +import org.apache.spark.SparkConf +import org.apache.spark.rdd.RDD import org.apache.spark.streaming.{Time, Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.util.IntParam -import java.io.File -import org.apache.spark.rdd.RDD -import com.google.common.io.Files -import java.nio.charset.Charset /** * Counts words in text encoded with UTF8 received from the network every second. * - * Usage: NetworkWordCount <master> <hostname> <port> <checkpoint-directory> <output-file> - * <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1. + * Usage: NetworkWordCount <hostname> <port> <checkpoint-directory> <output-file> * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive * data. <checkpoint-directory> directory to HDFS-compatible file system which checkpoint data * <output-file> file to which the word counts will be appended @@ -44,8 +46,9 @@ import java.nio.charset.Charset * * and run the example as * - * `$ ./run-example org.apache.spark.examples.streaming.RecoverableNetworkWordCount \ - * local[2] localhost 9999 ~/checkpoint/ ~/out` + * `$ ./bin/spark-submit examples.jar \ + * --class org.apache.spark.examples.streaming.RecoverableNetworkWordCount \ + * localhost 9999 ~/checkpoint/ ~/out` * * If the directory ~/checkpoint/ does not exist (e.g. running for the first time), it will create * a new StreamingContext (will print "Creating new context" to the console). Otherwise, if @@ -67,17 +70,16 @@ import java.nio.charset.Charset object RecoverableNetworkWordCount { - def createContext(master: String, ip: String, port: Int, outputPath: String) = { + def createContext(ip: String, port: Int, outputPath: String) = { // If you do not see this printed, that means the StreamingContext has been loaded // from the new checkpoint println("Creating new context") val outputFile = new File(outputPath) if (outputFile.exists()) outputFile.delete() - + val sparkConf = new SparkConf().setAppName("RecoverableNetworkWordCount") // Create the context with a 1 second batch size - val ssc = new StreamingContext(master, "RecoverableNetworkWordCount", Seconds(1), - System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq) + val ssc = new StreamingContext(sparkConf, Seconds(1)) // Create a NetworkInputDStream on target ip:port and count the // words in input stream of \n delimited text (eg. generated by 'nc') @@ -94,13 +96,12 @@ object RecoverableNetworkWordCount { } def main(args: Array[String]) { - if (args.length != 5) { + if (args.length != 4) { System.err.println("You arguments were " + args.mkString("[", ", ", "]")) System.err.println( """ - |Usage: RecoverableNetworkWordCount <master> <hostname> <port> <checkpoint-directory> - | <output-file> <master> is the Spark master URL. In local mode, <master> should be - | 'local[n]' with n > 1. <hostname> and <port> describe the TCP server that Spark + |Usage: RecoverableNetworkWordCount <hostname> <port> <checkpoint-directory> + | <output-file>. <hostname> and <port> describe the TCP server that Spark | Streaming would connect to receive data. <checkpoint-directory> directory to | HDFS-compatible file system which checkpoint data <output-file> file to which the | word counts will be appended @@ -111,10 +112,10 @@ object RecoverableNetworkWordCount { ) System.exit(1) } - val Array(master, ip, IntParam(port), checkpointDirectory, outputPath) = args + val Array(ip, IntParam(port), checkpointDirectory, outputPath) = args val ssc = StreamingContext.getOrCreate(checkpointDirectory, () => { - createContext(master, ip, port, outputPath) + createContext(ip, port, outputPath) }) ssc.start() ssc.awaitTermination() http://git-wip-us.apache.org/repos/asf/spark/blob/44dd57fb/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala index 8001d56..5e1415f 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala @@ -17,28 +17,27 @@ package org.apache.spark.examples.streaming +import org.apache.spark.SparkConf import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ -// scalastyle:off + /** * Counts words cumulatively in UTF8 encoded, '\n' delimited text received from the network every * second. - * Usage: StatefulNetworkWordCount <master> <hostname> <port> - * <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1. + * Usage: StatefulNetworkWordCount <hostname> <port> * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive * data. * * To run this on your local machine, you need to first run a Netcat server * `$ nc -lk 9999` * and then run the example - * `$ ./bin/run-example org.apache.spark.examples.streaming.StatefulNetworkWordCount local[2] localhost 9999` + * `$ ./bin/spark-submit examples.jar + * --class org.apache.spark.examples.streaming.StatefulNetworkWordCount localhost 9999` */ -// scalastyle:on object StatefulNetworkWordCount { def main(args: Array[String]) { - if (args.length < 3) { - System.err.println("Usage: StatefulNetworkWordCount <master> <hostname> <port>\n" + - "In local mode, <master> should be 'local[n]' with n > 1") + if (args.length < 2) { + System.err.println("Usage: StatefulNetworkWordCount <hostname> <port>") System.exit(1) } @@ -52,14 +51,14 @@ object StatefulNetworkWordCount { Some(currentCount + previousCount) } + val sparkConf = new SparkConf().setAppName("NetworkWordCumulativeCountUpdateStateByKey") // Create the context with a 1 second batch size - val ssc = new StreamingContext(args(0), "NetworkWordCumulativeCountUpdateStateByKey", - Seconds(1), System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq) + val ssc = new StreamingContext(sparkConf, Seconds(1)) ssc.checkpoint(".") // Create a NetworkInputDStream on target ip:port and count the // words in input stream of \n delimited test (eg. generated by 'nc') - val lines = ssc.socketTextStream(args(1), args(2).toInt) + val lines = ssc.socketTextStream(args(0), args(1).toInt) val words = lines.flatMap(_.split(" ")) val wordDstream = words.map(x => (x, 1)) http://git-wip-us.apache.org/repos/asf/spark/blob/44dd57fb/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala index b12617d..683752a 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala @@ -19,11 +19,13 @@ package org.apache.spark.examples.streaming import com.twitter.algebird._ +import org.apache.spark.SparkConf import org.apache.spark.SparkContext._ import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.twitter._ + // scalastyle:off /** * Illustrates the use of the Count-Min Sketch, from Twitter's Algebird library, to compute @@ -49,12 +51,6 @@ import org.apache.spark.streaming.twitter._ // scalastyle:on object TwitterAlgebirdCMS { def main(args: Array[String]) { - if (args.length < 1) { - System.err.println("Usage: TwitterAlgebirdCMS <master>" + - " [filter1] [filter2] ... [filter n]") - System.exit(1) - } - StreamingExamples.setStreamingLogLevels() // CMS parameters @@ -65,10 +61,9 @@ object TwitterAlgebirdCMS { // K highest frequency elements to take val TOPK = 10 - val (master, filters) = (args.head, args.tail) - - val ssc = new StreamingContext(master, "TwitterAlgebirdCMS", Seconds(10), - System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq) + val filters = args + val sparkConf = new SparkConf().setAppName("TwitterAlgebirdCMS") + val ssc = new StreamingContext(sparkConf, Seconds(10)) val stream = TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_ONLY_SER_2) val users = stream.map(status => status.getUser.getId) http://git-wip-us.apache.org/repos/asf/spark/blob/44dd57fb/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala index 22f232c..62db5e6 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala @@ -23,6 +23,8 @@ import com.twitter.algebird.HyperLogLog._ import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.twitter._ +import org.apache.spark.SparkConf + // scalastyle:off /** * Illustrates the use of the HyperLogLog algorithm, from Twitter's Algebird library, to compute @@ -42,20 +44,14 @@ import org.apache.spark.streaming.twitter._ // scalastyle:on object TwitterAlgebirdHLL { def main(args: Array[String]) { - if (args.length < 1) { - System.err.println("Usage: TwitterAlgebirdHLL <master>" + - " [filter1] [filter2] ... [filter n]") - System.exit(1) - } StreamingExamples.setStreamingLogLevels() /** Bit size parameter for HyperLogLog, trades off accuracy vs size */ val BIT_SIZE = 12 - val (master, filters) = (args.head, args.tail) - - val ssc = new StreamingContext(master, "TwitterAlgebirdHLL", Seconds(5), - System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq) + val filters = args + val sparkConf = new SparkConf().setAppName("TwitterAlgebirdHLL") + val ssc = new StreamingContext(sparkConf, Seconds(5)) val stream = TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_ONLY_SER) val users = stream.map(status => status.getUser.getId) http://git-wip-us.apache.org/repos/asf/spark/blob/44dd57fb/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala index 5b58e94..1ddff22 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala @@ -21,6 +21,7 @@ import org.apache.spark.streaming.{Seconds, StreamingContext} import StreamingContext._ import org.apache.spark.SparkContext._ import org.apache.spark.streaming.twitter._ +import org.apache.spark.SparkConf /** * Calculates popular hashtags (topics) over sliding 10 and 60 second windows from a Twitter @@ -30,18 +31,12 @@ import org.apache.spark.streaming.twitter._ */ object TwitterPopularTags { def main(args: Array[String]) { - if (args.length < 1) { - System.err.println("Usage: TwitterPopularTags <master>" + - " [filter1] [filter2] ... [filter n]") - System.exit(1) - } StreamingExamples.setStreamingLogLevels() - val (master, filters) = (args.head, args.tail) - - val ssc = new StreamingContext(master, "TwitterPopularTags", Seconds(2), - System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq) + val filters = args + val sparkConf = new SparkConf().setAppName("TwitterPopularTags") + val ssc = new StreamingContext(sparkConf, Seconds(2)) val stream = TwitterUtils.createStream(ssc, None, filters) val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#"))) http://git-wip-us.apache.org/repos/asf/spark/blob/44dd57fb/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala index de46e5f..7ade3f1 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala @@ -28,6 +28,7 @@ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.zeromq._ import scala.language.implicitConversions +import org.apache.spark.SparkConf /** * A simple publisher for demonstration purposes, repeatedly publishes random Messages @@ -63,30 +64,28 @@ object SimpleZeroMQPublisher { * Install zeroMQ (release 2.1) core libraries. [ZeroMQ Install guide] * (http://www.zeromq.org/intro:get-the-software) * - * Usage: ZeroMQWordCount <master> <zeroMQurl> <topic> - * In local mode, <master> should be 'local[n]' with n > 1 + * Usage: ZeroMQWordCount <zeroMQurl> <topic> * <zeroMQurl> and <topic> describe where zeroMq publisher is running. * * To run this example locally, you may run publisher as - * `$ ./bin/run-example org.apache.spark.examples.streaming.SimpleZeroMQPublisher tcp://127.0.1.1:1234 foo.bar` + * `$ ./bin/spark-submit examples.jar \ + * --class org.apache.spark.examples.streaming.SimpleZeroMQPublisher tcp://127.0.1.1:1234 foo.bar` * and run the example as - * `$ ./bin/run-example org.apache.spark.examples.streaming.ZeroMQWordCount local[2] tcp://127.0.1.1:1234 foo` + * `$ ./bin/spark-submit examples.jar \ + * --class org.apache.spark.examples.streaming.ZeroMQWordCount tcp://127.0.1.1:1234 foo` */ // scalastyle:on object ZeroMQWordCount { def main(args: Array[String]) { - if (args.length < 3) { - System.err.println( - "Usage: ZeroMQWordCount <master> <zeroMQurl> <topic>" + - "In local mode, <master> should be 'local[n]' with n > 1") + if (args.length < 2) { + System.err.println("Usage: ZeroMQWordCount <zeroMQurl> <topic>") System.exit(1) } StreamingExamples.setStreamingLogLevels() - val Seq(master, url, topic) = args.toSeq - + val Seq(url, topic) = args.toSeq + val sparkConf = new SparkConf().setAppName("ZeroMQWordCount") // Create the context and set the batch size - val ssc = new StreamingContext(master, "ZeroMQWordCount", Seconds(2), - System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq) + val ssc = new StreamingContext(sparkConf, Seconds(2)) def bytesToStringIterator(x: Seq[ByteString]) = (x.map(_.utf8String)).iterator http://git-wip-us.apache.org/repos/asf/spark/blob/44dd57fb/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala ---------------------------------------------------------------------- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala index fa533a5..d901d4f 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala @@ -27,10 +27,14 @@ import org.apache.spark.graphx.PartitionStrategy._ object Analytics extends Logging { def main(args: Array[String]): Unit = { - val host = args(0) - val taskType = args(1) - val fname = args(2) - val options = args.drop(3).map { arg => + if (args.length < 2) { + System.err.println("Usage: Analytics <taskType> <file> [other options]") + System.exit(1) + } + + val taskType = args(0) + val fname = args(1) + val options = args.drop(2).map { arg => arg.dropWhile(_ == '-').split('=') match { case Array(opt, v) => (opt -> v) case _ => throw new IllegalArgumentException("Invalid argument: " + arg) @@ -71,7 +75,7 @@ object Analytics extends Logging { println("| PageRank |") println("======================================") - val sc = new SparkContext(host, "PageRank(" + fname + ")", conf) + val sc = new SparkContext(conf.setAppName("PageRank(" + fname + ")")) val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname, minEdgePartitions = numEPart).cache() @@ -115,7 +119,7 @@ object Analytics extends Logging { println("| Connected Components |") println("======================================") - val sc = new SparkContext(host, "ConnectedComponents(" + fname + ")", conf) + val sc = new SparkContext(conf.setAppName("ConnectedComponents(" + fname + ")")) val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname, minEdgePartitions = numEPart).cache() val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_)) @@ -137,7 +141,7 @@ object Analytics extends Logging { println("======================================") println("| Triangle Count |") println("======================================") - val sc = new SparkContext(host, "TriangleCount(" + fname + ")", conf) + val sc = new SparkContext(conf.setAppName("TriangleCount(" + fname + ")")) val graph = GraphLoader.edgeListFile(sc, fname, canonicalOrientation = true, minEdgePartitions = numEPart).partitionBy(partitionStrategy).cache() val triangles = TriangleCount.run(graph)