This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 81be5fb02970 Revert "[SPARK-54218][PYTHON][SQL][TESTS] Add integrated
tests for Scalar Pandas Iterator UDF"
81be5fb02970 is described below
commit 81be5fb02970a26268e52bcfa05c5768186f329e
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Fri Nov 7 14:28:56 2025 +0800
Revert "[SPARK-54218][PYTHON][SQL][TESTS] Add integrated tests for Scalar
Pandas Iterator UDF"
revert https://github.com/apache/spark/pull/52916 to check the test timeout
issue
Closes #52935 from zhengruifeng/revert_pandas_iter_test.
Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
---
.../apache/spark/sql/IntegratedUDFTestUtils.scala | 82 +---------------------
.../org/apache/spark/sql/SQLQueryTestSuite.scala | 19 +----
.../sql/streaming/continuous/ContinuousSuite.scala | 4 +-
3 files changed, 5 insertions(+), 100 deletions(-)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/IntegratedUDFTestUtils.scala
b/sql/core/src/test/scala/org/apache/spark/sql/IntegratedUDFTestUtils.scala
index 9b4e0343f94b..a5f3e72f47b7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/IntegratedUDFTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/IntegratedUDFTestUtils.scala
@@ -249,7 +249,7 @@ object IntegratedUDFTestUtils extends SQLHelper {
binaryPythonDataSource
}
- private lazy val pandasScalarFunc: Array[Byte] = if (shouldTestPandasUDFs) {
+ private lazy val pandasFunc: Array[Byte] = if (shouldTestPandasUDFs) {
var binaryPandasFunc: Array[Byte] = null
withTempPath { path =>
Process(
@@ -272,29 +272,6 @@ object IntegratedUDFTestUtils extends SQLHelper {
throw new RuntimeException(s"Python executable [$pythonExec] and/or
pyspark are unavailable.")
}
- private lazy val pandasScalarIterFunc: Array[Byte] = if
(shouldTestPandasUDFs) {
- var binaryPandasFunc: Array[Byte] = null
- withTempPath { path =>
- Process(
- Seq(
- pythonExec,
- "-c",
- "from pyspark.sql.types import StringType; " +
- "from pyspark.serializers import CloudPickleSerializer; " +
- s"f = open('$path', 'wb');" +
- "f.write(CloudPickleSerializer().dumps((" +
- "lambda it: (x.apply(lambda v: None if v is None else str(v)) for
x in it), " +
- "StringType())))"),
- None,
- "PYTHONPATH" -> s"$pysparkPythonPath:$pythonPath").!!
- binaryPandasFunc = Files.readAllBytes(path.toPath)
- }
- assert(binaryPandasFunc != null)
- binaryPandasFunc
- } else {
- throw new RuntimeException(s"Python executable [$pythonExec] and/or
pyspark are unavailable.")
- }
-
private lazy val pandasGroupedAggFunc: Array[Byte] = if
(shouldTestPandasUDFs) {
var binaryPandasFunc: Array[Byte] = null
withTempPath { path =>
@@ -1403,7 +1380,7 @@ object IntegratedUDFTestUtils extends SQLHelper {
private[IntegratedUDFTestUtils] lazy val udf = new
UserDefinedPythonFunction(
name = name,
func = SimplePythonFunction(
- command = pandasScalarFunc.toImmutableArraySeq,
+ command = pandasFunc.toImmutableArraySeq,
envVars = workerEnv.clone().asInstanceOf[java.util.Map[String,
String]],
pythonIncludes = List.empty[String].asJava,
pythonExec = pythonExec,
@@ -1433,60 +1410,6 @@ object IntegratedUDFTestUtils extends SQLHelper {
val prettyName: String = "Scalar Pandas UDF"
}
- /**
- * A Scalar Iterator Pandas UDF that takes one column, casts into string,
executes the
- * Python native function, and casts back to the type of input column.
- *
- * Virtually equivalent to:
- *
- * {{{
- * from pyspark.sql.functions import pandas_udf, PandasUDFType
- *
- * df = spark.range(3).toDF("col")
- * scalar_iter_udf = pandas_udf(
- * lambda it: map(lambda x: x.apply(lambda v: str(v)), it),
- * "string",
- * PandasUDFType.SCALAR_ITER)
- * casted_col = scalar_iter_udf(df.col.cast("string"))
- * casted_col.cast(df.schema["col"].dataType)
- * }}}
- */
- case class TestScalarIterPandasUDF(
- name: String,
- returnType: Option[DataType] = None) extends TestUDF {
- private[IntegratedUDFTestUtils] lazy val udf = new
UserDefinedPythonFunction(
- name = name,
- func = SimplePythonFunction(
- command = pandasScalarIterFunc.toImmutableArraySeq,
- envVars = workerEnv.clone().asInstanceOf[java.util.Map[String,
String]],
- pythonIncludes = List.empty[String].asJava,
- pythonExec = pythonExec,
- pythonVer = pythonVer,
- broadcastVars = List.empty[Broadcast[PythonBroadcast]].asJava,
- accumulator = null),
- dataType = StringType,
- pythonEvalType = PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF,
- udfDeterministic = true) {
-
- override def builder(e: Seq[Expression]): Expression = {
- assert(e.length == 1, "Defined UDF only has one column")
- val expr = e.head
- val rt = returnType.getOrElse {
- assert(expr.resolved, "column should be resolved to use the same
type " +
- "as input. Try df(name) or df.col(name)")
- expr.dataType
- }
- val pythonUDF = new PythonUDFWithoutId(
- super.builder(Cast(expr, StringType) :: Nil).asInstanceOf[PythonUDF])
- Cast(pythonUDF, rt)
- }
- }
-
- def apply(exprs: Column*): Column = udf(exprs: _*)
-
- val prettyName: String = "Scalar Pandas Iterator UDF"
- }
-
/**
* A Grouped Aggregate Pandas UDF that takes one column, executes the
* Python native function calculating the count of the column using pandas.
@@ -1683,7 +1606,6 @@ object IntegratedUDFTestUtils extends SQLHelper {
def registerTestUDF(testUDF: TestUDF, session: classic.SparkSession): Unit =
testUDF match {
case udf: TestPythonUDF => session.udf.registerPython(udf.name, udf.udf)
case udf: TestScalarPandasUDF => session.udf.registerPython(udf.name,
udf.udf)
- case udf: TestScalarIterPandasUDF => session.udf.registerPython(udf.name,
udf.udf)
case udf: TestGroupedAggPandasUDF => session.udf.registerPython(udf.name,
udf.udf)
case udf: TestScalaUDF =>
val registry = session.sessionState.functionRegistry
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
index f87785600f7c..a57c72f5fc15 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
@@ -120,9 +120,6 @@ import org.apache.spark.util.Utils
* - Scalar Pandas UDF test case with a Scalar Pandas UDF registered as the
name 'udf'
* iff Python executable, pyspark, pandas and pyarrow are available.
*
- * - Scalar Iterator Pandas UDF test case with a Scalar Iterator Pandas UDF
registered
- * as the name 'udf' iff Python executable, pyspark, pandas and pyarrow are
available.
- *
* Therefore, UDF test cases should have single input and output files but
executed by three
* different types of UDFs. See 'udf/udf-inner-join.sql' as an example.
*
@@ -196,12 +193,6 @@ class SQLQueryTestSuite extends QueryTest with
SharedSparkSession with SQLHelper
s"pandas and/or pyarrow were not available in [$pythonExec].") {
/* Do nothing */
}
- case udfTestCase: SQLQueryTestSuite#UDFTest
- if udfTestCase.udf.isInstanceOf[TestScalarIterPandasUDF] &&
!shouldTestPandasUDFs =>
- ignore(s"${testCase.name} is skipped because pyspark," +
- s"pandas and/or pyarrow were not available in [$pythonExec].") {
- /* Do nothing */
- }
case udfTestCase: SQLQueryTestSuite#UDFTest
if udfTestCase.udf.isInstanceOf[TestGroupedAggPandasUDF] &&
!shouldTestPandasUDFs =>
@@ -406,10 +397,6 @@ class SQLQueryTestSuite extends QueryTest with
SharedSparkSession with SQLHelper
if udfTestCase.udf.isInstanceOf[TestScalarPandasUDF] &&
shouldTestPandasUDFs =>
s"${testCase.name}${System.lineSeparator()}" +
s"Python: $pythonVer Pandas: $pandasVer PyArrow:
$pyarrowVer${System.lineSeparator()}"
- case udfTestCase: SQLQueryTestSuite#UDFTest
- if udfTestCase.udf.isInstanceOf[TestScalarIterPandasUDF] &&
shouldTestPandasUDFs =>
- s"${testCase.name}${System.lineSeparator()}" +
- s"Python: $pythonVer Pandas: $pandasVer PyArrow:
$pyarrowVer${System.lineSeparator()}"
case udfTestCase: SQLQueryTestSuite#UDFTest
if udfTestCase.udf.isInstanceOf[TestGroupedAggPandasUDF] &&
shouldTestPandasUDFs =>
@@ -459,14 +446,12 @@ class SQLQueryTestSuite extends QueryTest with
SharedSparkSession with SQLHelper
// Create test cases of test types that depend on the input filename.
val newTestCases: Seq[TestCase] = if (file.getAbsolutePath.startsWith(
s"$inputFilePath${File.separator}udf${File.separator}postgreSQL")) {
- Seq(TestScalaUDF("udf"), TestPythonUDF("udf"),
- TestScalarPandasUDF("udf"), TestScalarIterPandasUDF("udf")).map {
udf =>
+ Seq(TestScalaUDF("udf"), TestPythonUDF("udf"),
TestScalarPandasUDF("udf")).map { udf =>
UDFPgSQLTestCase(
s"$testCaseName - ${udf.prettyName}", absPath, resultFile, udf)
}
} else if
(file.getAbsolutePath.startsWith(s"$inputFilePath${File.separator}udf")) {
- Seq(TestScalaUDF("udf"), TestPythonUDF("udf"),
- TestScalarPandasUDF("udf"), TestScalarIterPandasUDF("udf")).map {
udf =>
+ Seq(TestScalaUDF("udf"), TestPythonUDF("udf"),
TestScalarPandasUDF("udf")).map { udf =>
UDFTestCase(
s"$testCaseName - ${udf.prettyName}", absPath, resultFile, udf)
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
index 2dd17136081e..c70f21ae144b 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
@@ -278,12 +278,10 @@ class ContinuousSuite extends ContinuousSuiteBase {
s"Result set ${results.toSet} are not a superset of $expected!")
}
- Seq(TestScalaUDF("udf"), TestPythonUDF("udf"),
- TestScalarPandasUDF("udf"), TestScalarIterPandasUDF("udf")).foreach { udf
=>
+ Seq(TestScalaUDF("udf"), TestPythonUDF("udf"),
TestScalarPandasUDF("udf")).foreach { udf =>
test(s"continuous mode with various UDFs - ${udf.prettyName}") {
assume(
shouldTestPandasUDFs && udf.isInstanceOf[TestScalarPandasUDF] ||
- shouldTestPandasUDFs && udf.isInstanceOf[TestScalarIterPandasUDF] ||
shouldTestPythonUDFs && udf.isInstanceOf[TestPythonUDF] ||
udf.isInstanceOf[TestScalaUDF])
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]