Repository: bahir
Updated Branches:
  refs/heads/master 48e91fca5 -> 12f130846


[BAHIR-24] fix MQTT Python code, examples, add tests

Changes in this PR:

- remove unnecessary files from streaming-mqtt/python
- updated all *.py files with respect to the modified
  project structure pyspark.streaming.mqtt --> mqtt
- add test cases that were left out from the import and
  add shell script to run them:
    - streaming-mqtt/python-tests/run-python-tests.sh
    - streaming-mqtt/python-tests/tests.py
- modify MQTTTestUtils.scala to limit the required disk storage space
- modify bin/run-example script to setup PYTHONPATH to run Python examples

Closes #10


Project: http://git-wip-us.apache.org/repos/asf/bahir/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/12f13084
Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/12f13084
Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/12f13084

Branch: refs/heads/master
Commit: 12f130846ef7523138e98e79bfd823f61acab3b3
Parents: 48e91fc
Author: Christian Kadner <[email protected]>
Authored: Fri Jul 15 17:49:40 2016 -0700
Committer: Luciano Resende <[email protected]>
Committed: Sat Jul 23 09:19:57 2016 -0700

----------------------------------------------------------------------
 .gitignore                                      |   3 +
 bin/run-example                                 |  25 +-
 pom.xml                                         |   2 +-
 .../src/main/python/streaming/mqtt_wordcount.py |  34 +-
 streaming-mqtt/python-tests/run-python-tests.sh |  79 +++
 streaming-mqtt/python-tests/tests.py            |  99 +++
 streaming-mqtt/python/__init__.py               |  22 -
 streaming-mqtt/python/dstream.py                | 643 -------------------
 streaming-mqtt/python/mqtt.py                   |  27 +-
 .../spark/streaming/mqtt/MQTTTestUtils.scala    |   6 +
 10 files changed, 247 insertions(+), 693 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bahir/blob/12f13084/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 1801b67..fb6d3b7 100644
--- a/.gitignore
+++ b/.gitignore
@@ -15,6 +15,9 @@ target/
 *.class
 *.log
 
+# Python
+*.pyc
+
 # Others
 .checkstyle
 .fbExcludeFilterFile

http://git-wip-us.apache.org/repos/asf/bahir/blob/12f13084/bin/run-example
----------------------------------------------------------------------
diff --git a/bin/run-example b/bin/run-example
index 6f3cb39..483d853 100755
--- a/bin/run-example
+++ b/bin/run-example
@@ -16,6 +16,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+set -o pipefail
 
 # make sure Spark home is set and valid
 if [ -z "${SPARK_HOME}" ]; then
@@ -61,8 +62,8 @@ DESCRIPTION
 
 USAGE EXAMPLES
 EOF
-  grep -R "bin/run-example org.apache" --no-filename --include="*.scala" 
--include="*.java" "${project_dir}" | tr '`' ' ' | sed 's/^ *\* * /  /g' ; \
-  grep -R -A1 "bin/run-example \\\\"   --no-filename --include="*.scala" 
--include="*.java" "${project_dir}" | tr '`' ' ' | sed 's/^ *\* * /  /g' | sed 
'/^--$/d' | sed 'N;s/\\\n *//g'
+  grep -R "bin/run-example org.apache" --no-filename 
--include=*.{scala,java,py} "${project_dir}" | tr '`' ' ' | sed 's/^ *\* * /    
 /g' ; \
+  grep -R -A1 "bin/run-example \\\\"   --no-filename 
--include=*.{scala,java,py} "${project_dir}" | tr '`' ' ' | sed 's/^ *\* * /    
 /g' | sed '/^--$/d' | sed 'N;s/\\\n *//g'
   exit 1
 }
 
@@ -122,10 +123,22 @@ examples_jar="${module_tests_jar_path}"
 #      
streaming-akka/target/spark-streaming-akka_2.11-2.0.0-SNAPSHOT-tests.jar \
 #    localhost 9999
 
-# capture the full command line and echo it for transparency and debug purposes
-cmd="${SPARK_HOME}/bin/spark-submit \
-  --packages ${spark_package} \
-  --class ${example_class} ${examples_jar} ${example_args}"
+# for Python examples add all of the Bahir project's Python sources to 
PYTHONPATH, which in local
+# mode is easier than creating a zip files to be used with the --py-files 
option (TODO: BAHIR-35)
+# Note that --py-files with individual *.py files does not work if those 
modules are imported at top
+# of the example script but rather imports must be pushed down to after 
SparkContext initialization
+if [[ "$example_class" == *.py ]]; then
+  export PYTHONPATH="$( find "$project_dir" -path '*/python' -maxdepth 5 -type 
d | tr '\n' ':' )$PYTHONPATH"
+  cmd="${SPARK_HOME}/bin/spark-submit \
+    --packages ${spark_package} \
+    ${example_class} \
+    ${example_args}"
+else
+  cmd="${SPARK_HOME}/bin/spark-submit \
+    --packages ${spark_package} \
+    --class ${example_class} ${examples_jar} \
+    ${example_args}"
+fi
 
 echo "---"
 echo "Spark-Submit command: $cmd"

http://git-wip-us.apache.org/repos/asf/bahir/blob/12f13084/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index bbfa340..b9158a1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -95,7 +95,7 @@
     <log4j.version>1.2.17</log4j.version>
 
     <!-- Spark version -->
-    <spark.version>2.0.0-SNAPSHOT</spark.version>
+    <spark.version>2.0.1-SNAPSHOT</spark.version>
 
     <!-- Streaming Akka connector -->
     <akka.group>com.typesafe.akka</akka.group>

