This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ab4b124bb03 [SPARK-51599][PS][CONNECT] Optimize `ps.read_excel` for 
large excel file
5ab4b124bb03 is described below

commit 5ab4b124bb03e0a03e66242f04dc801175ead6ad
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Tue Mar 25 18:48:01 2025 +0900

    [SPARK-51599][PS][CONNECT] Optimize `ps.read_excel` for large excel file
    
    ### What changes were proposed in this pull request?
    Optimize `ps.read_excel` for large excel file
    
    ### Why are the changes needed?
    for large excel files, existing implementation collect all data from the 
first two files to the python side, to infer the schema;
    in this PR, we infer the schema by the first 1k rows in the first excel 
files.
    
    ### Does this PR introduce _any_ user-facing change?
    no
    
    ### How was this patch tested?
    added ut
    
    ### Was this patch authored or co-authored using generative AI tooling?
    no
    
    Closes #50382 from zhengruifeng/ps_read_excel_opt.
    
    Authored-by: Ruifeng Zheng <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 python/pyspark/pandas/namespace.py                 | 160 +++++++++++----------
 .../pandas/tests/io/test_dataframe_spark_io.py     |  20 +++
 2 files changed, 107 insertions(+), 73 deletions(-)

diff --git a/python/pyspark/pandas/namespace.py 
b/python/pyspark/pandas/namespace.py
index a5c5a32037b8..2ab1260eff69 100644
--- a/python/pyspark/pandas/namespace.py
+++ b/python/pyspark/pandas/namespace.py
@@ -36,6 +36,7 @@ from collections.abc import Iterable
 from datetime import tzinfo
 from functools import reduce
 from io import BytesIO
+import pickle
 import json
 import warnings
 
@@ -83,6 +84,7 @@ from pyspark.pandas.utils import (
     validate_axis,
     log_advice,
 )
