Repository: bahir
Updated Branches:
  refs/heads/master eae02f29e -> 770b2916f


[BAHIR-104] Multi-topic MQTT DStream in Python is now a PairRDD.

Closes #55


Project: http://git-wip-us.apache.org/repos/asf/bahir/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/770b2916
Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/770b2916
Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/770b2916

Branch: refs/heads/master
Commit: 770b2916f0a7603b62ef997a0ea98b38c6da15c0
Parents: eae02f2
Author: Zubair Nabi <[email protected]>
Authored: Wed Nov 29 10:45:28 2017 +0500
Committer: Luciano Resende <[email protected]>
Committed: Wed Dec 20 16:44:12 2017 -0800

----------------------------------------------------------------------
 streaming-mqtt/README.md             | 15 +++++++++++++++
 streaming-mqtt/python-tests/tests.py | 29 +++++++++++++++++++++++++++++
 streaming-mqtt/python/mqtt.py        | 15 ++++++++++++---
 3 files changed, 56 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bahir/blob/770b2916/streaming-mqtt/README.md
----------------------------------------------------------------------
diff --git a/streaming-mqtt/README.md b/streaming-mqtt/README.md
index 644e5fb..05542de 100644
--- a/streaming-mqtt/README.md
+++ b/streaming-mqtt/README.md
@@ -72,3 +72,18 @@ this actor can be configured to handle failures, etc.
     JavaReceiverInputDStream<Tuple2<String, String>> lines = 
MQTTUtils.createPairedByteArrayStream(jssc, brokerUrl, topics);
 
 See end-to-end examples at [MQTT 
Examples](https://github.com/apache/bahir/tree/master/streaming-mqtt/examples)
+
+
+### Python API
+
+Create a DStream from a single topic.
+
+```Python
+       MQTTUtils.createStream(ssc, broker_url, topic)
+```
+
+Create a DStream from a list of topics.
+
+```Python
+       MQTTUtils.createPairedStream(ssc, broker_url, topics)
+```
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/bahir/blob/770b2916/streaming-mqtt/python-tests/tests.py
----------------------------------------------------------------------
diff --git a/streaming-mqtt/python-tests/tests.py 
b/streaming-mqtt/python-tests/tests.py
index 749313f..978de77 100644
--- a/streaming-mqtt/python-tests/tests.py
+++ b/streaming-mqtt/python-tests/tests.py
@@ -83,6 +83,35 @@ class MQTTStreamTests(PySparkStreamingTestCase):
         # Retry it because we don't know when the receiver will start.
         self._retry_or_timeout(retry)
 
+    def _start_context_with_paired_stream(self, topics):
+        stream = MQTTUtils.createPairedStream(self.ssc, "tcp://" + 
self._MQTTTestUtils.brokerUri(), topics)
+        # Keep a set because records can potentially be repeated.
+        result = set()
+
+        def getOutput(_, rdd):
+            for data in rdd.collect():
+                result.add(data)
+
+        stream.foreachRDD(getOutput)
+        self.ssc.start()
+        return result
+
+    def test_mqtt_pair_stream(self):
+        """Test the Python MQTT stream API with multiple topics."""
+        data_records = ["random string 1", "random string 2", "random string 
3"]
+        topics = [self._randomTopic(), self._randomTopic(), 
self._randomTopic()]
+        topics_and_records = zip(topics, data_records)
+        result = self._start_context_with_paired_stream(topics)
+
+        def retry():
+            for topic, data_record in topics_and_records:
+                self._MQTTTestUtils.publishData(topic, data_record)
+            # Sort the received records as they might be out of order.
+            self.assertEqual(topics_and_records, sorted(result, key=lambda x: 
x[1]))
+
+        # Retry it because we don't know when the receiver will start.
+        self._retry_or_timeout(retry)
+
     def _retry_or_timeout(self, test_func):
         start_time = time.time()
         while True:

http://git-wip-us.apache.org/repos/asf/bahir/blob/770b2916/streaming-mqtt/python/mqtt.py
----------------------------------------------------------------------
diff --git a/streaming-mqtt/python/mqtt.py b/streaming-mqtt/python/mqtt.py
index da00394..ad71baf 100644
--- a/streaming-mqtt/python/mqtt.py
+++ b/streaming-mqtt/python/mqtt.py
@@ -18,7 +18,7 @@
 from py4j.protocol import Py4JJavaError
 
 from pyspark.storagelevel import StorageLevel
-from pyspark.serializers import UTF8Deserializer
+from pyspark.serializers import UTF8Deserializer, PairDeserializer
 from pyspark.streaming import DStream
 
 __all__ = ['MQTTUtils']
@@ -57,8 +57,17 @@ class MQTTUtils(object):
         """
         jlevel = ssc._sc._getJavaStorageLevel(storageLevel)
         helper = MQTTUtils._get_helper(ssc._sc)
-        jstream = helper.createStream(ssc._jssc, brokerUrl, topics, jlevel)
-        return DStream(jstream, ssc, UTF8Deserializer())
+        topics_array = MQTTUtils._list_to_java_string_array(ssc._sc, topics)
+        jstream = helper.createPairedStream(ssc._jssc, brokerUrl, 
topics_array, jlevel)
+        return DStream(jstream, ssc, PairDeserializer(UTF8Deserializer(), 
UTF8Deserializer()))
+
+    @staticmethod
+    def _list_to_java_string_array(sc, list_):
+        len_arr = len(list_)
+        arr = sc._gateway.new_array(sc._jvm.String, len_arr)
+        for i in range(len_arr):
+            arr[i] = list_[i]
+        return arr
 
     @staticmethod
     def _get_helper(sc):

Reply via email to