Repository: bahir Updated Branches: refs/heads/master be1effaaf -> e79a960fa
[BAHIR-181] Add username and password in MQTTUtils for pyspark Closes #69 Project: http://git-wip-us.apache.org/repos/asf/bahir/repo Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/e79a960f Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/e79a960f Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/e79a960f Branch: refs/heads/master Commit: e79a960fa289ee6caefce43c37355a73d44b5220 Parents: be1effa Author: zhankeyu <ZisZ> Authored: Thu Nov 1 17:26:19 2018 +0800 Committer: Luciano Resende <[email protected]> Committed: Wed Nov 7 19:45:25 2018 -0800 ---------------------------------------------------------------------- streaming-mqtt/python/mqtt.py | 20 ++++++++++++ .../apache/spark/streaming/mqtt/MQTTUtils.scala | 33 ++++++++++++++++++++ 2 files changed, 53 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bahir/blob/e79a960f/streaming-mqtt/python/mqtt.py ---------------------------------------------------------------------- diff --git a/streaming-mqtt/python/mqtt.py b/streaming-mqtt/python/mqtt.py index ad71baf..68c34b7 100644 --- a/streaming-mqtt/python/mqtt.py +++ b/streaming-mqtt/python/mqtt.py @@ -44,6 +44,26 @@ class MQTTUtils(object): return DStream(jstream, ssc, UTF8Deserializer()) @staticmethod + def createStream(ssc, brokerUrl, topic, + username, password, + storageLevel=StorageLevel.MEMORY_AND_DISK_2): + """ + Create an input stream that pulls messages from a Mqtt Broker. + + :param ssc: StreamingContext object + :param brokerUrl: Url of remote mqtt publisher + :param topic: topic name to subscribe to + :param username: the vitual host name : username or username + :param password: the password of mqtt + :param storageLevel: RDD storage level. + :return: A DStream object + """ + jlevel = ssc._sc._getJavaStorageLevel(storageLevel) + helper = MQTTUtils._get_helper(ssc._sc) + jstream = helper.createStream(ssc._jssc, brokerUrl, topic, jlevel, username, password) + return DStream(jstream, ssc, UTF8Deserializer()) + + @staticmethod def createPairedStream(ssc, brokerUrl, topics, storageLevel=StorageLevel.MEMORY_AND_DISK_2): """ http://git-wip-us.apache.org/repos/asf/bahir/blob/e79a960f/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala ---------------------------------------------------------------------- diff --git a/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala b/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala index f42275f..ef0e99f 100644 --- a/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala +++ b/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala @@ -148,6 +148,28 @@ object MQTTUtils { * @param jssc JavaStreamingContext object * @param brokerUrl Url of remote MQTT publisher * @param topic Topic name to subscribe to + * @param storageLevel RDD storage level. + * @param username Username for authentication to the mqtt publisher + * @param password Password for authentication to the mqtt publisher + */ + def createStream( + jssc: JavaStreamingContext, + brokerUrl: String, + topic: String, + storageLevel: StorageLevel, + username: String, + password: String + ): JavaReceiverInputDStream[String] = { + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] + createStream(jssc.ssc, brokerUrl, topic, storageLevel, None, Option(username), + Option(password), None, None, None, None, None) + } + + /** + * Create an input stream that receives messages pushed by a MQTT publisher. + * @param jssc JavaStreamingContext object + * @param brokerUrl Url of remote MQTT publisher + * @param topic Topic name to subscribe to * @param clientId ClientId to use for the mqtt connection * @param username Username for authentication to the mqtt publisher * @param password Password for authentication to the mqtt publisher @@ -598,4 +620,15 @@ private[mqtt] class MQTTUtilsPythonHelper { ): JavaDStream[(String, Array[Byte])] = { MQTTUtils.createPairedByteArrayStream(jssc, brokerUrl, topics, storageLevel) } + + def createStream( + jssc: JavaStreamingContext, + brokerUrl: String, + topic: String, + storageLevel: StorageLevel, + username: String, + password: String + ): JavaDStream[String] = { + MQTTUtils.createStream(jssc, brokerUrl, topic, storageLevel, username, password) + } }