http://git-wip-us.apache.org/repos/asf/bahir/blob/12f13084/streaming-mqtt/examples/src/main/python/streaming/mqtt_wordcount.py
----------------------------------------------------------------------
diff --git 
a/streaming-mqtt/examples/src/main/python/streaming/mqtt_wordcount.py 
b/streaming-mqtt/examples/src/main/python/streaming/mqtt_wordcount.py
index abf9c0e..19838dc 100644
--- a/streaming-mqtt/examples/src/main/python/streaming/mqtt_wordcount.py
+++ b/streaming-mqtt/examples/src/main/python/streaming/mqtt_wordcount.py
@@ -20,23 +20,35 @@
  Usage: mqtt_wordcount.py <broker url> <topic>
 
  To run this in your local machine, you need to setup a MQTT broker and 
publisher first,
- Mosquitto is one of the open source MQTT Brokers, see
- http://mosquitto.org/
- Eclipse paho project provides number of clients and utilities for working 
with MQTT, see
- http://www.eclipse.org/paho/#getting-started
-
- and then run the example
-    `$ bin/spark-submit --jars \
-      
external/mqtt-assembly/target/scala-*/spark-streaming-mqtt-assembly-*.jar \
-      examples/src/main/python/streaming/mqtt_wordcount.py \
-      tcp://localhost:1883 foo`
+ like Mosquitto (http://mosquitto.org/) an easy to use and install open source 
MQTT Broker.
+ On Mac OS Mosquitto can be installed with Homebrew `$ brew install mosquitto`.
+ On Ubuntu mosquitto can be installed with the command `$ sudo apt-get install 
mosquitto`.
+
+ Alternatively, the Eclipse paho project provides a number of clients and 
utilities for
+ working with MQTT, see http://www.eclipse.org/paho/#getting-started
+
+ How to run this example locally:
+
+ (1) Start Mqtt message broker/server, i.e. Mosquitto:
+
+    `$ mosquitto -p 1883`
+
+ (2) Run the publisher:
+
+    `$ bin/run-example \
+      org.apache.spark.examples.streaming.mqtt.MQTTPublisher 
tcp://localhost:1883 foo`
+
+ (3) Run the example:
+
+    `$ bin/run-example \
+      streaming-mqtt/examples/src/main/python/streaming/mqtt_wordcount.py 
tcp://localhost:1883 foo`
 """
 
 import sys
 
 from pyspark import SparkContext
 from pyspark.streaming import StreamingContext
-from pyspark.streaming.mqtt import MQTTUtils
+from mqtt import MQTTUtils
 
 if __name__ == "__main__":
     if len(sys.argv) != 3:

http://git-wip-us.apache.org/repos/asf/bahir/blob/12f13084/streaming-mqtt/python-tests/run-python-tests.sh
----------------------------------------------------------------------
diff --git a/streaming-mqtt/python-tests/run-python-tests.sh 
b/streaming-mqtt/python-tests/run-python-tests.sh
new file mode 100755
index 0000000..557b164
--- /dev/null
+++ b/streaming-mqtt/python-tests/run-python-tests.sh
@@ -0,0 +1,79 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+set -o pipefail
+
+# make sure Spark home is set and valid
+if [ -z "${SPARK_HOME}" ]; then
+  echo "SPARK_HOME is not set" >&2
+  exit 1
+elif [ ! -d "${SPARK_HOME}" ]; then
+  echo "SPARK_HOME does not point to a valid directory" >&2
+  exit 1
+fi
+
+# pinpoint the module folder and project root folder
+bin_dir=$( dirname "$0" )
+module_dir=$( cd "${bin_dir}/.." && pwd -P )
+project_dir=$( cd "${module_dir}/.." && pwd -P )
+stdout_log="${module_dir}/target/python-tests-python-output.log"
+stderr_log="${module_dir}/target/python-tests-java-output.log"
+
+# use the module name to find the tests jar file that contains the example to 
run
+module_name=${module_dir#"${project_dir}"/}
+module_tests_jar_path=$( find "${module_dir}/target" -name 
"*${module_name}*-tests.jar" -maxdepth 1 | head -1 )
+
+if [ -z "${module_tests_jar_path}" ] || [ ! -e "${module_tests_jar_path}" ]; 
then
+  echo "Could not find module tests jar file in ${module_dir}/target/" >&2
+  echo "Run \"mvn clean install\" and retry running this example" >&2
+  exit 1
+fi
+
+# use maven-help-plugin to determine project version and Scala version
+module_version=$( cd "${module_dir}" && mvn 
org.apache.maven.plugins:maven-help-plugin:2.2:evaluate 
-Dexpression=project.version | grep -v "INFO\|WARNING\|ERROR\|Downloading" | 
tail -1 )
+scala_version=$( cd "${module_dir}" && mvn 
org.apache.maven.plugins:maven-help-plugin:2.2:evaluate 
-Dexpression=scala.binary.version | grep -v "INFO\|WARNING\|ERROR\|Downloading" 
| tail -1 )
+
+# we are using spark-submit with --packages to run the tests and all necessary 
dependencies are
+# resolved by maven which requires running "mvn" or "mvn install" first
+spark_packages="org.apache.bahir:spark-${module_name}_${scala_version}:${module_version}"
+
+# find additional test-scoped dependencies and add them to the --packages list
+test_dependencies=$( cd "${project_dir}" && mvn dependency:tree -Dscope=test 
-Dtokens=standard -pl ${module_name} | grep "\[INFO\] +- [a-z].*:test" | grep 
-ivE "spark|bahir|scala|junit" | sed 's/\[INFO\] +- //; s/:jar//; s/:test//' )
+for td in ${test_dependencies}; do
+  spark_packages="${spark_packages},${td}"
+done
+
+# since we are running locally, we can use PYTHONPATH instead of --py-files 
(TODO: BAHIR-35)
+export PYTHONPATH="${module_dir}/python:${PYTHONPATH}"
+
+# run the tests via spark-submit and capture the output in two separate log 
files (stdout=Python,
+# stderr=Java) while only printing stdout to console
+"${SPARK_HOME}"/bin/spark-submit \
+    --master local[*] \
+    --driver-memory 512m \
+    --packages "${spark_packages}" \
+    --jars "${module_tests_jar_path}" \
+    "${module_dir}/python-tests/tests.py" \
+    1> >( tee "${stdout_log}" | grep -w '[[:alpha:]=-]\{2,\}' ) \
+    2> "${stderr_log}"
+
+# if the Python code doesn't get executed due to errors in SparkSubmit the 
stdout log file will be
+# empty and nothing was logged to the console, then lets print the stderr log 
(Java output)
+if [ ! -s "${stdout_log}" ]; then
+  cat "${stderr_log}"
+  echo "Error during test execution"
+fi

http://git-wip-us.apache.org/repos/asf/bahir/blob/12f13084/streaming-mqtt/python-tests/tests.py
----------------------------------------------------------------------
diff --git a/streaming-mqtt/python-tests/tests.py 
b/streaming-mqtt/python-tests/tests.py
new file mode 100644
index 0000000..749313f
--- /dev/null
+++ b/streaming-mqtt/python-tests/tests.py
@@ -0,0 +1,99 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import sys
+import time
+import random
+
+if sys.version_info[:2] <= (2, 6):
+    try:
+        import unittest2 as unittest
+    except ImportError:
+        sys.stderr.write('Please install unittest2 to test with Python 2.6 or 
earlier')
+        sys.exit(1)
+else:
+    import unittest
+
+from pyspark.context import SparkConf, SparkContext, RDD
+from pyspark.streaming.context import StreamingContext
+from pyspark.streaming.tests import PySparkStreamingTestCase
+from mqtt import MQTTUtils
+
+class MQTTStreamTests(PySparkStreamingTestCase):
+    timeout = 20  # seconds
+    duration = 1
+
+    def setUp(self):
+        super(MQTTStreamTests, self).setUp()
+
+        MQTTTestUtilsClz = 
self.ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \
+            .loadClass("org.apache.spark.streaming.mqtt.MQTTTestUtils")
+        self._MQTTTestUtils = MQTTTestUtilsClz.newInstance()
+        self._MQTTTestUtils.setup()
+
+    def tearDown(self):
+        if self._MQTTTestUtils is not None:
+            self._MQTTTestUtils.teardown()
+            self._MQTTTestUtils = None
+
+        super(MQTTStreamTests, self).tearDown()
+
+    def _randomTopic(self):
+        return "topic-%d" % random.randint(0, 10000)
+
+    def _startContext(self, topic):
+        # Start the StreamingContext and also collect the result
+        stream = MQTTUtils.createStream(self.ssc, "tcp://" + 
self._MQTTTestUtils.brokerUri(), topic)
+        result = []
+
+        def getOutput(_, rdd):
+            for data in rdd.collect():
+                result.append(data)
+
+        stream.foreachRDD(getOutput)
+        self.ssc.start()
+        return result
+
+    def test_mqtt_stream(self):
+        """Test the Python MQTT stream API."""
+        sendData = "MQTT demo for spark streaming"
+        topic = self._randomTopic()
+        result = self._startContext(topic)
+
+        def retry():
+            self._MQTTTestUtils.publishData(topic, sendData)
+            # Because "publishData" sends duplicate messages, here we should 
use > 0
+            self.assertTrue(len(result) > 0)
+            self.assertEqual(sendData, result[0])
+
+        # Retry it because we don't know when the receiver will start.
+        self._retry_or_timeout(retry)
+
+    def _retry_or_timeout(self, test_func):
+        start_time = time.time()
+        while True:
+            try:
+                test_func()
+                break
+            except:
+                if time.time() - start_time > self.timeout:
+                    raise
+                time.sleep(0.01)
+
+
+if __name__ == "__main__":
+    unittest.main()

http://git-wip-us.apache.org/repos/asf/bahir/blob/12f13084/streaming-mqtt/python/__init__.py
----------------------------------------------------------------------
diff --git a/streaming-mqtt/python/__init__.py 
b/streaming-mqtt/python/__init__.py
deleted file mode 100644
index 66e8f8e..0000000
--- a/streaming-mqtt/python/__init__.py
+++ /dev/null
@@ -1,22 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from pyspark.streaming.context import StreamingContext
-from pyspark.streaming.dstream import DStream
-from pyspark.streaming.listener import StreamingListener
-
-__all__ = ['StreamingContext', 'DStream', 'StreamingListener']

http://git-wip-us.apache.org/repos/asf/bahir/blob/12f13084/streaming-mqtt/python/dstream.py
----------------------------------------------------------------------
diff --git a/streaming-mqtt/python/dstream.py b/streaming-mqtt/python/dstream.py
deleted file mode 100644
index 2056663..0000000
--- a/streaming-mqtt/python/dstream.py
+++ /dev/null
@@ -1,643 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-import sys
-import operator
-import time
-from itertools import chain
-from datetime import datetime
-
-if sys.version < "3":
-    from itertools import imap as map, ifilter as filter
-
-from py4j.protocol import Py4JJavaError
-
-from pyspark import RDD
-from pyspark.storagelevel import StorageLevel
-from pyspark.streaming.util import rddToFileName, TransformFunction
-from pyspark.rdd import portable_hash
-from pyspark.resultiterable import ResultIterable
-
-__all__ = ["DStream"]
-
-
-class DStream(object):
-    """
-    A Discretized Stream (DStream), the basic abstraction in Spark Streaming,
-    is a continuous sequence of RDDs (of the same type) representing a
-    continuous stream of data (see L{RDD} in the Spark core documentation
-    for more details on RDDs).
-
-    DStreams can either be created from live data (such as, data from TCP
-    sockets, Kafka, Flume, etc.) using a L{StreamingContext} or it can be
-    generated by transforming existing DStreams using operations such as
-    `map`, `window` and `reduceByKeyAndWindow`. While a Spark Streaming
-    program is running, each DStream periodically generates a RDD, either
-    from live data or by transforming the RDD generated by a parent DStream.
-
-    DStreams internally is characterized by a few basic properties:
-     - A list of other DStreams that the DStream depends on
-     - A time interval at which the DStream generates an RDD
-     - A function that is used to generate an RDD after each time interval
-    """
-    def __init__(self, jdstream, ssc, jrdd_deserializer):
-        self._jdstream = jdstream
-        self._ssc = ssc
-        self._sc = ssc._sc
-        self._jrdd_deserializer = jrdd_deserializer
-        self.is_cached = False
-        self.is_checkpointed = False
-
-    def context(self):
-        """
-        Return the StreamingContext associated with this DStream
-        """
-        return self._ssc
-
-    def count(self):
-        """
-        Return a new DStream in which each RDD has a single element
-        generated by counting each RDD of this DStream.
-        """
-        return self.mapPartitions(lambda i: [sum(1 for _ in 
i)]).reduce(operator.add)
-
-    def filter(self, f):
-        """
-        Return a new DStream containing only the elements that satisfy 
predicate.
-        """
-        def func(iterator):
-            return filter(f, iterator)
-        return self.mapPartitions(func, True)
-
-    def flatMap(self, f, preservesPartitioning=False):
-        """
-        Return a new DStream by applying a function to all elements of
-        this DStream, and then flattening the results
-        """
-        def func(s, iterator):
-            return chain.from_iterable(map(f, iterator))
-        return self.mapPartitionsWithIndex(func, preservesPartitioning)
-
-    def map(self, f, preservesPartitioning=False):
-        """
-        Return a new DStream by applying a function to each element of DStream.
-        """
-        def func(iterator):
-            return map(f, iterator)
-        return self.mapPartitions(func, preservesPartitioning)
-
-    def mapPartitions(self, f, preservesPartitioning=False):
-        """
-        Return a new DStream in which each RDD is generated by applying
-        mapPartitions() to each RDDs of this DStream.
-        """
-        def func(s, iterator):
-            return f(iterator)
-        return self.mapPartitionsWithIndex(func, preservesPartitioning)
-
-    def mapPartitionsWithIndex(self, f, preservesPartitioning=False):
-        """
-        Return a new DStream in which each RDD is generated by applying
-        mapPartitionsWithIndex() to each RDDs of this DStream.
-        """
-        return self.transform(lambda rdd: rdd.mapPartitionsWithIndex(f, 
preservesPartitioning))
-
-    def reduce(self, func):
-        """
-        Return a new DStream in which each RDD has a single element
-        generated by reducing each RDD of this DStream.
-        """
-        return self.map(lambda x: (None, x)).reduceByKey(func, 1).map(lambda 
x: x[1])
-
-    def reduceByKey(self, func, numPartitions=None):
-        """
-        Return a new DStream by applying reduceByKey to each RDD.
-        """
-        if numPartitions is None:
-            numPartitions = self._sc.defaultParallelism
-        return self.combineByKey(lambda x: x, func, func, numPartitions)
-
-    def combineByKey(self, createCombiner, mergeValue, mergeCombiners,
-                     numPartitions=None):
-        """
-        Return a new DStream by applying combineByKey to each RDD.
-        """
-        if numPartitions is None:
-            numPartitions = self._sc.defaultParallelism
-
-        def func(rdd):
-            return rdd.combineByKey(createCombiner, mergeValue, 
mergeCombiners, numPartitions)
-        return self.transform(func)
-
-    def partitionBy(self, numPartitions, partitionFunc=portable_hash):
-        """
-        Return a copy of the DStream in which each RDD are partitioned
-        using the specified partitioner.
-        """
-        return self.transform(lambda rdd: rdd.partitionBy(numPartitions, 
partitionFunc))
-
-    def foreachRDD(self, func):
-        """
-        Apply a function to each RDD in this DStream.
-        """
-        if func.__code__.co_argcount == 1:
-            old_func = func
-            func = lambda t, rdd: old_func(rdd)
-        jfunc = TransformFunction(self._sc, func, self._jrdd_deserializer)
-        api = self._ssc._jvm.PythonDStream
-        api.callForeachRDD(self._jdstream, jfunc)
-
-    def pprint(self, num=10):
-        """
-        Print the first num elements of each RDD generated in this DStream.
-
-        @param num: the number of elements from the first will be printed.
-        """
-        def takeAndPrint(time, rdd):
-            taken = rdd.take(num + 1)
-            print("-------------------------------------------")
-            print("Time: %s" % time)
-            print("-------------------------------------------")
-            for record in taken[:num]:
-                print(record)
-            if len(taken) > num:
-                print("...")
-            print("")
-
-        self.foreachRDD(takeAndPrint)
-
-    def mapValues(self, f):
-        """
-        Return a new DStream by applying a map function to the value of
-        each key-value pairs in this DStream without changing the key.
-        """
-        map_values_fn = lambda kv: (kv[0], f(kv[1]))
-        return self.map(map_values_fn, preservesPartitioning=True)
-
-    def flatMapValues(self, f):
-        """
-        Return a new DStream by applying a flatmap function to the value
-        of each key-value pairs in this DStream without changing the key.
-        """
-        flat_map_fn = lambda kv: ((kv[0], x) for x in f(kv[1]))
-        return self.flatMap(flat_map_fn, preservesPartitioning=True)
-
-    def glom(self):
-        """
-        Return a new DStream in which RDD is generated by applying glom()
-        to RDD of this DStream.
-        """
-        def func(iterator):
-            yield list(iterator)
-        return self.mapPartitions(func)
-
-    def cache(self):
-        """
-        Persist the RDDs of this DStream with the default storage level
-        (C{MEMORY_ONLY}).
-        """
-        self.is_cached = True
-        self.persist(StorageLevel.MEMORY_ONLY)
-        return self
-
-    def persist(self, storageLevel):
-        """
-        Persist the RDDs of this DStream with the given storage level
-        """
-        self.is_cached = True
-        javaStorageLevel = self._sc._getJavaStorageLevel(storageLevel)
-        self._jdstream.persist(javaStorageLevel)
-        return self
-
-    def checkpoint(self, interval):
-        """
-        Enable periodic checkpointing of RDDs of this DStream
-
-        @param interval: time in seconds, after each period of that, generated
-                         RDD will be checkpointed
-        """
-        self.is_checkpointed = True
-        self._jdstream.checkpoint(self._ssc._jduration(interval))
-        return self
-
-    def groupByKey(self, numPartitions=None):
-        """
-        Return a new DStream by applying groupByKey on each RDD.
-        """
-        if numPartitions is None:
-            numPartitions = self._sc.defaultParallelism
-        return self.transform(lambda rdd: rdd.groupByKey(numPartitions))
-
-    def countByValue(self):
-        """
-        Return a new DStream in which each RDD contains the counts of each
-        distinct value in each RDD of this DStream.
-        """
-        return self.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x+y)
-
-    def saveAsTextFiles(self, prefix, suffix=None):
-        """
-        Save each RDD in this DStream as at text file, using string
-        representation of elements.
-        """
-        def saveAsTextFile(t, rdd):
-            path = rddToFileName(prefix, suffix, t)
-            try:
-                rdd.saveAsTextFile(path)
-            except Py4JJavaError as e:
-                # after recovered from checkpointing, the foreachRDD may
-                # be called twice
-                if 'FileAlreadyExistsException' not in str(e):
-                    raise
-        return self.foreachRDD(saveAsTextFile)
-
-    # TODO: uncomment this until we have ssc.pickleFileStream()
-    # def saveAsPickleFiles(self, prefix, suffix=None):
-    #     """
-    #     Save each RDD in this DStream as at binary file, the elements are
-    #     serialized by pickle.
-    #     """
-    #     def saveAsPickleFile(t, rdd):
-    #         path = rddToFileName(prefix, suffix, t)
-    #         try:
-    #             rdd.saveAsPickleFile(path)
-    #         except Py4JJavaError as e:
-    #             # after recovered from checkpointing, the foreachRDD may
-    #             # be called twice
-    #             if 'FileAlreadyExistsException' not in str(e):
-    #                 raise
-    #     return self.foreachRDD(saveAsPickleFile)
-
-    def transform(self, func):
-        """
-        Return a new DStream in which each RDD is generated by applying a 
function
-        on each RDD of this DStream.
-
-        `func` can have one argument of `rdd`, or have two arguments of
-        (`time`, `rdd`)
-        """
-        if func.__code__.co_argcount == 1:
-            oldfunc = func
-            func = lambda t, rdd: oldfunc(rdd)
-        assert func.__code__.co_argcount == 2, "func should take one or two 
arguments"
-        return TransformedDStream(self, func)
-
-    def transformWith(self, func, other, keepSerializer=False):
-        """
-        Return a new DStream in which each RDD is generated by applying a 
function
-        on each RDD of this DStream and 'other' DStream.
-
-        `func` can have two arguments of (`rdd_a`, `rdd_b`) or have three
-        arguments of (`time`, `rdd_a`, `rdd_b`)
-        """
-        if func.__code__.co_argcount == 2:
-            oldfunc = func
-            func = lambda t, a, b: oldfunc(a, b)
-        assert func.__code__.co_argcount == 3, "func should take two or three 
arguments"
-        jfunc = TransformFunction(self._sc, func, self._jrdd_deserializer, 
other._jrdd_deserializer)
-        dstream = 
self._sc._jvm.PythonTransformed2DStream(self._jdstream.dstream(),
-                                                          
other._jdstream.dstream(), jfunc)
-        jrdd_serializer = self._jrdd_deserializer if keepSerializer else 
self._sc.serializer
-        return DStream(dstream.asJavaDStream(), self._ssc, jrdd_serializer)
-
-    def repartition(self, numPartitions):
-        """
-        Return a new DStream with an increased or decreased level of 
parallelism.
-        """
-        return self.transform(lambda rdd: rdd.repartition(numPartitions))
-
-    @property
-    def _slideDuration(self):
-        """
-        Return the slideDuration in seconds of this DStream
-        """
-        return self._jdstream.dstream().slideDuration().milliseconds() / 1000.0
-
-    def union(self, other):
-        """
-        Return a new DStream by unifying data of another DStream with this 
DStream.
-
-        @param other: Another DStream having the same interval (i.e., 
slideDuration)
-                     as this DStream.
-        """
-        if self._slideDuration != other._slideDuration:
-            raise ValueError("the two DStream should have same slide duration")
-        return self.transformWith(lambda a, b: a.union(b), other, True)
-
-    def cogroup(self, other, numPartitions=None):
-        """
-        Return a new DStream by applying 'cogroup' between RDDs of this
-        DStream and `other` DStream.
-
-        Hash partitioning is used to generate the RDDs with `numPartitions` 
partitions.
-        """
-        if numPartitions is None:
-            numPartitions = self._sc.defaultParallelism
-        return self.transformWith(lambda a, b: a.cogroup(b, numPartitions), 
other)
-
-    def join(self, other, numPartitions=None):
-        """
-        Return a new DStream by applying 'join' between RDDs of this DStream 
and
-        `other` DStream.
-
-        Hash partitioning is used to generate the RDDs with `numPartitions`
-        partitions.
-        """
-        if numPartitions is None:
-            numPartitions = self._sc.defaultParallelism
-        return self.transformWith(lambda a, b: a.join(b, numPartitions), other)
-
-    def leftOuterJoin(self, other, numPartitions=None):
-        """
-        Return a new DStream by applying 'left outer join' between RDDs of 
this DStream and
-        `other` DStream.
-
-        Hash partitioning is used to generate the RDDs with `numPartitions`
-        partitions.
-        """
-        if numPartitions is None:
-            numPartitions = self._sc.defaultParallelism
-        return self.transformWith(lambda a, b: a.leftOuterJoin(b, 
numPartitions), other)
-
-    def rightOuterJoin(self, other, numPartitions=None):
-        """
-        Return a new DStream by applying 'right outer join' between RDDs of 
this DStream and
-        `other` DStream.
-
-        Hash partitioning is used to generate the RDDs with `numPartitions`
-        partitions.
-        """
-        if numPartitions is None:
-            numPartitions = self._sc.defaultParallelism
-        return self.transformWith(lambda a, b: a.rightOuterJoin(b, 
numPartitions), other)
-
-    def fullOuterJoin(self, other, numPartitions=None):
-        """
-        Return a new DStream by applying 'full outer join' between RDDs of 
this DStream and
-        `other` DStream.
-
-        Hash partitioning is used to generate the RDDs with `numPartitions`
-        partitions.
-        """
-        if numPartitions is None:
-            numPartitions = self._sc.defaultParallelism
-        return self.transformWith(lambda a, b: a.fullOuterJoin(b, 
numPartitions), other)
-
-    def _jtime(self, timestamp):
-        """ Convert datetime or unix_timestamp into Time
-        """
-        if isinstance(timestamp, datetime):
-            timestamp = time.mktime(timestamp.timetuple())
-        return self._sc._jvm.Time(long(timestamp * 1000))
-
-    def slice(self, begin, end):
-        """
-        Return all the RDDs between 'begin' to 'end' (both included)
-
-        `begin`, `end` could be datetime.datetime() or unix_timestamp
-        """
-        jrdds = self._jdstream.slice(self._jtime(begin), self._jtime(end))
-        return [RDD(jrdd, self._sc, self._jrdd_deserializer) for jrdd in jrdds]
-
-    def _validate_window_param(self, window, slide):
-        duration = self._jdstream.dstream().slideDuration().milliseconds()
-        if int(window * 1000) % duration != 0:
-            raise ValueError("windowDuration must be multiple of the slide 
duration (%d ms)"
-                             % duration)
-        if slide and int(slide * 1000) % duration != 0:
-            raise ValueError("slideDuration must be multiple of the slide 
duration (%d ms)"
-                             % duration)
-
-    def window(self, windowDuration, slideDuration=None):
-        """
-        Return a new DStream in which each RDD contains all the elements in 
seen in a
-        sliding window of time over this DStream.
-
-        @param windowDuration: width of the window; must be a multiple of this 
DStream's
-                              batching interval
-        @param slideDuration:  sliding interval of the window (i.e., the 
interval after which
-                              the new DStream will generate RDDs); must be a 
multiple of this
-                              DStream's batching interval
-        """
-        self._validate_window_param(windowDuration, slideDuration)
-        d = self._ssc._jduration(windowDuration)
-        if slideDuration is None:
-            return DStream(self._jdstream.window(d), self._ssc, 
self._jrdd_deserializer)
-        s = self._ssc._jduration(slideDuration)
-        return DStream(self._jdstream.window(d, s), self._ssc, 
self._jrdd_deserializer)
-
-    def reduceByWindow(self, reduceFunc, invReduceFunc, windowDuration, 
slideDuration):
-        """
-        Return a new DStream in which each RDD has a single element generated 
by reducing all
-        elements in a sliding window over this DStream.
-
-        if `invReduceFunc` is not None, the reduction is done incrementally
-        using the old window's reduced value :
-
-        1. reduce the new values that entered the window (e.g., adding new 
counts)
-
-        2. "inverse reduce" the old values that left the window (e.g., 
subtracting old counts)
-        This is more efficient than `invReduceFunc` is None.
-
-        @param reduceFunc:     associative and commutative reduce function
-        @param invReduceFunc:  inverse reduce function of `reduceFunc`
-        @param windowDuration: width of the window; must be a multiple of this 
DStream's
-                               batching interval
-        @param slideDuration:  sliding interval of the window (i.e., the 
interval after which
-                               the new DStream will generate RDDs); must be a 
multiple of this
-                               DStream's batching interval
-        """
-        keyed = self.map(lambda x: (1, x))
-        reduced = keyed.reduceByKeyAndWindow(reduceFunc, invReduceFunc,
-                                             windowDuration, slideDuration, 1)
-        return reduced.map(lambda kv: kv[1])
-
-    def countByWindow(self, windowDuration, slideDuration):
-        """
-        Return a new DStream in which each RDD has a single element generated
-        by counting the number of elements in a window over this DStream.
-        windowDuration and slideDuration are as defined in the window() 
operation.
-
-        This is equivalent to window(windowDuration, slideDuration).count(),
-        but will be more efficient if window is large.
-        """
-        return self.map(lambda x: 1).reduceByWindow(operator.add, operator.sub,
-                                                    windowDuration, 
slideDuration)
-
-    def countByValueAndWindow(self, windowDuration, slideDuration, 
numPartitions=None):
-        """
-        Return a new DStream in which each RDD contains the count of distinct 
elements in
-        RDDs in a sliding window over this DStream.
-
-        @param windowDuration: width of the window; must be a multiple of this 
DStream's
-                              batching interval
-        @param slideDuration:  sliding interval of the window (i.e., the 
interval after which
-                              the new DStream will generate RDDs); must be a 
multiple of this
-                              DStream's batching interval
-        @param numPartitions:  number of partitions of each RDD in the new 
DStream.
-        """
-        keyed = self.map(lambda x: (x, 1))
-        counted = keyed.reduceByKeyAndWindow(operator.add, operator.sub,
-                                             windowDuration, slideDuration, 
numPartitions)
-        return counted.filter(lambda kv: kv[1] > 0)
-
-    def groupByKeyAndWindow(self, windowDuration, slideDuration, 
numPartitions=None):
-        """
-        Return a new DStream by applying `groupByKey` over a sliding window.
-        Similar to `DStream.groupByKey()`, but applies it over a sliding 
window.
-
-        @param windowDuration: width of the window; must be a multiple of this 
DStream's
-                              batching interval
-        @param slideDuration:  sliding interval of the window (i.e., the 
interval after which
-                              the new DStream will generate RDDs); must be a 
multiple of this
-                              DStream's batching interval
-        @param numPartitions:  Number of partitions of each RDD in the new 
DStream.
-        """
-        ls = self.mapValues(lambda x: [x])
-        grouped = ls.reduceByKeyAndWindow(lambda a, b: a.extend(b) or a, 
lambda a, b: a[len(b):],
-                                          windowDuration, slideDuration, 
numPartitions)
-        return grouped.mapValues(ResultIterable)
-
-    def reduceByKeyAndWindow(self, func, invFunc, windowDuration, 
slideDuration=None,
-                             numPartitions=None, filterFunc=None):
-        """
-        Return a new DStream by applying incremental `reduceByKey` over a 
sliding window.
-
-        The reduced value of over a new window is calculated using the old 
window's reduce value :
-         1. reduce the new values that entered the window (e.g., adding new 
counts)
-         2. "inverse reduce" the old values that left the window (e.g., 
subtracting old counts)
-
-        `invFunc` can be None, then it will reduce all the RDDs in window, 
could be slower
-        than having `invFunc`.
-
-        @param func:           associative and commutative reduce function
-        @param invFunc:        inverse function of `reduceFunc`
-        @param windowDuration: width of the window; must be a multiple of this 
DStream's
-                              batching interval
-        @param slideDuration:  sliding interval of the window (i.e., the 
interval after which
-                              the new DStream will generate RDDs); must be a 
multiple of this
-                              DStream's batching interval
-        @param numPartitions:  number of partitions of each RDD in the new 
DStream.
-        @param filterFunc:     function to filter expired key-value pairs;
-                              only pairs that satisfy the function are retained
-                              set this to null if you do not want to filter
-        """
-        self._validate_window_param(windowDuration, slideDuration)
-        if numPartitions is None:
-            numPartitions = self._sc.defaultParallelism
-
-        reduced = self.reduceByKey(func, numPartitions)
-
-        if invFunc:
-            def reduceFunc(t, a, b):
-                b = b.reduceByKey(func, numPartitions)
-                r = a.union(b).reduceByKey(func, numPartitions) if a else b
-                if filterFunc:
-                    r = r.filter(filterFunc)
-                return r
-
-            def invReduceFunc(t, a, b):
-                b = b.reduceByKey(func, numPartitions)
-                joined = a.leftOuterJoin(b, numPartitions)
-                return joined.mapValues(lambda kv: invFunc(kv[0], kv[1])
-                                        if kv[1] is not None else kv[0])
-
-            jreduceFunc = TransformFunction(self._sc, reduceFunc, 
reduced._jrdd_deserializer)
-            jinvReduceFunc = TransformFunction(self._sc, invReduceFunc, 
reduced._jrdd_deserializer)
-            if slideDuration is None:
-                slideDuration = self._slideDuration
-            dstream = self._sc._jvm.PythonReducedWindowedDStream(
-                reduced._jdstream.dstream(),
-                jreduceFunc, jinvReduceFunc,
-                self._ssc._jduration(windowDuration),
-                self._ssc._jduration(slideDuration))
-            return DStream(dstream.asJavaDStream(), self._ssc, 
self._sc.serializer)
-        else:
-            return reduced.window(windowDuration, 
slideDuration).reduceByKey(func, numPartitions)
-
-    def updateStateByKey(self, updateFunc, numPartitions=None, 
initialRDD=None):
-        """
-        Return a new "state" DStream where the state for each key is updated 
by applying
-        the given function on the previous state of the key and the new values 
of the key.
-
-        @param updateFunc: State update function. If this function returns 
None, then
-                           corresponding state key-value pair will be 
eliminated.
-        """
-        if numPartitions is None:
-            numPartitions = self._sc.defaultParallelism
-
-        if initialRDD and not isinstance(initialRDD, RDD):
-            initialRDD = self._sc.parallelize(initialRDD)
-
-        def reduceFunc(t, a, b):
-            if a is None:
-                g = b.groupByKey(numPartitions).mapValues(lambda vs: 
(list(vs), None))
-            else:
-                g = a.cogroup(b.partitionBy(numPartitions), numPartitions)
-                g = g.mapValues(lambda ab: (list(ab[1]), list(ab[0])[0] if 
len(ab[0]) else None))
-            state = g.mapValues(lambda vs_s: updateFunc(vs_s[0], vs_s[1]))
-            return state.filter(lambda k_v: k_v[1] is not None)
-
-        jreduceFunc = TransformFunction(self._sc, reduceFunc,
-                                        self._sc.serializer, 
self._jrdd_deserializer)
-        if initialRDD:
-            initialRDD = initialRDD._reserialize(self._jrdd_deserializer)
-            dstream = 
self._sc._jvm.PythonStateDStream(self._jdstream.dstream(), jreduceFunc,
-                                                       initialRDD._jrdd)
-        else:
-            dstream = 
self._sc._jvm.PythonStateDStream(self._jdstream.dstream(), jreduceFunc)
-
-        return DStream(dstream.asJavaDStream(), self._ssc, self._sc.serializer)
-
-
-class TransformedDStream(DStream):
-    """
-    TransformedDStream is an DStream generated by an Python function
-    transforming each RDD of an DStream to another RDDs.
-
-    Multiple continuous transformations of DStream can be combined into
-    one transformation.
-    """
-    def __init__(self, prev, func):
-        self._ssc = prev._ssc
-        self._sc = self._ssc._sc
-        self._jrdd_deserializer = self._sc.serializer
-        self.is_cached = False
-        self.is_checkpointed = False
-        self._jdstream_val = None
-
-        # Using type() to avoid folding the functions and compacting the 
DStreams which is not
-        # not strictly a object of TransformedDStream.
-        # Changed here is to avoid bug in KafkaTransformedDStream when calling 
offsetRanges().
-        if (type(prev) is TransformedDStream and
-                not prev.is_cached and not prev.is_checkpointed):
-            prev_func = prev.func
-            self.func = lambda t, rdd: func(t, prev_func(t, rdd))
-            self.prev = prev.prev
-        else:
-            self.prev = prev
-            self.func = func
-
-    @property
-    def _jdstream(self):
-        if self._jdstream_val is not None:
-            return self._jdstream_val
-
-        jfunc = TransformFunction(self._sc, self.func, 
self.prev._jrdd_deserializer)
-        dstream = 
self._sc._jvm.PythonTransformedDStream(self.prev._jdstream.dstream(), jfunc)
-        self._jdstream_val = dstream.asJavaDStream()
-        return self._jdstream_val

http://git-wip-us.apache.org/repos/asf/bahir/blob/12f13084/streaming-mqtt/python/mqtt.py
----------------------------------------------------------------------
diff --git a/streaming-mqtt/python/mqtt.py b/streaming-mqtt/python/mqtt.py
index 8848a70..c55b704 100644
--- a/streaming-mqtt/python/mqtt.py
+++ b/streaming-mqtt/python/mqtt.py
@@ -38,19 +38,26 @@ class MQTTUtils(object):
         :param storageLevel:  RDD storage level.
         :return: A DStream object
         """
+        jlevel = ssc._sc._getJavaStorageLevel(storageLevel)
+        helper = MQTTUtils._get_helper(ssc._sc)
+        jstream = helper.createStream(ssc._jssc, brokerUrl, topic, jlevel)
+        return DStream(jstream, ssc, UTF8Deserializer())
+
+    @staticmethod
+    def _get_helper(sc):
         try:
-            helper = 
ssc._jvm.org.apache.spark.streaming.mqtt.MQTTUtilsPythonHelper()
+            return 
sc._jvm.org.apache.spark.streaming.mqtt.MQTTUtilsPythonHelper()
         except TypeError as e:
             if str(e) == "'JavaPackage' object is not callable":
-                MQTTUtils._printErrorMsg(ssc.sparkContext)
+                MQTTUtils._printErrorMsg(sc)
             raise
 
-        jlevel = ssc._sc._getJavaStorageLevel(storageLevel)
-        jstream = helper.createStream(ssc._jssc, brokerUrl, topic, jlevel)
-        return DStream(jstream, ssc, UTF8Deserializer())
-
     @staticmethod
     def _printErrorMsg(sc):
+        scalaVersionString = sc._jvm.scala.util.Properties.versionString()
+        import re
+        scalaVersion = re.sub(r'version (\d+\.\d+)\.\d+', r'\1', 
scalaVersionString)
+        sparkVersion = re.sub(r'(\d+\.\d+\.\d+).*', r'\1', sc.version)
         print("""
 
________________________________________________________________________________________________
 
@@ -59,12 +66,12 @@ 
________________________________________________________________________________
   1. Include the MQTT library and its dependencies with in the
      spark-submit command as
 
-     $ bin/spark-submit --packages org.apache.spark:spark-streaming-mqtt:%s ...
+     ${SPARK_HOME}/bin/spark-submit --packages 
org.apache.bahir:spark-streaming-mqtt_%s:%s ...
 
   2. Download the JAR of the artifact from Maven Central 
http://search.maven.org/,
-     Group Id = org.apache.spark, Artifact Id = spark-streaming-mqtt-assembly, 
Version = %s.
+     Group Id = org.apache.bahir, Artifact Id = spark-streaming-mqtt, Version 
= %s.
      Then, include the jar in the spark-submit command as
 
-     $ bin/spark-submit --jars <spark-streaming-mqtt-assembly.jar> ...
+     ${SPARK_HOME}/bin/spark-submit --jars <spark-streaming-mqtt.jar> ...
 
________________________________________________________________________________________________
-""" % (sc.version, sc.version))
+""" % (scalaVersion, sparkVersion, sparkVersion))

