This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new ed3a9b1aa929 [SPARK-49691][PYTHON][CONNECT] Function `substring`
should accept column names
ed3a9b1aa929 is described below
commit ed3a9b1aa92957015592b399167a960b68b73beb
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Wed Sep 18 09:28:09 2024 -0700
[SPARK-49691][PYTHON][CONNECT] Function `substring` should accept column
names
### What changes were proposed in this pull request?
Function `substring` should accept column names
### Why are the changes needed?
Bug fix:
```
In [1]: >>> import pyspark.sql.functions as sf
...: >>> df = spark.createDataFrame([('Spark', 2, 3)], ['s', 'p',
'l'])
...: >>> df.select('*', sf.substring('s', 'p', 'l')).show()
```
works in PySpark Classic, but fail in Connect with:
```
NumberFormatException Traceback (most recent call last)
Cell In[2], line 1
----> 1 df.select('*', sf.substring('s', 'p', 'l')).show()
File ~/Dev/spark/python/pyspark/sql/connect/dataframe.py:1170, in
DataFrame.show(self, n, truncate, vertical)
1169 def show(self, n: int = 20, truncate: Union[bool, int] = True,
vertical: bool = False) -> None:
-> 1170 print(self._show_string(n, truncate, vertical))
File ~/Dev/spark/python/pyspark/sql/connect/dataframe.py:927, in
DataFrame._show_string(self, n, truncate, vertical)
910 except ValueError:
911 raise PySparkTypeError(
912 errorClass="NOT_BOOL",
913 messageParameters={
(...)
916 },
917 )
919 table, _ = DataFrame(
920 plan.ShowString(
921 child=self._plan,
922 num_rows=n,
923 truncate=_truncate,
924 vertical=vertical,
925 ),
926 session=self._session,
--> 927 )._to_table()
928 return table[0][0].as_py()
File ~/Dev/spark/python/pyspark/sql/connect/dataframe.py:1844, in
DataFrame._to_table(self)
1842 def _to_table(self) -> Tuple["pa.Table", Optional[StructType]]:
1843 query = self._plan.to_proto(self._session.client)
-> 1844 table, schema, self._execution_info =
self._session.client.to_table(
1845 query, self._plan.observations
1846 )
1847 assert table is not None
1848 return (table, schema)
File ~/Dev/spark/python/pyspark/sql/connect/client/core.py:892, in
SparkConnectClient.to_table(self, plan, observations)
890 req = self._execute_plan_request_with_metadata()
891 req.plan.CopyFrom(plan)
--> 892 table, schema, metrics, observed_metrics, _ =
self._execute_and_fetch(req, observations)
894 # Create a query execution object.
895 ei = ExecutionInfo(metrics, observed_metrics)
File ~/Dev/spark/python/pyspark/sql/connect/client/core.py:1517, in
SparkConnectClient._execute_and_fetch(self, req, observations, self_destruct)
1514 properties: Dict[str, Any] = {}
1516 with Progress(handlers=self._progress_handlers,
operation_id=req.operation_id) as progress:
-> 1517 for response in self._execute_and_fetch_as_iterator(
1518 req, observations, progress=progress
1519 ):
1520 if isinstance(response, StructType):
1521 schema = response
File ~/Dev/spark/python/pyspark/sql/connect/client/core.py:1494, in
SparkConnectClient._execute_and_fetch_as_iterator(self, req, observations,
progress)
1492 raise kb
1493 except Exception as error:
-> 1494 self._handle_error(error)
File ~/Dev/spark/python/pyspark/sql/connect/client/core.py:1764, in
SparkConnectClient._handle_error(self, error)
1762 self.thread_local.inside_error_handling = True
1763 if isinstance(error, grpc.RpcError):
-> 1764 self._handle_rpc_error(error)
1765 elif isinstance(error, ValueError):
1766 if "Cannot invoke RPC" in str(error) and "closed" in str(error):
File ~/Dev/spark/python/pyspark/sql/connect/client/core.py:1840, in
SparkConnectClient._handle_rpc_error(self, rpc_error)
1837 if info.metadata["errorClass"] ==
"INVALID_HANDLE.SESSION_CHANGED":
1838 self._closed = True
-> 1840 raise convert_exception(
1841 info,
1842 status.message,
1843 self._fetch_enriched_error(info),
1844 self._display_server_stack_trace(),
1845 ) from None
1847 raise SparkConnectGrpcException(status.message) from None
1848 else:
NumberFormatException: [CAST_INVALID_INPUT] The value 'p' of the type
"STRING" cannot be cast to "INT" because it is malformed. Correct the value as
per the syntax, or change its target type. Use `try_cast` to tolerate malformed
input and return NULL instead. SQLSTATE: 22018
...
```
### Does this PR introduce _any_ user-facing change?
yes, Function `substring` in Connect can properly handle column names
### How was this patch tested?
new doctests
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #48135 from zhengruifeng/py_substring_fix.
Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
python/pyspark/sql/connect/functions/builtin.py | 10 +++-
python/pyspark/sql/functions/builtin.py | 63 +++++++++++++++++++++----
2 files changed, 62 insertions(+), 11 deletions(-)
diff --git a/python/pyspark/sql/connect/functions/builtin.py
b/python/pyspark/sql/connect/functions/builtin.py
index 031e7c22542d..2870d9c408b6 100644
--- a/python/pyspark/sql/connect/functions/builtin.py
+++ b/python/pyspark/sql/connect/functions/builtin.py
@@ -2488,8 +2488,14 @@ def sentences(
sentences.__doc__ = pysparkfuncs.sentences.__doc__
-def substring(str: "ColumnOrName", pos: int, len: int) -> Column:
- return _invoke_function("substring", _to_col(str), lit(pos), lit(len))
+def substring(
+ str: "ColumnOrName",
+ pos: Union["ColumnOrName", int],
+ len: Union["ColumnOrName", int],
+) -> Column:
+ _pos = lit(pos) if isinstance(pos, int) else _to_col(pos)
+ _len = lit(len) if isinstance(len, int) else _to_col(len)
+ return _invoke_function("substring", _to_col(str), _pos, _len)
substring.__doc__ = pysparkfuncs.substring.__doc__
diff --git a/python/pyspark/sql/functions/builtin.py
b/python/pyspark/sql/functions/builtin.py
index 781bf3d9f83a..c0730b193bc7 100644
--- a/python/pyspark/sql/functions/builtin.py
+++ b/python/pyspark/sql/functions/builtin.py
@@ -11309,7 +11309,9 @@ def sentences(
@_try_remote_functions
def substring(
- str: "ColumnOrName", pos: Union["ColumnOrName", int], len:
Union["ColumnOrName", int]
+ str: "ColumnOrName",
+ pos: Union["ColumnOrName", int],
+ len: Union["ColumnOrName", int],
) -> Column:
"""
Substring starts at `pos` and is of length `len` when str is String type or
@@ -11348,16 +11350,59 @@ def substring(
Examples
--------
+ Example 1: Using literal integers as arguments
+
+ >>> import pyspark.sql.functions as sf
>>> df = spark.createDataFrame([('abcd',)], ['s',])
- >>> df.select(substring(df.s, 1, 2).alias('s')).collect()
- [Row(s='ab')]
+ >>> df.select('*', sf.substring(df.s, 1, 2)).show()
+ +----+------------------+
+ | s|substring(s, 1, 2)|
+ +----+------------------+
+ |abcd| ab|
+ +----+------------------+
+
+ Example 2: Using columns as arguments
+
+ >>> import pyspark.sql.functions as sf
+ >>> df = spark.createDataFrame([('Spark', 2, 3)], ['s', 'p', 'l'])
+ >>> df.select('*', sf.substring(df.s, 2, df.l)).show()
+ +-----+---+---+------------------+
+ | s| p| l|substring(s, 2, l)|
+ +-----+---+---+------------------+
+ |Spark| 2| 3| par|
+ +-----+---+---+------------------+
+
+ >>> df.select('*', sf.substring(df.s, df.p, 3)).show()
+ +-----+---+---+------------------+
+ | s| p| l|substring(s, p, 3)|
+ +-----+---+---+------------------+
+ |Spark| 2| 3| par|
+ +-----+---+---+------------------+
+
+ >>> df.select('*', sf.substring(df.s, df.p, df.l)).show()
+ +-----+---+---+------------------+
+ | s| p| l|substring(s, p, l)|
+ +-----+---+---+------------------+
+ |Spark| 2| 3| par|
+ +-----+---+---+------------------+
+
+ Example 3: Using column names as arguments
+
+ >>> import pyspark.sql.functions as sf
>>> df = spark.createDataFrame([('Spark', 2, 3)], ['s', 'p', 'l'])
- >>> df.select(substring(df.s, 2, df.l).alias('s')).collect()
- [Row(s='par')]
- >>> df.select(substring(df.s, df.p, 3).alias('s')).collect()
- [Row(s='par')]
- >>> df.select(substring(df.s, df.p, df.l).alias('s')).collect()
- [Row(s='par')]
+ >>> df.select('*', sf.substring(df.s, 2, 'l')).show()
+ +-----+---+---+------------------+
+ | s| p| l|substring(s, 2, l)|
+ +-----+---+---+------------------+
+ |Spark| 2| 3| par|
+ +-----+---+---+------------------+
+
+ >>> df.select('*', sf.substring('s', 'p', 'l')).show()
+ +-----+---+---+------------------+
+ | s| p| l|substring(s, p, l)|
+ +-----+---+---+------------------+
+ |Spark| 2| 3| par|
+ +-----+---+---+------------------+
"""
pos = _enum_to_value(pos)
pos = lit(pos) if isinstance(pos, int) else pos
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]