[BAHIR-53] Add new configuration options to MQTTInputDStream

Add new configuration options to enable secured connections and
other quality of services.

Closes #23


Project: http://git-wip-us.apache.org/repos/asf/bahir/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/28f034f4
Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/28f034f4
Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/28f034f4

Branch: refs/heads/master
Commit: 28f034f49d19034b596f7f04ca4fc2698a21ad6c
Parents: ab62371
Author: Sebastian Woehrl <sebastian.woe...@maibornwolff.de>
Authored: Sat Aug 13 15:00:13 2016 +0200
Committer: Luciano Resende <lrese...@apache.org>
Committed: Mon Aug 22 12:38:28 2016 -0700

----------------------------------------------------------------------
 scalastyle-config.xml                           |   2 +-
 streaming-mqtt/README.md                        |  23 ++++
 .../spark/streaming/mqtt/MQTTInputDStream.scala |  67 ++++++++--
 .../apache/spark/streaming/mqtt/MQTTUtils.scala | 129 ++++++++++++++++++-
 .../streaming/mqtt/JavaMQTTStreamSuite.java     |   6 +
 5 files changed, 211 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bahir/blob/28f034f4/scalastyle-config.xml
----------------------------------------------------------------------
diff --git a/scalastyle-config.xml b/scalastyle-config.xml
index 1db5977..c6aa3d9 100644
--- a/scalastyle-config.xml
+++ b/scalastyle-config.xml
@@ -94,7 +94,7 @@ This file is divided into 3 sections:
   </check>
 
   <check level="error" 
class="org.scalastyle.scalariform.ParameterNumberChecker" enabled="true">
-    <parameters><parameter 
name="maxParameters"><![CDATA[10]]></parameter></parameters>
+    <parameters><parameter 
name="maxParameters"><![CDATA[15]]></parameter></parameters>
   </check>
 
   <check level="error" class="org.scalastyle.scalariform.NoFinalizeChecker" 
enabled="true"></check>

http://git-wip-us.apache.org/repos/asf/bahir/blob/28f034f4/streaming-mqtt/README.md
----------------------------------------------------------------------
diff --git a/streaming-mqtt/README.md b/streaming-mqtt/README.md
index 9482648..2ec0128 100644
--- a/streaming-mqtt/README.md
+++ b/streaming-mqtt/README.md
@@ -25,6 +25,23 @@ The `--packages` argument can also be used with 
`bin/spark-submit`.
 
 This library is cross-published for Scala 2.10 and Scala 2.11, so users should 
replace the proper Scala version (2.10 or 2.11) in the commands listed above.
 
