This is an automated email from the ASF dual-hosted git repository.
sarutak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new b66d392689ff [SPARK-57284][PYTHON][SQL] Add Scala/Python bindings for
vector functions
b66d392689ff is described below
commit b66d392689ffa66cdea7df94a145c3f7068e4a60
Author: Kousuke Saruta <[email protected]>
AuthorDate: Sun Jun 7 01:38:33 2026 +0900
[SPARK-57284][PYTHON][SQL] Add Scala/Python bindings for vector functions
### What changes were proposed in this pull request?
This PR adds following Scala/Python bindings for vector functions which
were added in SPARK-55030 (#53924), SPARK-55593 (#54368) and SPARK-55031
(#54011)
### Why are the changes needed?
For better built-in function parity.
### Does this PR introduce _any_ user-facing change?
Yes, new built-in functions introduced.
### How was this patch tested?
New tests.
### Was this patch authored or co-authored using generative AI tooling?
Kiro CLI / Claude.
Closes #56322 from sarutak/pyspark-vector-functions.
Authored-by: Kousuke Saruta <[email protected]>
Signed-off-by: Kousuke Saruta <[email protected]>
---
.../source/reference/pyspark.sql/functions.rst | 14 ++
python/pyspark/sql/connect/functions/builtin.py | 58 ++++++
python/pyspark/sql/functions/__init__.py | 8 +
python/pyspark/sql/functions/builtin.py | 224 +++++++++++++++++++++
python/pyspark/sql/tests/test_functions.py | 41 ++++
.../scala/org/apache/spark/sql/functions.scala | 72 +++++++
.../org/apache/spark/sql/MiscFunctionsSuite.scala | 34 ++++
7 files changed, 451 insertions(+)
diff --git a/python/docs/source/reference/pyspark.sql/functions.rst
b/python/docs/source/reference/pyspark.sql/functions.rst
index 06f2c47e24bd..4d91d15a87d0 100644
--- a/python/docs/source/reference/pyspark.sql/functions.rst
+++ b/python/docs/source/reference/pyspark.sql/functions.rst
@@ -725,6 +725,20 @@ Geospatial ST Functions
st_srid
+Vector Functions
+----------------
+.. autosummary::
+ :toctree: api/
+
+ vector_cosine_similarity
+ vector_inner_product
+ vector_l2_distance
+ vector_norm
+ vector_normalize
+ vector_avg
+ vector_sum
+
+
UDF, UDTF and UDT
-----------------
.. autosummary::
diff --git a/python/pyspark/sql/connect/functions/builtin.py
b/python/pyspark/sql/connect/functions/builtin.py
index c1109df1c41a..7c7edef02f0d 100644
--- a/python/pyspark/sql/connect/functions/builtin.py
+++ b/python/pyspark/sql/connect/functions/builtin.py
@@ -5570,6 +5570,64 @@ def call_function(funcName: str, *cols: "ColumnOrName")
-> Column:
call_function.__doc__ = pysparkfuncs.call_function.__doc__
+# ---------------------- Vector Functions ----------------------
+
+
+def vector_cosine_similarity(left: "ColumnOrName", right: "ColumnOrName") ->
Column:
+ return _invoke_function_over_columns("vector_cosine_similarity", left,
right)
+
+
+vector_cosine_similarity.__doc__ =
pysparkfuncs.vector_cosine_similarity.__doc__
+
+
+def vector_inner_product(left: "ColumnOrName", right: "ColumnOrName") ->
Column:
+ return _invoke_function_over_columns("vector_inner_product", left, right)
+
+
+vector_inner_product.__doc__ = pysparkfuncs.vector_inner_product.__doc__
+
+
+def vector_l2_distance(left: "ColumnOrName", right: "ColumnOrName") -> Column:
+ return _invoke_function_over_columns("vector_l2_distance", left, right)
+
+
+vector_l2_distance.__doc__ = pysparkfuncs.vector_l2_distance.__doc__
+
+
+def vector_norm(vector: "ColumnOrName", degree: Optional["ColumnOrName"] =
None) -> Column:
+ if degree is None:
+ return _invoke_function_over_columns("vector_norm", vector)
+ else:
+ return _invoke_function_over_columns("vector_norm", vector, degree)
+
+
+vector_norm.__doc__ = pysparkfuncs.vector_norm.__doc__
+
+
+def vector_normalize(vector: "ColumnOrName", degree: Optional["ColumnOrName"]
= None) -> Column:
+ if degree is None:
+ return _invoke_function_over_columns("vector_normalize", vector)
+ else:
+ return _invoke_function_over_columns("vector_normalize", vector,
degree)
+
+
+vector_normalize.__doc__ = pysparkfuncs.vector_normalize.__doc__
+
+
+def vector_avg(col: "ColumnOrName") -> Column:
+ return _invoke_function_over_columns("vector_avg", col)
+
+
+vector_avg.__doc__ = pysparkfuncs.vector_avg.__doc__
+
+
+def vector_sum(col: "ColumnOrName") -> Column:
+ return _invoke_function_over_columns("vector_sum", col)
+
+
+vector_sum.__doc__ = pysparkfuncs.vector_sum.__doc__
+
+
def _test() -> None:
import sys
import os
diff --git a/python/pyspark/sql/functions/__init__.py
b/python/pyspark/sql/functions/__init__.py
index b90b5a26ecb0..9a9b18a29278 100644
--- a/python/pyspark/sql/functions/__init__.py
+++ b/python/pyspark/sql/functions/__init__.py
@@ -584,6 +584,14 @@ __all__ = [ # noqa: F405
"st_geomfromwkb",
"st_setsrid",
"st_srid",
+ # Vector Functions
+ "vector_cosine_similarity",
+ "vector_inner_product",
+ "vector_l2_distance",
+ "vector_norm",
+ "vector_normalize",
+ "vector_avg",
+ "vector_sum",
# Call Functions
"call_udf",
"pandas_udf",
diff --git a/python/pyspark/sql/functions/builtin.py
b/python/pyspark/sql/functions/builtin.py
index 841d422f2026..d0dfb2278565 100644
--- a/python/pyspark/sql/functions/builtin.py
+++ b/python/pyspark/sql/functions/builtin.py
@@ -30980,6 +30980,230 @@ def arrow_udtf(
return _create_pyarrow_udtf(cls=cls, returnType=returnType)
+# ---------------------- Vector Functions ----------------------
+
+
+@_try_remote_functions
+def vector_cosine_similarity(left: "ColumnOrName", right: "ColumnOrName") ->
Column:
+ """Returns the cosine similarity between two float vectors.
+ The vectors must have the same dimension.
+
+ .. versionadded:: 4.3.0
+
+ Parameters
+ ----------
+ left : :class:`~pyspark.sql.Column` or column name
+ first vector column.
+ right : :class:`~pyspark.sql.Column` or column name
+ second vector column.
+
+ Returns
+ -------
+ :class:`~pyspark.sql.Column`
+ cosine similarity as a float value.
+
+ Examples
+ --------
+ >>> from pyspark.sql import functions as sf
+ >>> from pyspark.sql.types import ArrayType, FloatType, StructType,
StructField
+ >>> schema = StructType([StructField('a', ArrayType(FloatType())),
StructField('b', ArrayType(FloatType()))])
+ >>> df = spark.createDataFrame([([1.0, 2.0, 3.0], [4.0, 5.0, 6.0])],
schema)
+ >>> df.select(sf.vector_cosine_similarity('a', 'b')).first()[0]
+ 0.974631...
+ """
+ return _invoke_function_over_columns("vector_cosine_similarity", left,
right)
+
+
+@_try_remote_functions
+def vector_inner_product(left: "ColumnOrName", right: "ColumnOrName") ->
Column:
+ """Returns the inner product (dot product) between two float vectors.
+ The vectors must have the same dimension.
+
+ .. versionadded:: 4.3.0
+
+ Parameters
+ ----------
+ left : :class:`~pyspark.sql.Column` or column name
+ first vector column.
+ right : :class:`~pyspark.sql.Column` or column name
+ second vector column.
+
+ Returns
+ -------
+ :class:`~pyspark.sql.Column`
+ inner product as a float value.
+
+ Examples
+ --------
+ >>> from pyspark.sql import functions as sf
+ >>> from pyspark.sql.types import ArrayType, FloatType, StructType,
StructField
+ >>> schema = StructType([StructField('a', ArrayType(FloatType())),
StructField('b', ArrayType(FloatType()))])
+ >>> df = spark.createDataFrame([([1.0, 2.0, 3.0], [4.0, 5.0, 6.0])],
schema)
+ >>> df.select(sf.vector_inner_product('a', 'b')).first()[0]
+ 32.0
+ """
+ return _invoke_function_over_columns("vector_inner_product", left, right)
+
+
+@_try_remote_functions
+def vector_l2_distance(left: "ColumnOrName", right: "ColumnOrName") -> Column:
+ """Returns the Euclidean (L2) distance between two float vectors.
+ The vectors must have the same dimension.
+
+ .. versionadded:: 4.3.0
+
+ Parameters
+ ----------
+ left : :class:`~pyspark.sql.Column` or column name
+ first vector column.
+ right : :class:`~pyspark.sql.Column` or column name
+ second vector column.
+
+ Returns
+ -------
+ :class:`~pyspark.sql.Column`
+ L2 distance as a float value.
+
+ Examples
+ --------
+ >>> from pyspark.sql import functions as sf
+ >>> from pyspark.sql.types import ArrayType, FloatType, StructType,
StructField
+ >>> schema = StructType([StructField('a', ArrayType(FloatType())),
StructField('b', ArrayType(FloatType()))])
+ >>> df = spark.createDataFrame([([1.0, 2.0, 3.0], [4.0, 5.0, 6.0])],
schema)
+ >>> df.select(sf.vector_l2_distance('a', 'b')).first()[0]
+ 5.196152...
+ """
+ return _invoke_function_over_columns("vector_l2_distance", left, right)
+
+
+@_try_remote_functions
+def vector_norm(vector: "ColumnOrName", degree: Optional["ColumnOrName"] =
None) -> Column:
+ """Returns the Lp norm of a float vector using the specified degree.
+ Degree defaults to 2.0 (Euclidean norm) if unspecified.
+
+ .. versionadded:: 4.3.0
+
+ Parameters
+ ----------
+ vector : :class:`~pyspark.sql.Column` or column name
+ input vector column.
+ degree : :class:`~pyspark.sql.Column` or column name, optional
+ norm degree (1.0 for L1, 2.0 for L2, float('inf') for infinity norm).
+ Defaults to 2.0.
+
+ Returns
+ -------
+ :class:`~pyspark.sql.Column`
+ the Lp norm as a float value.
+
+ Examples
+ --------
+ >>> from pyspark.sql import functions as sf
+ >>> from pyspark.sql.types import ArrayType, FloatType, StructType,
StructField
+ >>> schema = StructType([StructField('v', ArrayType(FloatType()))])
+ >>> df = spark.createDataFrame([([3.0, 4.0],)], schema)
+ >>> df.select(sf.vector_norm('v', sf.lit(2.0).cast('float'))).first()[0]
+ 5.0
+ """
+ if degree is None:
+ return _invoke_function_over_columns("vector_norm", vector)
+ else:
+ return _invoke_function_over_columns("vector_norm", vector, degree)
+
+
+@_try_remote_functions
+def vector_normalize(vector: "ColumnOrName", degree: Optional["ColumnOrName"]
= None) -> Column:
+ """Normalizes a float vector to unit length using the specified norm
degree.
+ Degree defaults to 2.0 (Euclidean norm) if unspecified.
+
+ .. versionadded:: 4.3.0
+
+ Parameters
+ ----------
+ vector : :class:`~pyspark.sql.Column` or column name
+ input vector column.
+ degree : :class:`~pyspark.sql.Column` or column name, optional
+ norm degree (1.0 for L1, 2.0 for L2, float('inf') for infinity norm).
+ Defaults to 2.0.
+
+ Returns
+ -------
+ :class:`~pyspark.sql.Column`
+ the normalized vector as an array of floats.
+
+ Examples
+ --------
+ >>> from pyspark.sql import functions as sf
+ >>> from pyspark.sql.types import ArrayType, FloatType, StructType,
StructField
+ >>> schema = StructType([StructField('v', ArrayType(FloatType()))])
+ >>> df = spark.createDataFrame([([3.0, 4.0],)], schema)
+ >>> df.select(sf.vector_normalize('v',
sf.lit(2.0).cast('float'))).first()[0]
+ [0.6..., 0.8...]
+ """
+ if degree is None:
+ return _invoke_function_over_columns("vector_normalize", vector)
+ else:
+ return _invoke_function_over_columns("vector_normalize", vector,
degree)
+
+
+@_try_remote_functions
+def vector_avg(col: "ColumnOrName") -> Column:
+ """Aggregate function: returns the element-wise mean of float vectors in a
group.
+ All vectors must have the same dimension.
+
+ .. versionadded:: 4.3.0
+
+ Parameters
+ ----------
+ col : :class:`~pyspark.sql.Column` or column name
+ input vector column.
+
+ Returns
+ -------
+ :class:`~pyspark.sql.Column`
+ the element-wise average vector as an array of floats.
+
+ Examples
+ --------
+ >>> from pyspark.sql import functions as sf
+ >>> from pyspark.sql.types import ArrayType, FloatType, StructType,
StructField
+ >>> schema = StructType([StructField('v', ArrayType(FloatType()))])
+ >>> df = spark.createDataFrame([([1.0, 2.0],), ([3.0, 4.0],)], schema)
+ >>> df.select(sf.vector_avg('v')).first()[0]
+ [2.0, 3.0]
+ """
+ return _invoke_function_over_columns("vector_avg", col)
+
+
+@_try_remote_functions
+def vector_sum(col: "ColumnOrName") -> Column:
+ """Aggregate function: returns the element-wise sum of float vectors in a
group.
+ All vectors must have the same dimension.
+
+ .. versionadded:: 4.3.0
+
+ Parameters
+ ----------
+ col : :class:`~pyspark.sql.Column` or column name
+ input vector column.
+
+ Returns
+ -------
+ :class:`~pyspark.sql.Column`
+ the element-wise sum vector as an array of floats.
+
+ Examples
+ --------
+ >>> from pyspark.sql import functions as sf
+ >>> from pyspark.sql.types import ArrayType, FloatType, StructType,
StructField
+ >>> schema = StructType([StructField('v', ArrayType(FloatType()))])
+ >>> df = spark.createDataFrame([([1.0, 2.0],), ([3.0, 4.0],)], schema)
+ >>> df.select(sf.vector_sum('v')).first()[0]
+ [4.0, 6.0]
+ """
+ return _invoke_function_over_columns("vector_sum", col)
+
+
def _test() -> None:
import doctest
from pyspark.sql import SparkSession
diff --git a/python/pyspark/sql/tests/test_functions.py
b/python/pyspark/sql/tests/test_functions.py
index b26163a667c2..f0484cb98c83 100644
--- a/python/pyspark/sql/tests/test_functions.py
+++ b/python/pyspark/sql/tests/test_functions.py
@@ -973,6 +973,47 @@ class FunctionsTestsMixin:
actual_with_threshold = df.select(F.levenshtein(df.l, df.r,
2).alias("b"))
assertDataFrameEqual([Row(b=-1)], actual_with_threshold)
+ def test_vector_functions(self):
+ from pyspark.sql.types import ArrayType, FloatType, StructType,
StructField
+
+ schema = StructType(
+ [
+ StructField("a", ArrayType(FloatType())),
+ StructField("b", ArrayType(FloatType())),
+ ]
+ )
+ df = self.spark.createDataFrame([([1.0, 2.0, 3.0], [4.0, 5.0, 6.0])],
schema)
+
+ # Similarity/distance functions
+ self.assertAlmostEqual(
+ df.select(F.vector_cosine_similarity("a", "b")).first()[0],
0.9746318, places=4
+ )
+ self.assertAlmostEqual(
+ df.select(F.vector_inner_product("a", "b")).first()[0], 32.0,
places=1
+ )
+ self.assertAlmostEqual(
+ df.select(F.vector_l2_distance("a", "b")).first()[0], 5.196152,
places=4
+ )
+
+ # Norm/normalize functions
+ schema2 = StructType([StructField("v", ArrayType(FloatType()))])
+ df2 = self.spark.createDataFrame([([3.0, 4.0],)], schema2)
+ self.assertAlmostEqual(
+ df2.select(F.vector_norm("v",
F.lit(2.0).cast("float"))).first()[0], 5.0, places=1
+ )
+ result = df2.select(F.vector_normalize("v",
F.lit(2.0).cast("float"))).first()[0]
+ self.assertAlmostEqual(result[0], 0.6, places=4)
+ self.assertAlmostEqual(result[1], 0.8, places=4)
+
+ # Aggregate functions
+ df3 = self.spark.createDataFrame([([1.0, 2.0],), ([3.0, 4.0],)],
schema2)
+ avg_result = df3.select(F.vector_avg("v")).first()[0]
+ self.assertAlmostEqual(avg_result[0], 2.0, places=4)
+ self.assertAlmostEqual(avg_result[1], 3.0, places=4)
+ sum_result = df3.select(F.vector_sum("v")).first()[0]
+ self.assertAlmostEqual(sum_result[0], 4.0, places=4)
+ self.assertAlmostEqual(sum_result[1], 6.0, places=4)
+
def test_between_function(self):
df = self.spark.createDataFrame(
[Row(a=1, b=2, c=3), Row(a=2, b=1, c=3), Row(a=4, b=1, c=4)]
diff --git a/sql/api/src/main/scala/org/apache/spark/sql/functions.scala
b/sql/api/src/main/scala/org/apache/spark/sql/functions.scala
index 7afeedb4439c..115e2c572753 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/functions.scala
@@ -11901,6 +11901,78 @@ object functions {
*/
def unwrap_udt(column: Column): Column = Column.internalFn("unwrap_udt",
column)
+ // ---------------------- Vector Functions ----------------------
+
+ /**
+ * Returns the cosine similarity between two float vectors.
+ * @group vector_funcs
+ * @since 4.3.0
+ */
+ def vector_cosine_similarity(left: Column, right: Column): Column =
+ Column.fn("vector_cosine_similarity", left, right)
+
+ /**
+ * Returns the inner product (dot product) between two float vectors.
+ * @group vector_funcs
+ * @since 4.3.0
+ */
+ def vector_inner_product(left: Column, right: Column): Column =
+ Column.fn("vector_inner_product", left, right)
+
+ /**
+ * Returns the Euclidean (L2) distance between two float vectors.
+ * @group vector_funcs
+ * @since 4.3.0
+ */
+ def vector_l2_distance(left: Column, right: Column): Column =
+ Column.fn("vector_l2_distance", left, right)
+
+ /**
+ * Returns the Lp norm of a float vector. Degree defaults to 2.0 if
unspecified.
+ * @group vector_funcs
+ * @since 4.3.0
+ */
+ def vector_norm(vector: Column, degree: Column): Column =
+ Column.fn("vector_norm", vector, degree)
+
+ /**
+ * Returns the Lp norm of a float vector using degree 2.0 (Euclidean norm).
+ * @group vector_funcs
+ * @since 4.3.0
+ */
+ def vector_norm(vector: Column): Column =
+ Column.fn("vector_norm", vector)
+
+ /**
+ * Normalizes a float vector to unit length. Degree defaults to 2.0 if
unspecified.
+ * @group vector_funcs
+ * @since 4.3.0
+ */
+ def vector_normalize(vector: Column, degree: Column): Column =
+ Column.fn("vector_normalize", vector, degree)
+
+ /**
+ * Normalizes a float vector to unit length using degree 2.0 (Euclidean
norm).
+ * @group vector_funcs
+ * @since 4.3.0
+ */
+ def vector_normalize(vector: Column): Column =
+ Column.fn("vector_normalize", vector)
+
+ /**
+ * Aggregate function: returns the element-wise mean of float vectors in a
group.
+ * @group vector_funcs
+ * @since 4.3.0
+ */
+ def vector_avg(col: Column): Column = Column.fn("vector_avg", col)
+
+ /**
+ * Aggregate function: returns the element-wise sum of float vectors in a
group.
+ * @group vector_funcs
+ * @since 4.3.0
+ */
+ def vector_sum(col: Column): Column = Column.fn("vector_sum", col)
+
// scalastyle:off
// TODO(SPARK-45970): Use @static annotation so Java can access to those
// API in the same way. Once we land this fix, should deprecate
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/MiscFunctionsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/MiscFunctionsSuite.scala
index fb5d2650327e..cc4aceaed75e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/MiscFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/MiscFunctionsSuite.scala
@@ -285,6 +285,40 @@ class MiscFunctionsSuite extends SharedSparkSession {
assert(df.selectExpr("random(1)").collect() != null)
assert(df.select(random(lit(1))).collect() != null)
}
+
+ test("vector functions") {
+ import org.apache.spark.sql.types.{ArrayType, FloatType, StructField,
StructType}
+ val schema = StructType(Seq(
+ StructField("a", ArrayType(FloatType)),
+ StructField("b", ArrayType(FloatType))))
+ val df = spark.createDataFrame(
+ java.util.Arrays.asList(Row(Array(1.0f, 2.0f, 3.0f), Array(4.0f, 5.0f,
6.0f))), schema)
+
+ // Similarity/distance
+ val cos = df.select(vector_cosine_similarity($"a",
$"b")).first().getFloat(0)
+ assert(math.abs(cos - 0.9746319f) < 0.0001f)
+ val dot = df.select(vector_inner_product($"a", $"b")).first().getFloat(0)
+ assert(math.abs(dot - 32.0f) < 0.01f)
+ val l2 = df.select(vector_l2_distance($"a", $"b")).first().getFloat(0)
+ assert(math.abs(l2 - 5.196152f) < 0.001f)
+
+ // Norm/normalize
+ val schema2 = StructType(Seq(StructField("v", ArrayType(FloatType))))
+ val df2 = spark.createDataFrame(
+ java.util.Arrays.asList(Row(Array(3.0f, 4.0f))), schema2)
+ val norm = df2.select(vector_norm($"v", lit(2.0f))).first().getFloat(0)
+ assert(math.abs(norm - 5.0f) < 0.01f)
+
+ // Aggregate
+ val df3 = spark.createDataFrame(
+ java.util.Arrays.asList(Row(Array(1.0f, 2.0f)), Row(Array(3.0f, 4.0f))),
schema2)
+ val avg = df3.select(vector_avg($"v")).first().getSeq[Float](0)
+ assert(math.abs(avg(0) - 2.0f) < 0.01f)
+ assert(math.abs(avg(1) - 3.0f) < 0.01f)
+ val sum = df3.select(vector_sum($"v")).first().getSeq[Float](0)
+ assert(math.abs(sum(0) - 4.0f) < 0.01f)
+ assert(math.abs(sum(1) - 6.0f) < 0.01f)
+ }
}
object ReflectClass {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]