+from pyspark.pandas.config import get_option
 from pyspark.pandas.frame import DataFrame, _reduce_spark_multi
 from pyspark.pandas.internal import (
     InternalFrame,
@@ -1130,7 +1132,9 @@ def read_excel(
     """
 
     def pd_read_excel(
-        io_or_bin: Any, sn: Union[str, int, List[Union[str, int]], None]
+        io_or_bin: Any,
+        sn: Union[str, int, List[Union[str, int]], None],
+        nr: Optional[int] = None,
     ) -> pd.DataFrame:
         return pd.read_excel(
             io=BytesIO(io_or_bin) if isinstance(io_or_bin, (bytes, bytearray)) 
else io_or_bin,
@@ -1145,7 +1149,7 @@ def read_excel(
             true_values=true_values,
             false_values=false_values,
             skiprows=skiprows,
-            nrows=nrows,
+            nrows=nr,
             na_values=na_values,
             keep_default_na=keep_default_na,
             verbose=verbose,
@@ -1157,18 +1161,9 @@ def read_excel(
             **kwds,
         )
 
-    if isinstance(io, str):
-        # 'binaryFile' format is available since Spark 3.0.0.
-        binaries = 
default_session().read.format("binaryFile").load(io).select("content").head(2)
-        io_or_bin = binaries[0][0]
-        single_file = len(binaries) == 1
-    else:
-        io_or_bin = io
-        single_file = True
-
-    pdf_or_psers = pd_read_excel(io_or_bin, sn=sheet_name)
-
-    if single_file:
+    if not isinstance(io, str):
+        # When io is not a path, always need to load all data to python side
+        pdf_or_psers = pd_read_excel(io, sn=sheet_name, nr=nrows)
         if isinstance(pdf_or_psers, dict):
             return {
                 sn: cast(Union[DataFrame, Series], from_pandas(pdf_or_pser))
@@ -1176,70 +1171,89 @@ def read_excel(
             }
         else:
             return cast(Union[DataFrame, Series], from_pandas(pdf_or_psers))
-    else:
 
-        def read_excel_on_spark(
-            pdf_or_pser: Union[pd.DataFrame, pd.Series],
-            sn: Union[str, int, List[Union[str, int]], None],
-        ) -> Union[DataFrame, Series]:
-            if isinstance(pdf_or_pser, pd.Series):
-                pdf = pdf_or_pser.to_frame()
-            else:
-                pdf = pdf_or_pser
+    spark = default_session()
+
+    # Collect the first #nr rows from the first file
+    nr = get_option("compute.max_rows", 1000)
+    if nrows is not None and nrows < nr:
+        nr = nrows
+
+    def sample_data(pdf: pd.DataFrame) -> pd.DataFrame:
+        raw_data = BytesIO(pdf.content[0])
+        pdf_or_dict = pd_read_excel(raw_data, sn=sheet_name, nr=nr)
+        return pd.DataFrame({"sampled": [pickle.dumps(pdf_or_dict)]})
+
+    # 'binaryFile' format is available since Spark 3.0.0.
+    sampled = (
+        spark.read.format("binaryFile")
+        .load(io)
+        .select("content")
+        .limit(1)  # Read at most 1 file
+        .mapInPandas(func=lambda iterator: map(sample_data, iterator), 
schema="sampled BINARY")
+        .head()
+    )
+    sampled = pickle.loads(sampled[0])
+
+    def read_excel_on_spark(
+        pdf_or_pser: Union[pd.DataFrame, pd.Series],
+        sn: Union[str, int, List[Union[str, int]], None],
+    ) -> Union[DataFrame, Series]:
+        if isinstance(pdf_or_pser, pd.Series):
+            pdf = pdf_or_pser.to_frame()
+        else:
+            pdf = pdf_or_pser
 
-            psdf = cast(DataFrame, from_pandas(pdf))
+        psdf = cast(DataFrame, from_pandas(pdf))
 
-            raw_schema = 
psdf._internal.spark_frame.drop(*HIDDEN_COLUMNS).schema
-            index_scol_names = psdf._internal.index_spark_column_names
-            nullable_fields = []
-            for field in raw_schema.fields:
-                if field.name in index_scol_names:
-                    nullable_fields.append(field)
-                else:
-                    nullable_fields.append(
-                        StructField(
-                            field.name,
-                            as_nullable_spark_type(field.dataType),
-                            nullable=True,
-                            metadata=field.metadata,
-                        )
+        raw_schema = psdf._internal.spark_frame.drop(*HIDDEN_COLUMNS).schema
+        index_scol_names = psdf._internal.index_spark_column_names
+        nullable_fields = []
+        for field in raw_schema.fields:
+            if field.name in index_scol_names:
+                nullable_fields.append(field)
+            else:
+                nullable_fields.append(
+                    StructField(
+                        field.name,
+                        as_nullable_spark_type(field.dataType),
+                        nullable=True,
+                        metadata=field.metadata,
                     )
-            nullable_schema = StructType(nullable_fields)
-            return_schema = force_decimal_precision_scale(nullable_schema)
-
-            return_data_fields: Optional[List[InternalField]] = None
-            if psdf._internal.data_fields is not None:
-                return_data_fields = [f.normalize_spark_type() for f in 
psdf._internal.data_fields]
-
-            def output_func(pdf: pd.DataFrame) -> pd.DataFrame:
-                pdf = pd.concat([pd_read_excel(bin, sn=sn) for bin in 
pdf[pdf.columns[0]]])
-
-                reset_index = pdf.reset_index()
-                for name, col in reset_index.items():
-                    dt = col.dtype
-                    if is_datetime64_dtype(dt) or isinstance(dt, 
pd.DatetimeTZDtype):
-                        continue
-                    reset_index[name] = col.replace({np.nan: None})
-                pdf = reset_index
-
-                # Just positionally map the column names to given schema's.
-                return pdf.rename(columns=dict(zip(pdf.columns, 
return_schema.names)))
-
-            sdf = (
-                default_session()
-                .read.format("binaryFile")
-                .load(io)
-                .select("content")
-                .mapInPandas(lambda iterator: map(output_func, iterator), 
schema=return_schema)
-            )
-            return DataFrame(psdf._internal.with_new_sdf(sdf, 
data_fields=return_data_fields))
+                )
+        nullable_schema = StructType(nullable_fields)
+        return_schema = force_decimal_precision_scale(nullable_schema)
+
+        return_data_fields: Optional[List[InternalField]] = None
+        if psdf._internal.data_fields is not None:
+            return_data_fields = [f.normalize_spark_type() for f in 
psdf._internal.data_fields]
+
+        def output_func(pdf: pd.DataFrame) -> pd.DataFrame:
+            pdf = pd.concat([pd_read_excel(bin, sn=sn, nr=nrows) for bin in 
pdf[pdf.columns[0]]])
+
+            reset_index = pdf.reset_index()
+            for name, col in reset_index.items():
+                dt = col.dtype
+                if is_datetime64_dtype(dt) or isinstance(dt, 
pd.DatetimeTZDtype):
+                    continue
+                reset_index[name] = col.replace({np.nan: None})
+            pdf = reset_index
+
+            # Just positionally map the column names to given schema's.
+            return pdf.rename(columns=dict(zip(pdf.columns, 
return_schema.names)))
+
+        sdf = (
+            spark.read.format("binaryFile")
+            .load(io)
+            .select("content")
+            .mapInPandas(lambda iterator: map(output_func, iterator), 
schema=return_schema)
+        )
+        return DataFrame(psdf._internal.with_new_sdf(sdf, 
data_fields=return_data_fields))
 
-        if isinstance(pdf_or_psers, dict):
-            return {
-                sn: read_excel_on_spark(pdf_or_pser, sn) for sn, pdf_or_pser 
in pdf_or_psers.items()
-            }
-        else:
-            return read_excel_on_spark(pdf_or_psers, sheet_name)
+    if isinstance(sampled, dict):
+        return {sn: read_excel_on_spark(pdf_or_pser, sn) for sn, pdf_or_pser 
in sampled.items()}
+    else:
+        return read_excel_on_spark(cast(Union[pd.DataFrame, pd.Series], 
sampled), sheet_name)
 
 
 def read_html(
diff --git a/python/pyspark/pandas/tests/io/test_dataframe_spark_io.py 
b/python/pyspark/pandas/tests/io/test_dataframe_spark_io.py
index 74f152172e3d..0308d22b6a5c 100644
--- a/python/pyspark/pandas/tests/io/test_dataframe_spark_io.py
+++ b/python/pyspark/pandas/tests/io/test_dataframe_spark_io.py
@@ -344,6 +344,26 @@ class DataFrameSparkIOTestsMixin:
                     pd.concat([pdfs1["Sheet_name_2"], 
pdfs2["Sheet_name_2"]]).sort_index(),
                 )
 
+    def test_read_large_excel(self):
+        n = 20000
+        pdf = pd.DataFrame(
+            {
+                "i32": np.arange(n, dtype=np.int32) % 3,
+                "i64": np.arange(n, dtype=np.int64) % 5,
+                "f": np.arange(n, dtype=np.float64),
+                "bhello": np.random.choice(["hello", "yo", "people"], 
size=n).astype("O"),
+            },
+            columns=["i32", "i64", "f", "bhello"],
+            index=np.random.rand(n),
+        )
+
+        with self.temp_dir() as tmp:
+            path = "{}/large_file.xlsx".format(tmp)
+            pdf.to_excel(path)
+
+            self.assert_eq(ps.read_excel(path), pd.read_excel(path))
+            self.assert_eq(ps.read_excel(path, nrows=10), pd.read_excel(path, 
nrows=10))
+
     def test_read_orc(self):
         with self.temp_dir() as tmp:
             path = "{}/file1.orc".format(tmp)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to