This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 81be5fb02970 Revert "[SPARK-54218][PYTHON][SQL][TESTS] Add integrated 
tests for Scalar Pandas Iterator UDF"
81be5fb02970 is described below

commit 81be5fb02970a26268e52bcfa05c5768186f329e
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Fri Nov 7 14:28:56 2025 +0800

    Revert "[SPARK-54218][PYTHON][SQL][TESTS] Add integrated tests for Scalar 
Pandas Iterator UDF"
    
    revert https://github.com/apache/spark/pull/52916 to check the test timeout 
issue
    
    Closes #52935 from zhengruifeng/revert_pandas_iter_test.
    
    Authored-by: Ruifeng Zheng <[email protected]>
    Signed-off-by: Ruifeng Zheng <[email protected]>
---
 .../apache/spark/sql/IntegratedUDFTestUtils.scala  | 82 +---------------------
 .../org/apache/spark/sql/SQLQueryTestSuite.scala   | 19 +----
 .../sql/streaming/continuous/ContinuousSuite.scala |  4 +-
 3 files changed, 5 insertions(+), 100 deletions(-)

diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/IntegratedUDFTestUtils.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/IntegratedUDFTestUtils.scala
index 9b4e0343f94b..a5f3e72f47b7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/IntegratedUDFTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/IntegratedUDFTestUtils.scala
@@ -249,7 +249,7 @@ object IntegratedUDFTestUtils extends SQLHelper {
     binaryPythonDataSource
   }
 
