Repository: spark Updated Branches: refs/heads/branch-1.6 96e32db5c -> 84dab7260
[SPARK-13082][PYSPARK] Backport the fix of 'read.json(rdd)' in #10559 to branch-1.6 SPARK-13082 actually fixed by #10559. However, it's a big PR and not backported to 1.6. This PR just backported the fix of 'read.json(rdd)' to branch-1.6. Author: Shixiong Zhu <[email protected]> Closes #10988 from zsxwing/json-rdd. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/84dab726 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/84dab726 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/84dab726 Branch: refs/heads/branch-1.6 Commit: 84dab7260e9a33586ad4002cd826a5ae7c8c4141 Parents: 96e32db Author: Shixiong Zhu <[email protected]> Authored: Fri Jan 29 13:53:11 2016 -0800 Committer: Shixiong Zhu <[email protected]> Committed: Fri Jan 29 13:53:11 2016 -0800 ---------------------------------------------------------------------- python/pyspark/sql/readwriter.py | 12 +++++++++++- python/pyspark/sql/tests.py | 6 +++--- 2 files changed, 14 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/84dab726/python/pyspark/sql/readwriter.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index a3d7eca..97da3d9 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -177,7 +177,17 @@ class DataFrameReader(object): elif type(path) == list: return self._df(self._jreader.json(self._sqlContext._sc._jvm.PythonUtils.toSeq(path))) elif isinstance(path, RDD): - return self._df(self._jreader.json(path._jrdd)) + def func(iterator): + for x in iterator: + if not isinstance(x, basestring): + x = unicode(x) + if isinstance(x, unicode): + x = x.encode("utf-8") + yield x + keyed = path.mapPartitions(func) + keyed._bypass_serializer = True + jrdd = keyed._jrdd.map(self._sqlContext._jvm.BytesToString()) + return self._df(self._jreader.json(jrdd)) else: raise TypeError("path can be only string or RDD") http://git-wip-us.apache.org/repos/asf/spark/blob/84dab726/python/pyspark/sql/tests.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 8be9d92..6b06984 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -326,7 +326,7 @@ class SQLTests(ReusedPySparkTestCase): def test_basic_functions(self): rdd = self.sc.parallelize(['{"foo":"bar"}', '{"foo":"baz"}']) - df = self.sqlCtx.jsonRDD(rdd) + df = self.sqlCtx.read.json(rdd) df.count() df.collect() df.schema @@ -345,7 +345,7 @@ class SQLTests(ReusedPySparkTestCase): df.collect() def test_apply_schema_to_row(self): - df = self.sqlCtx.jsonRDD(self.sc.parallelize(["""{"a":2}"""])) + df = self.sqlCtx.read.json(self.sc.parallelize(["""{"a":2}"""])) df2 = self.sqlCtx.createDataFrame(df.map(lambda x: x), df.schema) self.assertEqual(df.collect(), df2.collect()) @@ -821,7 +821,7 @@ class SQLTests(ReusedPySparkTestCase): def test_help_command(self): # Regression test for SPARK-5464 rdd = self.sc.parallelize(['{"foo":"bar"}', '{"foo":"baz"}']) - df = self.sqlCtx.jsonRDD(rdd) + df = self.sqlCtx.read.json(rdd) # render_doc() reproduces the help() exception without printing output pydoc.render_doc(df) pydoc.render_doc(df.foo) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
