This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 04125eb80e5 [SPARK-41971][CONNECT][PYTHON][FOLLOWUP] Fix to_pandas to
support the older Spark
04125eb80e5 is described below
commit 04125eb80e5c5602bfaa9a5512706e31e49ca4c4
Author: Takuya UESHIN <[email protected]>
AuthorDate: Wed May 31 09:18:19 2023 +0900
[SPARK-41971][CONNECT][PYTHON][FOLLOWUP] Fix to_pandas to support the older
Spark
### What changes were proposed in this pull request?
This is a follow-up of #40988.
Fix `to_pandas` to support the older Spark.
For the server:
```py
% ./sbin/start-connect-server.sh --packages
org.apache.spark:spark-connect_2.12:3.4.0
```
with the client with the change here:
```py
>>> spark.sql("values (1, struct('x' as x)) as t(a, b)").toPandas()
a b
0 1 {'x': 'x'}
```
### Why are the changes needed?
The config `spark.sql.execution.pandas.structHandlingMode` introduced in
#40988 does not exist in the older Spark, `<3.5`
```py
>>> spark.sql("values (1, struct('x' as x)) as t(a, b)").toPandas()
Traceback (most recent call last):
...
pyspark.errors.exceptions.connect.SparkConnectGrpcException:
(java.util.NoSuchElementException) spark.sql.execution.pandas.structHandlingMode
```
### Does this PR introduce _any_ user-facing change?
The newer Spark Connect client will work with `Spark<3.5`.
### How was this patch tested?
Manually.
Closes #41390 from ueshin/issues/SPARK-41971/config_with_default.
Authored-by: Takuya UESHIN <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
python/pyspark/sql/connect/client/core.py | 20 +++++++++++++++++---
1 file changed, 17 insertions(+), 3 deletions(-)
diff --git a/python/pyspark/sql/connect/client/core.py
b/python/pyspark/sql/connect/client/core.py
index a0f790b2992..8da649e7765 100644
--- a/python/pyspark/sql/connect/client/core.py
+++ b/python/pyspark/sql/connect/client/core.py
@@ -726,11 +726,14 @@ class SparkConnectClient(object):
if len(pdf.columns) > 0:
timezone: Optional[str] = None
+ if any(_has_type(f.dataType, TimestampType) for f in
schema.fields):
+ (timezone,) = self.get_configs("spark.sql.session.timeZone")
+
struct_in_pandas: Optional[str] = None
error_on_duplicated_field_names: bool = False
- if any(_has_type(f.dataType, (StructType, TimestampType)) for f in
schema.fields):
- timezone, struct_in_pandas = self.get_configs(
- "spark.sql.session.timeZone",
"spark.sql.execution.pandas.structHandlingMode"
+ if any(_has_type(f.dataType, StructType) for f in schema.fields):
+ (struct_in_pandas,) = self.get_config_with_defaults(
+ ("spark.sql.execution.pandas.structHandlingMode",
"legacy"),
)
if struct_in_pandas == "legacy":
@@ -1108,6 +1111,17 @@ class SparkConnectClient(object):
configs = dict(self.config(op).pairs)
return tuple(configs.get(key) for key in keys)
+ def get_config_with_defaults(
+ self, *pairs: Tuple[str, Optional[str]]
+ ) -> Tuple[Optional[str], ...]:
+ op = pb2.ConfigRequest.Operation(
+ get_with_default=pb2.ConfigRequest.GetWithDefault(
+ pairs=[pb2.KeyValue(key=key, value=default) for key, default
in pairs]
+ )
+ )
+ configs = dict(self.config(op).pairs)
+ return tuple(configs.get(key) for key, _ in pairs)
+
def config(self, operation: pb2.ConfigRequest.Operation) -> ConfigResult:
"""
Call the config RPC of Spark Connect.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]