This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 5e3ba07c617 [SPARK-42012][CONNECT][PYTHON] Implement 
DataFrameReader.orc
5e3ba07c617 is described below

commit 5e3ba07c617f73c17534ce12ea9edde39175b0b4
Author: Sandeep Singh <[email protected]>
AuthorDate: Sat Jan 14 15:44:16 2023 -0800

    [SPARK-42012][CONNECT][PYTHON] Implement DataFrameReader.orc
    
    ### What changes were proposed in this pull request?
    This PR implements `DataFrameReader.orc` alias in Spark Connect.
    
    ### Why are the changes needed?
    For API feature parity.
    
    ### Does this PR introduce any user-facing change?
    This PR adds a user-facing API but Spark Connect has not been released yet.
    
    ### How was this patch tested?
    Unittest was added.
    
    Closes #39567 from techaddict/SPARK-42012-orc.
    
    Authored-by: Sandeep Singh <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 python/pyspark/sql/connect/readwriter.py           | 23 ++++++++++++++++++++--
 python/pyspark/sql/readwriter.py                   |  3 +++
 .../sql/tests/connect/test_connect_basic.py        | 12 ++++++++++-
 3 files changed, 35 insertions(+), 3 deletions(-)

diff --git a/python/pyspark/sql/connect/readwriter.py 
b/python/pyspark/sql/connect/readwriter.py
index f172b4ecd39..c26dd828485 100644
--- a/python/pyspark/sql/connect/readwriter.py
+++ b/python/pyspark/sql/connect/readwriter.py
@@ -240,8 +240,27 @@ class DataFrameReader(OptionUtils):
     def csv(self, *args: Any, **kwargs: Any) -> None:
         raise NotImplementedError("csv() is not implemented.")
 
-    def orc(self, *args: Any, **kwargs: Any) -> None:
-        raise NotImplementedError("orc() is not implemented.")
+    def orc(
+        self,
+        path: PathOrPaths,
+        mergeSchema: Optional[bool] = None,
+        pathGlobFilter: Optional[Union[bool, str]] = None,
+        recursiveFileLookup: Optional[Union[bool, str]] = None,
+        modifiedBefore: Optional[Union[bool, str]] = None,
+        modifiedAfter: Optional[Union[bool, str]] = None,
+    ) -> "DataFrame":
+        self._set_opts(
+            mergeSchema=mergeSchema,
+            pathGlobFilter=pathGlobFilter,
+            modifiedBefore=modifiedBefore,
+            modifiedAfter=modifiedAfter,
+            recursiveFileLookup=recursiveFileLookup,
+        )
+        if isinstance(path, str):
+            path = [path]
+        return self.load(path=path, format="orc")
+
+    orc.__doc__ = PySparkDataFrameReader.orc.__doc__
 
     def jdbc(self, *args: Any, **kwargs: Any) -> None:
         raise NotImplementedError("jdbc() not supported for DataFrameWriter")
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index db6084b826f..aa27c559a0d 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -761,6 +761,9 @@ class DataFrameReader(OptionUtils):
 
         .. versionadded:: 1.5.0
 
+        .. versionchanged:: 3.4.0
+            Support Spark Connect.
+
         Parameters
         ----------
         path : str or list
diff --git a/python/pyspark/sql/tests/connect/test_connect_basic.py 
b/python/pyspark/sql/tests/connect/test_connect_basic.py
index 1e0288786e6..e0e3cc6d1e0 100644
--- a/python/pyspark/sql/tests/connect/test_connect_basic.py
+++ b/python/pyspark/sql/tests/connect/test_connect_basic.py
@@ -283,6 +283,16 @@ class SparkConnectBasicTests(SparkConnectSQLTestCase):
                 self.spark.read.json(json_files).collect(),
             )
 
+    def test_orc(self):
+        # SPARK-42012: Implement DataFrameReader.orc
+        with tempfile.TemporaryDirectory() as d:
+            # Write a DataFrame into a text file
+            self.spark.createDataFrame(
+                [{"name": "Sandeep Singh"}, {"name": "Hyukjin Kwon"}]
+            ).write.mode("overwrite").format("orc").save(d)
+            # Read the text file as a DataFrame.
+            self.assert_eq(self.connect.read.orc(d).toPandas(), 
self.spark.read.orc(d).toPandas())
+
     def test_join_condition_column_list_columns(self):
         left_connect_df = self.connect.read.table(self.tbl_name)
         right_connect_df = self.connect.read.table(self.tbl_name2)
@@ -2547,7 +2557,7 @@ class SparkConnectBasicTests(SparkConnectSQLTestCase):
         # DataFrameWriterV2 is also not implemented yet
         df = self.connect.createDataFrame([(x, f"{x}") for x in range(100)], 
["id", "name"])
 
-        for f in ("csv", "orc", "jdbc"):
+        for f in ("csv", "jdbc"):
             with self.assertRaises(NotImplementedError):
                 getattr(self.connect.read, f)()
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to