Repository: spark Updated Branches: refs/heads/master 21a1e1bb8 -> 110fb8b24
[SPARK-2334] fix AttributeError when call PipelineRDD.id() The underline JavaRDD for PipelineRDD is created lazily, it's delayed until call _jrdd. The id of JavaRDD is cached as `_id`, it saves a RPC call in py4j for later calls. closes #1276 Author: Davies Liu <[email protected]> Closes #2296 from davies/id and squashes the following commits: e197958 [Davies Liu] fix style 9721716 [Davies Liu] fix id of PipelineRDD Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/110fb8b2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/110fb8b2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/110fb8b2 Branch: refs/heads/master Commit: 110fb8b24d2454ad7c979c3934dbed87650f17b8 Parents: 21a1e1b Author: Davies Liu <[email protected]> Authored: Sat Sep 6 16:12:29 2014 -0700 Committer: Josh Rosen <[email protected]> Committed: Sat Sep 6 16:12:29 2014 -0700 ---------------------------------------------------------------------- python/pyspark/rdd.py | 6 ++++++ python/pyspark/sql.py | 9 +++++---- python/pyspark/tests.py | 9 +++++++++ 3 files changed, 20 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/110fb8b2/python/pyspark/rdd.py ---------------------------------------------------------------------- diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index aa90297..266090e 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -2075,6 +2075,7 @@ class PipelinedRDD(RDD): self.ctx = prev.ctx self.prev = prev self._jrdd_val = None + self._id = None self._jrdd_deserializer = self.ctx.serializer self._bypass_serializer = False self._partitionFunc = prev._partitionFunc if self.preservesPartitioning else None @@ -2105,6 +2106,11 @@ class PipelinedRDD(RDD): self._jrdd_val = python_rdd.asJavaRDD() return self._jrdd_val + def id(self): + if self._id is None: + self._id = self._jrdd.id() + return self._id + def _is_pipelinable(self): return not (self.is_cached or self.is_checkpointed) http://git-wip-us.apache.org/repos/asf/spark/blob/110fb8b2/python/pyspark/sql.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py index 97a51b9..004d493 100644 --- a/python/pyspark/sql.py +++ b/python/pyspark/sql.py @@ -1525,7 +1525,7 @@ class SchemaRDD(RDD): self.sql_ctx = sql_ctx self._sc = sql_ctx._sc self._jschema_rdd = jschema_rdd - + self._id = None self.is_cached = False self.is_checkpointed = False self.ctx = self.sql_ctx._sc @@ -1543,9 +1543,10 @@ class SchemaRDD(RDD): self._lazy_jrdd = self._jschema_rdd.javaToPython() return self._lazy_jrdd - @property - def _id(self): - return self._jrdd.id() + def id(self): + if self._id is None: + self._id = self._jrdd.id() + return self._id def saveAsParquetFile(self, path): """Save the contents as a Parquet file, preserving the schema. http://git-wip-us.apache.org/repos/asf/spark/blob/110fb8b2/python/pyspark/tests.py ---------------------------------------------------------------------- diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 3e74799..2ade15b 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -281,6 +281,15 @@ class TestAddFile(PySparkTestCase): class TestRDDFunctions(PySparkTestCase): + def test_id(self): + rdd = self.sc.parallelize(range(10)) + id = rdd.id() + self.assertEqual(id, rdd.id()) + rdd2 = rdd.map(str).filter(bool) + id2 = rdd2.id() + self.assertEqual(id + 1, id2) + self.assertEqual(id2, rdd2.id()) + def test_failed_sparkcontext_creation(self): # Regression test for SPARK-1550 self.sc.stop() --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
