Repository: bahir Updated Branches: refs/heads/master eae02f29e -> 770b2916f
[BAHIR-104] Multi-topic MQTT DStream in Python is now a PairRDD. Closes #55 Project: http://git-wip-us.apache.org/repos/asf/bahir/repo Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/770b2916 Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/770b2916 Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/770b2916 Branch: refs/heads/master Commit: 770b2916f0a7603b62ef997a0ea98b38c6da15c0 Parents: eae02f2 Author: Zubair Nabi <[email protected]> Authored: Wed Nov 29 10:45:28 2017 +0500 Committer: Luciano Resende <[email protected]> Committed: Wed Dec 20 16:44:12 2017 -0800 ---------------------------------------------------------------------- streaming-mqtt/README.md | 15 +++++++++++++++ streaming-mqtt/python-tests/tests.py | 29 +++++++++++++++++++++++++++++ streaming-mqtt/python/mqtt.py | 15 ++++++++++++--- 3 files changed, 56 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bahir/blob/770b2916/streaming-mqtt/README.md ---------------------------------------------------------------------- diff --git a/streaming-mqtt/README.md b/streaming-mqtt/README.md index 644e5fb..05542de 100644 --- a/streaming-mqtt/README.md +++ b/streaming-mqtt/README.md @@ -72,3 +72,18 @@ this actor can be configured to handle failures, etc. JavaReceiverInputDStream<Tuple2<String, String>> lines = MQTTUtils.createPairedByteArrayStream(jssc, brokerUrl, topics); See end-to-end examples at [MQTT Examples](https://github.com/apache/bahir/tree/master/streaming-mqtt/examples) + + +### Python API + +Create a DStream from a single topic. + +```Python + MQTTUtils.createStream(ssc, broker_url, topic) +``` + +Create a DStream from a list of topics. + +```Python + MQTTUtils.createPairedStream(ssc, broker_url, topics) +``` \ No newline at end of file http://git-wip-us.apache.org/repos/asf/bahir/blob/770b2916/streaming-mqtt/python-tests/tests.py ---------------------------------------------------------------------- diff --git a/streaming-mqtt/python-tests/tests.py b/streaming-mqtt/python-tests/tests.py index 749313f..978de77 100644 --- a/streaming-mqtt/python-tests/tests.py +++ b/streaming-mqtt/python-tests/tests.py @@ -83,6 +83,35 @@ class MQTTStreamTests(PySparkStreamingTestCase): # Retry it because we don't know when the receiver will start. self._retry_or_timeout(retry) + def _start_context_with_paired_stream(self, topics): + stream = MQTTUtils.createPairedStream(self.ssc, "tcp://" + self._MQTTTestUtils.brokerUri(), topics) + # Keep a set because records can potentially be repeated. + result = set() + + def getOutput(_, rdd): + for data in rdd.collect(): + result.add(data) + + stream.foreachRDD(getOutput) + self.ssc.start() + return result + + def test_mqtt_pair_stream(self): + """Test the Python MQTT stream API with multiple topics.""" + data_records = ["random string 1", "random string 2", "random string 3"] + topics = [self._randomTopic(), self._randomTopic(), self._randomTopic()] + topics_and_records = zip(topics, data_records) + result = self._start_context_with_paired_stream(topics) + + def retry(): + for topic, data_record in topics_and_records: + self._MQTTTestUtils.publishData(topic, data_record) + # Sort the received records as they might be out of order. + self.assertEqual(topics_and_records, sorted(result, key=lambda x: x[1])) + + # Retry it because we don't know when the receiver will start. + self._retry_or_timeout(retry) + def _retry_or_timeout(self, test_func): start_time = time.time() while True: http://git-wip-us.apache.org/repos/asf/bahir/blob/770b2916/streaming-mqtt/python/mqtt.py ---------------------------------------------------------------------- diff --git a/streaming-mqtt/python/mqtt.py b/streaming-mqtt/python/mqtt.py index da00394..ad71baf 100644 --- a/streaming-mqtt/python/mqtt.py +++ b/streaming-mqtt/python/mqtt.py @@ -18,7 +18,7 @@ from py4j.protocol import Py4JJavaError from pyspark.storagelevel import StorageLevel -from pyspark.serializers import UTF8Deserializer +from pyspark.serializers import UTF8Deserializer, PairDeserializer from pyspark.streaming import DStream __all__ = ['MQTTUtils'] @@ -57,8 +57,17 @@ class MQTTUtils(object): """ jlevel = ssc._sc._getJavaStorageLevel(storageLevel) helper = MQTTUtils._get_helper(ssc._sc) - jstream = helper.createStream(ssc._jssc, brokerUrl, topics, jlevel) - return DStream(jstream, ssc, UTF8Deserializer()) + topics_array = MQTTUtils._list_to_java_string_array(ssc._sc, topics) + jstream = helper.createPairedStream(ssc._jssc, brokerUrl, topics_array, jlevel) + return DStream(jstream, ssc, PairDeserializer(UTF8Deserializer(), UTF8Deserializer())) + + @staticmethod + def _list_to_java_string_array(sc, list_): + len_arr = len(list_) + arr = sc._gateway.new_array(sc._jvm.String, len_arr) + for i in range(len_arr): + arr[i] = list_[i] + return arr @staticmethod def _get_helper(sc):
