Repository: spark
Updated Branches:
refs/heads/master db436e36c -> 8df4dad49
[SPARK-2871] [PySpark] add approx API for RDD
RDD.countApprox(self, timeout, confidence=0.95)
:: Experimental ::
Approximate version of count() that returns a potentially incomplete
result within a timeout, even if not all tasks have finished.
>>> rdd = sc.parallelize(range(1000), 10)
>>> rdd.countApprox(1000, 1.0)
1000
RDD.sumApprox(self, timeout, confidence=0.95)
Approximate operation to return the sum within a timeout
or meet the confidence.
>>> rdd = sc.parallelize(range(1000), 10)
>>> r = sum(xrange(1000))
>>> (rdd.sumApprox(1000) - r) / r < 0.05
RDD.meanApprox(self, timeout, confidence=0.95)
:: Experimental ::
Approximate operation to return the mean within a timeout
or meet the confidence.
>>> rdd = sc.parallelize(range(1000), 10)
>>> r = sum(xrange(1000)) / 1000.0
>>> (rdd.meanApprox(1000) - r) / r < 0.05
True
Author: Davies Liu <[email protected]>
Closes #2095 from davies/approx and squashes the following commits:
e8c252b [Davies Liu] add approx API for RDD
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8df4dad4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8df4dad4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8df4dad4
Branch: refs/heads/master
Commit: 8df4dad4951ca6e687df1288331909878922a55f
Parents: db436e3
Author: Davies Liu <[email protected]>
Authored: Sat Aug 23 19:33:34 2014 -0700
Committer: Josh Rosen <[email protected]>
Committed: Sat Aug 23 19:33:34 2014 -0700
----------------------------------------------------------------------
.../org/apache/spark/api/python/PythonRDD.scala | 17 ++++
python/pyspark/rdd.py | 81 ++++++++++++++++++++
2 files changed, 98 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/8df4dad4/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 7470238..ae80103 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -749,6 +749,23 @@ private[spark] object PythonRDD extends Logging {
}
}
}
+
+ /**
+ * Convert an RDD of serialized Python objects to RDD of objects, that is
usable by PySpark.
+ */
+ def pythonToJava(pyRDD: JavaRDD[Array[Byte]], batched: Boolean):
JavaRDD[Any] = {
+ pyRDD.rdd.mapPartitions { iter =>
+ val unpickle = new Unpickler
+ iter.flatMap { row =>
+ val obj = unpickle.loads(row)
+ if (batched) {
+ obj.asInstanceOf[JArrayList[_]]
+ } else {
+ Seq(obj)
+ }
+ }
+ }.toJavaRDD()
+ }
}
private
http://git-wip-us.apache.org/repos/asf/spark/blob/8df4dad4/python/pyspark/rdd.py
----------------------------------------------------------------------
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index bdd8bc82..9f88340 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -131,6 +131,22 @@ class _JavaStackTrace(object):
self._context._jsc.setCallSite(None)
+class BoundedFloat(float):
+ """
+ Bounded value is generated by approximate job, with confidence and low
+ bound and high bound.
+
+ >>> BoundedFloat(100.0, 0.95, 95.0, 105.0)
+ 100.0
+ """
+ def __new__(cls, mean, confidence, low, high):
+ obj = float.__new__(cls, mean)
+ obj.confidence = confidence
+ obj.low = low
+ obj.high = high
+ return obj
+
+
class MaxHeapQ(object):
"""
@@ -1792,6 +1808,71 @@ class RDD(object):
# keys in the pairs. This could be an expensive operation, since those
# hashes aren't retained.
+ def _is_pickled(self):
+ """ Return this RDD is serialized by Pickle or not. """
+ der = self._jrdd_deserializer
+ if isinstance(der, PickleSerializer):
+ return True
+ if isinstance(der, BatchedSerializer) and isinstance(der.serializer,
PickleSerializer):
+ return True
+ return False
+
+ def _to_jrdd(self):
+ """ Return an JavaRDD of Object by unpickling
+
+ It will convert each Python object into Java object by Pyrolite,
whenever the
+ RDD is serialized in batch or not.
+ """
+ if not self._is_pickled():
+ self = self._reserialize(BatchedSerializer(PickleSerializer(),
1024))
+ batched = isinstance(self._jrdd_deserializer, BatchedSerializer)
+ return self.ctx._jvm.PythonRDD.pythonToJava(self._jrdd, batched)
+
+ def countApprox(self, timeout, confidence=0.95):
+ """
+ :: Experimental ::
+ Approximate version of count() that returns a potentially incomplete
+ result within a timeout, even if not all tasks have finished.
+
+ >>> rdd = sc.parallelize(range(1000), 10)
+ >>> rdd.countApprox(1000, 1.0)
+ 1000
+ """
+ drdd = self.mapPartitions(lambda it: [float(sum(1 for i in it))])
+ return int(drdd.sumApprox(timeout, confidence))
+
+ def sumApprox(self, timeout, confidence=0.95):
+ """
+ :: Experimental ::
+ Approximate operation to return the sum within a timeout
+ or meet the confidence.
+
+ >>> rdd = sc.parallelize(range(1000), 10)
+ >>> r = sum(xrange(1000))
+ >>> (rdd.sumApprox(1000) - r) / r < 0.05
+ True
+ """
+ jrdd = self.mapPartitions(lambda it: [float(sum(it))])._to_jrdd()
+ jdrdd = self.ctx._jvm.JavaDoubleRDD.fromRDD(jrdd.rdd())
+ r = jdrdd.sumApprox(timeout, confidence).getFinalValue()
+ return BoundedFloat(r.mean(), r.confidence(), r.low(), r.high())
+
+ def meanApprox(self, timeout, confidence=0.95):
+ """
+ :: Experimental ::
+ Approximate operation to return the mean within a timeout
+ or meet the confidence.
+
+ >>> rdd = sc.parallelize(range(1000), 10)
+ >>> r = sum(xrange(1000)) / 1000.0
+ >>> (rdd.meanApprox(1000) - r) / r < 0.05
+ True
+ """
+ jrdd = self.map(float)._to_jrdd()
+ jdrdd = self.ctx._jvm.JavaDoubleRDD.fromRDD(jrdd.rdd())
+ r = jdrdd.meanApprox(timeout, confidence).getFinalValue()
+ return BoundedFloat(r.mean(), r.confidence(), r.low(), r.high())
+
class PipelinedRDD(RDD):
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]