This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 5ab4b124bb03 [SPARK-51599][PS][CONNECT] Optimize `ps.read_excel` for
large excel file
5ab4b124bb03 is described below
commit 5ab4b124bb03e0a03e66242f04dc801175ead6ad
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Tue Mar 25 18:48:01 2025 +0900
[SPARK-51599][PS][CONNECT] Optimize `ps.read_excel` for large excel file
### What changes were proposed in this pull request?
Optimize `ps.read_excel` for large excel file
### Why are the changes needed?
for large excel files, existing implementation collect all data from the
first two files to the python side, to infer the schema;
in this PR, we infer the schema by the first 1k rows in the first excel
files.
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
added ut
### Was this patch authored or co-authored using generative AI tooling?
no
Closes #50382 from zhengruifeng/ps_read_excel_opt.
Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
python/pyspark/pandas/namespace.py | 160 +++++++++++----------
.../pandas/tests/io/test_dataframe_spark_io.py | 20 +++
2 files changed, 107 insertions(+), 73 deletions(-)
diff --git a/python/pyspark/pandas/namespace.py
b/python/pyspark/pandas/namespace.py
index a5c5a32037b8..2ab1260eff69 100644
--- a/python/pyspark/pandas/namespace.py
+++ b/python/pyspark/pandas/namespace.py
@@ -36,6 +36,7 @@ from collections.abc import Iterable
from datetime import tzinfo
from functools import reduce
from io import BytesIO
+import pickle
import json
import warnings
@@ -83,6 +84,7 @@ from pyspark.pandas.utils import (
validate_axis,
log_advice,
)
+from pyspark.pandas.config import get_option
from pyspark.pandas.frame import DataFrame, _reduce_spark_multi
from pyspark.pandas.internal import (
InternalFrame,
@@ -1130,7 +1132,9 @@ def read_excel(
"""
def pd_read_excel(
- io_or_bin: Any, sn: Union[str, int, List[Union[str, int]], None]
+ io_or_bin: Any,
+ sn: Union[str, int, List[Union[str, int]], None],
+ nr: Optional[int] = None,
) -> pd.DataFrame:
return pd.read_excel(
io=BytesIO(io_or_bin) if isinstance(io_or_bin, (bytes, bytearray))
else io_or_bin,
@@ -1145,7 +1149,7 @@ def read_excel(
true_values=true_values,
false_values=false_values,
skiprows=skiprows,
- nrows=nrows,
+ nrows=nr,
na_values=na_values,
keep_default_na=keep_default_na,
verbose=verbose,
@@ -1157,18 +1161,9 @@ def read_excel(
**kwds,
)
- if isinstance(io, str):
- # 'binaryFile' format is available since Spark 3.0.0.
- binaries =
default_session().read.format("binaryFile").load(io).select("content").head(2)
- io_or_bin = binaries[0][0]
- single_file = len(binaries) == 1
- else:
- io_or_bin = io
- single_file = True
-
- pdf_or_psers = pd_read_excel(io_or_bin, sn=sheet_name)
-
- if single_file:
+ if not isinstance(io, str):
+ # When io is not a path, always need to load all data to python side
+ pdf_or_psers = pd_read_excel(io, sn=sheet_name, nr=nrows)
if isinstance(pdf_or_psers, dict):
return {
sn: cast(Union[DataFrame, Series], from_pandas(pdf_or_pser))
@@ -1176,70 +1171,89 @@ def read_excel(
}
else:
return cast(Union[DataFrame, Series], from_pandas(pdf_or_psers))
- else:
- def read_excel_on_spark(
- pdf_or_pser: Union[pd.DataFrame, pd.Series],
- sn: Union[str, int, List[Union[str, int]], None],
- ) -> Union[DataFrame, Series]:
- if isinstance(pdf_or_pser, pd.Series):
- pdf = pdf_or_pser.to_frame()
- else:
- pdf = pdf_or_pser
+ spark = default_session()
+
+ # Collect the first #nr rows from the first file
+ nr = get_option("compute.max_rows", 1000)
+ if nrows is not None and nrows < nr:
+ nr = nrows
+
+ def sample_data(pdf: pd.DataFrame) -> pd.DataFrame:
+ raw_data = BytesIO(pdf.content[0])
+ pdf_or_dict = pd_read_excel(raw_data, sn=sheet_name, nr=nr)
+ return pd.DataFrame({"sampled": [pickle.dumps(pdf_or_dict)]})
+
+ # 'binaryFile' format is available since Spark 3.0.0.
+ sampled = (
+ spark.read.format("binaryFile")
+ .load(io)
+ .select("content")
+ .limit(1) # Read at most 1 file
+ .mapInPandas(func=lambda iterator: map(sample_data, iterator),
schema="sampled BINARY")
+ .head()
+ )
+ sampled = pickle.loads(sampled[0])
+
+ def read_excel_on_spark(
+ pdf_or_pser: Union[pd.DataFrame, pd.Series],
+ sn: Union[str, int, List[Union[str, int]], None],
+ ) -> Union[DataFrame, Series]:
+ if isinstance(pdf_or_pser, pd.Series):
+ pdf = pdf_or_pser.to_frame()
+ else:
+ pdf = pdf_or_pser
- psdf = cast(DataFrame, from_pandas(pdf))
+ psdf = cast(DataFrame, from_pandas(pdf))
- raw_schema =
psdf._internal.spark_frame.drop(*HIDDEN_COLUMNS).schema
- index_scol_names = psdf._internal.index_spark_column_names
- nullable_fields = []
- for field in raw_schema.fields:
- if field.name in index_scol_names:
- nullable_fields.append(field)
- else:
- nullable_fields.append(
- StructField(
- field.name,
- as_nullable_spark_type(field.dataType),
- nullable=True,
- metadata=field.metadata,
- )
+ raw_schema = psdf._internal.spark_frame.drop(*HIDDEN_COLUMNS).schema
+ index_scol_names = psdf._internal.index_spark_column_names
+ nullable_fields = []
+ for field in raw_schema.fields:
+ if field.name in index_scol_names:
+ nullable_fields.append(field)
+ else:
+ nullable_fields.append(
+ StructField(
+ field.name,
+ as_nullable_spark_type(field.dataType),
+ nullable=True,
+ metadata=field.metadata,
)
- nullable_schema = StructType(nullable_fields)
- return_schema = force_decimal_precision_scale(nullable_schema)
-
- return_data_fields: Optional[List[InternalField]] = None
- if psdf._internal.data_fields is not None:
- return_data_fields = [f.normalize_spark_type() for f in
psdf._internal.data_fields]
-
- def output_func(pdf: pd.DataFrame) -> pd.DataFrame:
- pdf = pd.concat([pd_read_excel(bin, sn=sn) for bin in
pdf[pdf.columns[0]]])
-
- reset_index = pdf.reset_index()
- for name, col in reset_index.items():
- dt = col.dtype
- if is_datetime64_dtype(dt) or isinstance(dt,
pd.DatetimeTZDtype):
- continue
- reset_index[name] = col.replace({np.nan: None})
- pdf = reset_index
-
- # Just positionally map the column names to given schema's.
- return pdf.rename(columns=dict(zip(pdf.columns,
return_schema.names)))
-
- sdf = (
- default_session()
- .read.format("binaryFile")
- .load(io)
- .select("content")
- .mapInPandas(lambda iterator: map(output_func, iterator),
schema=return_schema)
- )
- return DataFrame(psdf._internal.with_new_sdf(sdf,
data_fields=return_data_fields))
+ )
+ nullable_schema = StructType(nullable_fields)
+ return_schema = force_decimal_precision_scale(nullable_schema)
+
+ return_data_fields: Optional[List[InternalField]] = None
+ if psdf._internal.data_fields is not None:
+ return_data_fields = [f.normalize_spark_type() for f in
psdf._internal.data_fields]
+
+ def output_func(pdf: pd.DataFrame) -> pd.DataFrame:
+ pdf = pd.concat([pd_read_excel(bin, sn=sn, nr=nrows) for bin in
pdf[pdf.columns[0]]])
+
+ reset_index = pdf.reset_index()
+ for name, col in reset_index.items():
+ dt = col.dtype
+ if is_datetime64_dtype(dt) or isinstance(dt,
pd.DatetimeTZDtype):
+ continue
+ reset_index[name] = col.replace({np.nan: None})
+ pdf = reset_index
+
+ # Just positionally map the column names to given schema's.
+ return pdf.rename(columns=dict(zip(pdf.columns,
return_schema.names)))
+
+ sdf = (
+ spark.read.format("binaryFile")
+ .load(io)
+ .select("content")
+ .mapInPandas(lambda iterator: map(output_func, iterator),
schema=return_schema)
+ )
+ return DataFrame(psdf._internal.with_new_sdf(sdf,
data_fields=return_data_fields))
- if isinstance(pdf_or_psers, dict):
- return {
- sn: read_excel_on_spark(pdf_or_pser, sn) for sn, pdf_or_pser
in pdf_or_psers.items()
- }
- else:
- return read_excel_on_spark(pdf_or_psers, sheet_name)
+ if isinstance(sampled, dict):
+ return {sn: read_excel_on_spark(pdf_or_pser, sn) for sn, pdf_or_pser
in sampled.items()}
+ else:
+ return read_excel_on_spark(cast(Union[pd.DataFrame, pd.Series],
sampled), sheet_name)
def read_html(
diff --git a/python/pyspark/pandas/tests/io/test_dataframe_spark_io.py
b/python/pyspark/pandas/tests/io/test_dataframe_spark_io.py
index 74f152172e3d..0308d22b6a5c 100644
--- a/python/pyspark/pandas/tests/io/test_dataframe_spark_io.py
+++ b/python/pyspark/pandas/tests/io/test_dataframe_spark_io.py
@@ -344,6 +344,26 @@ class DataFrameSparkIOTestsMixin:
pd.concat([pdfs1["Sheet_name_2"],
pdfs2["Sheet_name_2"]]).sort_index(),
)
+ def test_read_large_excel(self):
+ n = 20000
+ pdf = pd.DataFrame(
+ {
+ "i32": np.arange(n, dtype=np.int32) % 3,
+ "i64": np.arange(n, dtype=np.int64) % 5,
+ "f": np.arange(n, dtype=np.float64),
+ "bhello": np.random.choice(["hello", "yo", "people"],
size=n).astype("O"),
+ },
+ columns=["i32", "i64", "f", "bhello"],
+ index=np.random.rand(n),
+ )
+
+ with self.temp_dir() as tmp:
+ path = "{}/large_file.xlsx".format(tmp)
+ pdf.to_excel(path)
+
+ self.assert_eq(ps.read_excel(path), pd.read_excel(path))
+ self.assert_eq(ps.read_excel(path, nrows=10), pd.read_excel(path,
nrows=10))
+
def test_read_orc(self):
with self.temp_dir() as tmp:
path = "{}/file1.orc".format(tmp)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]