This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 5e3ba07c617 [SPARK-42012][CONNECT][PYTHON] Implement
DataFrameReader.orc
5e3ba07c617 is described below
commit 5e3ba07c617f73c17534ce12ea9edde39175b0b4
Author: Sandeep Singh <[email protected]>
AuthorDate: Sat Jan 14 15:44:16 2023 -0800
[SPARK-42012][CONNECT][PYTHON] Implement DataFrameReader.orc
### What changes were proposed in this pull request?
This PR implements `DataFrameReader.orc` alias in Spark Connect.
### Why are the changes needed?
For API feature parity.
### Does this PR introduce any user-facing change?
This PR adds a user-facing API but Spark Connect has not been released yet.
### How was this patch tested?
Unittest was added.
Closes #39567 from techaddict/SPARK-42012-orc.
Authored-by: Sandeep Singh <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
python/pyspark/sql/connect/readwriter.py | 23 ++++++++++++++++++++--
python/pyspark/sql/readwriter.py | 3 +++
.../sql/tests/connect/test_connect_basic.py | 12 ++++++++++-
3 files changed, 35 insertions(+), 3 deletions(-)
diff --git a/python/pyspark/sql/connect/readwriter.py
b/python/pyspark/sql/connect/readwriter.py
index f172b4ecd39..c26dd828485 100644
--- a/python/pyspark/sql/connect/readwriter.py
+++ b/python/pyspark/sql/connect/readwriter.py
@@ -240,8 +240,27 @@ class DataFrameReader(OptionUtils):
def csv(self, *args: Any, **kwargs: Any) -> None:
raise NotImplementedError("csv() is not implemented.")
- def orc(self, *args: Any, **kwargs: Any) -> None:
- raise NotImplementedError("orc() is not implemented.")
+ def orc(
+ self,
+ path: PathOrPaths,
+ mergeSchema: Optional[bool] = None,
+ pathGlobFilter: Optional[Union[bool, str]] = None,
+ recursiveFileLookup: Optional[Union[bool, str]] = None,
+ modifiedBefore: Optional[Union[bool, str]] = None,
+ modifiedAfter: Optional[Union[bool, str]] = None,
+ ) -> "DataFrame":
+ self._set_opts(
+ mergeSchema=mergeSchema,
+ pathGlobFilter=pathGlobFilter,
+ modifiedBefore=modifiedBefore,
+ modifiedAfter=modifiedAfter,
+ recursiveFileLookup=recursiveFileLookup,
+ )
+ if isinstance(path, str):
+ path = [path]
+ return self.load(path=path, format="orc")
+
+ orc.__doc__ = PySparkDataFrameReader.orc.__doc__
def jdbc(self, *args: Any, **kwargs: Any) -> None:
raise NotImplementedError("jdbc() not supported for DataFrameWriter")
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index db6084b826f..aa27c559a0d 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -761,6 +761,9 @@ class DataFrameReader(OptionUtils):
.. versionadded:: 1.5.0
+ .. versionchanged:: 3.4.0
+ Support Spark Connect.
+
Parameters
----------
path : str or list
diff --git a/python/pyspark/sql/tests/connect/test_connect_basic.py
b/python/pyspark/sql/tests/connect/test_connect_basic.py
index 1e0288786e6..e0e3cc6d1e0 100644
--- a/python/pyspark/sql/tests/connect/test_connect_basic.py
+++ b/python/pyspark/sql/tests/connect/test_connect_basic.py
@@ -283,6 +283,16 @@ class SparkConnectBasicTests(SparkConnectSQLTestCase):
self.spark.read.json(json_files).collect(),
)
+ def test_orc(self):
+ # SPARK-42012: Implement DataFrameReader.orc
+ with tempfile.TemporaryDirectory() as d:
+ # Write a DataFrame into a text file
+ self.spark.createDataFrame(
+ [{"name": "Sandeep Singh"}, {"name": "Hyukjin Kwon"}]
+ ).write.mode("overwrite").format("orc").save(d)
+ # Read the text file as a DataFrame.
+ self.assert_eq(self.connect.read.orc(d).toPandas(),
self.spark.read.orc(d).toPandas())
+
def test_join_condition_column_list_columns(self):
left_connect_df = self.connect.read.table(self.tbl_name)
right_connect_df = self.connect.read.table(self.tbl_name2)
@@ -2547,7 +2557,7 @@ class SparkConnectBasicTests(SparkConnectSQLTestCase):
# DataFrameWriterV2 is also not implemented yet
df = self.connect.createDataFrame([(x, f"{x}") for x in range(100)],
["id", "name"])
- for f in ("csv", "orc", "jdbc"):
+ for f in ("csv", "jdbc"):
with self.assertRaises(NotImplementedError):
getattr(self.connect.read, f)()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]