This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 7427ff46db45 [SPARK-53979][PYTHON][TESTS] Drop temporary functions in
Pandas UDF tests
7427ff46db45 is described below
commit 7427ff46db45e9ef5ad7bbb99dab465c3c470904
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Wed Oct 22 14:59:51 2025 +0800
[SPARK-53979][PYTHON][TESTS] Drop temporary functions in Pandas UDF tests
### What changes were proposed in this pull request?
Drop temporary functions in Pandas UDF tests
### Why are the changes needed?
for isolation of testing envs
### Does this PR introduce _any_ user-facing change?
no, test-only
### How was this patch tested?
ci
### Was this patch authored or co-authored using generative AI tooling?
no
Closes #52690 from zhengruifeng/with_temp_func_pd.
Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
---
.../sql/tests/pandas/test_pandas_grouped_map.py | 35 ++--
.../tests/pandas/test_pandas_udf_grouped_agg.py | 38 +++--
.../sql/tests/pandas/test_pandas_udf_scalar.py | 182 +++++++++++----------
.../sql/tests/pandas/test_pandas_udf_window.py | 6 +-
python/pyspark/sql/tests/test_udf.py | 3 +-
5 files changed, 140 insertions(+), 124 deletions(-)
diff --git a/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py
b/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py
index fb81cd772777..4c52303481fa 100644
--- a/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py
+++ b/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py
@@ -208,25 +208,22 @@ class ApplyInPandasTestsMixin:
assert_frame_equal(expected, result)
def test_register_grouped_map_udf(self):
- with self.quiet():
- self.check_register_grouped_map_udf()
-
- def check_register_grouped_map_udf(self):
- foo_udf = pandas_udf(lambda x: x, "id long", PandasUDFType.GROUPED_MAP)
-
- with self.assertRaises(PySparkTypeError) as pe:
- self.spark.catalog.registerFunction("foo_udf", foo_udf)
-
- self.check_error(
- exception=pe.exception,
- errorClass="INVALID_UDF_EVAL_TYPE",
- messageParameters={
- "eval_type": "SQL_BATCHED_UDF, SQL_ARROW_BATCHED_UDF, "
- "SQL_SCALAR_PANDAS_UDF, SQL_SCALAR_ARROW_UDF, "
- "SQL_SCALAR_PANDAS_ITER_UDF, SQL_SCALAR_ARROW_ITER_UDF, "
- "SQL_GROUPED_AGG_PANDAS_UDF or SQL_GROUPED_AGG_ARROW_UDF"
- },
- )
+ with self.quiet(), self.temp_func("foo_udf"):
+ foo_udf = pandas_udf(lambda x: x, "id long",
PandasUDFType.GROUPED_MAP)
+
+ with self.assertRaises(PySparkTypeError) as pe:
+ self.spark.catalog.registerFunction("foo_udf", foo_udf)
+
+ self.check_error(
+ exception=pe.exception,
+ errorClass="INVALID_UDF_EVAL_TYPE",
+ messageParameters={
+ "eval_type": "SQL_BATCHED_UDF, SQL_ARROW_BATCHED_UDF, "
+ "SQL_SCALAR_PANDAS_UDF, SQL_SCALAR_ARROW_UDF, "
+ "SQL_SCALAR_PANDAS_ITER_UDF, SQL_SCALAR_ARROW_ITER_UDF, "
+ "SQL_GROUPED_AGG_PANDAS_UDF or SQL_GROUPED_AGG_ARROW_UDF"
+ },
+ )
def test_decorator(self):
df = self.data
diff --git a/python/pyspark/sql/tests/pandas/test_pandas_udf_grouped_agg.py
b/python/pyspark/sql/tests/pandas/test_pandas_udf_grouped_agg.py
index cfcbb96fcc36..3fd970061b30 100644
--- a/python/pyspark/sql/tests/pandas/test_pandas_udf_grouped_agg.py
+++ b/python/pyspark/sql/tests/pandas/test_pandas_udf_grouped_agg.py
@@ -522,17 +522,23 @@ class GroupedAggPandasUDFTestsMixin:
df.groupby(df.id).agg(mean_udf(df.v), mean(df.v)).collect()
def test_register_vectorized_udf_basic(self):
- sum_pandas_udf = pandas_udf(
- lambda v: v.sum(), "integer",
PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF
- )
+ with self.temp_func("sum_pandas_udf"):
+ sum_pandas_udf = pandas_udf(
+ lambda v: v.sum(), "integer",
PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF
+ )
- self.assertEqual(sum_pandas_udf.evalType,
PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF)
- group_agg_pandas_udf = self.spark.udf.register("sum_pandas_udf",
sum_pandas_udf)
- self.assertEqual(group_agg_pandas_udf.evalType,
PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF)
- q = "SELECT sum_pandas_udf(v1) FROM VALUES (3, 0), (2, 0), (1, 1)
tbl(v1, v2) GROUP BY v2"
- actual = sorted(map(lambda r: r[0], self.spark.sql(q).collect()))
- expected = [1, 5]
- self.assertEqual(actual, expected)
+ self.assertEqual(sum_pandas_udf.evalType,
PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF)
+ group_agg_pandas_udf = self.spark.udf.register("sum_pandas_udf",
sum_pandas_udf)
+ self.assertEqual(
+ group_agg_pandas_udf.evalType,
PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF
+ )
+ q = """
+ SELECT sum_pandas_udf(v1)
+ FROM VALUES (3, 0), (2, 0), (1, 1) tbl(v1, v2) GROUP BY v2
+ """
+ actual = sorted(map(lambda r: r[0], self.spark.sql(q).collect()))
+ expected = [1, 5]
+ self.assertEqual(actual, expected)
def test_grouped_with_empty_partition(self):
data = [Row(id=1, x=2), Row(id=1, x=3), Row(id=2, x=4)]
@@ -551,10 +557,10 @@ class GroupedAggPandasUDFTestsMixin:
return v.max()
df = self.spark.range(0, 100)
- self.spark.udf.register("max_udf", max_udf)
- with self.tempView("table"):
+ with self.tempView("table"), self.temp_func("max_udf"):
df.createTempView("table")
+ self.spark.udf.register("max_udf", max_udf)
agg1 = df.agg(max_udf(df["id"]))
agg2 = self.spark.sql("select max_udf(id) from table")
@@ -579,7 +585,7 @@ class GroupedAggPandasUDFTestsMixin:
df = self.data
weighted_mean = self.pandas_agg_weighted_mean_udf
- with self.tempView("v"):
+ with self.tempView("v"), self.temp_func("weighted_mean"):
df.createOrReplaceTempView("v")
self.spark.udf.register("weighted_mean", weighted_mean)
@@ -604,7 +610,7 @@ class GroupedAggPandasUDFTestsMixin:
df = self.data
weighted_mean = self.pandas_agg_weighted_mean_udf
- with self.tempView("v"):
+ with self.tempView("v"), self.temp_func("weighted_mean"):
df.createOrReplaceTempView("v")
self.spark.udf.register("weighted_mean", weighted_mean)
@@ -644,7 +650,7 @@ class GroupedAggPandasUDFTestsMixin:
return np.average(kwargs["v"], weights=kwargs["w"])
- with self.tempView("v"):
+ with self.tempView("v"), self.temp_func("weighted_mean"):
df.createOrReplaceTempView("v")
self.spark.udf.register("weighted_mean", weighted_mean)
@@ -684,7 +690,7 @@ class GroupedAggPandasUDFTestsMixin:
def biased_sum(v, w=None):
return v.sum() + (w.sum() if w is not None else 100)
- with self.tempView("v"):
+ with self.tempView("v"), self.temp_func("biased_sum"):
df.createOrReplaceTempView("v")
self.spark.udf.register("biased_sum", biased_sum)
diff --git a/python/pyspark/sql/tests/pandas/test_pandas_udf_scalar.py
b/python/pyspark/sql/tests/pandas/test_pandas_udf_scalar.py
index e614d9039b61..3c2ae56067ae 100644
--- a/python/pyspark/sql/tests/pandas/test_pandas_udf_scalar.py
+++ b/python/pyspark/sql/tests/pandas/test_pandas_udf_scalar.py
@@ -293,13 +293,17 @@ class ScalarPandasUDFTestsMixin:
).asNondeterministic()
self.assertEqual(random_pandas_udf.deterministic, False)
self.assertEqual(random_pandas_udf.evalType,
PythonEvalType.SQL_SCALAR_PANDAS_UDF)
- nondeterministic_pandas_udf = self.spark.catalog.registerFunction(
- "randomPandasUDF", random_pandas_udf
- )
- self.assertEqual(nondeterministic_pandas_udf.deterministic, False)
- self.assertEqual(nondeterministic_pandas_udf.evalType,
PythonEvalType.SQL_SCALAR_PANDAS_UDF)
- [row] = self.spark.sql("SELECT randomPandasUDF(1)").collect()
- self.assertEqual(row[0], 7)
+
+ with self.temp_func("randomPandasUDF"):
+ nondeterministic_pandas_udf = self.spark.catalog.registerFunction(
+ "randomPandasUDF", random_pandas_udf
+ )
+ self.assertEqual(nondeterministic_pandas_udf.deterministic, False)
+ self.assertEqual(
+ nondeterministic_pandas_udf.evalType,
PythonEvalType.SQL_SCALAR_PANDAS_UDF
+ )
+ [row] = self.spark.sql("SELECT randomPandasUDF(1)").collect()
+ self.assertEqual(row[0], 7)
def random_iter_udf(it):
for i in it:
@@ -310,15 +314,17 @@ class ScalarPandasUDFTestsMixin:
).asNondeterministic()
self.assertEqual(random_pandas_iter_udf.deterministic, False)
self.assertEqual(random_pandas_iter_udf.evalType,
PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF)
- nondeterministic_pandas_iter_udf = self.spark.catalog.registerFunction(
- "randomPandasIterUDF", random_pandas_iter_udf
- )
- self.assertEqual(nondeterministic_pandas_iter_udf.deterministic, False)
- self.assertEqual(
- nondeterministic_pandas_iter_udf.evalType,
PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF
- )
- [row] = self.spark.sql("SELECT randomPandasIterUDF(1)").collect()
- self.assertEqual(row[0], 7)
+
+ with self.temp_func("randomPandasIterUDF"):
+ nondeterministic_pandas_iter_udf =
self.spark.catalog.registerFunction(
+ "randomPandasIterUDF", random_pandas_iter_udf
+ )
+ self.assertEqual(nondeterministic_pandas_iter_udf.deterministic,
False)
+ self.assertEqual(
+ nondeterministic_pandas_iter_udf.evalType,
PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF
+ )
+ [row] = self.spark.sql("SELECT randomPandasIterUDF(1)").collect()
+ self.assertEqual(row[0], 7)
def test_vectorized_udf_null_boolean(self):
data = [(True,), (True,), (None,), (False,)]
@@ -1397,14 +1403,16 @@ class ScalarPandasUDFTestsMixin:
for original_add in [scalar_original_add, iter_original_add]:
self.assertEqual(original_add.deterministic, True)
- new_add = self.spark.catalog.registerFunction("add1", original_add)
- res1 = df.select(new_add(col("a"), col("b")))
- res2 = self.spark.sql(
- "SELECT add1(t.a, t.b) FROM (SELECT id as a, id as b FROM
range(10)) t"
- )
- expected = df.select(expr("a + b"))
- self.assertEqual(expected.collect(), res1.collect())
- self.assertEqual(expected.collect(), res2.collect())
+
+ with self.temp_func("add1"):
+ new_add = self.spark.catalog.registerFunction("add1",
original_add)
+ res1 = df.select(new_add(col("a"), col("b")))
+ res2 = self.spark.sql(
+ "SELECT add1(t.a, t.b) FROM (SELECT id as a, id as b FROM
range(10)) t"
+ )
+ expected = df.select(expr("a + b"))
+ self.assertEqual(expected.collect(), res1.collect())
+ self.assertEqual(expected.collect(), res2.collect())
def test_scalar_iter_udf_init(self):
import numpy as np
@@ -1788,92 +1796,96 @@ class ScalarPandasUDFTestsMixin:
def test_udf(a, b):
return a + 10 * b
- self.spark.udf.register("test_udf", test_udf)
+ with self.temp_func("test_udf"):
+ self.spark.udf.register("test_udf", test_udf)
- for i, df in enumerate(
- [
- self.spark.range(2).select(test_udf(col("id"), b=col("id") *
10)),
- self.spark.range(2).select(test_udf(a=col("id"), b=col("id") *
10)),
- self.spark.range(2).select(test_udf(b=col("id") * 10,
a=col("id"))),
- self.spark.sql("SELECT test_udf(id, b => id * 10) FROM
range(2)"),
- self.spark.sql("SELECT test_udf(a => id, b => id * 10) FROM
range(2)"),
- self.spark.sql("SELECT test_udf(b => id * 10, a => id) FROM
range(2)"),
- ]
- ):
- with self.subTest(query_no=i):
- assertDataFrameEqual(df, [Row(0), Row(101)])
+ for i, df in enumerate(
+ [
+ self.spark.range(2).select(test_udf(col("id"), b=col("id")
* 10)),
+ self.spark.range(2).select(test_udf(a=col("id"),
b=col("id") * 10)),
+ self.spark.range(2).select(test_udf(b=col("id") * 10,
a=col("id"))),
+ self.spark.sql("SELECT test_udf(id, b => id * 10) FROM
range(2)"),
+ self.spark.sql("SELECT test_udf(a => id, b => id * 10)
FROM range(2)"),
+ self.spark.sql("SELECT test_udf(b => id * 10, a => id)
FROM range(2)"),
+ ]
+ ):
+ with self.subTest(query_no=i):
+ assertDataFrameEqual(df, [Row(0), Row(101)])
def test_named_arguments_negative(self):
@pandas_udf("int")
def test_udf(a, b):
return a + b
- self.spark.udf.register("test_udf", test_udf)
+ with self.temp_func("test_udf"):
+ self.spark.udf.register("test_udf", test_udf)
- with self.assertRaisesRegex(
- AnalysisException,
-
"DUPLICATE_ROUTINE_PARAMETER_ASSIGNMENT.DOUBLE_NAMED_ARGUMENT_REFERENCE",
- ):
- self.spark.sql("SELECT test_udf(a => id, a => id * 10) FROM
range(2)").show()
+ with self.assertRaisesRegex(
+ AnalysisException,
+
"DUPLICATE_ROUTINE_PARAMETER_ASSIGNMENT.DOUBLE_NAMED_ARGUMENT_REFERENCE",
+ ):
+ self.spark.sql("SELECT test_udf(a => id, a => id * 10) FROM
range(2)").show()
- with self.assertRaisesRegex(AnalysisException,
"UNEXPECTED_POSITIONAL_ARGUMENT"):
- self.spark.sql("SELECT test_udf(a => id, id * 10) FROM
range(2)").show()
+ with self.assertRaisesRegex(AnalysisException,
"UNEXPECTED_POSITIONAL_ARGUMENT"):
+ self.spark.sql("SELECT test_udf(a => id, id * 10) FROM
range(2)").show()
- with self.assertRaisesRegex(
- PythonException, r"test_udf\(\) got an unexpected keyword argument
'c'"
- ):
- self.spark.sql("SELECT test_udf(c => 'x') FROM range(2)").show()
+ with self.assertRaisesRegex(
+ PythonException, r"test_udf\(\) got an unexpected keyword
argument 'c'"
+ ):
+ self.spark.sql("SELECT test_udf(c => 'x') FROM
range(2)").show()
def test_kwargs(self):
@pandas_udf("int")
def test_udf(a, **kwargs):
return a + 10 * kwargs["b"]
- self.spark.udf.register("test_udf", test_udf)
+ with self.temp_func("test_udf"):
+ self.spark.udf.register("test_udf", test_udf)
- for i, df in enumerate(
- [
- self.spark.range(2).select(test_udf(a=col("id"), b=col("id") *
10)),
- self.spark.range(2).select(test_udf(b=col("id") * 10,
a=col("id"))),
- self.spark.sql("SELECT test_udf(a => id, b => id * 10) FROM
range(2)"),
- self.spark.sql("SELECT test_udf(b => id * 10, a => id) FROM
range(2)"),
- ]
- ):
- with self.subTest(query_no=i):
- assertDataFrameEqual(df, [Row(0), Row(101)])
+ for i, df in enumerate(
+ [
+ self.spark.range(2).select(test_udf(a=col("id"),
b=col("id") * 10)),
+ self.spark.range(2).select(test_udf(b=col("id") * 10,
a=col("id"))),
+ self.spark.sql("SELECT test_udf(a => id, b => id * 10)
FROM range(2)"),
+ self.spark.sql("SELECT test_udf(b => id * 10, a => id)
FROM range(2)"),
+ ]
+ ):
+ with self.subTest(query_no=i):
+ assertDataFrameEqual(df, [Row(0), Row(101)])
def test_named_arguments_and_defaults(self):
@pandas_udf("int")
def test_udf(a, b=0):
return a + 10 * b
- self.spark.udf.register("test_udf", test_udf)
+ with self.temp_func("test_udf"):
+ self.spark.udf.register("test_udf", test_udf)
- # without "b"
- for i, df in enumerate(
- [
- self.spark.range(2).select(test_udf(col("id"))),
- self.spark.range(2).select(test_udf(a=col("id"))),
- self.spark.sql("SELECT test_udf(id) FROM range(2)"),
- self.spark.sql("SELECT test_udf(a => id) FROM range(2)"),
- ]
- ):
- with self.subTest(with_b=False, query_no=i):
- assertDataFrameEqual(df, [Row(0), Row(1)])
+ # without "b"
+ for i, df in enumerate(
+ [
+ self.spark.range(2).select(test_udf(col("id"))),
+ self.spark.range(2).select(test_udf(a=col("id"))),
+ self.spark.sql("SELECT test_udf(id) FROM range(2)"),
+ self.spark.sql("SELECT test_udf(a => id) FROM range(2)"),
+ ]
+ ):
+ with self.subTest(with_b=False, query_no=i):
+ assertDataFrameEqual(df, [Row(0), Row(1)])
- # with "b"
- for i, df in enumerate(
- [
- self.spark.range(2).select(test_udf(col("id"), b=col("id") *
10)),
- self.spark.range(2).select(test_udf(a=col("id"), b=col("id") *
10)),
- self.spark.range(2).select(test_udf(b=col("id") * 10,
a=col("id"))),
- self.spark.sql("SELECT test_udf(id, b => id * 10) FROM
range(2)"),
- self.spark.sql("SELECT test_udf(a => id, b => id * 10) FROM
range(2)"),
- self.spark.sql("SELECT test_udf(b => id * 10, a => id) FROM
range(2)"),
- ]
- ):
- with self.subTest(with_b=True, query_no=i):
- assertDataFrameEqual(df, [Row(0), Row(101)])
+ # with "b"
+ for i, df in enumerate(
+ [
+ self.spark.range(2).select(test_udf(col("id"), b=col("id")
* 10)),
+ self.spark.range(2).select(test_udf(a=col("id"),
b=col("id") * 10)),
+ self.spark.range(2).select(test_udf(b=col("id") * 10,
a=col("id"))),
+ self.spark.sql("SELECT test_udf(id, b => id * 10) FROM
range(2)"),
+ self.spark.sql("SELECT test_udf(a => id, b => id * 10)
FROM range(2)"),
+ self.spark.sql("SELECT test_udf(b => id * 10, a => id)
FROM range(2)"),
+ ]
+ ):
+ with self.subTest(with_b=True, query_no=i):
+ assertDataFrameEqual(df, [Row(0), Row(101)])
def test_arrow_cast_enabled_numeric_to_decimal(self):
import numpy as np
diff --git a/python/pyspark/sql/tests/pandas/test_pandas_udf_window.py
b/python/pyspark/sql/tests/pandas/test_pandas_udf_window.py
index fbc2b32d1c69..547e237902b3 100644
--- a/python/pyspark/sql/tests/pandas/test_pandas_udf_window.py
+++ b/python/pyspark/sql/tests/pandas/test_pandas_udf_window.py
@@ -408,7 +408,7 @@ class WindowPandasUDFTestsMixin:
with self.subTest(bound=bound, query_no=i):
assertDataFrameEqual(windowed, df.withColumn("wm",
sf.mean(df.v).over(w)))
- with self.tempView("v"):
+ with self.tempView("v"), self.temp_func("weighted_mean"):
df.createOrReplaceTempView("v")
self.spark.udf.register("weighted_mean", weighted_mean)
@@ -436,7 +436,7 @@ class WindowPandasUDFTestsMixin:
df = self.data
weighted_mean = self.pandas_agg_weighted_mean_udf
- with self.tempView("v"):
+ with self.tempView("v"), self.temp_func("weighted_mean"):
df.createOrReplaceTempView("v")
self.spark.udf.register("weighted_mean", weighted_mean)
@@ -504,7 +504,7 @@ class WindowPandasUDFTestsMixin:
with self.subTest(bound=bound, query_no=i):
assertDataFrameEqual(windowed, df.withColumn("wm",
sf.mean(df.v).over(w)))
- with self.tempView("v"):
+ with self.tempView("v"), self.temp_func("weighted_mean"):
df.createOrReplaceTempView("v")
self.spark.udf.register("weighted_mean", weighted_mean)
diff --git a/python/pyspark/sql/tests/test_udf.py
b/python/pyspark/sql/tests/test_udf.py
index b1fb42ad11ec..8d792a54e346 100644
--- a/python/pyspark/sql/tests/test_udf.py
+++ b/python/pyspark/sql/tests/test_udf.py
@@ -216,7 +216,7 @@ class BaseUDFTestsMixin(object):
self.assertEqual(tuple(row), (2,))
def test_multiple_udfs(self):
- with self.temp_func("double_int"):
+ with self.temp_func("double_int", "add_int"):
self.spark.catalog.registerFunction("double_int", lambda x: x * 2,
IntegerType())
[row] = self.spark.sql("SELECT double_int(1),
double_int(2)").collect()
self.assertEqual(tuple(row), (2, 4))
@@ -224,6 +224,7 @@ class BaseUDFTestsMixin(object):
"SELECT double_int(double_int(1)), double_int(double_int(2) +
2)"
).collect()
self.assertEqual(tuple(row), (4, 12))
+
self.spark.catalog.registerFunction("add_int", lambda x, y: x + y,
IntegerType())
[row] = self.spark.sql(
"SELECT double_int(add_int(1, 2)), add_int(double_int(2), 1)"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]