This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push:
new dc5fffc4b58 [SPARK-44486][PYTHON][CONNECT] Implement PyArrow
`self_destruct` feature for `toPandas`
dc5fffc4b58 is described below
commit dc5fffc4b5833dbc0414d66994348ed56d155bbd
Author: Xinrong Meng <[email protected]>
AuthorDate: Tue Jul 25 09:43:52 2023 +0900
[SPARK-44486][PYTHON][CONNECT] Implement PyArrow `self_destruct` feature
for `toPandas`
### What changes were proposed in this pull request?
Implement Arrow `self_destruct` of `toPandas` for memory savings.
Now the Spark configuration
`spark.sql.execution.arrow.pyspark.selfDestruct.enabled` can be used to enable
PyArrow’s `self_destruct` feature in Spark Connect, which can save memory when
creating a Pandas DataFrame via `toPandas` by freeing Arrow-allocated memory
while building the Pandas DataFrame.
### Why are the changes needed?
Reach parity with vanilla PySpark. The PR is a mirror of
https://github.com/apache/spark/pull/29818 for Spark Connect.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Unit test.
Closes #42079 from xinrong-meng/self_destruct.
Authored-by: Xinrong Meng <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
(cherry picked from commit 78b3345fa17cf54ecaaab7da27125a08233c9e94)
Signed-off-by: Hyukjin Kwon <[email protected]>
---
python/pyspark/sql/connect/client/core.py | 47 ++++++++++++++++++++--
.../pyspark/sql/tests/connect/test_parity_arrow.py | 15 +++++--
2 files changed, 55 insertions(+), 7 deletions(-)
diff --git a/python/pyspark/sql/connect/client/core.py
b/python/pyspark/sql/connect/client/core.py
index 00f2a85d602..56236892122 100644
--- a/python/pyspark/sql/connect/client/core.py
+++ b/python/pyspark/sql/connect/client/core.py
@@ -757,14 +757,33 @@ class SparkConnectClient(object):
logger.info(f"Executing plan {self._proto_to_string(plan)}")
req = self._execute_plan_request_with_metadata()
req.plan.CopyFrom(plan)
- table, schema, metrics, observed_metrics, _ =
self._execute_and_fetch(req)
+ (self_destruct_conf,) = self.get_config_with_defaults(
+ ("spark.sql.execution.arrow.pyspark.selfDestruct.enabled",
"false"),
+ )
+ self_destruct = cast(str, self_destruct_conf).lower() == "true"
+ table, schema, metrics, observed_metrics, _ = self._execute_and_fetch(
+ req, self_destruct=self_destruct
+ )
assert table is not None
schema = schema or from_arrow_schema(table.schema,
prefer_timestamp_ntz=True)
assert schema is not None and isinstance(schema, StructType)
# Rename columns to avoid duplicated column names.
- pdf = table.rename_columns([f"col_{i}" for i in
range(table.num_columns)]).to_pandas()
+ renamed_table = table.rename_columns([f"col_{i}" for i in
range(table.num_columns)])
+ if self_destruct:
+ # Configure PyArrow to use as little memory as possible:
+ # self_destruct - free columns as they are converted
+ # split_blocks - create a separate Pandas block for each column
+ # use_threads - convert one column at a time
+ pandas_options = {
+ "self_destruct": True,
+ "split_blocks": True,
+ "use_threads": False,
+ }
+ pdf = renamed_table.to_pandas(**pandas_options)
+ else:
+ pdf = renamed_table.to_pandas()
pdf.columns = schema.names
if len(pdf.columns) > 0:
@@ -1108,7 +1127,7 @@ class SparkConnectClient(object):
self._handle_error(error)
def _execute_and_fetch(
- self, req: pb2.ExecutePlanRequest
+ self, req: pb2.ExecutePlanRequest, self_destruct: bool = False
) -> Tuple[
Optional["pa.Table"],
Optional[StructType],
@@ -1144,7 +1163,27 @@ class SparkConnectClient(object):
)
if len(batches) > 0:
- table = pa.Table.from_batches(batches=batches)
+ if self_destruct:
+ results = []
+ for batch in batches:
+ # self_destruct frees memory column-wise, but Arrow record
batches are
+ # oriented row-wise, so copies each column into its own
allocation
+ batch = pa.RecordBatch.from_arrays(
+ [
+ # This call actually reallocates the array
+ pa.concat_arrays([array])
+ for array in batch
+ ],
+ schema=batch.schema,
+ )
+ results.append(batch)
+ table = pa.Table.from_batches(batches=results)
+ # Ensure only the table has a reference to the batches, so that
+ # self_destruct (if enabled) is effective
+ del results
+ del batches
+ else:
+ table = pa.Table.from_batches(batches=batches)
return table, schema, metrics, observed_metrics, properties
else:
return None, schema, metrics, observed_metrics, properties
diff --git a/python/pyspark/sql/tests/connect/test_parity_arrow.py
b/python/pyspark/sql/tests/connect/test_parity_arrow.py
index e491305e867..5f76cafb192 100644
--- a/python/pyspark/sql/tests/connect/test_parity_arrow.py
+++ b/python/pyspark/sql/tests/connect/test_parity_arrow.py
@@ -19,11 +19,13 @@ import unittest
from distutils.version import LooseVersion
import pandas as pd
+
from pyspark.sql.tests.test_arrow import ArrowTestsMixin
from pyspark.testing.connectutils import ReusedConnectTestCase
+from pyspark.testing.pandasutils import PandasOnSparkTestUtils
-class ArrowParityTests(ArrowTestsMixin, ReusedConnectTestCase):
+class ArrowParityTests(ArrowTestsMixin, ReusedConnectTestCase,
PandasOnSparkTestUtils):
@unittest.skip("Spark Connect does not support Spark Context but the test
depends on that.")
def test_createDataFrame_empty_partition(self):
super().test_createDataFrame_empty_partition()
@@ -56,9 +58,16 @@ class ArrowParityTests(ArrowTestsMixin,
ReusedConnectTestCase):
def test_no_partition_toPandas(self):
super().test_no_partition_toPandas()
- @unittest.skip("The test uses internal APIs.")
def test_pandas_self_destruct(self):
- super().test_pandas_self_destruct()
+ df = self.spark.range(100).select("id", "id", "id")
+
+ with
self.sql_conf({"spark.sql.execution.arrow.pyspark.selfDestruct.enabled": True}):
+ self_destruct_pdf = df.toPandas()
+
+ with
self.sql_conf({"spark.sql.execution.arrow.pyspark.selfDestruct.enabled":
False}):
+ no_self_destruct_pdf = df.toPandas()
+
+ self.assert_eq(self_destruct_pdf, no_self_destruct_pdf)
def test_propagates_spark_exception(self):
self.check_propagates_spark_exception()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]