http://git-wip-us.apache.org/repos/asf/bahir/blob/12f13084/streaming-mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala
----------------------------------------------------------------------
diff --git 
a/streaming-mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala
 
b/streaming-mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala
index 80ab27b..3ce7511 100644
--- 
a/streaming-mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala
+++ 
b/streaming-mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala
@@ -23,6 +23,7 @@ import java.nio.charset.StandardCharsets
 import scala.language.postfixOps
 
 import org.apache.activemq.broker.{BrokerService, TransportConnector}
+import org.apache.activemq.usage.SystemUsage
 import org.apache.commons.lang3.RandomUtils
 import org.eclipse.paho.client.mqttv3._
 import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
@@ -41,6 +42,7 @@ private[mqtt] class MQTTTestUtils extends Logging {
   private val brokerPort = findFreePort()
 
   private var broker: BrokerService = _
+  private var systemUsage: SystemUsage = _
   private var connector: TransportConnector = _
 
   def brokerUri: String = {
@@ -50,6 +52,10 @@ private[mqtt] class MQTTTestUtils extends Logging {
   def setup(): Unit = {
     broker = new BrokerService()
     broker.setDataDirectoryFile(Utils.createTempDir())
+    broker.getSystemUsage().setSendFailIfNoSpace(false)
+    systemUsage = broker.getSystemUsage()
+    systemUsage.getStoreUsage().setLimit(1024L * 1024 * 256);  // 256 MB 
(default: 100 GB)
+    systemUsage.getTempUsage().setLimit(1024L * 1024 * 128);   // 128 MB 
(default: 50 GB)
     connector = new TransportConnector()
     connector.setName("mqtt")
     connector.setUri(new URI("mqtt://" + brokerUri))

Reply via email to