This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 8095c39aaa92 [SPARK-54869][PYTHON][TESTS] Apply the standard import of
pyarrow compute in `test_arrow_udf_scalar`
8095c39aaa92 is described below
commit 8095c39aaa926c1e4215c195671c52123250f216
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Tue Dec 30 12:45:13 2025 +0800
[SPARK-54869][PYTHON][TESTS] Apply the standard import of pyarrow compute
in `test_arrow_udf_scalar`
### What changes were proposed in this pull request?
Apply the standard import of pyarrow compute in `test_arrow_udf_scalar`
```
import pyarrow.compute as pc
```
### Why are the changes needed?
There seems to be a bug in cloudpickle when dealing with nested lambdas +
submodules like `pa.compute`
see https://github.com/apache/spark/pull/53607, it fixed a failure in
Python-Only MacOS26.
before:
https://github.com/apache/spark/actions/runs/20495264978/job/58904792835
after:
https://github.com/apache/spark/actions/runs/20560816127/job/59051168728
This PR applies such change in more tests.
### Does this PR introduce _any_ user-facing change?
no, test-only
### How was this patch tested?
ci
### Was this patch authored or co-authored using generative AI tooling?
no
Closes #53640 from zhengruifeng/import_pc.
Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
---
.../sql/tests/arrow/test_arrow_udf_scalar.py | 69 ++++++++++++----------
1 file changed, 38 insertions(+), 31 deletions(-)
diff --git a/python/pyspark/sql/tests/arrow/test_arrow_udf_scalar.py
b/python/pyspark/sql/tests/arrow/test_arrow_udf_scalar.py
index 710e8322de21..524f337554e7 100644
--- a/python/pyspark/sql/tests/arrow/test_arrow_udf_scalar.py
+++ b/python/pyspark/sql/tests/arrow/test_arrow_udf_scalar.py
@@ -84,12 +84,12 @@ class ScalarArrowUDFTestsMixin:
return random_udf.asNondeterministic()
def test_arrow_udf_tokenize(self):
- import pyarrow as pa
+ import pyarrow.compute as pc
df = self.spark.createDataFrame([("hi boo",), ("bye boo",)], ["vals"])
tokenize = arrow_udf(
- lambda s: pa.compute.ascii_split_whitespace(s),
+ lambda s: pc.ascii_split_whitespace(s),
ArrayType(StringType()),
)
@@ -99,11 +99,12 @@ class ScalarArrowUDFTestsMixin:
def test_arrow_udf_output_nested_arrays(self):
import pyarrow as pa
+ import pyarrow.compute as pc
df = self.spark.createDataFrame([("hi boo",), ("bye boo",)], ["vals"])
tokenize = arrow_udf(
- lambda s: pa.array([[v] for v in
pa.compute.ascii_split_whitespace(s).to_pylist()]),
+ lambda s: pa.array([[v] for v in
pc.ascii_split_whitespace(s).to_pylist()]),
ArrayType(ArrayType(StringType())),
)
@@ -499,6 +500,7 @@ class ScalarArrowUDFTestsMixin:
def test_arrow_udf_input_variant(self):
import pyarrow as pa
+ import pyarrow.compute as pc
@arrow_udf("int")
def scalar_f(v: pa.Array) -> pa.Array:
@@ -506,7 +508,7 @@ class ScalarArrowUDFTestsMixin:
assert isinstance(v, pa.StructArray)
assert isinstance(v.field("metadata"), pa.BinaryArray)
assert isinstance(v.field("value"), pa.BinaryArray)
- return pa.compute.binary_length(v.field("value"))
+ return pc.binary_length(v.field("value"))
@arrow_udf("int")
def iter_f(it: Iterator[pa.Array]) -> Iterator[pa.Array]:
@@ -515,7 +517,7 @@ class ScalarArrowUDFTestsMixin:
assert isinstance(v, pa.StructArray)
assert isinstance(v.field("metadata"), pa.BinaryArray)
assert isinstance(v.field("value"), pa.BinaryArray)
- yield pa.compute.binary_length(v.field("value"))
+ yield pc.binary_length(v.field("value"))
df = self.spark.range(0, 10).selectExpr("parse_json(cast(id as
string)) v")
expected = [Row(l=2) for i in range(10)]
@@ -703,10 +705,9 @@ class ScalarArrowUDFTestsMixin:
def test_udf_register_arrow_udf_basic(self):
import pyarrow as pa
+ import pyarrow.compute as pc
- scalar_original_add = arrow_udf(
- lambda x, y: pa.compute.add(x, y).cast(pa.int32()), IntegerType()
- )
+ scalar_original_add = arrow_udf(lambda x, y: pc.add(x,
y).cast(pa.int32()), IntegerType())
self.assertEqual(scalar_original_add.evalType,
PythonEvalType.SQL_SCALAR_ARROW_UDF)
self.assertEqual(scalar_original_add.deterministic, True)
@@ -730,7 +731,7 @@ class ScalarArrowUDFTestsMixin:
@arrow_udf(LongType())
def scalar_iter_add(it: Iterator[Tuple[pa.Array, pa.Array]]) ->
Iterator[pa.Array]:
for a, b in it:
- yield pa.compute.add(a, b)
+ yield pc.add(a, b)
with self.temp_func("add1"):
new_add = self.spark.udf.register("add1", scalar_iter_add)
@@ -745,10 +746,9 @@ class ScalarArrowUDFTestsMixin:
def test_catalog_register_arrow_udf_basic(self):
import pyarrow as pa
+ import pyarrow.compute as pc
- scalar_original_add = arrow_udf(
- lambda x, y: pa.compute.add(x, y).cast(pa.int32()), IntegerType()
- )
+ scalar_original_add = arrow_udf(lambda x, y: pc.add(x,
y).cast(pa.int32()), IntegerType())
self.assertEqual(scalar_original_add.evalType,
PythonEvalType.SQL_SCALAR_ARROW_UDF)
self.assertEqual(scalar_original_add.deterministic, True)
@@ -772,7 +772,7 @@ class ScalarArrowUDFTestsMixin:
@arrow_udf(LongType())
def scalar_iter_add(it: Iterator[Tuple[pa.Array, pa.Array]]) ->
Iterator[pa.Array]:
for a, b in it:
- yield pa.compute.add(a, b)
+ yield pc.add(a, b)
with self.temp_func("add1"):
new_add = self.spark.catalog.registerFunction("add1",
scalar_iter_add)
@@ -786,10 +786,10 @@ class ScalarArrowUDFTestsMixin:
self.assertEqual(expected.collect(), res4.collect())
def test_udf_register_nondeterministic_arrow_udf(self):
- import pyarrow as pa
+ import pyarrow.compute as pc
random_arrow_udf = arrow_udf(
- lambda x: pa.compute.add(x, random.randint(6, 6)), LongType()
+ lambda x: pc.add(x, random.randint(6, 6)), LongType()
).asNondeterministic()
self.assertEqual(random_arrow_udf.deterministic, False)
self.assertEqual(random_arrow_udf.evalType,
PythonEvalType.SQL_SCALAR_ARROW_UDF)
@@ -805,10 +805,10 @@ class ScalarArrowUDFTestsMixin:
self.assertEqual(row[0], 7)
def test_catalog_register_nondeterministic_arrow_udf(self):
- import pyarrow as pa
+ import pyarrow.compute as pc
random_arrow_udf = arrow_udf(
- lambda x: pa.compute.add(x, random.randint(6, 6)), LongType()
+ lambda x: pc.add(x, random.randint(6, 6)), LongType()
).asNondeterministic()
self.assertEqual(random_arrow_udf.deterministic, False)
self.assertEqual(random_arrow_udf.evalType,
PythonEvalType.SQL_SCALAR_ARROW_UDF)
@@ -827,17 +827,17 @@ class ScalarArrowUDFTestsMixin:
@unittest.skipIf(not have_numpy, numpy_requirement_message)
def test_nondeterministic_arrow_udf(self):
- import pyarrow as pa
+ import pyarrow.compute as pc
# Test that nondeterministic UDFs are evaluated only once in chained
UDF evaluations
@arrow_udf("double")
def scalar_plus_ten(v):
- return pa.compute.add(v, 10)
+ return pc.add(v, 10)
@arrow_udf("double", ArrowUDFType.SCALAR_ITER)
def iter_plus_ten(it):
for v in it:
- yield pa.compute.add(v, 10)
+ yield pc.add(v, 10)
for plus_ten in [scalar_plus_ten, iter_plus_ten]:
random_udf = self.nondeterministic_arrow_udf
@@ -984,10 +984,11 @@ class ScalarArrowUDFTestsMixin:
def test_arrow_udf_named_arguments(self):
import pyarrow as pa
+ import pyarrow.compute as pc
@arrow_udf("int")
def test_udf(a, b):
- return pa.compute.add(a, pa.compute.multiply(b,
10)).cast(pa.int32())
+ return pc.add(a, pc.multiply(b, 10)).cast(pa.int32())
with self.temp_func("test_udf"):
self.spark.udf.register("test_udf", test_udf)
@@ -1008,10 +1009,11 @@ class ScalarArrowUDFTestsMixin:
def test_arrow_udf_named_arguments_negative(self):
import pyarrow as pa
+ import pyarrow.compute as pc
@arrow_udf("int")
def test_udf(a, b):
- return pa.compute.add(a, b).cast(pa.int32())
+ return pc.add(a, b).cast(pa.int32())
with self.temp_func("test_udf"):
self.spark.udf.register("test_udf", test_udf)
@@ -1032,10 +1034,11 @@ class ScalarArrowUDFTestsMixin:
def test_arrow_udf_named_arguments_and_defaults(self):
import pyarrow as pa
+ import pyarrow.compute as pc
@arrow_udf("int")
def test_udf(a, b=0):
- return pa.compute.add(a, pa.compute.multiply(b,
10)).cast(pa.int32())
+ return pc.add(a, pc.multiply(b, 10)).cast(pa.int32())
with self.temp_func("test_udf"):
self.spark.udf.register("test_udf", test_udf)
@@ -1070,10 +1073,11 @@ class ScalarArrowUDFTestsMixin:
def test_arrow_udf_kwargs(self):
import pyarrow as pa
+ import pyarrow.compute as pc
@arrow_udf("int")
def test_udf(a, **kwargs):
- return pa.compute.add(a, pa.compute.multiply(kwargs["b"],
10)).cast(pa.int32())
+ return pc.add(a, pc.multiply(kwargs["b"], 10)).cast(pa.int32())
with self.temp_func("test_udf"):
self.spark.udf.register("test_udf", test_udf)
@@ -1092,11 +1096,12 @@ class ScalarArrowUDFTestsMixin:
def test_arrow_iter_udf_single_column(self):
import pyarrow as pa
+ import pyarrow.compute as pc
@arrow_udf(LongType())
def add_one(it: Iterator[pa.Array]) -> Iterator[pa.Array]:
for s in it:
- yield pa.compute.add(s, 1)
+ yield pc.add(s, 1)
df = self.spark.range(10)
expected = df.select((F.col("id") + 1).alias("res")).collect()
@@ -1106,11 +1111,12 @@ class ScalarArrowUDFTestsMixin:
def test_arrow_iter_udf_two_columns(self):
import pyarrow as pa
+ import pyarrow.compute as pc
@arrow_udf(LongType())
def multiple(it: Iterator[Tuple[pa.Array, pa.Array]]) ->
Iterator[pa.Array]:
for a, b in it:
- yield pa.compute.multiply(a, b)
+ yield pc.multiply(a, b)
df = self.spark.range(10).select(
F.col("id").alias("a"),
@@ -1124,11 +1130,12 @@ class ScalarArrowUDFTestsMixin:
def test_arrow_iter_udf_three_columns(self):
import pyarrow as pa
+ import pyarrow.compute as pc
@arrow_udf(LongType())
def multiple(it: Iterator[Tuple[pa.Array, pa.Array, pa.Array]]) ->
Iterator[pa.Array]:
for a, b, c in it:
- yield pa.compute.multiply(pa.compute.multiply(a, b), c)
+ yield pc.multiply(pc.multiply(a, b), c)
df = self.spark.range(10).select(
F.col("id").alias("a"),
@@ -1142,21 +1149,21 @@ class ScalarArrowUDFTestsMixin:
self.assertEqual(expected, result.collect())
def test_return_type_coercion(self):
- import pyarrow as pa
+ import pyarrow.compute as pc
df = self.spark.range(10)
- scalar_long = arrow_udf(lambda x: pa.compute.add(x, 1), LongType())
+ scalar_long = arrow_udf(lambda x: pc.add(x, 1), LongType())
result1 = df.select(scalar_long("id").alias("res"))
self.assertEqual(10, len(result1.collect()))
# long -> int coercion
- scalar_int1 = arrow_udf(lambda x: pa.compute.add(x, 1), IntegerType())
+ scalar_int1 = arrow_udf(lambda x: pc.add(x, 1), IntegerType())
result2 = df.select(scalar_int1("id").alias("res"))
self.assertEqual(10, len(result2.collect()))
# long -> int coercion, overflow
- scalar_int2 = arrow_udf(lambda x: pa.compute.add(x, 2147483647),
IntegerType())
+ scalar_int2 = arrow_udf(lambda x: pc.add(x, 2147483647), IntegerType())
result3 = df.select(scalar_int2("id").alias("res"))
with self.assertRaises(Exception):
# pyarrow.lib.ArrowInvalid:
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]