Repository: bahir Updated Branches: refs/heads/master 8d46b3961 -> 826545cb8
[BAHIR-89] Multi topic API support for streaming MQTT New API which accept array of MQTT topics as input and return Tuple2<TopicName, Message> as output. It helps consume from multiple MQTT topics with efficient user of resources. Closes #37. Project: http://git-wip-us.apache.org/repos/asf/bahir/repo Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/826545cb Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/826545cb Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/826545cb Branch: refs/heads/master Commit: 826545cb8db4b89bbdb3927e53f555c0fa15771e Parents: 8d46b39 Author: Anntinu <[email protected]> Authored: Mon Feb 27 07:37:07 2017 +0530 Committer: Luciano Resende <[email protected]> Committed: Thu Mar 23 14:32:04 2017 -0700 ---------------------------------------------------------------------- .gitattributes | 13 -- .gitignore | 24 --- streaming-mqtt/README.md | 6 +- streaming-mqtt/python/mqtt.py | 17 ++ .../streaming/mqtt/MQTTPairedInputDStream.scala | 142 +++++++++++++++ .../apache/spark/streaming/mqtt/MQTTUtils.scala | 182 +++++++++++++++++++ .../streaming/mqtt/JavaMQTTStreamSuite.java | 15 +- .../spark/streaming/mqtt/MQTTStreamSuite.scala | 25 +++ 8 files changed, 385 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bahir/blob/826545cb/.gitattributes ---------------------------------------------------------------------- diff --git a/.gitattributes b/.gitattributes deleted file mode 100644 index a8edefd..0000000 --- a/.gitattributes +++ /dev/null @@ -1,13 +0,0 @@ -# Set the default behavior to have all files normalized to Unix-style -# line endings upon check-in. -* text=auto - -# Declare files that will always have CRLF line endings on checkout. -*.bat text eol=crlf - -# Denote all files that are truly binary and should not be modified. -*.dll binary -*.exp binary -*.lib binary -*.pdb binary -*.exe binary http://git-wip-us.apache.org/repos/asf/bahir/blob/826545cb/.gitignore ---------------------------------------------------------------------- diff --git a/.gitignore b/.gitignore deleted file mode 100644 index fb6d3b7..0000000 --- a/.gitignore +++ /dev/null @@ -1,24 +0,0 @@ -# Mac -.DS_Store - -# Eclipse -.classpath -.project -.settings/ -target/ - -# Intellij -.idea/ -.idea_modules/ -*.iml -*.iws -*.class -*.log - -# Python -*.pyc - -# Others -.checkstyle -.fbExcludeFilterFile -dependency-reduced-pom.xml http://git-wip-us.apache.org/repos/asf/bahir/blob/826545cb/streaming-mqtt/README.md ---------------------------------------------------------------------- diff --git a/streaming-mqtt/README.md b/streaming-mqtt/README.md index 872375d..6b89136 100644 --- a/streaming-mqtt/README.md +++ b/streaming-mqtt/README.md @@ -32,6 +32,7 @@ This source uses the [Eclipse Paho Java Client](https://eclipse.org/paho/clients * `brokerUrl` A url MqttClient connects to. Set this as the url of the Mqtt Server. e.g. tcp://localhost:1883. * `storageLevel` By default it is used for storing incoming messages on disk. * `topic` Topic MqttClient subscribes to. + * `topics` List of topics MqttClient subscribes to. * `clientId` clientId, this client is assoicated with. Provide the same value to recover a stopped client. * `QoS` The maximum quality of service to subscribe each topic at. Messages published at a lower quality of service will be received at the published QoS. Messages published at a higher quality of service will be received using the QoS specified on the subscribe. * `username` Sets the user name to use for the connection to Mqtt Server. Do not set it, if server does not need this. Setting it empty will lead to errors. @@ -50,11 +51,13 @@ You need to extend `ActorReceiver` so as to store received data into Spark using this actor can be configured to handle failures, etc. val lines = MQTTUtils.createStream(ssc, brokerUrl, topic) + val lines = MQTTUtils.createPairedStream(ssc, brokerUrl, topic) Additional mqtt connection options can be provided: ```Scala val lines = MQTTUtils.createStream(ssc, brokerUrl, topic, storageLevel, clientId, username, password, cleanSession, qos, connectionTimeout, keepAliveInterval, mqttVersion) +val lines = MQTTUtils.createPairedStream(ssc, brokerUrl, topics, storageLevel, clientId, username, password, cleanSession, qos, connectionTimeout, keepAliveInterval, mqttVersion) ``` ### Java API @@ -63,5 +66,6 @@ You need to extend `JavaActorReceiver` so as to store received data into Spark u this actor can be configured to handle failures, etc. JavaDStream<String> lines = MQTTUtils.createStream(jssc, brokerUrl, topic); + JavaReceiverInputDStream<Tuple2<String, String>> lines = MQTTUtils.createPairedStream(jssc, brokerUrl, topics); -See end-to-end examples at [MQTT Examples](https://github.com/apache/bahir/tree/master/streaming-mqtt/examples) \ No newline at end of file +See end-to-end examples at [MQTT Examples](https://github.com/apache/bahir/tree/master/streaming-mqtt/examples) http://git-wip-us.apache.org/repos/asf/bahir/blob/826545cb/streaming-mqtt/python/mqtt.py ---------------------------------------------------------------------- diff --git a/streaming-mqtt/python/mqtt.py b/streaming-mqtt/python/mqtt.py index c55b704..da00394 100644 --- a/streaming-mqtt/python/mqtt.py +++ b/streaming-mqtt/python/mqtt.py @@ -44,6 +44,23 @@ class MQTTUtils(object): return DStream(jstream, ssc, UTF8Deserializer()) @staticmethod + def createPairedStream(ssc, brokerUrl, topics, + storageLevel=StorageLevel.MEMORY_AND_DISK_2): + """ + Create an input stream that pulls messages from a Mqtt Broker. + + :param ssc: StreamingContext object + :param brokerUrl: Url of remote mqtt publisher + :param topics: topic names to subscribe to + :param storageLevel: RDD storage level. + :return: A DStream object + """ + jlevel = ssc._sc._getJavaStorageLevel(storageLevel) + helper = MQTTUtils._get_helper(ssc._sc) + jstream = helper.createStream(ssc._jssc, brokerUrl, topics, jlevel) + return DStream(jstream, ssc, UTF8Deserializer()) + + @staticmethod def _get_helper(sc): try: return sc._jvm.org.apache.spark.streaming.mqtt.MQTTUtilsPythonHelper() http://git-wip-us.apache.org/repos/asf/bahir/blob/826545cb/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTPairedInputDStream.scala ---------------------------------------------------------------------- diff --git a/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTPairedInputDStream.scala b/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTPairedInputDStream.scala new file mode 100644 index 0000000..050777b --- /dev/null +++ b/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTPairedInputDStream.scala @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.mqtt + +import java.nio.charset.StandardCharsets + +import org.eclipse.paho.client.mqttv3._ +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence + +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.dstream._ +import org.apache.spark.streaming.receiver.Receiver + +/** + * Input stream that subscribe messages from a Mqtt Broker. + * Uses eclipse paho as MqttClient http://www.eclipse.org/paho/ + * @param brokerUrl Url of remote mqtt publisher + * @param topics topic name Array to subscribe to + * @param storageLevel RDD storage level. + * @param clientId ClientId to use for the mqtt connection + * @param username Username for authentication to the mqtt publisher + * @param password Password for authentication to the mqtt publisher + * @param cleanSession Sets the mqtt cleanSession parameter + * @param qos Quality of service to use for the topic subscription + * @param connectionTimeout Connection timeout for the mqtt connection + * @param keepAliveInterval Keepalive interal for the mqtt connection + * @param mqttVersion Version to use for the mqtt connection + */ +private[streaming] class MQTTPairedInputDStream( + _ssc: StreamingContext, + brokerUrl: String, + topics: Array[String], + storageLevel: StorageLevel, + clientId: Option[String] = None, + username: Option[String] = None, + password: Option[String] = None, + cleanSession: Option[Boolean] = None, + qos: Option[Int] = None, + connectionTimeout: Option[Int] = None, + keepAliveInterval: Option[Int] = None, + mqttVersion: Option[Int] = None) extends ReceiverInputDStream[(String, String)](_ssc) { + + private[streaming] override def name: String = s"MQTT stream [$id]" + + def getReceiver(): Receiver[(String, String)] = { + new MQTTPairReceiver(brokerUrl, topics, storageLevel, clientId, username, + password, cleanSession, qos, connectionTimeout, keepAliveInterval, mqttVersion) + } +} + +private[streaming] class MQTTPairReceiver( + brokerUrl: String, + topics: Array[String], + storageLevel: StorageLevel, + clientId: Option[String], + username: Option[String], + password: Option[String], + cleanSession: Option[Boolean], + qos: Option[Int], + connectionTimeout: Option[Int], + keepAliveInterval: Option[Int], + mqttVersion: Option[Int]) extends Receiver[(String, String)](storageLevel) { + + def onStop() { + + } + + def onStart() { + + // Set up persistence for messages + val persistence = new MemoryPersistence() + + // Initializing Mqtt Client specifying brokerUrl, clientID and MqttClientPersistance + val client = new MqttClient(brokerUrl, clientId.getOrElse(MqttClient.generateClientId()), + persistence) + + // Initialize mqtt parameters + val mqttConnectionOptions = new MqttConnectOptions() + if (username.isDefined && password.isDefined) { + mqttConnectionOptions.setUserName(username.get) + mqttConnectionOptions.setPassword(password.get.toCharArray) + } + mqttConnectionOptions.setCleanSession(cleanSession.getOrElse(true)) + if (connectionTimeout.isDefined) { + mqttConnectionOptions.setConnectionTimeout(connectionTimeout.get) + } + if (keepAliveInterval.isDefined) { + mqttConnectionOptions.setKeepAliveInterval(keepAliveInterval.get) + } + if (mqttVersion.isDefined) { + mqttConnectionOptions.setMqttVersion(mqttVersion.get) + } + + // Callback automatically triggers as and when new message arrives on specified topic + val callback = new MqttCallback() { + + // Handles Mqtt message + override def messageArrived(topic: String, message: MqttMessage) { + store((topic, new String(message.getPayload(), StandardCharsets.UTF_8))) + } + + override def deliveryComplete(token: IMqttDeliveryToken) { + } + + override def connectionLost(cause: Throwable) { + restart("Connection lost ", cause) + } + } + + // Set up callback for MqttClient. This needs to happen before + // connecting or subscribing, otherwise messages may be lost + client.setCallback(callback) + + // Connect to MqttBroker + client.connect(mqttConnectionOptions) + + // Subscribe to Mqtt topic + var i = 0; + val qosArray = Array.ofDim[Int](topics.length); + for (i <- 0 to qosArray.length -1) { + qosArray(i) = qos.getOrElse(1); + } + client.subscribe(topics, qosArray) + + } +} http://git-wip-us.apache.org/repos/asf/bahir/blob/826545cb/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala ---------------------------------------------------------------------- diff --git a/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala b/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala index 7e2f5c7..0accb80 100644 --- a/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala +++ b/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala @@ -199,7 +199,181 @@ object MQTTUtils { createStream(jssc.ssc, brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2, Option(clientId), Option(username), Option(password), Option(cleanSession), None, None, None, None) } + /** + * Create an input stream that receives messages pushed by a MQTT publisher. + * @param ssc StreamingContext object + * @param brokerUrl Url of remote MQTT publisher + * @param topics Array of topic names to subscribe to + * @param storageLevel RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2. + */ + def createPairedStream( + ssc: StreamingContext, + brokerUrl: String, + topics: Array[String], + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 + ): ReceiverInputDStream[(String, String)] = { + new MQTTPairedInputDStream(ssc, brokerUrl, topics, storageLevel) + } + + /** + * Create an input stream that receives messages pushed by a MQTT publisher. + * @param ssc StreamingContext object + * @param brokerUrl Url of remote MQTT publisher + * @param topics Array of topic names to subscribe to + * @param storageLevel RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2. + * @param clientId ClientId to use for the mqtt connection + * @param username Username for authentication to the mqtt publisher + * @param password Password for authentication to the mqtt publisher + * @param cleanSession Sets the mqtt cleanSession parameter + * @param qos Quality of service to use for the topic subscription + * @param connectionTimeout Connection timeout for the mqtt connection + * @param keepAliveInterval Keepalive interal for the mqtt connection + * @param mqttVersion Version to use for the mqtt connection + */ + def createPairedStream( + ssc: StreamingContext, + brokerUrl: String, + topics: Array[String], + storageLevel: StorageLevel, + clientId: Option[String], + username: Option[String], + password: Option[String], + cleanSession: Option[Boolean], + qos: Option[Int], + connectionTimeout: Option[Int], + keepAliveInterval: Option[Int], + mqttVersion: Option[Int] + ): ReceiverInputDStream[(String, String)] = { + new MQTTPairedInputDStream(ssc, brokerUrl, topics, storageLevel, clientId, username, password, + cleanSession, qos, connectionTimeout, keepAliveInterval, mqttVersion) + } + + /** + * Create an input stream that receives messages pushed by a MQTT publisher. + * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2. + * @param jssc JavaStreamingContext object + * @param brokerUrl Url of remote MQTT publisher + * @param topics Array of topic names to subscribe to + */ + def createPairedStream( + jssc: JavaStreamingContext, + brokerUrl: String, + topics: Array[String] + ): JavaReceiverInputDStream[(String, String)] = { + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] + createPairedStream(jssc.ssc, brokerUrl, topics) + } + + /** + * Create an input stream that receives messages pushed by a MQTT publisher. + * @param jssc JavaStreamingContext object + * @param brokerUrl Url of remote MQTT publisher + * @param topics Array of topic names to subscribe to + * @param storageLevel RDD storage level. + */ + def createPairedStream( + jssc: JavaStreamingContext, + brokerUrl: String, + topics: Array[String], + storageLevel: StorageLevel + ): JavaReceiverInputDStream[(String, String)] = { + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] + createPairedStream(jssc.ssc, brokerUrl, topics, storageLevel) + } + + /** + * Create an input stream that receives messages pushed by a MQTT publisher. + * @param jssc JavaStreamingContext object + * @param brokerUrl Url of remote MQTT publisher + * @param topics Array of topic names to subscribe to + * @param storageLevel RDD storage level. + * @param clientId ClientId to use for the mqtt connection + * @param username Username for authentication to the mqtt publisher + * @param password Password for authentication to the mqtt publisher + * @param cleanSession Sets the mqtt cleanSession parameter + * @param qos Quality of service to use for the topic subscription + * @param connectionTimeout Connection timeout for the mqtt connection + * @param keepAliveInterval Keepalive interal for the mqtt connection + * @param mqttVersion Version to use for the mqtt connection + */ + def createPairedStream( + jssc: JavaStreamingContext, + brokerUrl: String, + topics: Array[String], + storageLevel: StorageLevel, + clientId: String, + username: String, + password: String, + cleanSession: Boolean, + qos: Int, + connectionTimeout: Int, + keepAliveInterval: Int, + mqttVersion: Int + ): JavaReceiverInputDStream[(String, String)] = { + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] + createPairedStream(jssc.ssc, brokerUrl, topics, storageLevel, Option(clientId), + Option(username), Option(password), Option(cleanSession), Option(qos), + Option(connectionTimeout), Option(keepAliveInterval), Option(mqttVersion)) + } + + /** + * Create an input stream that receives messages pushed by a MQTT publisher. + * @param jssc JavaStreamingContext object + * @param brokerUrl Url of remote MQTT publisher + * @param topics Array of topic names to subscribe to + * @param clientId ClientId to use for the mqtt connection + * @param username Username for authentication to the mqtt publisher + * @param password Password for authentication to the mqtt publisher + * @param cleanSession Sets the mqtt cleanSession parameter + * @param qos Quality of service to use for the topic subscription + * @param connectionTimeout Connection timeout for the mqtt connection + * @param keepAliveInterval Keepalive interal for the mqtt connection + * @param mqttVersion Version to use for the mqtt connection + */ + def createPairedStream( + jssc: JavaStreamingContext, + brokerUrl: String, + topics: Array[String], + clientId: String, + username: String, + password: String, + cleanSession: Boolean, + qos: Int, + connectionTimeout: Int, + keepAliveInterval: Int, + mqttVersion: Int + ): JavaReceiverInputDStream[(String, String)] = { + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] + createPairedStream(jssc.ssc, brokerUrl, topics, StorageLevel.MEMORY_AND_DISK_SER_2, + Option(clientId), Option(username), Option(password), Option(cleanSession), Option(qos), + Option(connectionTimeout), Option(keepAliveInterval), Option(mqttVersion)) + } + + /** + * Create an input stream that receives messages pushed by a MQTT publisher. + * @param jssc JavaStreamingContext object + * @param brokerUrl Url of remote MQTT publisher + * @param topics Array of topic names to subscribe to + * @param clientId ClientId to use for the mqtt connection + * @param username Username for authentication to the mqtt publisher + * @param password Password for authentication to the mqtt publisher + * @param cleanSession Sets the mqtt cleanSession parameter + */ + def createPairedStream( + jssc: JavaStreamingContext, + brokerUrl: String, + topics: Array[String], + clientId: String, + username: String, + password: String, + cleanSession: Boolean + ): JavaReceiverInputDStream[(String, String)] = { + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] + createPairedStream(jssc.ssc, brokerUrl, topics, StorageLevel.MEMORY_AND_DISK_SER_2, + Option(clientId), Option(username), Option(password), Option(cleanSession), None, + None, None, None) + } } /** @@ -216,4 +390,12 @@ private[mqtt] class MQTTUtilsPythonHelper { ): JavaDStream[String] = { MQTTUtils.createStream(jssc, brokerUrl, topic, storageLevel) } + def createPairedStream( + jssc: JavaStreamingContext, + brokerUrl: String, + topics: Array[String], + storageLevel: StorageLevel + ): JavaDStream[(String, String)] = { + MQTTUtils.createPairedStream(jssc, brokerUrl, topics, storageLevel) + } } http://git-wip-us.apache.org/repos/asf/bahir/blob/826545cb/streaming-mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java ---------------------------------------------------------------------- diff --git a/streaming-mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java b/streaming-mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java index 45332d9..d320595 100644 --- a/streaming-mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java +++ b/streaming-mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java @@ -18,16 +18,18 @@ package org.apache.spark.streaming.mqtt; import org.apache.spark.storage.StorageLevel; +import org.apache.spark.streaming.LocalJavaStreamingContext; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.junit.Test; -import org.apache.spark.streaming.LocalJavaStreamingContext; +import scala.Tuple2; public class JavaMQTTStreamSuite extends LocalJavaStreamingContext { @Test public void testMQTTStream() { String brokerUrl = "abc"; String topic = "def"; + String[] topics = {"def1","def2"}; // tests the API, does not actually test data receiving JavaReceiverInputDStream<String> test1 = MQTTUtils.createStream(ssc, brokerUrl, topic); @@ -39,5 +41,16 @@ public class JavaMQTTStreamSuite extends LocalJavaStreamingContext { "testid", "user", "password", true, 1, 10, 30, 3); JavaReceiverInputDStream<String> test5 = MQTTUtils.createStream(ssc, brokerUrl, topic, "testid", "user", "password", true); + JavaReceiverInputDStream<Tuple2<String, String>> test6 = MQTTUtils.createPairedStream(ssc, + brokerUrl, topics); + JavaReceiverInputDStream<Tuple2<String, String>> test7 = MQTTUtils.createPairedStream(ssc, + brokerUrl, topics, StorageLevel.MEMORY_AND_DISK_SER_2()); + JavaReceiverInputDStream<Tuple2<String, String>> test8 = MQTTUtils.createPairedStream(ssc, + brokerUrl, topics, StorageLevel.MEMORY_AND_DISK_SER_2(), "testid", "user", + "password", true, 1, 10, 30, 3); + JavaReceiverInputDStream<Tuple2<String, String>> test9 = MQTTUtils.createPairedStream(ssc, + brokerUrl, topics, "testid", "user", "password", true, 1, 10, 30, 3); + JavaReceiverInputDStream<Tuple2<String, String>> test10 = MQTTUtils.createPairedStream(ssc, + brokerUrl, topics, "testid", "user", "password", true); } } http://git-wip-us.apache.org/repos/asf/bahir/blob/826545cb/streaming-mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala ---------------------------------------------------------------------- diff --git a/streaming-mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala b/streaming-mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala index fdcd18c..f1d9a20 100644 --- a/streaming-mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala +++ b/streaming-mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala @@ -33,6 +33,7 @@ class MQTTStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfter private val master = "local[2]" private val framework = this.getClass.getSimpleName private val topic = "def" + private val topics = Array("def1", "def2") private var ssc: StreamingContext = _ private var mqttTestUtils: MQTTTestUtils = _ @@ -76,4 +77,28 @@ class MQTTStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfter } ssc.stop() } + test("mqtt input stream2") { + val sendMessage1 = "MQTT demo for spark streaming1" + val sendMessage2 = "MQTT demo for spark streaming2" + val receiveStream2 = MQTTUtils.createPairedStream(ssc, "tcp://" + mqttTestUtils.brokerUri, + topics, StorageLevel.MEMORY_ONLY) + + @volatile var receiveMessage: List[String] = List() + receiveStream2.foreachRDD { rdd => + if (rdd.collect.length > 0) { + receiveMessage = receiveMessage ::: List(rdd.first()._2) + receiveMessage + } + } + + ssc.start() + + // Retry it because we don't know when the receiver will start. + eventually(timeout(10000 milliseconds), interval(100 milliseconds)) { + mqttTestUtils.publishData(topics(0), sendMessage1) + mqttTestUtils.publishData(topics(1), sendMessage2) + assert(receiveMessage.contains(sendMessage1)||receiveMessage.contains(sendMessage2)) + } + ssc.stop() + } }
