This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8095c39aaa92 [SPARK-54869][PYTHON][TESTS] Apply the standard import of 
pyarrow compute in `test_arrow_udf_scalar`
8095c39aaa92 is described below

commit 8095c39aaa926c1e4215c195671c52123250f216
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Tue Dec 30 12:45:13 2025 +0800

    [SPARK-54869][PYTHON][TESTS] Apply the standard import of pyarrow compute 
in `test_arrow_udf_scalar`
    
    ### What changes were proposed in this pull request?
    Apply the standard import of pyarrow compute in `test_arrow_udf_scalar`
    ```
    import pyarrow.compute as pc
    ```
    
    ### Why are the changes needed?
    There seems to be a bug in cloudpickle when dealing with nested lambdas + 
submodules like `pa.compute`
    
    see https://github.com/apache/spark/pull/53607, it fixed a failure in 
Python-Only MacOS26.
    
    before: 
https://github.com/apache/spark/actions/runs/20495264978/job/58904792835
    
    after: 
https://github.com/apache/spark/actions/runs/20560816127/job/59051168728
    
    This PR applies such change in more tests.
    
    ### Does this PR introduce _any_ user-facing change?
    no, test-only
    
    ### How was this patch tested?
    ci
    
    ### Was this patch authored or co-authored using generative AI tooling?
    no
    
    Closes #53640 from zhengruifeng/import_pc.
    
    Authored-by: Ruifeng Zheng <[email protected]>
    Signed-off-by: Ruifeng Zheng <[email protected]>
---
 .../sql/tests/arrow/test_arrow_udf_scalar.py       | 69 ++++++++++++----------
 1 file changed, 38 insertions(+), 31 deletions(-)

diff --git a/python/pyspark/sql/tests/arrow/test_arrow_udf_scalar.py 
b/python/pyspark/sql/tests/arrow/test_arrow_udf_scalar.py
index 710e8322de21..524f337554e7 100644
--- a/python/pyspark/sql/tests/arrow/test_arrow_udf_scalar.py
+++ b/python/pyspark/sql/tests/arrow/test_arrow_udf_scalar.py
@@ -84,12 +84,12 @@ class ScalarArrowUDFTestsMixin:
         return random_udf.asNondeterministic()
 
     def test_arrow_udf_tokenize(self):
-        import pyarrow as pa
+        import pyarrow.compute as pc
 
         df = self.spark.createDataFrame([("hi boo",), ("bye boo",)], ["vals"])
 
         tokenize = arrow_udf(
-            lambda s: pa.compute.ascii_split_whitespace(s),
+            lambda s: pc.ascii_split_whitespace(s),
             ArrayType(StringType()),
         )
 
@@ -99,11 +99,12 @@ class ScalarArrowUDFTestsMixin:
 
     def test_arrow_udf_output_nested_arrays(self):
         import pyarrow as pa
+        import pyarrow.compute as pc
 
         df = self.spark.createDataFrame([("hi boo",), ("bye boo",)], ["vals"])
 
         tokenize = arrow_udf(
-            lambda s: pa.array([[v] for v in 
pa.compute.ascii_split_whitespace(s).to_pylist()]),
+            lambda s: pa.array([[v] for v in 
pc.ascii_split_whitespace(s).to_pylist()]),
             ArrayType(ArrayType(StringType())),
         )
 
@@ -499,6 +500,7 @@ class ScalarArrowUDFTestsMixin:
 
     def test_arrow_udf_input_variant(self):
         import pyarrow as pa
+        import pyarrow.compute as pc
 
         @arrow_udf("int")
         def scalar_f(v: pa.Array) -> pa.Array:
@@ -506,7 +508,7 @@ class ScalarArrowUDFTestsMixin:
             assert isinstance(v, pa.StructArray)
             assert isinstance(v.field("metadata"), pa.BinaryArray)
             assert isinstance(v.field("value"), pa.BinaryArray)
-            return pa.compute.binary_length(v.field("value"))
+            return pc.binary_length(v.field("value"))
 
         @arrow_udf("int")
         def iter_f(it: Iterator[pa.Array]) -> Iterator[pa.Array]:
@@ -515,7 +517,7 @@ class ScalarArrowUDFTestsMixin:
                 assert isinstance(v, pa.StructArray)
                 assert isinstance(v.field("metadata"), pa.BinaryArray)
                 assert isinstance(v.field("value"), pa.BinaryArray)
-                yield pa.compute.binary_length(v.field("value"))
+                yield pc.binary_length(v.field("value"))
 
         df = self.spark.range(0, 10).selectExpr("parse_json(cast(id as 
string)) v")
         expected = [Row(l=2) for i in range(10)]
@@ -703,10 +705,9 @@ class ScalarArrowUDFTestsMixin:
 
     def test_udf_register_arrow_udf_basic(self):
         import pyarrow as pa
+        import pyarrow.compute as pc
 
