Repository: spark Updated Branches: refs/heads/branch-1.4 eda1ff4ee -> 5118abb4e
[SPARK-11812][PYSPARK] invFunc=None works properly with python's reduceByKeyAndWindow invFunc is optional and can be None. Instead of invFunc (the parameter) invReduceFunc (a local function) was checked for trueness (that is, not None, in this context). A local function is never None, thus the case of invFunc=None (a common one when inverse reduction is not defined) was treated incorrectly, resulting in loss of data. In addition, the docstring used wrong parameter names, also fixed. Author: David Tolpin <[email protected]> Closes #9775 from dtolpin/master. (cherry picked from commit 599a8c6e2bf7da70b20ef3046f5ce099dfd637f8) Signed-off-by: Tathagata Das <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5118abb4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5118abb4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5118abb4 Branch: refs/heads/branch-1.4 Commit: 5118abb4eefb70d5290069caafdff23247f01721 Parents: eda1ff4 Author: David Tolpin <[email protected]> Authored: Thu Nov 19 13:57:23 2015 -0800 Committer: Tathagata Das <[email protected]> Committed: Thu Nov 19 13:58:47 2015 -0800 ---------------------------------------------------------------------- python/pyspark/streaming/dstream.py | 6 +++--- python/pyspark/streaming/tests.py | 11 +++++++++++ 2 files changed, 14 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/5118abb4/python/pyspark/streaming/dstream.py ---------------------------------------------------------------------- diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index ff09798..58ce91b 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -524,8 +524,8 @@ class DStream(object): `invFunc` can be None, then it will reduce all the RDDs in window, could be slower than having `invFunc`. - @param reduceFunc: associative reduce function - @param invReduceFunc: inverse function of `reduceFunc` + @param func: associative reduce function + @param invFunc: inverse function of `reduceFunc` @param windowDuration: width of the window; must be a multiple of this DStream's batching interval @param slideDuration: sliding interval of the window (i.e., the interval after which @@ -556,7 +556,7 @@ class DStream(object): if kv[1] is not None else kv[0]) jreduceFunc = TransformFunction(self._sc, reduceFunc, reduced._jrdd_deserializer) - if invReduceFunc: + if invFunc: jinvReduceFunc = TransformFunction(self._sc, invReduceFunc, reduced._jrdd_deserializer) else: jinvReduceFunc = None http://git-wip-us.apache.org/repos/asf/spark/blob/5118abb4/python/pyspark/streaming/tests.py ---------------------------------------------------------------------- diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index 46cb18b..ab1cc3f 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -433,6 +433,17 @@ class WindowFunctionTests(PySparkStreamingTestCase): self.assertRaises(ValueError, lambda: d1.reduceByKeyAndWindow(None, None, 0.1, 0.1)) self.assertRaises(ValueError, lambda: d1.reduceByKeyAndWindow(None, None, 1, 0.1)) + def test_reduce_by_key_and_window_with_none_invFunc(self): + input = [range(1), range(2), range(3), range(4), range(5), range(6)] + + def func(dstream): + return dstream.map(lambda x: (x, 1))\ + .reduceByKeyAndWindow(operator.add, None, 5, 1)\ + .filter(lambda kv: kv[1] > 0).count() + + expected = [[2], [4], [6], [6], [6], [6]] + self._test_func(input, func, expected) + class StreamingContextTests(PySparkStreamingTestCase): --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
