Repository: spark Updated Branches: refs/heads/master 2409af9dc -> af7f2f109
Spark-1163, Added missing Python RDD functions Author: prabinb <[email protected]> Closes #92 from prabinb/python-api-rdd and squashes the following commits: 51129ca [prabinb] Added missing Python RDD functions Added __repr__ function to StorageLevel class. Added doctest for RDD.getStorageLevel(). Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/af7f2f10 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/af7f2f10 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/af7f2f10 Branch: refs/heads/master Commit: af7f2f10902c7b42e08797f7467dd06e4803594c Parents: 2409af9 Author: prabinb <[email protected]> Authored: Tue Mar 11 23:57:05 2014 -0700 Committer: Patrick Wendell <[email protected]> Committed: Tue Mar 11 23:57:05 2014 -0700 ---------------------------------------------------------------------- python/pyspark/rdd.py | 42 +++++++++++++++++++++++++++++++++++++ python/pyspark/storagelevel.py | 4 ++++ 2 files changed, 46 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/af7f2f10/python/pyspark/rdd.py ---------------------------------------------------------------------- diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 39916d2..0f28dbd 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -36,6 +36,7 @@ from pyspark.join import python_join, python_left_outer_join, \ python_right_outer_join, python_cogroup from pyspark.statcounter import StatCounter from pyspark.rddsampler import RDDSampler +from pyspark.storagelevel import StorageLevel from py4j.java_collections import ListConverter, MapConverter @@ -1119,6 +1120,47 @@ class RDD(object): other._jrdd_deserializer) return RDD(pairRDD, self.ctx, deserializer) + def name(self): + """ + Return the name of this RDD. + """ + name_ = self._jrdd.name() + if not name_: + return None + return name_.encode('utf-8') + + def setName(self, name): + """ + Assign a name to this RDD. + >>> rdd1 = sc.parallelize([1,2]) + >>> rdd1.setName('RDD1') + >>> rdd1.name() + 'RDD1' + """ + self._jrdd.setName(name) + + def toDebugString(self): + """ + A description of this RDD and its recursive dependencies for debugging. + """ + debug_string = self._jrdd.toDebugString() + if not debug_string: + return None + return debug_string.encode('utf-8') + + def getStorageLevel(self): + """ + Get the RDD's current storage level. + >>> rdd1 = sc.parallelize([1,2]) + >>> rdd1.getStorageLevel() + StorageLevel(False, False, False, 1) + """ + java_storage_level = self._jrdd.getStorageLevel() + storage_level = StorageLevel(java_storage_level.useDisk(), + java_storage_level.useMemory(), + java_storage_level.deserialized(), + java_storage_level.replication()) + return storage_level # TODO: `lookup` is disabled because we can't make direct comparisons based # on the key; we need to compare the hash of the key to the hash of the http://git-wip-us.apache.org/repos/asf/spark/blob/af7f2f10/python/pyspark/storagelevel.py ---------------------------------------------------------------------- diff --git a/python/pyspark/storagelevel.py b/python/pyspark/storagelevel.py index b31f476..c3e3a44 100644 --- a/python/pyspark/storagelevel.py +++ b/python/pyspark/storagelevel.py @@ -31,6 +31,10 @@ class StorageLevel: self.deserialized = deserialized self.replication = replication + def __repr__(self): + return "StorageLevel(%s, %s, %s, %s)" % ( + self.useDisk, self.useMemory, self.deserialized, self.replication) + StorageLevel.DISK_ONLY = StorageLevel(True, False, False) StorageLevel.DISK_ONLY_2 = StorageLevel(True, False, False, 2) StorageLevel.MEMORY_ONLY = StorageLevel(False, True, True)
