Repository: bahir Updated Branches: refs/heads/master 4a993afaa -> 1abeab29c
[BAHIR-42] Refactor sql-streaming-mqtt scala example Project: http://git-wip-us.apache.org/repos/asf/bahir/repo Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/1abeab29 Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/1abeab29 Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/1abeab29 Branch: refs/heads/master Commit: 1abeab29c8a5e884f4603ef12abd85971a9105b0 Parents: 4a993af Author: Luciano Resende <[email protected]> Authored: Sat Aug 6 20:24:01 2016 +0300 Committer: Luciano Resende <[email protected]> Committed: Sat Aug 6 20:24:01 2016 +0300 ---------------------------------------------------------------------- .../streaming/mqtt/MQTTStreamWordCount.scala | 73 ++++++++++++++++++++ .../mqtt/examples/MQTTStreamWordCount.scala | 73 -------------------- 2 files changed, 73 insertions(+), 73 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bahir/blob/1abeab29/sql-streaming-mqtt/examples/src/main/scala/org/apache/bahir/examples/sql/streaming/mqtt/MQTTStreamWordCount.scala ---------------------------------------------------------------------- diff --git a/sql-streaming-mqtt/examples/src/main/scala/org/apache/bahir/examples/sql/streaming/mqtt/MQTTStreamWordCount.scala b/sql-streaming-mqtt/examples/src/main/scala/org/apache/bahir/examples/sql/streaming/mqtt/MQTTStreamWordCount.scala new file mode 100644 index 0000000..237a8fa --- /dev/null +++ b/sql-streaming-mqtt/examples/src/main/scala/org/apache/bahir/examples/sql/streaming/mqtt/MQTTStreamWordCount.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.bahir.examples.sql.streaming.mqtt + +import java.sql.Timestamp + +import org.apache.spark.sql.SparkSession + +/** + * Counts words in UTF8 encoded, '\n' delimited text received from MQTT Server. + * + * Usage: MQTTStreamWordCount <brokerUrl> <topic> + * <brokerUrl> and <topic> describe the MQTT server that Structured Streaming + * would connect to receive data. + * + * To run this on your local machine, a MQTT Server should be up and running. + * + */ +object MQTTStreamWordCount { + def main(args: Array[String]) { + if (args.length < 2) { + System.err.println("Usage: MQTTStreamWordCount <brokerUrl> <topic>") // scalastyle:off println + System.exit(1) + } + + val brokerUrl = args(0) + val topic = args(1) + + val spark = SparkSession + .builder + .appName("MQTTStreamWordCount") + .master("local[4]") + .getOrCreate() + + import spark.implicits._ + + // Create DataFrame representing the stream of input lines from connection to mqtt server + val lines = spark.readStream + .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider") + .option("topic", topic) + .load(brokerUrl).as[(String, Timestamp)] + + // Split the lines into words + val words = lines.map(_._1).flatMap(_.split(" ")) + + // Generate running word count + val wordCounts = words.groupBy("value").count() + + // Start running the query that prints the running counts to the console + val query = wordCounts.writeStream + .outputMode("complete") + .format("console") + .start() + + query.awaitTermination() + } +} + http://git-wip-us.apache.org/repos/asf/bahir/blob/1abeab29/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/examples/MQTTStreamWordCount.scala ---------------------------------------------------------------------- diff --git a/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/examples/MQTTStreamWordCount.scala b/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/examples/MQTTStreamWordCount.scala deleted file mode 100644 index c792858..0000000 --- a/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/examples/MQTTStreamWordCount.scala +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.bahir.sql.streaming.mqtt.examples - -import java.sql.Timestamp - -import org.apache.spark.sql.SparkSession - -/** - * Counts words in UTF8 encoded, '\n' delimited text received from MQTT Server. - * - * Usage: MQTTStreamWordCount <brokerUrl> <topic> - * <brokerUrl> and <topic> describe the MQTT server that Structured Streaming - * would connect to receive data. - * - * To run this on your local machine, a MQTT Server should be up and running. - * - */ -object MQTTStreamWordCount { - def main(args: Array[String]) { - if (args.length < 2) { - System.err.println("Usage: MQTTStreamWordCount <brokerUrl> <topic>") // scalastyle:off println - System.exit(1) - } - - val brokerUrl = args(0) - val topic = args(1) - - val spark = SparkSession - .builder - .appName("MQTTStreamWordCount") - .master("local[4]") - .getOrCreate() - - import spark.implicits._ - - // Create DataFrame representing the stream of input lines from connection to mqtt server - val lines = spark.readStream - .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider") - .option("topic", topic) - .load(brokerUrl).as[(String, Timestamp)] - - // Split the lines into words - val words = lines.map(_._1).flatMap(_.split(" ")) - - // Generate running word count - val wordCounts = words.groupBy("value").count() - - // Start running the query that prints the running counts to the console - val query = wordCounts.writeStream - .outputMode("complete") - .format("console") - .start() - - query.awaitTermination() - } -} -
