This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 786ccf9a10c [SPARK-41445][CONNECT] Implement DataFrameReader.parquet
786ccf9a10c is described below
commit 786ccf9a10c92f41bfb7b0eb022e3126918730ea
Author: Hyukjin Kwon <[email protected]>
AuthorDate: Thu Dec 8 00:41:23 2022 -0800
[SPARK-41445][CONNECT] Implement DataFrameReader.parquet
### What changes were proposed in this pull request?
This PR implements `DataFrameReader.parquet` alias in Spark Connect.
### Why are the changes needed?
For API feature parity.
### Does this PR introduce _any_ user-facing change?
This PR adds a user-facing API but Spark Connect has not been released yet.
### How was this patch tested?
Unittest was added.
Closes #38977 from HyukjinKwon/parquet-read.
Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
python/pyspark/sql/connect/readwriter.py | 57 ++++++++++++++++++++++
.../sql/tests/connect/test_connect_basic.py | 12 +++++
2 files changed, 69 insertions(+)
diff --git a/python/pyspark/sql/connect/readwriter.py
b/python/pyspark/sql/connect/readwriter.py
index 64ee3973f5f..470417b6a28 100644
--- a/python/pyspark/sql/connect/readwriter.py
+++ b/python/pyspark/sql/connect/readwriter.py
@@ -273,6 +273,63 @@ class DataFrameReader(OptionUtils):
)
return self.load(path=path, format="json", schema=schema)
+ def parquet(self, path: str, **options: "OptionalPrimitiveType") ->
"DataFrame":
+ """
+ Loads Parquet files, returning the result as a :class:`DataFrame`.
+
+ .. versionadded:: 3.4.0
+
+ Parameters
+ ----------
+ path : str
+
+ Other Parameters
+ ----------------
+ **options
+ For the extra options, refer to
+ `Data Source Option
<https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#data-source-option>`_
+ for the version you use.
+
+ .. # noqa
+
+ Examples
+ --------
+ Write a DataFrame into a Parquet file and read it back.
+
+ >>> import tempfile
+ >>> with tempfile.TemporaryDirectory() as d:
+ ... # Write a DataFrame into a Parquet file
+ ... spark.createDataFrame(
+ ... [{"age": 100, "name": "Hyukjin Kwon"}]
+ ... ).write.mode("overwrite").format("parquet").save(d)
+ ...
+ ... # Read the Parquet file as a DataFrame.
+ ... spark.read.parquet(d).show()
+ +---+------------+
+ |age| name|
+ +---+------------+
+ |100|Hyukjin Kwon|
+ +---+------------+
+ """
+ mergeSchema = options.get("mergeSchema", None)
+ pathGlobFilter = options.get("pathGlobFilter", None)
+ modifiedBefore = options.get("modifiedBefore", None)
+ modifiedAfter = options.get("modifiedAfter", None)
+ recursiveFileLookup = options.get("recursiveFileLookup", None)
+ datetimeRebaseMode = options.get("datetimeRebaseMode", None)
+ int96RebaseMode = options.get("int96RebaseMode", None)
+ self._set_opts(
+ mergeSchema=mergeSchema,
+ pathGlobFilter=pathGlobFilter,
+ recursiveFileLookup=recursiveFileLookup,
+ modifiedBefore=modifiedBefore,
+ modifiedAfter=modifiedAfter,
+ datetimeRebaseMode=datetimeRebaseMode,
+ int96RebaseMode=int96RebaseMode,
+ )
+
+ return self.load(path=path, format="parquet")
+
class DataFrameWriter(OptionUtils):
"""
diff --git a/python/pyspark/sql/tests/connect/test_connect_basic.py
b/python/pyspark/sql/tests/connect/test_connect_basic.py
index 3681a9980b9..ae3813b43ae 100644
--- a/python/pyspark/sql/tests/connect/test_connect_basic.py
+++ b/python/pyspark/sql/tests/connect/test_connect_basic.py
@@ -138,6 +138,18 @@ class SparkConnectTests(SparkConnectSQLTestCase):
self.spark.read.json(path=d,
primitivesAsString=True).toPandas(),
)
+ def test_paruqet(self):
+ # SPARK-41445: Implement DataFrameReader.paruqet
+ with tempfile.TemporaryDirectory() as d:
+ # Write a DataFrame into a JSON file
+ self.spark.createDataFrame([{"age": 100, "name": "Hyukjin
Kwon"}]).write.mode(
+ "overwrite"
+ ).format("parquet").save(d)
+ # Read the Parquet file as a DataFrame.
+ self.assert_eq(
+ self.connect.read.parquet(d).toPandas(),
self.spark.read.parquet(d).toPandas()
+ )
+
def test_join_condition_column_list_columns(self):
left_connect_df = self.connect.read.table(self.tbl_name)
right_connect_df = self.connect.read.table(self.tbl_name2)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]