This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new f8b15395cf3 [SPARK-39734][SPARK-36259][SPARK-39733][SPARK-37348][PYTHON] Functions Parity between PySpark and SQL f8b15395cf3 is described below commit f8b15395cf347b6c6c6a4a20077fdeb31bfabb24 Author: Ruifeng Zheng <ruife...@apache.org> AuthorDate: Wed Aug 10 19:33:47 2022 +0800 [SPARK-39734][SPARK-36259][SPARK-39733][SPARK-37348][PYTHON] Functions Parity between PySpark and SQL ### What changes were proposed in this pull request? Implement the missing functions in PySpark: - call_udf - localtimestamp - map_contains_key - pmod After this PR, all functions in `org.apache.spark.sql.functions` can be found in `pyspark.sql.functions` or or have equivalents (e.g. `not` -> `~`) ### Why are the changes needed? for function parity ### Does this PR introduce _any_ user-facing change? yes, 4 new APIs added ### How was this patch tested? added doctests Closes #37449 from zhengruifeng/py_func_parity. Authored-by: Ruifeng Zheng <ruife...@apache.org> Signed-off-by: Ruifeng Zheng <ruife...@apache.org> --- .../source/reference/pyspark.sql/functions.rst | 4 + python/pyspark/sql/functions.py | 138 ++++++++++++++++++++- python/pyspark/sql/tests/test_functions.py | 7 +- 3 files changed, 142 insertions(+), 7 deletions(-) diff --git a/python/docs/source/reference/pyspark.sql/functions.rst b/python/docs/source/reference/pyspark.sql/functions.rst index ea495445426..a799bb8ad0a 100644 --- a/python/docs/source/reference/pyspark.sql/functions.rst +++ b/python/docs/source/reference/pyspark.sql/functions.rst @@ -84,6 +84,7 @@ Math Functions log10 log1p log2 + pmod pow rint round @@ -125,6 +126,7 @@ Datetime Functions quarter month last_day + localtimestamp minute months_between next_day @@ -188,6 +190,7 @@ Collection Functions flatten sequence array_repeat + map_contains_key map_keys map_values map_entries @@ -326,6 +329,7 @@ UDF .. autosummary:: :toctree: api/ + call_udf pandas_udf udf unwrap_udt diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index e73c70d8ca0..9dd81145243 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -1077,6 +1077,45 @@ def pow(col1: Union["ColumnOrName", float], col2: Union["ColumnOrName", float]) return _invoke_binary_math_function("pow", col1, col2) +def pmod(dividend: Union["ColumnOrName", float], divisor: Union["ColumnOrName", float]) -> Column: + """ + Returns the positive value of dividend mod divisor. + + .. versionadded:: 3.4.0 + + Parameters + ---------- + dividend : str, :class:`~pyspark.sql.Column` or float + the column that contains dividend, or the specified dividend value + divisor : str, :class:`~pyspark.sql.Column` or float + the column that contains divisor, or the specified divisor value + + Examples + -------- + >>> from pyspark.sql.functions import pmod + >>> df = spark.createDataFrame([ + ... (1.0, float('nan')), (float('nan'), 2.0), (10.0, 3.0), + ... (float('nan'), float('nan')), (-3.0, 4.0), (-10.0, 3.0), + ... (-5.0, -6.0), (7.0, -8.0), (1.0, 2.0)], + ... ("a", "b")) + >>> df.select(pmod("a", "b")).show() + +----------+ + |pmod(a, b)| + +----------+ + | NaN| + | NaN| + | 1.0| + | NaN| + | 1.0| + | 2.0| + | -5.0| + | 7.0| + | 1.0| + +----------+ + """ + return _invoke_binary_math_function("pmod", dividend, divisor) + + @since(1.6) def row_number() -> Column: """ @@ -1997,6 +2036,28 @@ def current_timestamp() -> Column: return _invoke_function("current_timestamp") +def localtimestamp() -> Column: + """ + Returns the current timestamp without time zone at the start of query evaluation + as a timestamp without time zone column. All calls of localtimestamp within the + same query return the same value. + + .. versionadded:: 3.4.0 + + Examples + -------- + >>> from pyspark.sql.functions import localtimestamp + >>> df = spark.range(0, 100) + >>> df.select(localtimestamp()).distinct().show() + +--------------------+ + | localtimestamp()| + +--------------------+ + |20...-...-... ...:...:...| + +--------------------+ + """ + return _invoke_function("localtimestamp") + + def date_format(date: "ColumnOrName", format: str) -> Column: """ Converts a date/timestamp/string to a value of string in the format specified by the date @@ -4441,6 +4502,39 @@ def flatten(col: "ColumnOrName") -> Column: return _invoke_function_over_columns("flatten", col) +def map_contains_key(col: "ColumnOrName", value: Any) -> Column: + """ + Returns true if the map contains the key. + + Parameters + ---------- + col : :class:`~pyspark.sql.Column` or str + name of column or expression + value : + a literal value + + .. versionadded:: 3.4.0 + + Examples + -------- + >>> from pyspark.sql.functions import map_contains_key + >>> df = spark.sql("SELECT map(1, 'a', 2, 'b') as data") + >>> df.select(map_contains_key("data", 1)).show() + +---------------------------------+ + |array_contains(map_keys(data), 1)| + +---------------------------------+ + | true| + +---------------------------------+ + >>> df.select(map_contains_key("data", -1)).show() + +----------------------------------+ + |array_contains(map_keys(data), -1)| + +----------------------------------+ + | false| + +----------------------------------+ + """ + return _invoke_function("map_contains_key", _to_java_column(col), value) + + def map_keys(col: "ColumnOrName") -> Column: """ Collection function: Returns an unordered array containing the keys of the map. @@ -5404,11 +5498,53 @@ def bucket(numBuckets: Union[Column, int], col: "ColumnOrName") -> Column: return _invoke_function("bucket", numBuckets, _to_java_column(col)) +def call_udf(udfName: str, *cols: "ColumnOrName") -> Column: + """ + Call an user-defined function. + + .. versionadded:: 3.4.0 + + Parameters + ---------- + udfName : str + name of the user defined function (UDF) + cols : :class:`~pyspark.sql.Column` or str + column names or :class:`~pyspark.sql.Column`\\s to be used in the UDF + + Examples + -------- + >>> from pyspark.sql.functions import call_udf, col + >>> from pyspark.sql.types import IntegerType, StringType + >>> df = spark.createDataFrame([(1, "a"),(2, "b"), (3, "c")],["id", "name"]) + >>> _ = spark.udf.register("intX2", lambda i: i * 2, IntegerType()) + >>> df.select(call_udf("intX2", "id")).show() + +---------+ + |intX2(id)| + +---------+ + | 2| + | 4| + | 6| + +---------+ + >>> _ = spark.udf.register("strX2", lambda s: s * 2, StringType()) + >>> df.select(call_udf("strX2", col("name"))).show() + +-----------+ + |strX2(name)| + +-----------+ + | aa| + | bb| + | cc| + +-----------+ + """ + sc = SparkContext._active_spark_context + assert sc is not None and sc._jvm is not None + return _invoke_function("call_udf", udfName, _to_seq(sc, cols, _to_java_column)) + + def unwrap_udt(col: "ColumnOrName") -> Column: """ Unwrap UDT data type column into its underlying type. - .. versionadded:: 3.4.0 + .. versionadded:: 3.4.0 """ return _invoke_function("unwrap_udt", _to_java_column(col)) diff --git a/python/pyspark/sql/tests/test_functions.py b/python/pyspark/sql/tests/test_functions.py index 5091fa711a8..44d95372d9d 100644 --- a/python/pyspark/sql/tests/test_functions.py +++ b/python/pyspark/sql/tests/test_functions.py @@ -100,12 +100,7 @@ class FunctionsTests(ReusedSQLTestCase): missing_in_py = jvm_fn_set.difference(py_fn_set) # Functions that we expect to be missing in python until they are added to pyspark - expected_missing_in_py = { - "call_udf", # TODO(SPARK-39734) - "localtimestamp", # TODO(SPARK-36259) - "map_contains_key", # TODO(SPARK-39733) - "pmod", # TODO(SPARK-37348) - } + expected_missing_in_py = set() self.assertEqual( expected_missing_in_py, missing_in_py, "Missing functions in pyspark not as expected" --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org