Repository: spark
Updated Branches:
  refs/heads/master db436e36c -> 8df4dad49


[SPARK-2871] [PySpark] add approx API for RDD

RDD.countApprox(self, timeout, confidence=0.95)

        :: Experimental ::
        Approximate version of count() that returns a potentially incomplete
        result within a timeout, even if not all tasks have finished.

        >>> rdd = sc.parallelize(range(1000), 10)
        >>> rdd.countApprox(1000, 1.0)
        1000

RDD.sumApprox(self, timeout, confidence=0.95)

        Approximate operation to return the sum within a timeout
        or meet the confidence.

        >>> rdd = sc.parallelize(range(1000), 10)
        >>> r = sum(xrange(1000))
        >>> (rdd.sumApprox(1000) - r) / r < 0.05

RDD.meanApprox(self, timeout, confidence=0.95)

        :: Experimental ::
        Approximate operation to return the mean within a timeout
        or meet the confidence.

        >>> rdd = sc.parallelize(range(1000), 10)
        >>> r = sum(xrange(1000)) / 1000.0
        >>> (rdd.meanApprox(1000) - r) / r < 0.05
        True

Author: Davies Liu <[email protected]>

Closes #2095 from davies/approx and squashes the following commits:

e8c252b [Davies Liu] add approx API for RDD


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8df4dad4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8df4dad4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8df4dad4

Branch: refs/heads/master
Commit: 8df4dad4951ca6e687df1288331909878922a55f
Parents: db436e3
Author: Davies Liu <[email protected]>
Authored: Sat Aug 23 19:33:34 2014 -0700
Committer: Josh Rosen <[email protected]>
Committed: Sat Aug 23 19:33:34 2014 -0700

----------------------------------------------------------------------
 .../org/apache/spark/api/python/PythonRDD.scala | 17 ++++
 python/pyspark/rdd.py                           | 81 ++++++++++++++++++++
 2 files changed, 98 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8df4dad4/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 7470238..ae80103 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -749,6 +749,23 @@ private[spark] object PythonRDD extends Logging {
       }
     }
   }
+
+  /**
+    * Convert an RDD of serialized Python objects to RDD of objects, that is 
usable by PySpark.
+    */
+  def pythonToJava(pyRDD: JavaRDD[Array[Byte]], batched: Boolean): 
JavaRDD[Any] = {
+    pyRDD.rdd.mapPartitions { iter =>
+      val unpickle = new Unpickler
+      iter.flatMap { row =>
+        val obj = unpickle.loads(row)
+        if (batched) {
+          obj.asInstanceOf[JArrayList[_]]
+        } else {
+          Seq(obj)
+        }
+      }
+    }.toJavaRDD()
+  }
 }
 
 private

http://git-wip-us.apache.org/repos/asf/spark/blob/8df4dad4/python/pyspark/rdd.py
----------------------------------------------------------------------
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index bdd8bc82..9f88340 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -131,6 +131,22 @@ class _JavaStackTrace(object):
             self._context._jsc.setCallSite(None)
 
 
+class BoundedFloat(float):
+    """
+    Bounded value is generated by approximate job, with confidence and low
+    bound and high bound.
+
+    >>> BoundedFloat(100.0, 0.95, 95.0, 105.0)
+    100.0
+    """
+    def __new__(cls, mean, confidence, low, high):
+        obj = float.__new__(cls, mean)
+        obj.confidence = confidence
+        obj.low = low
+        obj.high = high
+        return obj
+
+
 class MaxHeapQ(object):
 
     """
@@ -1792,6 +1808,71 @@ class RDD(object):
     # keys in the pairs.  This could be an expensive operation, since those
     # hashes aren't retained.
 
+    def _is_pickled(self):
+        """ Return this RDD is serialized by Pickle or not. """
+        der = self._jrdd_deserializer
+        if isinstance(der, PickleSerializer):
+            return True
+        if isinstance(der, BatchedSerializer) and isinstance(der.serializer, 
PickleSerializer):
+            return True
+        return False
+
+    def _to_jrdd(self):
+        """ Return an JavaRDD of Object by unpickling
+
+        It will convert each Python object into Java object by Pyrolite, 
whenever the
+        RDD is serialized in batch or not.
+        """
+        if not self._is_pickled():
+            self = self._reserialize(BatchedSerializer(PickleSerializer(), 
1024))
+        batched = isinstance(self._jrdd_deserializer, BatchedSerializer)
+        return self.ctx._jvm.PythonRDD.pythonToJava(self._jrdd, batched)
+
+    def countApprox(self, timeout, confidence=0.95):
+        """
+        :: Experimental ::
+        Approximate version of count() that returns a potentially incomplete
+        result within a timeout, even if not all tasks have finished.
+
+        >>> rdd = sc.parallelize(range(1000), 10)
+        >>> rdd.countApprox(1000, 1.0)
+        1000
+        """
+        drdd = self.mapPartitions(lambda it: [float(sum(1 for i in it))])
+        return int(drdd.sumApprox(timeout, confidence))
+
+    def sumApprox(self, timeout, confidence=0.95):
+        """
+        :: Experimental ::
+        Approximate operation to return the sum within a timeout
+        or meet the confidence.
+
+        >>> rdd = sc.parallelize(range(1000), 10)
+        >>> r = sum(xrange(1000))
+        >>> (rdd.sumApprox(1000) - r) / r < 0.05
+        True
+        """
+        jrdd = self.mapPartitions(lambda it: [float(sum(it))])._to_jrdd()
+        jdrdd = self.ctx._jvm.JavaDoubleRDD.fromRDD(jrdd.rdd())
+        r = jdrdd.sumApprox(timeout, confidence).getFinalValue()
+        return BoundedFloat(r.mean(), r.confidence(), r.low(), r.high())
+
+    def meanApprox(self, timeout, confidence=0.95):
+        """
+        :: Experimental ::
+        Approximate operation to return the mean within a timeout
+        or meet the confidence.
+
+        >>> rdd = sc.parallelize(range(1000), 10)
+        >>> r = sum(xrange(1000)) / 1000.0
+        >>> (rdd.meanApprox(1000) - r) / r < 0.05
+        True
+        """
+        jrdd = self.map(float)._to_jrdd()
+        jdrdd = self.ctx._jvm.JavaDoubleRDD.fromRDD(jrdd.rdd())
+        r = jdrdd.meanApprox(timeout, confidence).getFinalValue()
+        return BoundedFloat(r.mean(), r.confidence(), r.low(), r.high())
+
 
 class PipelinedRDD(RDD):
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to