This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new d9c7908f348 [SPARK-41397][CONNECT][PYTHON] Implement part of
string/binary functions
d9c7908f348 is described below
commit d9c7908f348fa7771182dca49fa032f6d1b689be
Author: Xinrong Meng <[email protected]>
AuthorDate: Wed Dec 7 12:06:57 2022 +0800
[SPARK-41397][CONNECT][PYTHON] Implement part of string/binary functions
### What changes were proposed in this pull request?
Implement the first half of string/binary functions. The rest of the
string/binary functions will be implemented in a separate PR for easier review.
### Why are the changes needed?
For API coverage on Spark Connect.
### Does this PR introduce _any_ user-facing change?
Yes. New functions are available on Spark Connect.
### How was this patch tested?
Unit tests.
Closes #38921 from xinrong-meng/connect_func_string.
Authored-by: Xinrong Meng <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
---
python/pyspark/sql/connect/functions.py | 346 +++++++++++++++++++++
.../sql/tests/connect/test_connect_function.py | 61 ++++
2 files changed, 407 insertions(+)
diff --git a/python/pyspark/sql/connect/functions.py
b/python/pyspark/sql/connect/functions.py
index b576a092f99..e57ffd10462 100644
--- a/python/pyspark/sql/connect/functions.py
+++ b/python/pyspark/sql/connect/functions.py
@@ -3208,3 +3208,349 @@ def variance(col: "ColumnOrName") -> Column:
+------------+
"""
return var_samp(col)
+
+
+# String/Binary functions
+
+
+def upper(col: "ColumnOrName") -> Column:
+ """
+ Converts a string expression to upper case.
+
+ .. versionadded:: 3.4.0
+
+ Parameters
+ ----------
+ col : :class:`~pyspark.sql.Column` or str
+ target column to work on.
+
+ Returns
+ -------
+ :class:`~pyspark.sql.Column`
+ upper case values.
+
+ Examples
+ --------
+ >>> df = spark.createDataFrame(["Spark", "PySpark", "Pandas API"],
"STRING")
+ >>> df.select(upper("value")).show()
+ +------------+
+ |upper(value)|
+ +------------+
+ | SPARK|
+ | PYSPARK|
+ | PANDAS API|
+ +------------+
+ """
+ return _invoke_function_over_columns("upper", col)
+
+
+def lower(col: "ColumnOrName") -> Column:
+ """
+ Converts a string expression to lower case.
+
+ .. versionadded:: 3.4.0
+
+ Parameters
+ ----------
+ col : :class:`~pyspark.sql.Column` or str
+ target column to work on.
+
+ Returns
+ -------
+ :class:`~pyspark.sql.Column`
+ lower case values.
+
+ Examples
+ --------
+ >>> df = spark.createDataFrame(["Spark", "PySpark", "Pandas API"],
"STRING")
+ >>> df.select(lower("value")).show()
+ +------------+
+ |lower(value)|
+ +------------+
+ | spark|
+ | pyspark|
+ | pandas api|
+ +------------+
+ """
+ return _invoke_function_over_columns("lower", col)
+
+
+def ascii(col: "ColumnOrName") -> Column:
+ """
+ Computes the numeric value of the first character of the string column.
+
+ .. versionadded:: 3.4.0
+
+ Parameters
+ ----------
+ col : :class:`~pyspark.sql.Column` or str
+ target column to work on.
+
+ Returns
+ -------
+ :class:`~pyspark.sql.Column`
+ numeric value.
+
+ Examples
+ --------
+ >>> df = spark.createDataFrame(["Spark", "PySpark", "Pandas API"],
"STRING")
+ >>> df.select(ascii("value")).show()
+ +------------+
+ |ascii(value)|
+ +------------+
+ | 83|
+ | 80|
+ | 80|
+ +------------+
+ """
+ return _invoke_function_over_columns("ascii", col)
+
+
+def base64(col: "ColumnOrName") -> Column:
+ """
+ Computes the BASE64 encoding of a binary column and returns it as a string
column.
+
+ .. versionadded:: 3.4.0
+
+ Parameters
+ ----------
+ col : :class:`~pyspark.sql.Column` or str
+ target column to work on.
+
+ Returns
+ -------
+ :class:`~pyspark.sql.Column`
+ BASE64 encoding of string value.
+
+ Examples
+ --------
+ >>> df = spark.createDataFrame(["Spark", "PySpark", "Pandas API"],
"STRING")
+ >>> df.select(base64("value")).show()
+ +----------------+
+ | base64(value)|
+ +----------------+
+ | U3Bhcms=|
+ | UHlTcGFyaw==|
+ |UGFuZGFzIEFQSQ==|
+ +----------------+
+ """
+ return _invoke_function_over_columns("base64", col)
+
+
+def unbase64(col: "ColumnOrName") -> Column:
+ """
+ Decodes a BASE64 encoded string column and returns it as a binary column.
+
+ .. versionadded:: 3.4.0
+
+ Parameters
+ ----------
+ col : :class:`~pyspark.sql.Column` or str
+ target column to work on.
+
+ Returns
+ -------
+ :class:`~pyspark.sql.Column`
+ encoded string value.
+
+ Examples
+ --------
+ >>> df = spark.createDataFrame(["U3Bhcms=",
+ ... "UHlTcGFyaw==",
+ ... "UGFuZGFzIEFQSQ=="], "STRING")
+ >>> df.select(unbase64("value")).show()
+ +--------------------+
+ | unbase64(value)|
+ +--------------------+
+ | [53 70 61 72 6B]|
+ |[50 79 53 70 61 7...|
+ |[50 61 6E 64 61 7...|
+ +--------------------+
+ """
+ return _invoke_function_over_columns("unbase64", col)
+
+
+def ltrim(col: "ColumnOrName") -> Column:
+ """
+ Trim the spaces from left end for the specified string value.
+
+ .. versionadded:: 3.4.0
+
+ Parameters
+ ----------
+ col : :class:`~pyspark.sql.Column` or str
+ target column to work on.
+
+ Returns
+ -------
+ :class:`~pyspark.sql.Column`
+ left trimmed values.
+
+ Examples
+ --------
+ >>> df = spark.createDataFrame([" Spark", "Spark ", " Spark"], "STRING")
+ >>> df.select(ltrim("value").alias("r")).withColumn("length",
length("r")).show()
+ +-------+------+
+ | r|length|
+ +-------+------+
+ | Spark| 5|
+ |Spark | 7|
+ | Spark| 5|
+ +-------+------+
+ """
+ return _invoke_function_over_columns("ltrim", col)
+
+
+def rtrim(col: "ColumnOrName") -> Column:
+ """
+ Trim the spaces from right end for the specified string value.
+
+ .. versionadded:: 3.4.0
+
+ Parameters
+ ----------
+ col : :class:`~pyspark.sql.Column` or str
+ target column to work on.
+
+ Returns
+ -------
+ :class:`~pyspark.sql.Column`
+ right trimmed values.
+
+ Examples
+ --------
+ >>> df = spark.createDataFrame([" Spark", "Spark ", " Spark"], "STRING")
+ >>> df.select(rtrim("value").alias("r")).withColumn("length",
length("r")).show()
+ +--------+------+
+ | r|length|
+ +--------+------+
+ | Spark| 8|
+ | Spark| 5|
+ | Spark| 6|
+ +--------+------+
+ """
+ return _invoke_function_over_columns("rtrim", col)
+
+
+def trim(col: "ColumnOrName") -> Column:
+ """
+ Trim the spaces from both ends for the specified string column.
+
+ .. versionadded:: 3.4.0
+
+ Parameters
+ ----------
+ col : :class:`~pyspark.sql.Column` or str
+ target column to work on.
+
+ Returns
+ -------
+ :class:`~pyspark.sql.Column`
+ trimmed values from both sides.
+
+ Examples
+ --------
+ >>> df = spark.createDataFrame([" Spark", "Spark ", " Spark"], "STRING")
+ >>> df.select(trim("value").alias("r")).withColumn("length",
length("r")).show()
+ +-----+------+
+ | r|length|
+ +-----+------+
+ |Spark| 5|
+ |Spark| 5|
+ |Spark| 5|
+ +-----+------+
+ """
+ return _invoke_function_over_columns("trim", col)
+
+
+def concat_ws(sep: str, *cols: "ColumnOrName") -> Column:
+ """
+ Concatenates multiple input string columns together into a single string
column,
+ using the given separator.
+
+ .. versionadded:: 3.4.0
+
+ Parameters
+ ----------
+ sep : str
+ words separator.
+ cols : :class:`~pyspark.sql.Column` or str
+ list of columns to work on.
+
+ Returns
+ -------
+ :class:`~pyspark.sql.Column`
+ string of concatenated words.
+
+ Examples
+ --------
+ >>> df = spark.createDataFrame([('abcd','123')], ['s', 'd'])
+ >>> df.select(concat_ws('-', df.s, df.d).alias('s')).collect()
+ [Row(s='abcd-123')]
+ """
+ return _invoke_function("concat_ws", lit(sep), *[_to_col(c) for c in cols])
+
+
+# TODO: enable with SPARK-41402
+# def decode(col: "ColumnOrName", charset: str) -> Column:
+# """
+# Computes the first argument into a string from a binary using the
provided character set
+# (one of 'US-ASCII', 'ISO-8859-1', 'UTF-8', 'UTF-16BE', 'UTF-16LE',
'UTF-16').
+#
+# .. versionadded:: 3.4.0
+#
+# Parameters
+# ----------
+# col : :class:`~pyspark.sql.Column` or str
+# target column to work on.
+# charset : str
+# charset to use to decode to.
+#
+# Returns
+# -------
+# :class:`~pyspark.sql.Column`
+# the column for computed results.
+#
+# Examples
+# --------
+# >>> df = spark.createDataFrame([('abcd',)], ['a'])
+# >>> df.select(decode("a", "UTF-8")).show()
+# +----------------------+
+# |stringdecode(a, UTF-8)|
+# +----------------------+
+# | abcd|
+# +----------------------+
+# """
+# return _invoke_function("decode", _to_col(col), lit(charset))
+
+
+def encode(col: "ColumnOrName", charset: str) -> Column:
+ """
+ Computes the first argument into a binary from a string using the provided
character set
+ (one of 'US-ASCII', 'ISO-8859-1', 'UTF-8', 'UTF-16BE', 'UTF-16LE',
'UTF-16').
+
+ .. versionadded:: 3.4.0
+
+ Parameters
+ ----------
+ col : :class:`~pyspark.sql.Column` or str
+ target column to work on.
+ charset : str
+ charset to use to encode.
+
+ Returns
+ -------
+ :class:`~pyspark.sql.Column`
+ the column for computed results.
+
+ Examples
+ --------
+ >>> df = spark.createDataFrame([('abcd',)], ['c'])
+ >>> df.select(encode("c", "UTF-8")).show()
+ +----------------+
+ |encode(c, UTF-8)|
+ +----------------+
+ | [61 62 63 64]|
+ +----------------+
+ """
+ return _invoke_function("encode", _to_col(col), lit(charset))
diff --git a/python/pyspark/sql/tests/connect/test_connect_function.py
b/python/pyspark/sql/tests/connect/test_connect_function.py
index 2f1e4968942..22f33ce0530 100644
--- a/python/pyspark/sql/tests/connect/test_connect_function.py
+++ b/python/pyspark/sql/tests/connect/test_connect_function.py
@@ -413,6 +413,67 @@ class SparkConnectFunctionTests(SparkConnectFuncTestCase):
sdf.groupBy("a").agg(SF.percentile_approx(sdf.b, [0.1,
0.9])).toPandas(),
)
+ def test_string_functions(self):
+ from pyspark.sql import functions as SF
+ from pyspark.sql.connect import functions as CF
+
+ query = """
+ SELECT * FROM VALUES
+ (' ab ', 'ab ', NULL), (' ab', NULL, 'ab')
+ AS tab(a, b, c)
+ """
+ # +--------+-----+----+
+ # | a| b| c|
+ # +--------+-----+----+
+ # | ab |ab |null|
+ # | ab| null| ab|
+ # +--------+-----+----+
+
+ cdf = self.connect.sql(query)
+ sdf = self.spark.sql(query)
+
+ for cfunc, sfunc in [
+ (CF.upper, SF.upper),
+ (CF.lower, SF.lower),
+ (CF.ascii, SF.ascii),
+ (CF.base64, SF.base64),
+ (CF.unbase64, SF.unbase64),
+ (CF.ltrim, SF.ltrim),
+ (CF.rtrim, SF.rtrim),
+ (CF.trim, SF.trim),
+ ]:
+ self.assert_eq(
+ cdf.select(cfunc("a"), cfunc(cdf.b)).toPandas(),
+ sdf.select(sfunc("a"), sfunc(sdf.b)).toPandas(),
+ )
+
+ self.assert_eq(
+ cdf.select(CF.concat_ws("-", cdf.a, "c")).toPandas(),
+ sdf.select(SF.concat_ws("-", sdf.a, "c")).toPandas(),
+ )
+
+ # Disable the test for "decode" because of inconsistent column names,
+ # as shown below
+ #
+ # >>> sdf.select(SF.decode("c", "UTF-8")).toPandas()
+ # stringdecode(c, UTF-8)
+ # 0 None
+ # 1 ab
+ # >>> cdf.select(CF.decode("c", "UTF-8")).toPandas()
+ # decode(c, UTF-8)
+ # 0 None
+ # 1 ab
+ #
+ # self.assert_eq(
+ # cdf.select(CF.decode("c", "UTF-8")).toPandas(),
+ # sdf.select(SF.decode("c", "UTF-8")).toPandas(),
+ # )
+
+ self.assert_eq(
+ cdf.select(CF.encode("c", "UTF-8")).toPandas(),
+ sdf.select(SF.encode("c", "UTF-8")).toPandas(),
+ )
+
if __name__ == "__main__":
from pyspark.sql.tests.connect.test_connect_function import * # noqa: F401
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]