[SPARK-12353][STREAMING][PYSPARK] Fix countByValue inconsistent output in 
Python API

The semantics of Python countByValue is different from Scala API, it is more 
like countDistinctValue, so here change to make it consistent with Scala/Java 
API.

Author: jerryshao <[email protected]>

Closes #10350 from jerryshao/SPARK-12353.


Project: http://git-wip-us.apache.org/repos/asf/bahir/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/47e0e0ed
Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/47e0e0ed
Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/47e0e0ed

Branch: refs/heads/master
Commit: 47e0e0edd2974140e7ca88924681319487cfbc92
Parents: e2d30ef
Author: jerryshao <[email protected]>
Authored: Mon Dec 28 10:43:23 2015 +0000
Committer: Sean Owen <[email protected]>
Committed: Mon Dec 28 10:43:23 2015 +0000

----------------------------------------------------------------------
 streaming-mqtt/python/dstream.py | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bahir/blob/47e0e0ed/streaming-mqtt/python/dstream.py
----------------------------------------------------------------------
diff --git a/streaming-mqtt/python/dstream.py b/streaming-mqtt/python/dstream.py
index adc2651..86447f5 100644
--- a/streaming-mqtt/python/dstream.py
+++ b/streaming-mqtt/python/dstream.py
@@ -247,7 +247,7 @@ class DStream(object):
         Return a new DStream in which each RDD contains the counts of each
         distinct value in each RDD of this DStream.
         """
-        return self.map(lambda x: (x, None)).reduceByKey(lambda x, y: 
None).count()
+        return self.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x+y)
 
     def saveAsTextFiles(self, prefix, suffix=None):
         """
@@ -493,7 +493,7 @@ class DStream(object):
         keyed = self.map(lambda x: (x, 1))
         counted = keyed.reduceByKeyAndWindow(operator.add, operator.sub,
                                              windowDuration, slideDuration, 
numPartitions)
-        return counted.filter(lambda kv: kv[1] > 0).count()
+        return counted.filter(lambda kv: kv[1] > 0)
 
     def groupByKeyAndWindow(self, windowDuration, slideDuration, 
numPartitions=None):
         """

Reply via email to