[
https://issues.apache.org/jira/browse/BAHIR-39?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15406494#comment-15406494
]
ASF GitHub Bot commented on BAHIR-39:
-------------------------------------
Github user lresende commented on a diff in the pull request:
https://github.com/apache/bahir/pull/13#discussion_r73406376
--- Diff: sql-streaming-mqtt/README.md ---
@@ -0,0 +1,121 @@
+A library for reading data from MQTT Servers using Spark SQL Streaming (
or Structured streaming.).
+
+## Linking
+
+Using SBT:
+
+```scala
+libraryDependencies += "org.apache.bahir" %% "spark-sql-streaming-mqtt" %
"2.0.0"
+```
+
+Using Maven:
+
+```xml
+<dependency>
+ <groupId>org.apache.bahir</groupId>
+ <artifactId>spark-sql-streaming-mqtt_2.11</artifactId>
+ <version>2.0.0</version>
+</dependency>
+```
+
+This library can also be added to Spark jobs launched through
`spark-shell` or `spark-submit` by using the `--packages` command line option.
+For example, to include it when starting the spark shell:
+
+```
+$ bin/spark-shell --packages
org.apache.bahir:spark-sql-streaming-mqtt_2.11:2.0.0
+```
+
+Unlike using `--jars`, using `--packages` ensures that this library and
its dependencies will be added to the classpath.
+The `--packages` argument can also be used with `bin/spark-submit`.
+
+This library is compiled for Scala 2.11 only, and intends to support Spark
2.0 onwards.
+
+## Examples
+
+A SQL Stream can be created with data streams received through MQTT Server
using,
+
+```scala
+sqlContext.readStream
+ .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
+ .option("topic", "mytopic")
+ .load("tcp://localhost:1883")
+
+```
+
+## Enable recovering from failures.
+
+Setting values for option `localStorage` and `clientId` helps in
recovering in case of a restart, by restoring the state where it left off
before the shutdown.
+
+```scala
+sqlContext.readStream
+ .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
+ .option("topic", "mytopic")
+ .option("localStorage", "/path/to/localdir")
+ .option("clientId", "some-client-id")
+ .load("tcp://localhost:1883")
+
+```
+
+### Scala API
+
+An example, for scala API to count words from incoming message stream.
+
+```scala
+ // Create DataFrame representing the stream of input lines from
connection to mqtt server
+ val lines = spark.readStream
+
.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
+ .option("topic", topic)
+ .load(brokerUrl).as[(String, Timestamp)]
+
+ // Split the lines into words
+ val words = lines.map(_._1).flatMap(_.split(" "))
+
+ // Generate running word count
+ val wordCounts = words.groupBy("value").count()
+
+ // Start running the query that prints the running counts to the
console
+ val query = wordCounts.writeStream
+ .outputMode("complete")
+ .format("console")
+ .start()
+
+ query.awaitTermination()
+
+```
+Please see `MQTTStreamWordCount.scala` for full example.
+
+### Java API
+
+An example, for Java API to count words from incoming message stream.
+
+```java
+
+ // Create DataFrame representing the stream of input lines from
connection to mqtt server.
+ Dataset<String> lines = spark
+ .readStream()
+
.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
+ .option("topic", topic)
+ .load(brokerUrl).select("value").as(Encoders.STRING());
+
+ // Split the lines into words
+ Dataset<String> words = lines.flatMap(new FlatMapFunction<String,
String>() {
+ @Override
+ public Iterator<String> call(String x) {
+ return Arrays.asList(x.split(" ")).iterator();
+ }
+ }, Encoders.STRING());
+
+ // Generate running word count
+ Dataset<Row> wordCounts = words.groupBy("value").count();
+
+ // Start running the query that prints the running counts to the
console
+ StreamingQuery query = wordCounts.writeStream()
+ .outputMode("complete")
+ .format("console")
+ .start();
+
+ query.awaitTermination();
+```
+
+Please see `JavaMQTTStreamWordCount.java` for full example.
--- End diff --
Please add a link to the example directory. Also, the example file name is
MQTTStreamWordCount.java
> MQTT as a streaming source for SQL Streaming.
> ---------------------------------------------
>
> Key: BAHIR-39
> URL: https://issues.apache.org/jira/browse/BAHIR-39
> Project: Bahir
> Issue Type: New Feature
> Components: Spark SQL Data Sources
> Affects Versions: 2.1.0
> Reporter: Prashant Sharma
>
> MQTT compatible streaming source for Spark SQL Streaming.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)