-        scalar_original_add = arrow_udf(
-            lambda x, y: pa.compute.add(x, y).cast(pa.int32()), IntegerType()
-        )
+        scalar_original_add = arrow_udf(lambda x, y: pc.add(x, 
y).cast(pa.int32()), IntegerType())
         self.assertEqual(scalar_original_add.evalType, 
PythonEvalType.SQL_SCALAR_ARROW_UDF)
         self.assertEqual(scalar_original_add.deterministic, True)
 
@@ -730,7 +731,7 @@ class ScalarArrowUDFTestsMixin:
         @arrow_udf(LongType())
         def scalar_iter_add(it: Iterator[Tuple[pa.Array, pa.Array]]) -> 
Iterator[pa.Array]:
             for a, b in it:
-                yield pa.compute.add(a, b)
+                yield pc.add(a, b)
 
         with self.temp_func("add1"):
             new_add = self.spark.udf.register("add1", scalar_iter_add)
@@ -745,10 +746,9 @@ class ScalarArrowUDFTestsMixin:
 
     def test_catalog_register_arrow_udf_basic(self):
         import pyarrow as pa
+        import pyarrow.compute as pc
 
-        scalar_original_add = arrow_udf(
-            lambda x, y: pa.compute.add(x, y).cast(pa.int32()), IntegerType()
-        )
+        scalar_original_add = arrow_udf(lambda x, y: pc.add(x, 
y).cast(pa.int32()), IntegerType())
         self.assertEqual(scalar_original_add.evalType, 
PythonEvalType.SQL_SCALAR_ARROW_UDF)
         self.assertEqual(scalar_original_add.deterministic, True)
 
@@ -772,7 +772,7 @@ class ScalarArrowUDFTestsMixin:
         @arrow_udf(LongType())
         def scalar_iter_add(it: Iterator[Tuple[pa.Array, pa.Array]]) -> 
Iterator[pa.Array]:
             for a, b in it:
-                yield pa.compute.add(a, b)
+                yield pc.add(a, b)
 
         with self.temp_func("add1"):
             new_add = self.spark.catalog.registerFunction("add1", 
scalar_iter_add)
@@ -786,10 +786,10 @@ class ScalarArrowUDFTestsMixin:
             self.assertEqual(expected.collect(), res4.collect())
 
     def test_udf_register_nondeterministic_arrow_udf(self):
-        import pyarrow as pa
+        import pyarrow.compute as pc
 
         random_arrow_udf = arrow_udf(
-            lambda x: pa.compute.add(x, random.randint(6, 6)), LongType()
+            lambda x: pc.add(x, random.randint(6, 6)), LongType()
         ).asNondeterministic()
         self.assertEqual(random_arrow_udf.deterministic, False)
         self.assertEqual(random_arrow_udf.evalType, 
PythonEvalType.SQL_SCALAR_ARROW_UDF)
@@ -805,10 +805,10 @@ class ScalarArrowUDFTestsMixin:
             self.assertEqual(row[0], 7)
 
     def test_catalog_register_nondeterministic_arrow_udf(self):
-        import pyarrow as pa
+        import pyarrow.compute as pc
 
         random_arrow_udf = arrow_udf(
-            lambda x: pa.compute.add(x, random.randint(6, 6)), LongType()
+            lambda x: pc.add(x, random.randint(6, 6)), LongType()
         ).asNondeterministic()
         self.assertEqual(random_arrow_udf.deterministic, False)
         self.assertEqual(random_arrow_udf.evalType, 
PythonEvalType.SQL_SCALAR_ARROW_UDF)
@@ -827,17 +827,17 @@ class ScalarArrowUDFTestsMixin:
 
     @unittest.skipIf(not have_numpy, numpy_requirement_message)
     def test_nondeterministic_arrow_udf(self):
-        import pyarrow as pa
+        import pyarrow.compute as pc
 
         # Test that nondeterministic UDFs are evaluated only once in chained 
UDF evaluations
         @arrow_udf("double")
         def scalar_plus_ten(v):
-            return pa.compute.add(v, 10)
+            return pc.add(v, 10)
 
         @arrow_udf("double", ArrowUDFType.SCALAR_ITER)
         def iter_plus_ten(it):
             for v in it:
-                yield pa.compute.add(v, 10)
+                yield pc.add(v, 10)
 
         for plus_ten in [scalar_plus_ten, iter_plus_ten]:
             random_udf = self.nondeterministic_arrow_udf
@@ -984,10 +984,11 @@ class ScalarArrowUDFTestsMixin:
 
     def test_arrow_udf_named_arguments(self):
         import pyarrow as pa
+        import pyarrow.compute as pc
 
         @arrow_udf("int")
         def test_udf(a, b):
-            return pa.compute.add(a, pa.compute.multiply(b, 
10)).cast(pa.int32())
+            return pc.add(a, pc.multiply(b, 10)).cast(pa.int32())
 
         with self.temp_func("test_udf"):
             self.spark.udf.register("test_udf", test_udf)
