Repository: bahir Updated Branches: refs/heads/master dca8d4c2d -> e3d9e6960
[BAHIR-100] Enhance MQTT connector to support byte arrays Closes #47 Project: http://git-wip-us.apache.org/repos/asf/bahir/repo Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/e3d9e696 Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/e3d9e696 Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/e3d9e696 Branch: refs/heads/master Commit: e3d9e6960941696ba073735e9d039c85146c217a Parents: dca8d4c Author: drosenst <[email protected]> Authored: Wed Jul 5 23:41:02 2017 +0300 Committer: Luciano Resende <[email protected]> Committed: Wed Jul 19 16:47:26 2017 -0700 ---------------------------------------------------------------------- streaming-mqtt/README.md | 3 + .../spark/streaming/mqtt/MQTTInputDStream.scala | 1 + .../mqtt/MQTTPairedByteArrayInputDStream.scala | 144 ++++++++ .../streaming/mqtt/MQTTPairedInputDStream.scala | 1 + .../apache/spark/streaming/mqtt/MQTTUtils.scala | 366 ++++++++++++++----- .../streaming/mqtt/JavaMQTTStreamSuite.java | 20 +- .../spark/streaming/mqtt/MQTTStreamSuite.scala | 26 ++ 7 files changed, 477 insertions(+), 84 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bahir/blob/e3d9e696/streaming-mqtt/README.md ---------------------------------------------------------------------- diff --git a/streaming-mqtt/README.md b/streaming-mqtt/README.md index 6b89136..eb08b51 100644 --- a/streaming-mqtt/README.md +++ b/streaming-mqtt/README.md @@ -52,12 +52,14 @@ this actor can be configured to handle failures, etc. val lines = MQTTUtils.createStream(ssc, brokerUrl, topic) val lines = MQTTUtils.createPairedStream(ssc, brokerUrl, topic) + val lines = MQTTUtils.createPairedByteArrayStreamStream(ssc, brokerUrl, topic) Additional mqtt connection options can be provided: ```Scala val lines = MQTTUtils.createStream(ssc, brokerUrl, topic, storageLevel, clientId, username, password, cleanSession, qos, connectionTimeout, keepAliveInterval, mqttVersion) val lines = MQTTUtils.createPairedStream(ssc, brokerUrl, topics, storageLevel, clientId, username, password, cleanSession, qos, connectionTimeout, keepAliveInterval, mqttVersion) +val lines = MQTTUtils.createPairedByteArrayStream(ssc, brokerUrl, topics, storageLevel, clientId, username, password, cleanSession, qos, connectionTimeout, keepAliveInterval, mqttVersion) ``` ### Java API @@ -67,5 +69,6 @@ this actor can be configured to handle failures, etc. JavaDStream<String> lines = MQTTUtils.createStream(jssc, brokerUrl, topic); JavaReceiverInputDStream<Tuple2<String, String>> lines = MQTTUtils.createPairedStream(jssc, brokerUrl, topics); + JavaReceiverInputDStream<Tuple2<String, String>> lines = MQTTUtils.createPairedByteArrayStream(jssc, brokerUrl, topics); See end-to-end examples at [MQTT Examples](https://github.com/apache/bahir/tree/master/streaming-mqtt/examples) http://git-wip-us.apache.org/repos/asf/bahir/blob/e3d9e696/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala ---------------------------------------------------------------------- diff --git a/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala b/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala index 328656b..cf27440 100644 --- a/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala +++ b/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala @@ -30,6 +30,7 @@ import org.apache.spark.streaming.receiver.Receiver /** * Input stream that subscribe messages from a Mqtt Broker. * Uses eclipse paho as MqttClient http://www.eclipse.org/paho/ + * @param _ssc Spark Streaming StreamingContext * @param brokerUrl Url of remote mqtt publisher * @param topic topic name to subscribe to * @param storageLevel RDD storage level. http://git-wip-us.apache.org/repos/asf/bahir/blob/e3d9e696/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTPairedByteArrayInputDStream.scala ---------------------------------------------------------------------- diff --git a/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTPairedByteArrayInputDStream.scala b/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTPairedByteArrayInputDStream.scala new file mode 100644 index 0000000..07c0b18 --- /dev/null +++ b/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTPairedByteArrayInputDStream.scala @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.mqtt + +import org.eclipse.paho.client.mqttv3._ +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence + +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.dstream._ +import org.apache.spark.streaming.receiver.Receiver + +/** + * Input stream that subscribe messages from a Mqtt Broker. + * Uses eclipse paho as MqttClient http://www.eclipse.org/paho/ + * @param _ssc: Spark Streaming StreamingContext, + * @param brokerUrl Url of remote mqtt publisher + * @param topics topic name Array to subscribe to + * @param storageLevel RDD storage level. + * @param clientId ClientId to use for the mqtt connection + * @param username Username for authentication to the mqtt publisher + * @param password Password for authentication to the mqtt publisher + * @param cleanSession Sets the mqtt cleanSession parameter + * @param qos Quality of service to use for the topic subscription + * @param connectionTimeout Connection timeout for the mqtt connection + * @param keepAliveInterval Keepalive interal for the mqtt connection + * @param mqttVersion Version to use for the mqtt connection + */ +private[streaming] class MQTTPairedByteArrayInputDStream( + _ssc: StreamingContext, + brokerUrl: String, + topics: Array[String], + storageLevel: StorageLevel, + clientId: Option[String] = None, + username: Option[String] = None, + password: Option[String] = None, + cleanSession: Option[Boolean] = None, + qos: Option[Int] = None, + connectionTimeout: Option[Int] = None, + keepAliveInterval: Option[Int] = None, + mqttVersion: Option[Int] = None) extends ReceiverInputDStream[(String, Array[Byte])](_ssc) { + + private[streaming] override def name: String = s"MQTT stream [$id]" + + def getReceiver(): Receiver[(String, Array[Byte])] = { + new MQTTByteArrayPairReceiver(brokerUrl, topics, storageLevel, clientId, username, + password, cleanSession, qos, connectionTimeout, keepAliveInterval, mqttVersion) + } +} + +private[streaming] class MQTTByteArrayPairReceiver( + brokerUrl: String, + topics: Array[String], + storageLevel: StorageLevel, + clientId: Option[String], + username: Option[String], + password: Option[String], + cleanSession: Option[Boolean], + qos: Option[Int], + connectionTimeout: Option[Int], + keepAliveInterval: Option[Int], + mqttVersion: Option[Int]) extends Receiver[(String, Array[Byte])](storageLevel) { + + def onStop() { + + } + + def onStart() { + + // Set up persistence for messages + val persistence = new MemoryPersistence() + + // Initializing Mqtt Client specifying brokerUrl, clientID and MqttClientPersistance + val client = new MqttClient(brokerUrl, clientId.getOrElse(MqttClient.generateClientId()), + persistence) + + // Initialize mqtt parameters + val mqttConnectionOptions = new MqttConnectOptions() + if (username.isDefined && password.isDefined) { + mqttConnectionOptions.setUserName(username.get) + mqttConnectionOptions.setPassword(password.get.toCharArray) + } + mqttConnectionOptions.setCleanSession(cleanSession.getOrElse(true)) + if (connectionTimeout.isDefined) { + mqttConnectionOptions.setConnectionTimeout(connectionTimeout.get) + } + if (keepAliveInterval.isDefined) { + mqttConnectionOptions.setKeepAliveInterval(keepAliveInterval.get) + } + if (mqttVersion.isDefined) { + mqttConnectionOptions.setMqttVersion(mqttVersion.get) + } + + // Callback automatically triggers as and when new message arrives on specified topic + val callback = new MqttCallback() { + + // Handles Mqtt message + override def messageArrived(topic: String, message: MqttMessage) { + store((topic, message.getPayload())) + } + + override def deliveryComplete(token: IMqttDeliveryToken) { + } + + override def connectionLost(cause: Throwable) { + restart("Connection lost ", cause) + } + } + + // Set up callback for MqttClient. This needs to happen before + // connecting or subscribing, otherwise messages may be lost + client.setCallback(callback) + + // Connect to MqttBroker + client.connect(mqttConnectionOptions) + + // Subscribe to Mqtt topic + var i = 0 + val qosArray = Array.ofDim[Int](topics.length) + for (i <- qosArray.indices) { + qosArray(i) = qos.getOrElse(1) + } + client.subscribe(topics, qosArray) + + } +} + + + http://git-wip-us.apache.org/repos/asf/bahir/blob/e3d9e696/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTPairedInputDStream.scala ---------------------------------------------------------------------- diff --git a/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTPairedInputDStream.scala b/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTPairedInputDStream.scala index 050777b..ec89ed7 100644 --- a/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTPairedInputDStream.scala +++ b/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTPairedInputDStream.scala @@ -30,6 +30,7 @@ import org.apache.spark.streaming.receiver.Receiver /** * Input stream that subscribe messages from a Mqtt Broker. * Uses eclipse paho as MqttClient http://www.eclipse.org/paho/ + * @param _ssc Spark Streaming StreamingContext * @param brokerUrl Url of remote mqtt publisher * @param topics topic name Array to subscribe to * @param storageLevel RDD storage level. http://git-wip-us.apache.org/repos/asf/bahir/blob/e3d9e696/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala ---------------------------------------------------------------------- diff --git a/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala b/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala index 0accb80..f42275f 100644 --- a/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala +++ b/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala @@ -215,22 +215,39 @@ object MQTTUtils { new MQTTPairedInputDStream(ssc, brokerUrl, topics, storageLevel) } +/** +* Create an input stream that receives messages pushed by a MQTT publisher. + * @param ssc StreamingContext object + * @param brokerUrl Url of remote MQTT publisher + * @param topics Array of topic names to subscribe to + * @param storageLevel RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2. + */ - /** - * Create an input stream that receives messages pushed by a MQTT publisher. - * @param ssc StreamingContext object - * @param brokerUrl Url of remote MQTT publisher - * @param topics Array of topic names to subscribe to - * @param storageLevel RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2. - * @param clientId ClientId to use for the mqtt connection - * @param username Username for authentication to the mqtt publisher - * @param password Password for authentication to the mqtt publisher - * @param cleanSession Sets the mqtt cleanSession parameter - * @param qos Quality of service to use for the topic subscription - * @param connectionTimeout Connection timeout for the mqtt connection - * @param keepAliveInterval Keepalive interal for the mqtt connection - * @param mqttVersion Version to use for the mqtt connection - */ + def createPairedByteArrayStream( + ssc: StreamingContext, + brokerUrl: String, + topics: Array[String], + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 + ): ReceiverInputDStream[(String, Array[Byte])] = { + new MQTTPairedByteArrayInputDStream(ssc, brokerUrl, topics, storageLevel) + } + +/** + * Create an input stream that receives messages pushed by a MQTT publisher. + * + * @param ssc StreamingContext object + * @param brokerUrl Url of remote MQTT publisher + * @param topics Array of topic names to subscribe to + * @param storageLevel RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2. + * @param clientId ClientId to use for the mqtt connection + * @param username Username for authentication to the mqtt publisher + * @param password Password for authentication to the mqtt publisher + * @param cleanSession Sets the mqtt cleanSession parameter + * @param qos Quality of service to use for the topic subscription + * @param connectionTimeout Connection timeout for the mqtt connection + * @param keepAliveInterval Keepalive interal for the mqtt connection + * @param mqttVersion Version to use for the mqtt connection + */ def createPairedStream( ssc: StreamingContext, brokerUrl: String, @@ -246,57 +263,130 @@ object MQTTUtils { mqttVersion: Option[Int] ): ReceiverInputDStream[(String, String)] = { new MQTTPairedInputDStream(ssc, brokerUrl, topics, storageLevel, clientId, username, password, - cleanSession, qos, connectionTimeout, keepAliveInterval, mqttVersion) + cleanSession, qos, connectionTimeout, keepAliveInterval, mqttVersion) } - /** - * Create an input stream that receives messages pushed by a MQTT publisher. - * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2. - * @param jssc JavaStreamingContext object - * @param brokerUrl Url of remote MQTT publisher - * @param topics Array of topic names to subscribe to - */ +/** +* Create an input stream that receives messages pushed by a MQTT publisher. + * + * @param ssc StreamingContext object + * @param brokerUrl Url of remote MQTT publisher + * @param topics Array of topic names to subscribe to + * @param storageLevel RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2. + * @param clientId ClientId to use for the mqtt connection + * @param username Username for authentication to the mqtt publisher + * @param password Password for authentication to the mqtt publisher + * @param cleanSession Sets the mqtt cleanSession parameter + * @param qos Quality of service to use for the topic subscription + * @param connectionTimeout Connection timeout for the mqtt connection + * @param keepAliveInterval Keepalive interal for the mqtt connection + * @param mqttVersion Version to use for the mqtt connection + */ + def createPairedByteArrayStream( + ssc: StreamingContext, + brokerUrl: String, + topics: Array[String], + storageLevel: StorageLevel, + clientId: Option[String], + username: Option[String], + password: Option[String], + cleanSession: Option[Boolean], + qos: Option[Int], + connectionTimeout: Option[Int], + keepAliveInterval: Option[Int], + mqttVersion: Option[Int] + ): ReceiverInputDStream[(String, Array[Byte])] = { + new MQTTPairedByteArrayInputDStream(ssc, brokerUrl, topics, storageLevel, + clientId, username, password, cleanSession, qos, connectionTimeout, + keepAliveInterval, mqttVersion) + } + +/** +* Create an input stream that receives messages pushed by a MQTT publisher. + * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2. + * + * @param jssc JavaStreamingContext object + * @param brokerUrl Url of remote MQTT publisher + * @param topics Array of topic names to subscribe to + */ def createPairedStream( - jssc: JavaStreamingContext, - brokerUrl: String, - topics: Array[String] - ): JavaReceiverInputDStream[(String, String)] = { + jssc: JavaStreamingContext, + brokerUrl: String, + topics: Array[String] + ): JavaReceiverInputDStream[(String, String)] = { implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] createPairedStream(jssc.ssc, brokerUrl, topics) } - /** - * Create an input stream that receives messages pushed by a MQTT publisher. - * @param jssc JavaStreamingContext object - * @param brokerUrl Url of remote MQTT publisher - * @param topics Array of topic names to subscribe to - * @param storageLevel RDD storage level. - */ +/** +* Create an input stream that receives messages pushed by a MQTT publisher. + * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2. + * + * @param jssc JavaStreamingContext object + * @param brokerUrl Url of remote MQTT publisher + * @param topics Array of topic names to subscribe to + */ + def createPairedByteArrayStream( + jssc: JavaStreamingContext, + brokerUrl: String, + topics: Array[String] + ): JavaReceiverInputDStream[(String, Array[Byte])] = { + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] + createPairedByteArrayStream(jssc.ssc, brokerUrl, topics) + } + +/** +* Create an input stream that receives messages pushed by a MQTT publisher. + * + * @param jssc JavaStreamingContext object + * @param brokerUrl Url of remote MQTT publisher + * @param topics Array of topic names to subscribe to + * @param storageLevel RDD storage level. + */ def createPairedStream( - jssc: JavaStreamingContext, - brokerUrl: String, - topics: Array[String], - storageLevel: StorageLevel - ): JavaReceiverInputDStream[(String, String)] = { + jssc: JavaStreamingContext, + brokerUrl: String, + topics: Array[String], + storageLevel: StorageLevel + ): JavaReceiverInputDStream[(String, String)] = { implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] createPairedStream(jssc.ssc, brokerUrl, topics, storageLevel) } - /** - * Create an input stream that receives messages pushed by a MQTT publisher. - * @param jssc JavaStreamingContext object - * @param brokerUrl Url of remote MQTT publisher - * @param topics Array of topic names to subscribe to - * @param storageLevel RDD storage level. - * @param clientId ClientId to use for the mqtt connection - * @param username Username for authentication to the mqtt publisher - * @param password Password for authentication to the mqtt publisher - * @param cleanSession Sets the mqtt cleanSession parameter - * @param qos Quality of service to use for the topic subscription - * @param connectionTimeout Connection timeout for the mqtt connection - * @param keepAliveInterval Keepalive interal for the mqtt connection - * @param mqttVersion Version to use for the mqtt connection - */ +/** +* Create an input stream that receives messages pushed by a MQTT publisher. + * + * @param jssc JavaStreamingContext object + * @param brokerUrl Url of remote MQTT publisher + * @param topics Array of topic names to subscribe to + * @param storageLevel RDD storage level. + */ + def createPairedByteArrayStream( + jssc: JavaStreamingContext, + brokerUrl: String, + topics: Array[String], + storageLevel: StorageLevel + ): JavaReceiverInputDStream[(String, Array[Byte])] = { + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] + createPairedByteArrayStream(jssc.ssc, brokerUrl, topics, storageLevel) + } + +/** +* Create an input stream that receives messages pushed by a MQTT publisher. + * + * @param jssc JavaStreamingContext object + * @param brokerUrl Url of remote MQTT publisher + * @param topics Array of topic names to subscribe to + * @param storageLevel RDD storage level. + * @param clientId ClientId to use for the mqtt connection + * @param username Username for authentication to the mqtt publisher + * @param password Password for authentication to the mqtt publisher + * @param cleanSession Sets the mqtt cleanSession parameter + * @param qos Quality of service to use for the topic subscription + * @param connectionTimeout Connection timeout for the mqtt connection + * @param keepAliveInterval Keepalive interal for the mqtt connection + * @param mqttVersion Version to use for the mqtt connection + */ def createPairedStream( jssc: JavaStreamingContext, brokerUrl: String, @@ -317,20 +407,57 @@ object MQTTUtils { Option(connectionTimeout), Option(keepAliveInterval), Option(mqttVersion)) } - /** - * Create an input stream that receives messages pushed by a MQTT publisher. - * @param jssc JavaStreamingContext object - * @param brokerUrl Url of remote MQTT publisher - * @param topics Array of topic names to subscribe to - * @param clientId ClientId to use for the mqtt connection - * @param username Username for authentication to the mqtt publisher - * @param password Password for authentication to the mqtt publisher - * @param cleanSession Sets the mqtt cleanSession parameter - * @param qos Quality of service to use for the topic subscription - * @param connectionTimeout Connection timeout for the mqtt connection - * @param keepAliveInterval Keepalive interal for the mqtt connection - * @param mqttVersion Version to use for the mqtt connection - */ +/** +* Create an input stream that receives messages pushed by a MQTT publisher. + * + * @param jssc JavaStreamingContext object + * @param brokerUrl Url of remote MQTT publisher + * @param topics Array of topic names to subscribe to + * @param storageLevel RDD storage level. + * @param clientId ClientId to use for the mqtt connection + * @param username Username for authentication to the mqtt publisher + * @param password Password for authentication to the mqtt publisher + * @param cleanSession Sets the mqtt cleanSession parameter + * @param qos Quality of service to use for the topic subscription + * @param connectionTimeout Connection timeout for the mqtt connection + * @param keepAliveInterval Keepalive interal for the mqtt connection + * @param mqttVersion Version to use for the mqtt connection + */ + def createPairedByteArrayStream( + jssc: JavaStreamingContext, + brokerUrl: String, + topics: Array[String], + storageLevel: StorageLevel, + clientId: String, + username: String, + password: String, + cleanSession: Boolean, + qos: Int, + connectionTimeout: Int, + keepAliveInterval: Int, + mqttVersion: Int + ): JavaReceiverInputDStream[(String, Array[Byte])] = { + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] + createPairedByteArrayStream(jssc.ssc, brokerUrl, topics, storageLevel, Option(clientId), + Option(username), Option(password), Option(cleanSession), Option(qos), + Option(connectionTimeout), Option(keepAliveInterval), Option(mqttVersion)) + } + +/** +* Create an input stream that receives messages pushed by a MQTT publisher. + * + * @param jssc JavaStreamingContext object + * @param brokerUrl Url of remote MQTT publisher + * @param topics Array of topic names to subscribe to + * @param clientId ClientId to use for the mqtt connection + * @param username Username for authentication to the mqtt publisher + * @param password Password for authentication to the mqtt publisher + * @param cleanSession Sets the mqtt cleanSession parameter + * @param qos Quality of service to use for the topic subscription + * @param connectionTimeout Connection timeout for the mqtt connection + * @param keepAliveInterval Keepalive interal for the mqtt connection + * @param mqttVersion Version to use for the mqtt connection + */ def createPairedStream( jssc: JavaStreamingContext, brokerUrl: String, @@ -346,20 +473,56 @@ object MQTTUtils { ): JavaReceiverInputDStream[(String, String)] = { implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] createPairedStream(jssc.ssc, brokerUrl, topics, StorageLevel.MEMORY_AND_DISK_SER_2, - Option(clientId), Option(username), Option(password), Option(cleanSession), Option(qos), - Option(connectionTimeout), Option(keepAliveInterval), Option(mqttVersion)) + Option(clientId), Option(username), Option(password), Option(cleanSession), Option(qos), + Option(connectionTimeout), Option(keepAliveInterval), Option(mqttVersion)) } - /** - * Create an input stream that receives messages pushed by a MQTT publisher. - * @param jssc JavaStreamingContext object - * @param brokerUrl Url of remote MQTT publisher - * @param topics Array of topic names to subscribe to - * @param clientId ClientId to use for the mqtt connection - * @param username Username for authentication to the mqtt publisher - * @param password Password for authentication to the mqtt publisher - * @param cleanSession Sets the mqtt cleanSession parameter - */ +/** + * Create an input stream that receives messages pushed by a MQTT publisher. + * + * @param jssc JavaStreamingContext object + * @param brokerUrl Url of remote MQTT publisher + * @param topics Array of topic names to subscribe to + * @param clientId ClientId to use for the mqtt connection + * @param username Username for authentication to the mqtt publisher + * @param password Password for authentication to the mqtt publisher + * @param cleanSession Sets the mqtt cleanSession parameter + * @param qos Quality of service to use for the topic subscription + * @param connectionTimeout Connection timeout for the mqtt connection + * @param keepAliveInterval Keepalive interal for the mqtt connection + * @param mqttVersion Version to use for the mqtt connection + */ + def createPairedByteArrayStream( + jssc: JavaStreamingContext, + brokerUrl: String, + topics: Array[String], + clientId: String, + username: String, + password: String, + cleanSession: Boolean, + qos: Int, + connectionTimeout: Int, + keepAliveInterval: Int, + mqttVersion: Int + ): JavaReceiverInputDStream[(String, Array[Byte])] = { + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] + createPairedByteArrayStream(jssc.ssc, brokerUrl, topics, StorageLevel.MEMORY_AND_DISK_SER_2, + Option(clientId), Option(username), Option(password), Option(cleanSession), Option(qos), + Option(connectionTimeout), Option(keepAliveInterval), Option(mqttVersion)) + } + + +/** + * Create an input stream that receives messages pushed by a MQTT publisher. + * + * @param jssc JavaStreamingContext object + * @param brokerUrl Url of remote MQTT publisher + * @param topics Array of topic names to subscribe to + * @param clientId ClientId to use for the mqtt connection + * @param username Username for authentication to the mqtt publisher + * @param password Password for authentication to the mqtt publisher + * @param cleanSession Sets the mqtt cleanSession parameter + */ def createPairedStream( jssc: JavaStreamingContext, brokerUrl: String, @@ -368,12 +531,40 @@ object MQTTUtils { username: String, password: String, cleanSession: Boolean - ): JavaReceiverInputDStream[(String, String)] = { + ): JavaReceiverInputDStream[(String, String)] = { implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] createPairedStream(jssc.ssc, brokerUrl, topics, StorageLevel.MEMORY_AND_DISK_SER_2, - Option(clientId), Option(username), Option(password), Option(cleanSession), None, - None, None, None) + Option(clientId), Option(username), Option(password), Option(cleanSession), None, + None, None, None) } + + +/** + * Create an input stream that receives messages pushed by a MQTT publisher. + * + * @param jssc JavaStreamingContext object + * @param brokerUrl Url of remote MQTT publisher + * @param topics Array of topic names to subscribe to + * @param clientId ClientId to use for the mqtt connection + * @param username Username for authentication to the mqtt publisher + * @param password Password for authentication to the mqtt publisher + * @param cleanSession Sets the mqtt cleanSession parameter + */ + def createPairedByteArrayStream( + jssc: JavaStreamingContext, + brokerUrl: String, + topics: Array[String], + clientId: String, + username: String, + password: String, + cleanSession: Boolean + ): JavaReceiverInputDStream[(String, Array[Byte])] = { + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] + createPairedByteArrayStream(jssc.ssc, brokerUrl, topics, StorageLevel.MEMORY_AND_DISK_SER_2, + Option(clientId), Option(username), Option(password), Option(cleanSession), None, + None, None, None) + } + } /** @@ -398,4 +589,13 @@ private[mqtt] class MQTTUtilsPythonHelper { ): JavaDStream[(String, String)] = { MQTTUtils.createPairedStream(jssc, brokerUrl, topics, storageLevel) } + + def createPairedByteArrayStream( + jssc: JavaStreamingContext, + brokerUrl: String, + topics: Array[String], + storageLevel: StorageLevel + ): JavaDStream[(String, Array[Byte])] = { + MQTTUtils.createPairedByteArrayStream(jssc, brokerUrl, topics, storageLevel) + } } http://git-wip-us.apache.org/repos/asf/bahir/blob/e3d9e696/streaming-mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java ---------------------------------------------------------------------- diff --git a/streaming-mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java b/streaming-mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java index d320595..e30d187 100644 --- a/streaming-mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java +++ b/streaming-mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java @@ -49,8 +49,26 @@ public class JavaMQTTStreamSuite extends LocalJavaStreamingContext { brokerUrl, topics, StorageLevel.MEMORY_AND_DISK_SER_2(), "testid", "user", "password", true, 1, 10, 30, 3); JavaReceiverInputDStream<Tuple2<String, String>> test9 = MQTTUtils.createPairedStream(ssc, - brokerUrl, topics, "testid", "user", "password", true, 1, 10, 30, 3); + brokerUrl, topics, "testid", "user", "password", true, 1, + 10, 30, 3); JavaReceiverInputDStream<Tuple2<String, String>> test10 = MQTTUtils.createPairedStream(ssc, brokerUrl, topics, "testid", "user", "password", true); + JavaReceiverInputDStream<Tuple2<String, byte[]>> test11 = + MQTTUtils.createPairedByteArrayStream(ssc, brokerUrl, topics); + JavaReceiverInputDStream<Tuple2<String, byte[]>> test12 = + MQTTUtils.createPairedByteArrayStream(ssc, brokerUrl, topics, + StorageLevel.MEMORY_AND_DISK_SER_2()); + JavaReceiverInputDStream<Tuple2<String, byte[]>> test13 = + MQTTUtils.createPairedByteArrayStream(ssc, brokerUrl, topics, + StorageLevel.MEMORY_AND_DISK_SER_2(), "testid", "user", + "password", true, 1, 10, 30, 3); + JavaReceiverInputDStream<Tuple2<String, byte[]>> test14 = + MQTTUtils.createPairedByteArrayStream(ssc, brokerUrl, topics, + "testid", "user", "password", true, + 1, 10, 30, 3); + JavaReceiverInputDStream<Tuple2<String, byte[]>> test15 = + MQTTUtils.createPairedByteArrayStream(ssc, brokerUrl, topics, "testid", + "user", "password", true); + } } http://git-wip-us.apache.org/repos/asf/bahir/blob/e3d9e696/streaming-mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala ---------------------------------------------------------------------- diff --git a/streaming-mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala b/streaming-mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala index f1d9a20..6ef551b 100644 --- a/streaming-mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala +++ b/streaming-mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala @@ -101,4 +101,30 @@ class MQTTStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfter } ssc.stop() } + + test("mqtt input stream3") { + val sendMessage1 = "MQTT demo for spark streaming1" + val sendMessage2 = "MQTT demo for spark streaming2" + val receiveStream2 = MQTTUtils.createPairedByteArrayStream(ssc, + "tcp://" + mqttTestUtils.brokerUri, topics, StorageLevel.MEMORY_ONLY) + + @volatile var receiveMessage: List[String] = List() + receiveStream2.foreachRDD { rdd => + if (rdd.collect.length > 0) { + receiveMessage = receiveMessage ::: List(new String(rdd.first()._2)) + receiveMessage + } + } + + ssc.start() + + // Retry it because we don't know when the receiver will start. + eventually(timeout(10000 milliseconds), interval(100 milliseconds)) { + mqttTestUtils.publishData(topics(0), sendMessage1) + mqttTestUtils.publishData(topics(1), sendMessage2) + assert(receiveMessage.contains(sendMessage1)||receiveMessage.contains(sendMessage2)) + } + ssc.stop() + } + }