+## Configuration options.
+
+This source uses the [Eclipse Paho Java 
Client](https://eclipse.org/paho/clients/java/). Client API documentation is 
located [here](http://www.eclipse.org/paho/files/javadoc/index.html).
+
+ * `brokerUrl` A url MqttClient connects to. Set this as the url of the Mqtt 
Server. e.g. tcp://localhost:1883.
+ * `storageLevel` By default it is used for storing incoming messages on disk.
+ * `topic` Topic MqttClient subscribes to.
+ * `clientId` clientId, this client is assoicated with. Provide the same value 
to recover a stopped client.
+ * `QoS` The maximum quality of service to subscribe each topic at. Messages 
published at a lower quality of service will be received at the published QoS. 
Messages published at a higher quality of service will be received using the 
QoS specified on the subscribe.
+ * `username` Sets the user name to use for the connection to Mqtt Server. Do 
not set it, if server does not need this. Setting it empty will lead to errors.
+ * `password` Sets the password to use for the connection.
+ * `cleanSession` Setting it true starts a clean session, removes all 
checkpointed messages by a previous run of this source. This is set to false by 
default.
+ * `connectionTimeout` Sets the connection timeout, a value of 0 is 
interpreted as wait until client connects. See 
`MqttConnectOptions.setConnectionTimeout` for more information.
+ * `keepAlive` Same as `MqttConnectOptions.setKeepAliveInterval`.
+ * `mqttVersion` Same as `MqttConnectOptions.setMqttVersion`.
+
+
 ## Examples
 
 ### Scala API
@@ -34,6 +51,12 @@ this actor can be configured to handle failures, etc.
 
     val lines = MQTTUtils.createStream(ssc, brokerUrl, topic)
 
+Additional mqtt connection options can be provided:
+
+```Scala
+val lines = MQTTUtils.createStream(ssc, brokerUrl, topic, storageLevel, 
clientId, username, password, cleanSession, qos, connectionTimeout, 
keepAliveInterval, mqttVersion)
+```
+
 ### Java API
 
 You need to extend `JavaActorReceiver` so as to store received data into Spark 
using `store(...)` methods. The supervisor strategy of

http://git-wip-us.apache.org/repos/asf/bahir/blob/28f034f4/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
 
b/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
index cbad6f7..328656b 100644
--- 
a/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
+++ 
b/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
@@ -19,10 +19,7 @@ package org.apache.spark.streaming.mqtt
 
 import java.nio.charset.StandardCharsets
 
-import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken
-import org.eclipse.paho.client.mqttv3.MqttCallback
-import org.eclipse.paho.client.mqttv3.MqttClient
-import org.eclipse.paho.client.mqttv3.MqttMessage
+import org.eclipse.paho.client.mqttv3._
 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
 
 import org.apache.spark.storage.StorageLevel
@@ -33,23 +30,39 @@ import org.apache.spark.streaming.receiver.Receiver
 /**
  * Input stream that subscribe messages from a Mqtt Broker.
  * Uses eclipse paho as MqttClient http://www.eclipse.org/paho/
- * @param brokerUrl Url of remote mqtt publisher
- * @param topic topic name to subscribe to
- * @param storageLevel RDD storage level.
+ * @param brokerUrl          Url of remote mqtt publisher
+ * @param topic              topic name to subscribe to
+ * @param storageLevel       RDD storage level.
+ * @param clientId           ClientId to use for the mqtt connection
+ * @param username           Username for authentication to the mqtt publisher
+ * @param password           Password for authentication to the mqtt publisher
+ * @param cleanSession       Sets the mqtt cleanSession parameter
+ * @param qos                Quality of service to use for the topic 
subscription
+ * @param connectionTimeout  Connection timeout for the mqtt connection
+ * @param keepAliveInterval  Keepalive interal for the mqtt connection
+ * @param mqttVersion        Version to use for the mqtt connection
  */
-
 private[streaming]
 class MQTTInputDStream(
     _ssc: StreamingContext,
     brokerUrl: String,
     topic: String,
-    storageLevel: StorageLevel
+    storageLevel: StorageLevel,
+    clientId: Option[String] = None,
+    username: Option[String] = None,
+    password: Option[String] = None,
+    cleanSession: Option[Boolean] = None,
+    qos: Option[Int] = None,
+    connectionTimeout: Option[Int] = None,
+    keepAliveInterval: Option[Int] = None,
+    mqttVersion: Option[Int] = None
   ) extends ReceiverInputDStream[String](_ssc) {
 
   private[streaming] override def name: String = s"MQTT stream [$id]"
 
   def getReceiver(): Receiver[String] = {
-    new MQTTReceiver(brokerUrl, topic, storageLevel)
+    new MQTTReceiver(brokerUrl, topic, storageLevel, clientId, username, 
password, cleanSession,
+      qos, connectionTimeout, keepAliveInterval, mqttVersion)
   }
 }
 
@@ -57,7 +70,15 @@ private[streaming]
 class MQTTReceiver(
     brokerUrl: String,
     topic: String,
-    storageLevel: StorageLevel
+    storageLevel: StorageLevel,
+    clientId: Option[String],
+    username: Option[String],
+    password: Option[String],
+    cleanSession: Option[Boolean],
+    qos: Option[Int],
+    connectionTimeout: Option[Int],
+    keepAliveInterval: Option[Int],
+    mqttVersion: Option[Int]
   ) extends Receiver[String](storageLevel) {
 
   def onStop() {
@@ -70,7 +91,25 @@ class MQTTReceiver(
     val persistence = new MemoryPersistence()
 
     // Initializing Mqtt Client specifying brokerUrl, clientID and 
MqttClientPersistance
-    val client = new MqttClient(brokerUrl, MqttClient.generateClientId(), 
persistence)
+    val client = new MqttClient(brokerUrl, 
clientId.getOrElse(MqttClient.generateClientId()),
+                                persistence)
+
+    // Initialize mqtt parameters
+    val mqttConnectionOptions = new MqttConnectOptions()
+    if (username.isDefined && password.isDefined) {
+      mqttConnectionOptions.setUserName(username.get)
+      mqttConnectionOptions.setPassword(password.get.toCharArray)
+    }
+    mqttConnectionOptions.setCleanSession(cleanSession.getOrElse(true))
+    if (connectionTimeout.isDefined) {
+      mqttConnectionOptions.setConnectionTimeout(connectionTimeout.get)
+    }
+    if (keepAliveInterval.isDefined) {
+      mqttConnectionOptions.setKeepAliveInterval(keepAliveInterval.get)
+    }
+    if (mqttVersion.isDefined) {
+      mqttConnectionOptions.setMqttVersion(mqttVersion.get)
+    }
 
     // Callback automatically triggers as and when new message arrives on 
specified topic
     val callback = new MqttCallback() {
@@ -93,10 +132,10 @@ class MQTTReceiver(
     client.setCallback(callback)
 
     // Connect to MqttBroker
-    client.connect()
+    client.connect(mqttConnectionOptions)
 
     // Subscribe to Mqtt topic
-    client.subscribe(topic)
+    client.subscribe(topic, qos.getOrElse(1))
 
   }
 }

http://git-wip-us.apache.org/repos/asf/bahir/blob/28f034f4/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
----------------------------------------------------------------------
diff --git 
a/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala 
b/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
index 7b8d56d..7e2f5c7 100644
--- 
a/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
+++ 
b/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
@@ -41,6 +41,40 @@ object MQTTUtils {
     new MQTTInputDStream(ssc, brokerUrl, topic, storageLevel)
   }
 
+
+  /**
+   * Create an input stream that receives messages pushed by a MQTT publisher.
+   * @param ssc                StreamingContext object
+   * @param brokerUrl          Url of remote MQTT publisher
+   * @param topic              Topic name to subscribe to
+   * @param storageLevel       RDD storage level. Defaults to 
StorageLevel.MEMORY_AND_DISK_SER_2.
+   * @param clientId           ClientId to use for the mqtt connection
+   * @param username           Username for authentication to the mqtt 
publisher
+   * @param password           Password for authentication to the mqtt 
publisher
+   * @param cleanSession       Sets the mqtt cleanSession parameter
+   * @param qos                Quality of service to use for the topic 
subscription
+   * @param connectionTimeout  Connection timeout for the mqtt connection
+   * @param keepAliveInterval  Keepalive interal for the mqtt connection
+   * @param mqttVersion        Version to use for the mqtt connection
+   */
+  def createStream(
+      ssc: StreamingContext,
+      brokerUrl: String,
+      topic: String,
+      storageLevel: StorageLevel,
+      clientId: Option[String],
+      username: Option[String],
+      password: Option[String],
+      cleanSession: Option[Boolean],
+      qos: Option[Int],
+      connectionTimeout: Option[Int],
+      keepAliveInterval: Option[Int],
+      mqttVersion: Option[Int]
+    ): ReceiverInputDStream[String] = {
+    new MQTTInputDStream(ssc, brokerUrl, topic, storageLevel, clientId, 
username, password,
+          cleanSession, qos, connectionTimeout, keepAliveInterval, mqttVersion)
+  }
+
   /**
    * Create an input stream that receives messages pushed by a MQTT publisher.
    * Storage level of the data will be the default 
StorageLevel.MEMORY_AND_DISK_SER_2.
@@ -59,7 +93,7 @@ object MQTTUtils {
 
   /**
    * Create an input stream that receives messages pushed by a MQTT publisher.
-   * @param jssc      JavaStreamingContext object
+   * @param jssc          JavaStreamingContext object
    * @param brokerUrl     Url of remote MQTT publisher
    * @param topic         Topic name to subscribe to
    * @param storageLevel  RDD storage level.
@@ -73,6 +107,99 @@ object MQTTUtils {
     implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
     createStream(jssc.ssc, brokerUrl, topic, storageLevel)
   }
+
+  /**
+   * Create an input stream that receives messages pushed by a MQTT publisher.
+   * @param jssc               JavaStreamingContext object
+   * @param brokerUrl          Url of remote MQTT publisher
+   * @param topic              Topic name to subscribe to
+   * @param storageLevel       RDD storage level.
+   * @param clientId           ClientId to use for the mqtt connection
+   * @param username           Username for authentication to the mqtt 
publisher
+   * @param password           Password for authentication to the mqtt 
publisher
+   * @param cleanSession       Sets the mqtt cleanSession parameter
+   * @param qos                Quality of service to use for the topic 
subscription
+   * @param connectionTimeout  Connection timeout for the mqtt connection
+   * @param keepAliveInterval  Keepalive interal for the mqtt connection
+   * @param mqttVersion        Version to use for the mqtt connection
+   */
+  def createStream(
+      jssc: JavaStreamingContext,
+      brokerUrl: String,
+      topic: String,
+      storageLevel: StorageLevel,
+      clientId: String,
+      username: String,
+      password: String,
+      cleanSession: Boolean,
+      qos: Int,
+      connectionTimeout: Int,
+      keepAliveInterval: Int,
+      mqttVersion: Int
+    ): JavaReceiverInputDStream[String] = {
+    implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
+    createStream(jssc.ssc, brokerUrl, topic, storageLevel, Option(clientId), 
Option(username),
+        Option(password), Option(cleanSession), Option(qos), 
Option(connectionTimeout),
+        Option(keepAliveInterval), Option(mqttVersion))
+  }
+
+  /**
+   * Create an input stream that receives messages pushed by a MQTT publisher.
+   * @param jssc               JavaStreamingContext object
+   * @param brokerUrl          Url of remote MQTT publisher
+   * @param topic              Topic name to subscribe to
+   * @param clientId           ClientId to use for the mqtt connection
+   * @param username           Username for authentication to the mqtt 
publisher
+   * @param password           Password for authentication to the mqtt 
publisher
+   * @param cleanSession       Sets the mqtt cleanSession parameter
+   * @param qos                Quality of service to use for the topic 
subscription
+   * @param connectionTimeout  Connection timeout for the mqtt connection
+   * @param keepAliveInterval  Keepalive interal for the mqtt connection
+   * @param mqttVersion        Version to use for the mqtt connection
+   */
+  def createStream(
+      jssc: JavaStreamingContext,
+      brokerUrl: String,
+      topic: String,
+      clientId: String,
+      username: String,
+      password: String,
+      cleanSession: Boolean,
+      qos: Int,
+      connectionTimeout: Int,
+      keepAliveInterval: Int,
+      mqttVersion: Int
+    ): JavaReceiverInputDStream[String] = {
+    implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
+    createStream(jssc.ssc, brokerUrl, topic, 
StorageLevel.MEMORY_AND_DISK_SER_2, Option(clientId),
+      Option(username), Option(password), Option(cleanSession), Option(qos),
+      Option(connectionTimeout), Option(keepAliveInterval), 
Option(mqttVersion))
+  }
+
+  /**
+   * Create an input stream that receives messages pushed by a MQTT publisher.
+   * @param jssc               JavaStreamingContext object
+   * @param brokerUrl          Url of remote MQTT publisher
+   * @param topic              Topic name to subscribe to
+   * @param clientId           ClientId to use for the mqtt connection
+   * @param username           Username for authentication to the mqtt 
publisher
+   * @param password           Password for authentication to the mqtt 
publisher
+   * @param cleanSession       Sets the mqtt cleanSession parameter
+   */
+  def createStream(
+      jssc: JavaStreamingContext,
+      brokerUrl: String,
+      topic: String,
+      clientId: String,
+      username: String,
+      password: String,
+      cleanSession: Boolean
+    ): JavaReceiverInputDStream[String] = {
+    implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
+    createStream(jssc.ssc, brokerUrl, topic, 
StorageLevel.MEMORY_AND_DISK_SER_2, Option(clientId),
+      Option(username), Option(password), Option(cleanSession), None, None, 
None, None)
+  }
+
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/bahir/blob/28f034f4/streaming-mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java
----------------------------------------------------------------------
diff --git 
a/streaming-mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java
 
b/streaming-mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java
index ce5aa1e..45332d9 100644
--- 
a/streaming-mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java
+++ 
b/streaming-mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java
@@ -33,5 +33,11 @@ public class JavaMQTTStreamSuite extends 
LocalJavaStreamingContext {
     JavaReceiverInputDStream<String> test1 = MQTTUtils.createStream(ssc, 
brokerUrl, topic);
     JavaReceiverInputDStream<String> test2 = MQTTUtils.createStream(ssc, 
brokerUrl, topic,
       StorageLevel.MEMORY_AND_DISK_SER_2());
+    JavaReceiverInputDStream<String> test3 = MQTTUtils.createStream(ssc, 
brokerUrl, topic,
+      StorageLevel.MEMORY_AND_DISK_SER_2(), "testid", "user", "password", 
true, 1, 10, 30, 3);
+    JavaReceiverInputDStream<String> test4 = MQTTUtils.createStream(ssc, 
brokerUrl, topic,
+      "testid", "user", "password", true, 1, 10, 30, 3);
+    JavaReceiverInputDStream<String> test5 = MQTTUtils.createStream(ssc, 
brokerUrl, topic,
+      "testid", "user", "password", true);
   }
 }

Reply via email to