@@ -1008,10 +1009,11 @@ class ScalarArrowUDFTestsMixin:
 
     def test_arrow_udf_named_arguments_negative(self):
         import pyarrow as pa
+        import pyarrow.compute as pc
 
         @arrow_udf("int")
         def test_udf(a, b):
-            return pa.compute.add(a, b).cast(pa.int32())
+            return pc.add(a, b).cast(pa.int32())
 
         with self.temp_func("test_udf"):
             self.spark.udf.register("test_udf", test_udf)
@@ -1032,10 +1034,11 @@ class ScalarArrowUDFTestsMixin:
 
     def test_arrow_udf_named_arguments_and_defaults(self):
         import pyarrow as pa
+        import pyarrow.compute as pc
 
         @arrow_udf("int")
         def test_udf(a, b=0):
-            return pa.compute.add(a, pa.compute.multiply(b, 
10)).cast(pa.int32())
+            return pc.add(a, pc.multiply(b, 10)).cast(pa.int32())
 
         with self.temp_func("test_udf"):
             self.spark.udf.register("test_udf", test_udf)
@@ -1070,10 +1073,11 @@ class ScalarArrowUDFTestsMixin:
 
     def test_arrow_udf_kwargs(self):
         import pyarrow as pa
+        import pyarrow.compute as pc
 
         @arrow_udf("int")
         def test_udf(a, **kwargs):
-            return pa.compute.add(a, pa.compute.multiply(kwargs["b"], 
10)).cast(pa.int32())
+            return pc.add(a, pc.multiply(kwargs["b"], 10)).cast(pa.int32())
 
         with self.temp_func("test_udf"):
             self.spark.udf.register("test_udf", test_udf)
@@ -1092,11 +1096,12 @@ class ScalarArrowUDFTestsMixin:
 
     def test_arrow_iter_udf_single_column(self):
         import pyarrow as pa
+        import pyarrow.compute as pc
 
         @arrow_udf(LongType())
         def add_one(it: Iterator[pa.Array]) -> Iterator[pa.Array]:
             for s in it:
-                yield pa.compute.add(s, 1)
+                yield pc.add(s, 1)
 
         df = self.spark.range(10)
         expected = df.select((F.col("id") + 1).alias("res")).collect()
@@ -1106,11 +1111,12 @@ class ScalarArrowUDFTestsMixin:
 
     def test_arrow_iter_udf_two_columns(self):
         import pyarrow as pa
+        import pyarrow.compute as pc
 
         @arrow_udf(LongType())
         def multiple(it: Iterator[Tuple[pa.Array, pa.Array]]) -> 
Iterator[pa.Array]:
             for a, b in it:
-                yield pa.compute.multiply(a, b)
+                yield pc.multiply(a, b)
 
         df = self.spark.range(10).select(
             F.col("id").alias("a"),
@@ -1124,11 +1130,12 @@ class ScalarArrowUDFTestsMixin:
 
     def test_arrow_iter_udf_three_columns(self):
         import pyarrow as pa
+        import pyarrow.compute as pc
 
         @arrow_udf(LongType())
         def multiple(it: Iterator[Tuple[pa.Array, pa.Array, pa.Array]]) -> 
Iterator[pa.Array]:
             for a, b, c in it:
-                yield pa.compute.multiply(pa.compute.multiply(a, b), c)
+                yield pc.multiply(pc.multiply(a, b), c)
 
         df = self.spark.range(10).select(
             F.col("id").alias("a"),
@@ -1142,21 +1149,21 @@ class ScalarArrowUDFTestsMixin:
         self.assertEqual(expected, result.collect())
 
     def test_return_type_coercion(self):
-        import pyarrow as pa
+        import pyarrow.compute as pc
 
         df = self.spark.range(10)
 
-        scalar_long = arrow_udf(lambda x: pa.compute.add(x, 1), LongType())
+        scalar_long = arrow_udf(lambda x: pc.add(x, 1), LongType())
         result1 = df.select(scalar_long("id").alias("res"))
         self.assertEqual(10, len(result1.collect()))
 
         # long -> int coercion
-        scalar_int1 = arrow_udf(lambda x: pa.compute.add(x, 1), IntegerType())
+        scalar_int1 = arrow_udf(lambda x: pc.add(x, 1), IntegerType())
         result2 = df.select(scalar_int1("id").alias("res"))
         self.assertEqual(10, len(result2.collect()))
 
         # long -> int coercion, overflow
-        scalar_int2 = arrow_udf(lambda x: pa.compute.add(x, 2147483647), 
IntegerType())
+        scalar_int2 = arrow_udf(lambda x: pc.add(x, 2147483647), IntegerType())
         result3 = df.select(scalar_int2("id").alias("res"))
         with self.assertRaises(Exception):
             # pyarrow.lib.ArrowInvalid:


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to