[ 
https://issues.apache.org/jira/browse/BAHIR-39?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15406494#comment-15406494
 ] 

ASF GitHub Bot commented on BAHIR-39:
-------------------------------------

Github user lresende commented on a diff in the pull request:

    https://github.com/apache/bahir/pull/13#discussion_r73406376
  
    --- Diff: sql-streaming-mqtt/README.md ---
    @@ -0,0 +1,121 @@
    +A library for reading data from MQTT Servers using Spark SQL Streaming ( 
or Structured streaming.). 
    +
    +## Linking
    +
    +Using SBT:
    +
    +```scala
    +libraryDependencies += "org.apache.bahir" %% "spark-sql-streaming-mqtt" % 
"2.0.0"
    +```
    +
    +Using Maven:
    +
    +```xml
    +<dependency>
    +    <groupId>org.apache.bahir</groupId>
    +    <artifactId>spark-sql-streaming-mqtt_2.11</artifactId>
    +    <version>2.0.0</version>
    +</dependency>
    +```
    +
    +This library can also be added to Spark jobs launched through 
`spark-shell` or `spark-submit` by using the `--packages` command line option.
    +For example, to include it when starting the spark shell:
    +
    +```
    +$ bin/spark-shell --packages 
org.apache.bahir:spark-sql-streaming-mqtt_2.11:2.0.0
    +```
    +
    +Unlike using `--jars`, using `--packages` ensures that this library and 
its dependencies will be added to the classpath.
    +The `--packages` argument can also be used with `bin/spark-submit`.
    +
    +This library is compiled for Scala 2.11 only, and intends to support Spark 
2.0 onwards.
    +
    +## Examples
    +
    +A SQL Stream can be created with data streams received through MQTT Server 
using,
    +
    +```scala
    +sqlContext.readStream
    +  .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
    +  .option("topic", "mytopic")
    +  .load("tcp://localhost:1883")
    +
    +```
    +
    +## Enable recovering from failures.
    +
    +Setting values for option `localStorage` and `clientId` helps in 
recovering in case of a restart, by restoring the state where it left off 
before the shutdown.
    +
    +```scala
    +sqlContext.readStream
    +  .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
    +  .option("topic", "mytopic")
    +  .option("localStorage", "/path/to/localdir")
    +  .option("clientId", "some-client-id")
    +  .load("tcp://localhost:1883")
    +
    +```
    +
    +### Scala API
    +
    +An example, for scala API to count words from incoming message stream. 
    +
    +```scala
    +    // Create DataFrame representing the stream of input lines from 
connection to mqtt server
    +    val lines = spark.readStream
    +      
.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
    +      .option("topic", topic)
    +      .load(brokerUrl).as[(String, Timestamp)]
    +
    +    // Split the lines into words
    +    val words = lines.map(_._1).flatMap(_.split(" "))
    +
    +    // Generate running word count
    +    val wordCounts = words.groupBy("value").count()
    +
    +    // Start running the query that prints the running counts to the 
console
    +    val query = wordCounts.writeStream
    +      .outputMode("complete")
    +      .format("console")
    +      .start()
    +
    +    query.awaitTermination()
    +
    +```
    +Please see `MQTTStreamWordCount.scala` for full example.
    +
    +### Java API
    +
    +An example, for Java API to count words from incoming message stream. 
    +
    +```java
    +   
    +        // Create DataFrame representing the stream of input lines from 
connection to mqtt server.
    +        Dataset<String> lines = spark
    +                .readStream()
    +                
.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
    +                .option("topic", topic)
    +                .load(brokerUrl).select("value").as(Encoders.STRING());
    +
    +        // Split the lines into words
    +        Dataset<String> words = lines.flatMap(new FlatMapFunction<String, 
String>() {
    +            @Override
    +            public Iterator<String> call(String x) {
    +                return Arrays.asList(x.split(" ")).iterator();
    +            }
    +        }, Encoders.STRING());
    +
    +        // Generate running word count
    +        Dataset<Row> wordCounts = words.groupBy("value").count();
    +
    +        // Start running the query that prints the running counts to the 
console
    +        StreamingQuery query = wordCounts.writeStream()
    +                .outputMode("complete")
    +                .format("console")
    +                .start();
    +
    +        query.awaitTermination();
    +```
    +
    +Please see `JavaMQTTStreamWordCount.java` for full example.
    --- End diff --
    
    Please add a link to the example directory. Also, the example file name is 
MQTTStreamWordCount.java


> MQTT as a streaming source for SQL Streaming.
> ---------------------------------------------
>
>                 Key: BAHIR-39
>                 URL: https://issues.apache.org/jira/browse/BAHIR-39
>             Project: Bahir
>          Issue Type: New Feature
>          Components: Spark SQL Data Sources
>    Affects Versions: 2.1.0
>            Reporter: Prashant Sharma
>
> MQTT compatible streaming source for Spark SQL Streaming.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to