Repository: spark
Updated Branches:
  refs/heads/master ceaec9383 -> 1fdfe6935


[SPARK-22112][PYSPARK] Supports RDD of strings as input in spark.read.csv in 
PySpark

## What changes were proposed in this pull request?
We added a method to the scala API for creating a `DataFrame` from 
`DataSet[String]` storing CSV in 
[SPARK-15463](https://issues.apache.org/jira/browse/SPARK-15463) but PySpark 
doesn't have `Dataset` to support this feature. Therfore, I add an API to 
create a `DataFrame` from `RDD[String]` storing csv and it's also consistent 
with PySpark's `spark.read.json`.

For example as below
```
>>> rdd = sc.textFile('python/test_support/sql/ages.csv')
>>> df2 = spark.read.csv(rdd)
>>> df2.dtypes
[('_c0', 'string'), ('_c1', 'string')]
```
## How was this patch tested?
add unit test cases.

Author: goldmedal <liugs...@gmail.com>

Closes #19339 from goldmedal/SPARK-22112.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1fdfe693
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1fdfe693
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1fdfe693

Branch: refs/heads/master
Commit: 1fdfe69352e4d4714c1f06d61d7ad475ce3a7f1f
Parents: ceaec93
Author: goldmedal <liugs...@gmail.com>
Authored: Wed Sep 27 11:19:45 2017 +0900
Committer: hyukjinkwon <gurwls...@gmail.com>
Committed: Wed Sep 27 11:19:45 2017 +0900

----------------------------------------------------------------------
 python/pyspark/sql/readwriter.py | 31 +++++++++++++++++++++++++++++--
 1 file changed, 29 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1fdfe693/python/pyspark/sql/readwriter.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index cb847a0..f309291 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -335,7 +335,8 @@ class DataFrameReader(OptionUtils):
         ``inferSchema`` is enabled. To avoid going through the entire data 
once, disable
         ``inferSchema`` option or specify the schema explicitly using 
``schema``.
 
-        :param path: string, or list of strings, for input path(s).
+        :param path: string, or list of strings, for input path(s),
+                     or RDD of Strings storing CSV rows.
         :param schema: an optional :class:`pyspark.sql.types.StructType` for 
the input schema
                        or a DDL-formatted string (For example ``col0 INT, col1 
DOUBLE``).
         :param sep: sets the single character as a separator for each field 
and value.
@@ -408,6 +409,10 @@ class DataFrameReader(OptionUtils):
         >>> df = spark.read.csv('python/test_support/sql/ages.csv')
         >>> df.dtypes
         [('_c0', 'string'), ('_c1', 'string')]
+        >>> rdd = sc.textFile('python/test_support/sql/ages.csv')
+        >>> df2 = spark.read.csv(rdd)
+        >>> df2.dtypes
+        [('_c0', 'string'), ('_c1', 'string')]
         """
         self._set_opts(
             schema=schema, sep=sep, encoding=encoding, quote=quote, 
escape=escape, comment=comment,
@@ -420,7 +425,29 @@ class DataFrameReader(OptionUtils):
             columnNameOfCorruptRecord=columnNameOfCorruptRecord, 
multiLine=multiLine)
         if isinstance(path, basestring):
             path = [path]
-        return 
self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
+        if type(path) == list:
+            return 
self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
+        elif isinstance(path, RDD):
+            def func(iterator):
+                for x in iterator:
+                    if not isinstance(x, basestring):
+                        x = unicode(x)
+                    if isinstance(x, unicode):
+                        x = x.encode("utf-8")
+                    yield x
+            keyed = path.mapPartitions(func)
+            keyed._bypass_serializer = True
+            jrdd = keyed._jrdd.map(self._spark._jvm.BytesToString())
+            # see SPARK-22112
+            # There aren't any jvm api for creating a dataframe from rdd 
storing csv.
+            # We can do it through creating a jvm dataset firstly and using 
the jvm api
+            # for creating a dataframe from dataset storing csv.
+            jdataset = self._spark._ssql_ctx.createDataset(
+                jrdd.rdd(),
+                self._spark._jvm.Encoders.STRING())
+            return self._df(self._jreader.csv(jdataset))
+        else:
+            raise TypeError("path can be only string, list or RDD")
 
     @since(1.5)
     def orc(self, path):


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to