Repository: bahir Updated Branches: refs/heads/master 1c0f4afcb -> a351549cf
[BAHIR-51] Add new configuration options to MqttStreamSource. Add new configuration options to enable secured connections and other quality of services. Closes #22 Project: http://git-wip-us.apache.org/repos/asf/bahir/repo Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/a351549c Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/a351549c Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/a351549c Branch: refs/heads/master Commit: a351549cf634adf5249862599166ef9ed9073725 Parents: 1c0f4af Author: Prashant Sharma <[email protected]> Authored: Thu Aug 11 15:56:20 2016 +0530 Committer: Luciano Resende <[email protected]> Committed: Mon Aug 22 09:58:02 2016 -0700 ---------------------------------------------------------------------- sql-streaming-mqtt/README.md | 16 ++++++ .../sql/streaming/mqtt/MQTTStreamSource.scala | 60 +++++++++++++++++--- .../streaming/mqtt/MQTTStreamSourceSuite.scala | 2 +- 3 files changed, 69 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bahir/blob/a351549c/sql-streaming-mqtt/README.md ---------------------------------------------------------------------- diff --git a/sql-streaming-mqtt/README.md b/sql-streaming-mqtt/README.md index c1a078b..c9ea8cf 100644 --- a/sql-streaming-mqtt/README.md +++ b/sql-streaming-mqtt/README.md @@ -44,6 +44,22 @@ Setting values for option `localStorage` and `clientId` helps in recovering in c .option("clientId", "some-client-id") .load("tcp://localhost:1883") +## Configuration options. + +This source uses [Eclipse Paho Java Client](https://eclipse.org/paho/clients/java/). Client API documentation is located [here](http://www.eclipse.org/paho/files/javadoc/index.html). + + * `brokerUrl` A url MqttClient connects to. Set this or `path` as the url of the Mqtt Server. e.g. tcp://localhost:1883. + * `persistence` By default it is used for storing incoming messages on disk. If `memory` is provided as value for this option, then recovery on restart is not supported. + * `topic` Topic MqttClient subscribes to. + * `clientId` clientId, this client is assoicated with. Provide the same value to recover a stopped client. + * `QoS` The maximum quality of service to subscribe each topic at. Messages published at a lower quality of service will be received at the published QoS. Messages published at a higher quality of service will be received using the QoS specified on the subscribe. + * `username` Sets the user name to use for the connection to Mqtt Server. Do not set it, if server does not need this. Setting it empty will lead to errors. + * `password` Sets the password to use for the connection. + * `cleanSession` Setting it true starts a clean session, removes all checkpointed messages by a previous run of this source. This is set to false by default. + * `connectionTimeout` Sets the connection timeout, a value of 0 is interpretted as wait until client connects. See `MqttConnectOptions.setConnectionTimeout` for more information. + * `keepAlive` Same as `MqttConnectOptions.setKeepAliveInterval`. + * `mqttVersion` Same as `MqttConnectOptions.setMqttVersion`. + ### Scala API An example, for scala API to count words from incoming message stream. http://git-wip-us.apache.org/repos/asf/bahir/blob/a351549c/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala ---------------------------------------------------------------------- diff --git a/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala b/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala index 471886a..8857edb 100644 --- a/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala +++ b/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala @@ -17,6 +17,7 @@ package org.apache.bahir.sql.streaming.mqtt +import java.nio.charset.Charset import java.sql.Timestamp import java.text.SimpleDateFormat import java.util.Calendar @@ -44,9 +45,31 @@ object MQTTStreamConstants { :: StructField("timestamp", TimestampType) :: Nil) } +/** + * A Text based mqtt stream source, it interprets the payload of each incoming message by converting + * the bytes to String using Charset.defaultCharset as charset. Each value is associated with a + * timestamp of arrival of the message on the source. It can be used to operate a window on the + * incoming stream. + * + * @param brokerUrl url MqttClient connects to. + * @param persistence an instance of MqttClientPersistence. By default it is used for storing + * incoming messages on disk. If memory is provided as option, then recovery on + * restart is not supported. + * @param topic topic MqttClient subscribes to. + * @param clientId clientId, this client is assoicated with. Provide the same value to recover + * a stopped client. + * @param messageParser parsing logic for processing incoming messages from Mqtt Server. + * @param sqlContext Spark provided, SqlContext. + * @param mqttConnectOptions an instance of MqttConnectOptions for this Source. + * @param qos the maximum quality of service to subscribe each topic at.Messages published at + * a lower quality of service will be received at the published QoS. Messages + * published at a higher quality of service will be received using the QoS specified + * on the subscribe. + */ class MQTTTextStreamSource(brokerUrl: String, persistence: MqttClientPersistence, topic: String, clientId: String, messageParser: Array[Byte] => (String, Timestamp), - sqlContext: SQLContext) extends Source with Logging { + sqlContext: SQLContext, mqttConnectOptions: MqttConnectOptions, qos: Int) + extends Source with Logging { override def schema: StructType = MQTTStreamConstants.SCHEMA_DEFAULT @@ -73,10 +96,6 @@ class MQTTTextStreamSource(brokerUrl: String, persistence: MqttClientPersistence private def initialize(): Unit = { client = new MqttClient(brokerUrl, clientId, persistence) - val mqttConnectOptions: MqttConnectOptions = new MqttConnectOptions() - mqttConnectOptions.setAutomaticReconnect(true) - // This is required to support recovery. TODO: configurable ? - mqttConnectOptions.setCleanSession(false) val callback = new MqttCallbackExtended() { @@ -101,7 +120,7 @@ class MQTTTextStreamSource(brokerUrl: String, persistence: MqttClientPersistence } client.setCallback(callback) client.connect(mqttConnectOptions) - client.subscribe(topic) + client.subscribe(topic, qos) // It is not possible to initialize offset without `client.connect` offset = fetchLastProcessedOffset() initLock.countDown() // Release. @@ -171,7 +190,8 @@ class MQTTStreamSourceProvider extends StreamSourceProvider with DataSourceRegis } } - val messageParserWithTimeStamp = (x: Array[Byte]) => (new String(x), Timestamp.valueOf( + val messageParserWithTimeStamp = (x: Array[Byte]) => + (new String(x, Charset.defaultCharset()), Timestamp.valueOf( MQTTStreamConstants.DATE_FORMAT.format(Calendar.getInstance().getTime))) // if default is subscribe everything, it leads to getting lot unwanted system messages. @@ -183,8 +203,32 @@ class MQTTStreamSourceProvider extends StreamSourceProvider with DataSourceRegis "\nRecovering from failure is not supported in such a case.") MqttClient.generateClientId()}) + val username: Option[String] = parameters.get("username") + val password: Option[String] = parameters.get("password") + val connectionTimeout: Int = parameters.getOrElse("connectionTimeout", + MqttConnectOptions.CONNECTION_TIMEOUT_DEFAULT.toString).toInt + val keepAlive: Int = parameters.getOrElse("keepAlive", MqttConnectOptions + .KEEP_ALIVE_INTERVAL_DEFAULT.toString).toInt + val mqttVersion: Int = parameters.getOrElse("mqttVersion", MqttConnectOptions + .MQTT_VERSION_DEFAULT.toString).toInt + val cleanSession: Boolean = parameters.getOrElse("cleanSession", "false").toBoolean + val qos: Int = parameters.getOrElse("QoS", "1").toInt + + val mqttConnectOptions: MqttConnectOptions = new MqttConnectOptions() + mqttConnectOptions.setAutomaticReconnect(true) + mqttConnectOptions.setCleanSession(cleanSession) + mqttConnectOptions.setConnectionTimeout(connectionTimeout) + mqttConnectOptions.setKeepAliveInterval(keepAlive) + mqttConnectOptions.setMqttVersion(mqttVersion) + (username, password) match { + case (Some(u: String), Some(p: String)) => + mqttConnectOptions.setUserName(u) + mqttConnectOptions.setPassword(p.toCharArray) + case _ => + } + new MQTTTextStreamSource(brokerUrl, persistence, topic, clientId, - messageParserWithTimeStamp, sqlContext) + messageParserWithTimeStamp, sqlContext, mqttConnectOptions, qos) } override def shortName(): String = "mqtt" http://git-wip-us.apache.org/repos/asf/bahir/blob/a351549c/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSourceSuite.scala ---------------------------------------------------------------------- diff --git a/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSourceSuite.scala b/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSourceSuite.scala index f6f5ff6..257bd0b 100644 --- a/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSourceSuite.scala +++ b/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSourceSuite.scala @@ -59,7 +59,7 @@ class MQTTStreamSourceSuite extends SparkFunSuite with SharedSparkContext with B val dataFrame: DataFrame = sqlContext.readStream.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider") .option("topic", "test").option("localStorage", dir).option("clientId", "clientId") - .load("tcp://" + mqttTestUtils.brokerUri) + .option("QoS", "2").load("tcp://" + mqttTestUtils.brokerUri) (sqlContext, dataFrame) }