-  private lazy val pandasScalarFunc: Array[Byte] = if (shouldTestPandasUDFs) {
+  private lazy val pandasFunc: Array[Byte] = if (shouldTestPandasUDFs) {
     var binaryPandasFunc: Array[Byte] = null
     withTempPath { path =>
       Process(
@@ -272,29 +272,6 @@ object IntegratedUDFTestUtils extends SQLHelper {
     throw new RuntimeException(s"Python executable [$pythonExec] and/or 
pyspark are unavailable.")
   }
 
-  private lazy val pandasScalarIterFunc: Array[Byte] = if 
(shouldTestPandasUDFs) {
-    var binaryPandasFunc: Array[Byte] = null
-    withTempPath { path =>
-      Process(
-        Seq(
-          pythonExec,
-          "-c",
-          "from pyspark.sql.types import StringType; " +
-            "from pyspark.serializers import CloudPickleSerializer; " +
-            s"f = open('$path', 'wb');" +
-            "f.write(CloudPickleSerializer().dumps((" +
-            "lambda it: (x.apply(lambda v: None if v is None else str(v)) for 
x in it), " +
-            "StringType())))"),
-        None,
-        "PYTHONPATH" -> s"$pysparkPythonPath:$pythonPath").!!
-      binaryPandasFunc = Files.readAllBytes(path.toPath)
-    }
-    assert(binaryPandasFunc != null)
-    binaryPandasFunc
-  } else {
-    throw new RuntimeException(s"Python executable [$pythonExec] and/or 
pyspark are unavailable.")
-  }
-
   private lazy val pandasGroupedAggFunc: Array[Byte] = if 
(shouldTestPandasUDFs) {
     var binaryPandasFunc: Array[Byte] = null
     withTempPath { path =>
@@ -1403,7 +1380,7 @@ object IntegratedUDFTestUtils extends SQLHelper {
     private[IntegratedUDFTestUtils] lazy val udf = new 
UserDefinedPythonFunction(
       name = name,
       func = SimplePythonFunction(
-        command = pandasScalarFunc.toImmutableArraySeq,
+        command = pandasFunc.toImmutableArraySeq,
         envVars = workerEnv.clone().asInstanceOf[java.util.Map[String, 
String]],
         pythonIncludes = List.empty[String].asJava,
         pythonExec = pythonExec,
@@ -1433,60 +1410,6 @@ object IntegratedUDFTestUtils extends SQLHelper {
     val prettyName: String = "Scalar Pandas UDF"
   }
 
-  /**
-   * A Scalar Iterator Pandas UDF that takes one column, casts into string, 
executes the
-   * Python native function, and casts back to the type of input column.
-   *
-   * Virtually equivalent to:
-   *
-   * {{{
-   *   from pyspark.sql.functions import pandas_udf, PandasUDFType
-   *
-   *   df = spark.range(3).toDF("col")
-   *   scalar_iter_udf = pandas_udf(
-   *       lambda it: map(lambda x: x.apply(lambda v: str(v)), it),
-   *       "string",
-   *       PandasUDFType.SCALAR_ITER)
-   *   casted_col = scalar_iter_udf(df.col.cast("string"))
-   *   casted_col.cast(df.schema["col"].dataType)
-   * }}}
-   */
-  case class TestScalarIterPandasUDF(
-      name: String,
-      returnType: Option[DataType] = None) extends TestUDF {
-    private[IntegratedUDFTestUtils] lazy val udf = new 
UserDefinedPythonFunction(
-      name = name,
-      func = SimplePythonFunction(
-        command = pandasScalarIterFunc.toImmutableArraySeq,
-        envVars = workerEnv.clone().asInstanceOf[java.util.Map[String, 
String]],
-        pythonIncludes = List.empty[String].asJava,
-        pythonExec = pythonExec,
-        pythonVer = pythonVer,
-        broadcastVars = List.empty[Broadcast[PythonBroadcast]].asJava,
-        accumulator = null),
-      dataType = StringType,
-      pythonEvalType = PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF,
-      udfDeterministic = true) {
-
-      override def builder(e: Seq[Expression]): Expression = {
-        assert(e.length == 1, "Defined UDF only has one column")
-        val expr = e.head
-        val rt = returnType.getOrElse {
-          assert(expr.resolved, "column should be resolved to use the same 
type " +
-            "as input. Try df(name) or df.col(name)")
-          expr.dataType
-        }
-        val pythonUDF = new PythonUDFWithoutId(
-          super.builder(Cast(expr, StringType) :: Nil).asInstanceOf[PythonUDF])
-        Cast(pythonUDF, rt)
-      }
-    }
-
-    def apply(exprs: Column*): Column = udf(exprs: _*)
-
-    val prettyName: String = "Scalar Pandas Iterator UDF"
-  }
-
   /**
    * A Grouped Aggregate Pandas UDF that takes one column, executes the
    * Python native function calculating the count of the column using pandas.
@@ -1683,7 +1606,6 @@ object IntegratedUDFTestUtils extends SQLHelper {
   def registerTestUDF(testUDF: TestUDF, session: classic.SparkSession): Unit = 
testUDF match {
     case udf: TestPythonUDF => session.udf.registerPython(udf.name, udf.udf)
     case udf: TestScalarPandasUDF => session.udf.registerPython(udf.name, 
udf.udf)
-    case udf: TestScalarIterPandasUDF => session.udf.registerPython(udf.name, 
udf.udf)
     case udf: TestGroupedAggPandasUDF => session.udf.registerPython(udf.name, 
udf.udf)
     case udf: TestScalaUDF =>
       val registry = session.sessionState.functionRegistry
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
index f87785600f7c..a57c72f5fc15 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
@@ -120,9 +120,6 @@ import org.apache.spark.util.Utils
  *  - Scalar Pandas UDF test case with a Scalar Pandas UDF registered as the 
name 'udf'
  *    iff Python executable, pyspark, pandas and pyarrow are available.
  *
- *  - Scalar Iterator Pandas UDF test case with a Scalar Iterator Pandas UDF 
registered
- *    as the name 'udf' iff Python executable, pyspark, pandas and pyarrow are 
available.
- *
  * Therefore, UDF test cases should have single input and output files but 
executed by three
  * different types of UDFs. See 'udf/udf-inner-join.sql' as an example.
  *
@@ -196,12 +193,6 @@ class SQLQueryTestSuite extends QueryTest with 
SharedSparkSession with SQLHelper
           s"pandas and/or pyarrow were not available in [$pythonExec].") {
           /* Do nothing */
         }
-      case udfTestCase: SQLQueryTestSuite#UDFTest
-        if udfTestCase.udf.isInstanceOf[TestScalarIterPandasUDF] && 
!shouldTestPandasUDFs =>
-        ignore(s"${testCase.name} is skipped because pyspark," +
-          s"pandas and/or pyarrow were not available in [$pythonExec].") {
-          /* Do nothing */
-        }
       case udfTestCase: SQLQueryTestSuite#UDFTest
           if udfTestCase.udf.isInstanceOf[TestGroupedAggPandasUDF] &&
             !shouldTestPandasUDFs =>
@@ -406,10 +397,6 @@ class SQLQueryTestSuite extends QueryTest with 
SharedSparkSession with SQLHelper
           if udfTestCase.udf.isInstanceOf[TestScalarPandasUDF] && 
shouldTestPandasUDFs =>
         s"${testCase.name}${System.lineSeparator()}" +
           s"Python: $pythonVer Pandas: $pandasVer PyArrow: 
$pyarrowVer${System.lineSeparator()}"
-      case udfTestCase: SQLQueryTestSuite#UDFTest
-          if udfTestCase.udf.isInstanceOf[TestScalarIterPandasUDF] && 
shouldTestPandasUDFs =>
-        s"${testCase.name}${System.lineSeparator()}" +
-          s"Python: $pythonVer Pandas: $pandasVer PyArrow: 
$pyarrowVer${System.lineSeparator()}"
       case udfTestCase: SQLQueryTestSuite#UDFTest
           if udfTestCase.udf.isInstanceOf[TestGroupedAggPandasUDF] &&
             shouldTestPandasUDFs =>
@@ -459,14 +446,12 @@ class SQLQueryTestSuite extends QueryTest with 
SharedSparkSession with SQLHelper
       // Create test cases of test types that depend on the input filename.
       val newTestCases: Seq[TestCase] = if (file.getAbsolutePath.startsWith(
         s"$inputFilePath${File.separator}udf${File.separator}postgreSQL")) {
-        Seq(TestScalaUDF("udf"), TestPythonUDF("udf"),
-          TestScalarPandasUDF("udf"), TestScalarIterPandasUDF("udf")).map { 
udf =>
+        Seq(TestScalaUDF("udf"), TestPythonUDF("udf"), 
TestScalarPandasUDF("udf")).map { udf =>
           UDFPgSQLTestCase(
             s"$testCaseName - ${udf.prettyName}", absPath, resultFile, udf)
         }
       } else if 
(file.getAbsolutePath.startsWith(s"$inputFilePath${File.separator}udf")) {
-        Seq(TestScalaUDF("udf"), TestPythonUDF("udf"),
-          TestScalarPandasUDF("udf"), TestScalarIterPandasUDF("udf")).map { 
udf =>
+        Seq(TestScalaUDF("udf"), TestPythonUDF("udf"), 
TestScalarPandasUDF("udf")).map { udf =>
           UDFTestCase(
             s"$testCaseName - ${udf.prettyName}", absPath, resultFile, udf)
         }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
index 2dd17136081e..c70f21ae144b 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
@@ -278,12 +278,10 @@ class ContinuousSuite extends ContinuousSuiteBase {
       s"Result set ${results.toSet} are not a superset of $expected!")
   }
 
-  Seq(TestScalaUDF("udf"), TestPythonUDF("udf"),
-    TestScalarPandasUDF("udf"), TestScalarIterPandasUDF("udf")).foreach { udf 
=>
+  Seq(TestScalaUDF("udf"), TestPythonUDF("udf"), 
TestScalarPandasUDF("udf")).foreach { udf =>
     test(s"continuous mode with various UDFs - ${udf.prettyName}") {
       assume(
         shouldTestPandasUDFs && udf.isInstanceOf[TestScalarPandasUDF] ||
-        shouldTestPandasUDFs && udf.isInstanceOf[TestScalarIterPandasUDF] ||
         shouldTestPythonUDFs && udf.isInstanceOf[TestPythonUDF] ||
         udf.isInstanceOf[TestScalaUDF])
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to