[SPARK-12353][STREAMING][PYSPARK] Fix countByValue inconsistent output in Python API
The semantics of Python countByValue is different from Scala API, it is more like countDistinctValue, so here change to make it consistent with Scala/Java API. Author: jerryshao <[email protected]> Closes #10350 from jerryshao/SPARK-12353. Project: http://git-wip-us.apache.org/repos/asf/bahir/repo Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/47e0e0ed Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/47e0e0ed Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/47e0e0ed Branch: refs/heads/master Commit: 47e0e0edd2974140e7ca88924681319487cfbc92 Parents: e2d30ef Author: jerryshao <[email protected]> Authored: Mon Dec 28 10:43:23 2015 +0000 Committer: Sean Owen <[email protected]> Committed: Mon Dec 28 10:43:23 2015 +0000 ---------------------------------------------------------------------- streaming-mqtt/python/dstream.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bahir/blob/47e0e0ed/streaming-mqtt/python/dstream.py ---------------------------------------------------------------------- diff --git a/streaming-mqtt/python/dstream.py b/streaming-mqtt/python/dstream.py index adc2651..86447f5 100644 --- a/streaming-mqtt/python/dstream.py +++ b/streaming-mqtt/python/dstream.py @@ -247,7 +247,7 @@ class DStream(object): Return a new DStream in which each RDD contains the counts of each distinct value in each RDD of this DStream. """ - return self.map(lambda x: (x, None)).reduceByKey(lambda x, y: None).count() + return self.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x+y) def saveAsTextFiles(self, prefix, suffix=None): """ @@ -493,7 +493,7 @@ class DStream(object): keyed = self.map(lambda x: (x, 1)) counted = keyed.reduceByKeyAndWindow(operator.add, operator.sub, windowDuration, slideDuration, numPartitions) - return counted.filter(lambda kv: kv[1] > 0).count() + return counted.filter(lambda kv: kv[1] > 0) def groupByKeyAndWindow(self, windowDuration, slideDuration, numPartitions=None): """
