Repository: bahir
Updated Branches:
refs/heads/master 48e91fca5 -> 12f130846
[BAHIR-24] fix MQTT Python code, examples, add tests
Changes in this PR:
- remove unnecessary files from streaming-mqtt/python
- updated all *.py files with respect to the modified
project structure pyspark.streaming.mqtt --> mqtt
- add test cases that were left out from the import and
add shell script to run them:
- streaming-mqtt/python-tests/run-python-tests.sh
- streaming-mqtt/python-tests/tests.py
- modify MQTTTestUtils.scala to limit the required disk storage space
- modify bin/run-example script to setup PYTHONPATH to run Python examples
Closes #10
Project: http://git-wip-us.apache.org/repos/asf/bahir/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/12f13084
Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/12f13084
Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/12f13084
Branch: refs/heads/master
Commit: 12f130846ef7523138e98e79bfd823f61acab3b3
Parents: 48e91fc
Author: Christian Kadner <[email protected]>
Authored: Fri Jul 15 17:49:40 2016 -0700
Committer: Luciano Resende <[email protected]>
Committed: Sat Jul 23 09:19:57 2016 -0700
----------------------------------------------------------------------
.gitignore | 3 +
bin/run-example | 25 +-
pom.xml | 2 +-
.../src/main/python/streaming/mqtt_wordcount.py | 34 +-
streaming-mqtt/python-tests/run-python-tests.sh | 79 +++
streaming-mqtt/python-tests/tests.py | 99 +++
streaming-mqtt/python/__init__.py | 22 -
streaming-mqtt/python/dstream.py | 643 -------------------
streaming-mqtt/python/mqtt.py | 27 +-
.../spark/streaming/mqtt/MQTTTestUtils.scala | 6 +
10 files changed, 247 insertions(+), 693 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/bahir/blob/12f13084/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 1801b67..fb6d3b7 100644
--- a/.gitignore
+++ b/.gitignore
@@ -15,6 +15,9 @@ target/
*.class
*.log
+# Python
+*.pyc
+
# Others
.checkstyle
.fbExcludeFilterFile
http://git-wip-us.apache.org/repos/asf/bahir/blob/12f13084/bin/run-example
----------------------------------------------------------------------
diff --git a/bin/run-example b/bin/run-example
index 6f3cb39..483d853 100755
--- a/bin/run-example
+++ b/bin/run-example
@@ -16,6 +16,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
+set -o pipefail
# make sure Spark home is set and valid
if [ -z "${SPARK_HOME}" ]; then
@@ -61,8 +62,8 @@ DESCRIPTION
USAGE EXAMPLES
EOF
- grep -R "bin/run-example org.apache" --no-filename --include="*.scala"
--include="*.java" "${project_dir}" | tr '`' ' ' | sed 's/^ *\* * / /g' ; \
- grep -R -A1 "bin/run-example \\\\" --no-filename --include="*.scala"
--include="*.java" "${project_dir}" | tr '`' ' ' | sed 's/^ *\* * / /g' | sed
'/^--$/d' | sed 'N;s/\\\n *//g'
+ grep -R "bin/run-example org.apache" --no-filename
--include=*.{scala,java,py} "${project_dir}" | tr '`' ' ' | sed 's/^ *\* * /
/g' ; \
+ grep -R -A1 "bin/run-example \\\\" --no-filename
--include=*.{scala,java,py} "${project_dir}" | tr '`' ' ' | sed 's/^ *\* * /
/g' | sed '/^--$/d' | sed 'N;s/\\\n *//g'
exit 1
}
@@ -122,10 +123,22 @@ examples_jar="${module_tests_jar_path}"
#
streaming-akka/target/spark-streaming-akka_2.11-2.0.0-SNAPSHOT-tests.jar \
# localhost 9999
-# capture the full command line and echo it for transparency and debug purposes
-cmd="${SPARK_HOME}/bin/spark-submit \
- --packages ${spark_package} \
- --class ${example_class} ${examples_jar} ${example_args}"
+# for Python examples add all of the Bahir project's Python sources to
PYTHONPATH, which in local
+# mode is easier than creating a zip files to be used with the --py-files
option (TODO: BAHIR-35)
+# Note that --py-files with individual *.py files does not work if those
modules are imported at top
+# of the example script but rather imports must be pushed down to after
SparkContext initialization
+if [[ "$example_class" == *.py ]]; then
+ export PYTHONPATH="$( find "$project_dir" -path '*/python' -maxdepth 5 -type
d | tr '\n' ':' )$PYTHONPATH"
+ cmd="${SPARK_HOME}/bin/spark-submit \
+ --packages ${spark_package} \
+ ${example_class} \
+ ${example_args}"
+else
+ cmd="${SPARK_HOME}/bin/spark-submit \
+ --packages ${spark_package} \
+ --class ${example_class} ${examples_jar} \
+ ${example_args}"
+fi
echo "---"
echo "Spark-Submit command: $cmd"
http://git-wip-us.apache.org/repos/asf/bahir/blob/12f13084/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index bbfa340..b9158a1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -95,7 +95,7 @@
<log4j.version>1.2.17</log4j.version>
<!-- Spark version -->
- <spark.version>2.0.0-SNAPSHOT</spark.version>
+ <spark.version>2.0.1-SNAPSHOT</spark.version>
<!-- Streaming Akka connector -->
<akka.group>com.typesafe.akka</akka.group>
http://git-wip-us.apache.org/repos/asf/bahir/blob/12f13084/streaming-mqtt/examples/src/main/python/streaming/mqtt_wordcount.py
----------------------------------------------------------------------
diff --git
a/streaming-mqtt/examples/src/main/python/streaming/mqtt_wordcount.py
b/streaming-mqtt/examples/src/main/python/streaming/mqtt_wordcount.py
index abf9c0e..19838dc 100644
--- a/streaming-mqtt/examples/src/main/python/streaming/mqtt_wordcount.py
+++ b/streaming-mqtt/examples/src/main/python/streaming/mqtt_wordcount.py
@@ -20,23 +20,35 @@
Usage: mqtt_wordcount.py <broker url> <topic>
To run this in your local machine, you need to setup a MQTT broker and
publisher first,
- Mosquitto is one of the open source MQTT Brokers, see
- http://mosquitto.org/
- Eclipse paho project provides number of clients and utilities for working
with MQTT, see
- http://www.eclipse.org/paho/#getting-started
-
- and then run the example
- `$ bin/spark-submit --jars \
-
external/mqtt-assembly/target/scala-*/spark-streaming-mqtt-assembly-*.jar \
- examples/src/main/python/streaming/mqtt_wordcount.py \
- tcp://localhost:1883 foo`
+ like Mosquitto (http://mosquitto.org/) an easy to use and install open source
MQTT Broker.
+ On Mac OS Mosquitto can be installed with Homebrew `$ brew install mosquitto`.
+ On Ubuntu mosquitto can be installed with the command `$ sudo apt-get install
mosquitto`.
+
+ Alternatively, the Eclipse paho project provides a number of clients and
utilities for
+ working with MQTT, see http://www.eclipse.org/paho/#getting-started
+
+ How to run this example locally:
+
+ (1) Start Mqtt message broker/server, i.e. Mosquitto:
+
+ `$ mosquitto -p 1883`
+
+ (2) Run the publisher:
+
+ `$ bin/run-example \
+ org.apache.spark.examples.streaming.mqtt.MQTTPublisher
tcp://localhost:1883 foo`
+
+ (3) Run the example:
+
+ `$ bin/run-example \
+ streaming-mqtt/examples/src/main/python/streaming/mqtt_wordcount.py
tcp://localhost:1883 foo`
"""
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
-from pyspark.streaming.mqtt import MQTTUtils
+from mqtt import MQTTUtils
if __name__ == "__main__":
if len(sys.argv) != 3:
http://git-wip-us.apache.org/repos/asf/bahir/blob/12f13084/streaming-mqtt/python-tests/run-python-tests.sh
----------------------------------------------------------------------
diff --git a/streaming-mqtt/python-tests/run-python-tests.sh
b/streaming-mqtt/python-tests/run-python-tests.sh
new file mode 100755
index 0000000..557b164
--- /dev/null
+++ b/streaming-mqtt/python-tests/run-python-tests.sh
@@ -0,0 +1,79 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+set -o pipefail
+
+# make sure Spark home is set and valid
+if [ -z "${SPARK_HOME}" ]; then
+ echo "SPARK_HOME is not set" >&2
+ exit 1
+elif [ ! -d "${SPARK_HOME}" ]; then
+ echo "SPARK_HOME does not point to a valid directory" >&2
+ exit 1
+fi
+
+# pinpoint the module folder and project root folder
+bin_dir=$( dirname "$0" )
+module_dir=$( cd "${bin_dir}/.." && pwd -P )
+project_dir=$( cd "${module_dir}/.." && pwd -P )
+stdout_log="${module_dir}/target/python-tests-python-output.log"
+stderr_log="${module_dir}/target/python-tests-java-output.log"
+
+# use the module name to find the tests jar file that contains the example to
run
+module_name=${module_dir#"${project_dir}"/}
+module_tests_jar_path=$( find "${module_dir}/target" -name
"*${module_name}*-tests.jar" -maxdepth 1 | head -1 )
+
+if [ -z "${module_tests_jar_path}" ] || [ ! -e "${module_tests_jar_path}" ];
then
+ echo "Could not find module tests jar file in ${module_dir}/target/" >&2
+ echo "Run \"mvn clean install\" and retry running this example" >&2
+ exit 1
+fi
+
+# use maven-help-plugin to determine project version and Scala version
+module_version=$( cd "${module_dir}" && mvn
org.apache.maven.plugins:maven-help-plugin:2.2:evaluate
-Dexpression=project.version | grep -v "INFO\|WARNING\|ERROR\|Downloading" |
tail -1 )
+scala_version=$( cd "${module_dir}" && mvn
org.apache.maven.plugins:maven-help-plugin:2.2:evaluate
-Dexpression=scala.binary.version | grep -v "INFO\|WARNING\|ERROR\|Downloading"
| tail -1 )
+
+# we are using spark-submit with --packages to run the tests and all necessary
dependencies are
+# resolved by maven which requires running "mvn" or "mvn install" first
+spark_packages="org.apache.bahir:spark-${module_name}_${scala_version}:${module_version}"
+
+# find additional test-scoped dependencies and add them to the --packages list
+test_dependencies=$( cd "${project_dir}" && mvn dependency:tree -Dscope=test
-Dtokens=standard -pl ${module_name} | grep "\[INFO\] +- [a-z].*:test" | grep
-ivE "spark|bahir|scala|junit" | sed 's/\[INFO\] +- //; s/:jar//; s/:test//' )
+for td in ${test_dependencies}; do
+ spark_packages="${spark_packages},${td}"
+done
+
+# since we are running locally, we can use PYTHONPATH instead of --py-files
(TODO: BAHIR-35)
+export PYTHONPATH="${module_dir}/python:${PYTHONPATH}"
+
+# run the tests via spark-submit and capture the output in two separate log
files (stdout=Python,
+# stderr=Java) while only printing stdout to console
+"${SPARK_HOME}"/bin/spark-submit \
+ --master local[*] \
+ --driver-memory 512m \
+ --packages "${spark_packages}" \
+ --jars "${module_tests_jar_path}" \
+ "${module_dir}/python-tests/tests.py" \
+ 1> >( tee "${stdout_log}" | grep -w '[[:alpha:]=-]\{2,\}' ) \
+ 2> "${stderr_log}"
+
+# if the Python code doesn't get executed due to errors in SparkSubmit the
stdout log file will be
+# empty and nothing was logged to the console, then lets print the stderr log
(Java output)
+if [ ! -s "${stdout_log}" ]; then
+ cat "${stderr_log}"
+ echo "Error during test execution"
+fi
http://git-wip-us.apache.org/repos/asf/bahir/blob/12f13084/streaming-mqtt/python-tests/tests.py
----------------------------------------------------------------------
diff --git a/streaming-mqtt/python-tests/tests.py
b/streaming-mqtt/python-tests/tests.py
new file mode 100644
index 0000000..749313f
--- /dev/null
+++ b/streaming-mqtt/python-tests/tests.py
@@ -0,0 +1,99 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import sys
+import time
+import random
+
+if sys.version_info[:2] <= (2, 6):
+ try:
+ import unittest2 as unittest
+ except ImportError:
+ sys.stderr.write('Please install unittest2 to test with Python 2.6 or
earlier')
+ sys.exit(1)
+else:
+ import unittest
+
+from pyspark.context import SparkConf, SparkContext, RDD
+from pyspark.streaming.context import StreamingContext
+from pyspark.streaming.tests import PySparkStreamingTestCase
+from mqtt import MQTTUtils
+
+class MQTTStreamTests(PySparkStreamingTestCase):
+ timeout = 20 # seconds
+ duration = 1
+
+ def setUp(self):
+ super(MQTTStreamTests, self).setUp()
+
+ MQTTTestUtilsClz =
self.ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \
+ .loadClass("org.apache.spark.streaming.mqtt.MQTTTestUtils")
+ self._MQTTTestUtils = MQTTTestUtilsClz.newInstance()
+ self._MQTTTestUtils.setup()
+
+ def tearDown(self):
+ if self._MQTTTestUtils is not None:
+ self._MQTTTestUtils.teardown()
+ self._MQTTTestUtils = None
+
+ super(MQTTStreamTests, self).tearDown()
+
+ def _randomTopic(self):
+ return "topic-%d" % random.randint(0, 10000)
+
+ def _startContext(self, topic):
+ # Start the StreamingContext and also collect the result
+ stream = MQTTUtils.createStream(self.ssc, "tcp://" +
self._MQTTTestUtils.brokerUri(), topic)
+ result = []
+
+ def getOutput(_, rdd):
+ for data in rdd.collect():
+ result.append(data)
+
+ stream.foreachRDD(getOutput)
+ self.ssc.start()
+ return result
+
+ def test_mqtt_stream(self):
+ """Test the Python MQTT stream API."""
+ sendData = "MQTT demo for spark streaming"
+ topic = self._randomTopic()
+ result = self._startContext(topic)
+
+ def retry():
+ self._MQTTTestUtils.publishData(topic, sendData)
+ # Because "publishData" sends duplicate messages, here we should
use > 0
+ self.assertTrue(len(result) > 0)
+ self.assertEqual(sendData, result[0])
+
+ # Retry it because we don't know when the receiver will start.
+ self._retry_or_timeout(retry)
+
+ def _retry_or_timeout(self, test_func):
+ start_time = time.time()
+ while True:
+ try:
+ test_func()
+ break
+ except:
+ if time.time() - start_time > self.timeout:
+ raise
+ time.sleep(0.01)
+
+
+if __name__ == "__main__":
+ unittest.main()
http://git-wip-us.apache.org/repos/asf/bahir/blob/12f13084/streaming-mqtt/python/__init__.py
----------------------------------------------------------------------
diff --git a/streaming-mqtt/python/__init__.py
b/streaming-mqtt/python/__init__.py
deleted file mode 100644
index 66e8f8e..0000000
--- a/streaming-mqtt/python/__init__.py
+++ /dev/null
@@ -1,22 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from pyspark.streaming.context import StreamingContext
-from pyspark.streaming.dstream import DStream
-from pyspark.streaming.listener import StreamingListener
-
-__all__ = ['StreamingContext', 'DStream', 'StreamingListener']
http://git-wip-us.apache.org/repos/asf/bahir/blob/12f13084/streaming-mqtt/python/dstream.py
----------------------------------------------------------------------
diff --git a/streaming-mqtt/python/dstream.py b/streaming-mqtt/python/dstream.py
deleted file mode 100644
index 2056663..0000000
--- a/streaming-mqtt/python/dstream.py
+++ /dev/null
@@ -1,643 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-import sys
-import operator
-import time
-from itertools import chain
-from datetime import datetime
-
-if sys.version < "3":
- from itertools import imap as map, ifilter as filter
-
-from py4j.protocol import Py4JJavaError
-
-from pyspark import RDD
-from pyspark.storagelevel import StorageLevel
-from pyspark.streaming.util import rddToFileName, TransformFunction
-from pyspark.rdd import portable_hash
-from pyspark.resultiterable import ResultIterable
-
-__all__ = ["DStream"]
-
-
-class DStream(object):
- """
- A Discretized Stream (DStream), the basic abstraction in Spark Streaming,
- is a continuous sequence of RDDs (of the same type) representing a
- continuous stream of data (see L{RDD} in the Spark core documentation
- for more details on RDDs).
-
- DStreams can either be created from live data (such as, data from TCP
- sockets, Kafka, Flume, etc.) using a L{StreamingContext} or it can be
- generated by transforming existing DStreams using operations such as
- `map`, `window` and `reduceByKeyAndWindow`. While a Spark Streaming
- program is running, each DStream periodically generates a RDD, either
- from live data or by transforming the RDD generated by a parent DStream.
-
- DStreams internally is characterized by a few basic properties:
- - A list of other DStreams that the DStream depends on
- - A time interval at which the DStream generates an RDD
- - A function that is used to generate an RDD after each time interval
- """
- def __init__(self, jdstream, ssc, jrdd_deserializer):
- self._jdstream = jdstream
- self._ssc = ssc
- self._sc = ssc._sc
- self._jrdd_deserializer = jrdd_deserializer
- self.is_cached = False
- self.is_checkpointed = False
-
- def context(self):
- """
- Return the StreamingContext associated with this DStream
- """
- return self._ssc
-
- def count(self):
- """
- Return a new DStream in which each RDD has a single element
- generated by counting each RDD of this DStream.
- """
- return self.mapPartitions(lambda i: [sum(1 for _ in
i)]).reduce(operator.add)
-
- def filter(self, f):
- """
- Return a new DStream containing only the elements that satisfy
predicate.
- """
- def func(iterator):
- return filter(f, iterator)
- return self.mapPartitions(func, True)
-
- def flatMap(self, f, preservesPartitioning=False):
- """
- Return a new DStream by applying a function to all elements of
- this DStream, and then flattening the results
- """
- def func(s, iterator):
- return chain.from_iterable(map(f, iterator))
- return self.mapPartitionsWithIndex(func, preservesPartitioning)
-
- def map(self, f, preservesPartitioning=False):
- """
- Return a new DStream by applying a function to each element of DStream.
- """
- def func(iterator):
- return map(f, iterator)
- return self.mapPartitions(func, preservesPartitioning)
-
- def mapPartitions(self, f, preservesPartitioning=False):
- """
- Return a new DStream in which each RDD is generated by applying
- mapPartitions() to each RDDs of this DStream.
- """
- def func(s, iterator):
- return f(iterator)
- return self.mapPartitionsWithIndex(func, preservesPartitioning)
-
- def mapPartitionsWithIndex(self, f, preservesPartitioning=False):
- """
- Return a new DStream in which each RDD is generated by applying
- mapPartitionsWithIndex() to each RDDs of this DStream.
- """
- return self.transform(lambda rdd: rdd.mapPartitionsWithIndex(f,
preservesPartitioning))
-
- def reduce(self, func):
- """
- Return a new DStream in which each RDD has a single element
- generated by reducing each RDD of this DStream.
- """
- return self.map(lambda x: (None, x)).reduceByKey(func, 1).map(lambda
x: x[1])
-
- def reduceByKey(self, func, numPartitions=None):
- """
- Return a new DStream by applying reduceByKey to each RDD.
- """
- if numPartitions is None:
- numPartitions = self._sc.defaultParallelism
- return self.combineByKey(lambda x: x, func, func, numPartitions)
-
- def combineByKey(self, createCombiner, mergeValue, mergeCombiners,
- numPartitions=None):
- """
- Return a new DStream by applying combineByKey to each RDD.
- """
- if numPartitions is None:
- numPartitions = self._sc.defaultParallelism
-
- def func(rdd):
- return rdd.combineByKey(createCombiner, mergeValue,
mergeCombiners, numPartitions)
- return self.transform(func)
-
- def partitionBy(self, numPartitions, partitionFunc=portable_hash):
- """
- Return a copy of the DStream in which each RDD are partitioned
- using the specified partitioner.
- """
- return self.transform(lambda rdd: rdd.partitionBy(numPartitions,
partitionFunc))
-
- def foreachRDD(self, func):
- """
- Apply a function to each RDD in this DStream.
- """
- if func.__code__.co_argcount == 1:
- old_func = func
- func = lambda t, rdd: old_func(rdd)
- jfunc = TransformFunction(self._sc, func, self._jrdd_deserializer)
- api = self._ssc._jvm.PythonDStream
- api.callForeachRDD(self._jdstream, jfunc)
-
- def pprint(self, num=10):
- """
- Print the first num elements of each RDD generated in this DStream.
-
- @param num: the number of elements from the first will be printed.
- """
- def takeAndPrint(time, rdd):
- taken = rdd.take(num + 1)
- print("-------------------------------------------")
- print("Time: %s" % time)
- print("-------------------------------------------")
- for record in taken[:num]:
- print(record)
- if len(taken) > num:
- print("...")
- print("")
-
- self.foreachRDD(takeAndPrint)
-
- def mapValues(self, f):
- """
- Return a new DStream by applying a map function to the value of
- each key-value pairs in this DStream without changing the key.
- """
- map_values_fn = lambda kv: (kv[0], f(kv[1]))
- return self.map(map_values_fn, preservesPartitioning=True)
-
- def flatMapValues(self, f):
- """
- Return a new DStream by applying a flatmap function to the value
- of each key-value pairs in this DStream without changing the key.
- """
- flat_map_fn = lambda kv: ((kv[0], x) for x in f(kv[1]))
- return self.flatMap(flat_map_fn, preservesPartitioning=True)
-
- def glom(self):
- """
- Return a new DStream in which RDD is generated by applying glom()
- to RDD of this DStream.
- """
- def func(iterator):
- yield list(iterator)
- return self.mapPartitions(func)
-
- def cache(self):
- """
- Persist the RDDs of this DStream with the default storage level
- (C{MEMORY_ONLY}).
- """
- self.is_cached = True
- self.persist(StorageLevel.MEMORY_ONLY)
- return self
-
- def persist(self, storageLevel):
- """
- Persist the RDDs of this DStream with the given storage level
- """
- self.is_cached = True
- javaStorageLevel = self._sc._getJavaStorageLevel(storageLevel)
- self._jdstream.persist(javaStorageLevel)
- return self
-
- def checkpoint(self, interval):
- """
- Enable periodic checkpointing of RDDs of this DStream
-
- @param interval: time in seconds, after each period of that, generated
- RDD will be checkpointed
- """
- self.is_checkpointed = True
- self._jdstream.checkpoint(self._ssc._jduration(interval))
- return self
-
- def groupByKey(self, numPartitions=None):
- """
- Return a new DStream by applying groupByKey on each RDD.
- """
- if numPartitions is None:
- numPartitions = self._sc.defaultParallelism
- return self.transform(lambda rdd: rdd.groupByKey(numPartitions))
-
- def countByValue(self):
- """
- Return a new DStream in which each RDD contains the counts of each
- distinct value in each RDD of this DStream.
- """
- return self.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x+y)
-
- def saveAsTextFiles(self, prefix, suffix=None):
- """
- Save each RDD in this DStream as at text file, using string
- representation of elements.
- """
- def saveAsTextFile(t, rdd):
- path = rddToFileName(prefix, suffix, t)
- try:
- rdd.saveAsTextFile(path)
- except Py4JJavaError as e:
- # after recovered from checkpointing, the foreachRDD may
- # be called twice
- if 'FileAlreadyExistsException' not in str(e):
- raise
- return self.foreachRDD(saveAsTextFile)
-
- # TODO: uncomment this until we have ssc.pickleFileStream()
- # def saveAsPickleFiles(self, prefix, suffix=None):
- # """
- # Save each RDD in this DStream as at binary file, the elements are
- # serialized by pickle.
- # """
- # def saveAsPickleFile(t, rdd):
- # path = rddToFileName(prefix, suffix, t)
- # try:
- # rdd.saveAsPickleFile(path)
- # except Py4JJavaError as e:
- # # after recovered from checkpointing, the foreachRDD may
- # # be called twice
- # if 'FileAlreadyExistsException' not in str(e):
- # raise
- # return self.foreachRDD(saveAsPickleFile)
-
- def transform(self, func):
- """
- Return a new DStream in which each RDD is generated by applying a
function
- on each RDD of this DStream.
-
- `func` can have one argument of `rdd`, or have two arguments of
- (`time`, `rdd`)
- """
- if func.__code__.co_argcount == 1:
- oldfunc = func
- func = lambda t, rdd: oldfunc(rdd)
- assert func.__code__.co_argcount == 2, "func should take one or two
arguments"
- return TransformedDStream(self, func)
-
- def transformWith(self, func, other, keepSerializer=False):
- """
- Return a new DStream in which each RDD is generated by applying a
function
- on each RDD of this DStream and 'other' DStream.
-
- `func` can have two arguments of (`rdd_a`, `rdd_b`) or have three
- arguments of (`time`, `rdd_a`, `rdd_b`)
- """
- if func.__code__.co_argcount == 2:
- oldfunc = func
- func = lambda t, a, b: oldfunc(a, b)
- assert func.__code__.co_argcount == 3, "func should take two or three
arguments"
- jfunc = TransformFunction(self._sc, func, self._jrdd_deserializer,
other._jrdd_deserializer)
- dstream =
self._sc._jvm.PythonTransformed2DStream(self._jdstream.dstream(),
-
other._jdstream.dstream(), jfunc)
- jrdd_serializer = self._jrdd_deserializer if keepSerializer else
self._sc.serializer
- return DStream(dstream.asJavaDStream(), self._ssc, jrdd_serializer)
-
- def repartition(self, numPartitions):
- """
- Return a new DStream with an increased or decreased level of
parallelism.
- """
- return self.transform(lambda rdd: rdd.repartition(numPartitions))
-
- @property
- def _slideDuration(self):
- """
- Return the slideDuration in seconds of this DStream
- """
- return self._jdstream.dstream().slideDuration().milliseconds() / 1000.0
-
- def union(self, other):
- """
- Return a new DStream by unifying data of another DStream with this
DStream.
-
- @param other: Another DStream having the same interval (i.e.,
slideDuration)
- as this DStream.
- """
- if self._slideDuration != other._slideDuration:
- raise ValueError("the two DStream should have same slide duration")
- return self.transformWith(lambda a, b: a.union(b), other, True)
-
- def cogroup(self, other, numPartitions=None):
- """
- Return a new DStream by applying 'cogroup' between RDDs of this
- DStream and `other` DStream.
-
- Hash partitioning is used to generate the RDDs with `numPartitions`
partitions.
- """
- if numPartitions is None:
- numPartitions = self._sc.defaultParallelism
- return self.transformWith(lambda a, b: a.cogroup(b, numPartitions),
other)
-
- def join(self, other, numPartitions=None):
- """
- Return a new DStream by applying 'join' between RDDs of this DStream
and
- `other` DStream.
-
- Hash partitioning is used to generate the RDDs with `numPartitions`
- partitions.
- """
- if numPartitions is None:
- numPartitions = self._sc.defaultParallelism
- return self.transformWith(lambda a, b: a.join(b, numPartitions), other)
-
- def leftOuterJoin(self, other, numPartitions=None):
- """
- Return a new DStream by applying 'left outer join' between RDDs of
this DStream and
- `other` DStream.
-
- Hash partitioning is used to generate the RDDs with `numPartitions`
- partitions.
- """
- if numPartitions is None:
- numPartitions = self._sc.defaultParallelism
- return self.transformWith(lambda a, b: a.leftOuterJoin(b,
numPartitions), other)
-
- def rightOuterJoin(self, other, numPartitions=None):
- """
- Return a new DStream by applying 'right outer join' between RDDs of
this DStream and
- `other` DStream.
-
- Hash partitioning is used to generate the RDDs with `numPartitions`
- partitions.
- """
- if numPartitions is None:
- numPartitions = self._sc.defaultParallelism
- return self.transformWith(lambda a, b: a.rightOuterJoin(b,
numPartitions), other)
-
- def fullOuterJoin(self, other, numPartitions=None):
- """
- Return a new DStream by applying 'full outer join' between RDDs of
this DStream and
- `other` DStream.
-
- Hash partitioning is used to generate the RDDs with `numPartitions`
- partitions.
- """
- if numPartitions is None:
- numPartitions = self._sc.defaultParallelism
- return self.transformWith(lambda a, b: a.fullOuterJoin(b,
numPartitions), other)
-
- def _jtime(self, timestamp):
- """ Convert datetime or unix_timestamp into Time
- """
- if isinstance(timestamp, datetime):
- timestamp = time.mktime(timestamp.timetuple())
- return self._sc._jvm.Time(long(timestamp * 1000))
-
- def slice(self, begin, end):
- """
- Return all the RDDs between 'begin' to 'end' (both included)
-
- `begin`, `end` could be datetime.datetime() or unix_timestamp
- """
- jrdds = self._jdstream.slice(self._jtime(begin), self._jtime(end))
- return [RDD(jrdd, self._sc, self._jrdd_deserializer) for jrdd in jrdds]
-
- def _validate_window_param(self, window, slide):
- duration = self._jdstream.dstream().slideDuration().milliseconds()
- if int(window * 1000) % duration != 0:
- raise ValueError("windowDuration must be multiple of the slide
duration (%d ms)"
- % duration)
- if slide and int(slide * 1000) % duration != 0:
- raise ValueError("slideDuration must be multiple of the slide
duration (%d ms)"
- % duration)
-
- def window(self, windowDuration, slideDuration=None):
- """
- Return a new DStream in which each RDD contains all the elements in
seen in a
- sliding window of time over this DStream.
-
- @param windowDuration: width of the window; must be a multiple of this
DStream's
- batching interval
- @param slideDuration: sliding interval of the window (i.e., the
interval after which
- the new DStream will generate RDDs); must be a
multiple of this
- DStream's batching interval
- """
- self._validate_window_param(windowDuration, slideDuration)
- d = self._ssc._jduration(windowDuration)
- if slideDuration is None:
- return DStream(self._jdstream.window(d), self._ssc,
self._jrdd_deserializer)
- s = self._ssc._jduration(slideDuration)
- return DStream(self._jdstream.window(d, s), self._ssc,
self._jrdd_deserializer)
-
- def reduceByWindow(self, reduceFunc, invReduceFunc, windowDuration,
slideDuration):
- """
- Return a new DStream in which each RDD has a single element generated
by reducing all
- elements in a sliding window over this DStream.
-
- if `invReduceFunc` is not None, the reduction is done incrementally
- using the old window's reduced value :
-
- 1. reduce the new values that entered the window (e.g., adding new
counts)
-
- 2. "inverse reduce" the old values that left the window (e.g.,
subtracting old counts)
- This is more efficient than `invReduceFunc` is None.
-
- @param reduceFunc: associative and commutative reduce function
- @param invReduceFunc: inverse reduce function of `reduceFunc`
- @param windowDuration: width of the window; must be a multiple of this
DStream's
- batching interval
- @param slideDuration: sliding interval of the window (i.e., the
interval after which
- the new DStream will generate RDDs); must be a
multiple of this
- DStream's batching interval
- """
- keyed = self.map(lambda x: (1, x))
- reduced = keyed.reduceByKeyAndWindow(reduceFunc, invReduceFunc,
- windowDuration, slideDuration, 1)
- return reduced.map(lambda kv: kv[1])
-
- def countByWindow(self, windowDuration, slideDuration):
- """
- Return a new DStream in which each RDD has a single element generated
- by counting the number of elements in a window over this DStream.
- windowDuration and slideDuration are as defined in the window()
operation.
-
- This is equivalent to window(windowDuration, slideDuration).count(),
- but will be more efficient if window is large.
- """
- return self.map(lambda x: 1).reduceByWindow(operator.add, operator.sub,
- windowDuration,
slideDuration)
-
- def countByValueAndWindow(self, windowDuration, slideDuration,
numPartitions=None):
- """
- Return a new DStream in which each RDD contains the count of distinct
elements in
- RDDs in a sliding window over this DStream.
-
- @param windowDuration: width of the window; must be a multiple of this
DStream's
- batching interval
- @param slideDuration: sliding interval of the window (i.e., the
interval after which
- the new DStream will generate RDDs); must be a
multiple of this
- DStream's batching interval
- @param numPartitions: number of partitions of each RDD in the new
DStream.
- """
- keyed = self.map(lambda x: (x, 1))
- counted = keyed.reduceByKeyAndWindow(operator.add, operator.sub,
- windowDuration, slideDuration,
numPartitions)
- return counted.filter(lambda kv: kv[1] > 0)
-
- def groupByKeyAndWindow(self, windowDuration, slideDuration,
numPartitions=None):
- """
- Return a new DStream by applying `groupByKey` over a sliding window.
- Similar to `DStream.groupByKey()`, but applies it over a sliding
window.
-
- @param windowDuration: width of the window; must be a multiple of this
DStream's
- batching interval
- @param slideDuration: sliding interval of the window (i.e., the
interval after which
- the new DStream will generate RDDs); must be a
multiple of this
- DStream's batching interval
- @param numPartitions: Number of partitions of each RDD in the new
DStream.
- """
- ls = self.mapValues(lambda x: [x])
- grouped = ls.reduceByKeyAndWindow(lambda a, b: a.extend(b) or a,
lambda a, b: a[len(b):],
- windowDuration, slideDuration,
numPartitions)
- return grouped.mapValues(ResultIterable)
-
- def reduceByKeyAndWindow(self, func, invFunc, windowDuration,
slideDuration=None,
- numPartitions=None, filterFunc=None):
- """
- Return a new DStream by applying incremental `reduceByKey` over a
sliding window.
-
- The reduced value of over a new window is calculated using the old
window's reduce value :
- 1. reduce the new values that entered the window (e.g., adding new
counts)
- 2. "inverse reduce" the old values that left the window (e.g.,
subtracting old counts)
-
- `invFunc` can be None, then it will reduce all the RDDs in window,
could be slower
- than having `invFunc`.
-
- @param func: associative and commutative reduce function
- @param invFunc: inverse function of `reduceFunc`
- @param windowDuration: width of the window; must be a multiple of this
DStream's
- batching interval
- @param slideDuration: sliding interval of the window (i.e., the
interval after which
- the new DStream will generate RDDs); must be a
multiple of this
- DStream's batching interval
- @param numPartitions: number of partitions of each RDD in the new
DStream.
- @param filterFunc: function to filter expired key-value pairs;
- only pairs that satisfy the function are retained
- set this to null if you do not want to filter
- """
- self._validate_window_param(windowDuration, slideDuration)
- if numPartitions is None:
- numPartitions = self._sc.defaultParallelism
-
- reduced = self.reduceByKey(func, numPartitions)
-
- if invFunc:
- def reduceFunc(t, a, b):
- b = b.reduceByKey(func, numPartitions)
- r = a.union(b).reduceByKey(func, numPartitions) if a else b
- if filterFunc:
- r = r.filter(filterFunc)
- return r
-
- def invReduceFunc(t, a, b):
- b = b.reduceByKey(func, numPartitions)
- joined = a.leftOuterJoin(b, numPartitions)
- return joined.mapValues(lambda kv: invFunc(kv[0], kv[1])
- if kv[1] is not None else kv[0])
-
- jreduceFunc = TransformFunction(self._sc, reduceFunc,
reduced._jrdd_deserializer)
- jinvReduceFunc = TransformFunction(self._sc, invReduceFunc,
reduced._jrdd_deserializer)
- if slideDuration is None:
- slideDuration = self._slideDuration
- dstream = self._sc._jvm.PythonReducedWindowedDStream(
- reduced._jdstream.dstream(),
- jreduceFunc, jinvReduceFunc,
- self._ssc._jduration(windowDuration),
- self._ssc._jduration(slideDuration))
- return DStream(dstream.asJavaDStream(), self._ssc,
self._sc.serializer)
- else:
- return reduced.window(windowDuration,
slideDuration).reduceByKey(func, numPartitions)
-
- def updateStateByKey(self, updateFunc, numPartitions=None,
initialRDD=None):
- """
- Return a new "state" DStream where the state for each key is updated
by applying
- the given function on the previous state of the key and the new values
of the key.
-
- @param updateFunc: State update function. If this function returns
None, then
- corresponding state key-value pair will be
eliminated.
- """
- if numPartitions is None:
- numPartitions = self._sc.defaultParallelism
-
- if initialRDD and not isinstance(initialRDD, RDD):
- initialRDD = self._sc.parallelize(initialRDD)
-
- def reduceFunc(t, a, b):
- if a is None:
- g = b.groupByKey(numPartitions).mapValues(lambda vs:
(list(vs), None))
- else:
- g = a.cogroup(b.partitionBy(numPartitions), numPartitions)
- g = g.mapValues(lambda ab: (list(ab[1]), list(ab[0])[0] if
len(ab[0]) else None))
- state = g.mapValues(lambda vs_s: updateFunc(vs_s[0], vs_s[1]))
- return state.filter(lambda k_v: k_v[1] is not None)
-
- jreduceFunc = TransformFunction(self._sc, reduceFunc,
- self._sc.serializer,
self._jrdd_deserializer)
- if initialRDD:
- initialRDD = initialRDD._reserialize(self._jrdd_deserializer)
- dstream =
self._sc._jvm.PythonStateDStream(self._jdstream.dstream(), jreduceFunc,
- initialRDD._jrdd)
- else:
- dstream =
self._sc._jvm.PythonStateDStream(self._jdstream.dstream(), jreduceFunc)
-
- return DStream(dstream.asJavaDStream(), self._ssc, self._sc.serializer)
-
-
-class TransformedDStream(DStream):
- """
- TransformedDStream is an DStream generated by an Python function
- transforming each RDD of an DStream to another RDDs.
-
- Multiple continuous transformations of DStream can be combined into
- one transformation.
- """
- def __init__(self, prev, func):
- self._ssc = prev._ssc
- self._sc = self._ssc._sc
- self._jrdd_deserializer = self._sc.serializer
- self.is_cached = False
- self.is_checkpointed = False
- self._jdstream_val = None
-
- # Using type() to avoid folding the functions and compacting the
DStreams which is not
- # not strictly a object of TransformedDStream.
- # Changed here is to avoid bug in KafkaTransformedDStream when calling
offsetRanges().
- if (type(prev) is TransformedDStream and
- not prev.is_cached and not prev.is_checkpointed):
- prev_func = prev.func
- self.func = lambda t, rdd: func(t, prev_func(t, rdd))
- self.prev = prev.prev
- else:
- self.prev = prev
- self.func = func
-
- @property
- def _jdstream(self):
- if self._jdstream_val is not None:
- return self._jdstream_val
-
- jfunc = TransformFunction(self._sc, self.func,
self.prev._jrdd_deserializer)
- dstream =
self._sc._jvm.PythonTransformedDStream(self.prev._jdstream.dstream(), jfunc)
- self._jdstream_val = dstream.asJavaDStream()
- return self._jdstream_val
http://git-wip-us.apache.org/repos/asf/bahir/blob/12f13084/streaming-mqtt/python/mqtt.py
----------------------------------------------------------------------
diff --git a/streaming-mqtt/python/mqtt.py b/streaming-mqtt/python/mqtt.py
index 8848a70..c55b704 100644
--- a/streaming-mqtt/python/mqtt.py
+++ b/streaming-mqtt/python/mqtt.py
@@ -38,19 +38,26 @@ class MQTTUtils(object):
:param storageLevel: RDD storage level.
:return: A DStream object
"""
+ jlevel = ssc._sc._getJavaStorageLevel(storageLevel)
+ helper = MQTTUtils._get_helper(ssc._sc)
+ jstream = helper.createStream(ssc._jssc, brokerUrl, topic, jlevel)
+ return DStream(jstream, ssc, UTF8Deserializer())
+
+ @staticmethod
+ def _get_helper(sc):
try:
- helper =
ssc._jvm.org.apache.spark.streaming.mqtt.MQTTUtilsPythonHelper()
+ return
sc._jvm.org.apache.spark.streaming.mqtt.MQTTUtilsPythonHelper()
except TypeError as e:
if str(e) == "'JavaPackage' object is not callable":
- MQTTUtils._printErrorMsg(ssc.sparkContext)
+ MQTTUtils._printErrorMsg(sc)
raise
- jlevel = ssc._sc._getJavaStorageLevel(storageLevel)
- jstream = helper.createStream(ssc._jssc, brokerUrl, topic, jlevel)
- return DStream(jstream, ssc, UTF8Deserializer())
-
@staticmethod
def _printErrorMsg(sc):
+ scalaVersionString = sc._jvm.scala.util.Properties.versionString()
+ import re
+ scalaVersion = re.sub(r'version (\d+\.\d+)\.\d+', r'\1',
scalaVersionString)
+ sparkVersion = re.sub(r'(\d+\.\d+\.\d+).*', r'\1', sc.version)
print("""
________________________________________________________________________________________________
@@ -59,12 +66,12 @@
________________________________________________________________________________
1. Include the MQTT library and its dependencies with in the
spark-submit command as
- $ bin/spark-submit --packages org.apache.spark:spark-streaming-mqtt:%s ...
+ ${SPARK_HOME}/bin/spark-submit --packages
org.apache.bahir:spark-streaming-mqtt_%s:%s ...
2. Download the JAR of the artifact from Maven Central
http://search.maven.org/,
- Group Id = org.apache.spark, Artifact Id = spark-streaming-mqtt-assembly,
Version = %s.
+ Group Id = org.apache.bahir, Artifact Id = spark-streaming-mqtt, Version
= %s.
Then, include the jar in the spark-submit command as
- $ bin/spark-submit --jars <spark-streaming-mqtt-assembly.jar> ...
+ ${SPARK_HOME}/bin/spark-submit --jars <spark-streaming-mqtt.jar> ...
________________________________________________________________________________________________
-""" % (sc.version, sc.version))
+""" % (scalaVersion, sparkVersion, sparkVersion))
http://git-wip-us.apache.org/repos/asf/bahir/blob/12f13084/streaming-mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala
----------------------------------------------------------------------
diff --git
a/streaming-mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala
b/streaming-mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala
index 80ab27b..3ce7511 100644
---
a/streaming-mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala
+++
b/streaming-mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala
@@ -23,6 +23,7 @@ import java.nio.charset.StandardCharsets
import scala.language.postfixOps
import org.apache.activemq.broker.{BrokerService, TransportConnector}
+import org.apache.activemq.usage.SystemUsage
import org.apache.commons.lang3.RandomUtils
import org.eclipse.paho.client.mqttv3._
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
@@ -41,6 +42,7 @@ private[mqtt] class MQTTTestUtils extends Logging {
private val brokerPort = findFreePort()
private var broker: BrokerService = _
+ private var systemUsage: SystemUsage = _
private var connector: TransportConnector = _
def brokerUri: String = {
@@ -50,6 +52,10 @@ private[mqtt] class MQTTTestUtils extends Logging {
def setup(): Unit = {
broker = new BrokerService()
broker.setDataDirectoryFile(Utils.createTempDir())
+ broker.getSystemUsage().setSendFailIfNoSpace(false)
+ systemUsage = broker.getSystemUsage()
+ systemUsage.getStoreUsage().setLimit(1024L * 1024 * 256); // 256 MB
(default: 100 GB)
+ systemUsage.getTempUsage().setLimit(1024L * 1024 * 128); // 128 MB
(default: 50 GB)
connector = new TransportConnector()
connector.setName("mqtt")
connector.setUri(new URI("mqtt://" + brokerUri))