This is an automated email from the ASF dual-hosted git repository.

yuanzhou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 52b717abda [VL] Add GlutenExtractPythonUDFsSuite (#9877)
52b717abda is described below

commit 52b717abdab284b70a042fef0293da4ff909404c
Author: Ankita Victor <anvi...@microsoft.com>
AuthorDate: Thu Jun 12 21:40:24 2025 +0530

    [VL] Add GlutenExtractPythonUDFsSuite (#9877)
    
    Add GlutenExtractPythonUDFsSuite for all Spark versions.
---
 .../gluten/utils/velox/VeloxTestSettings.scala     |   8 +
 .../python/GlutenExtractPythonUDFsSuite.scala      | 167 +++++++++++++++++++++
 .../gluten/utils/velox/VeloxTestSettings.scala     |   8 +
 .../python/GlutenExtractPythonUDFsSuite.scala      | 167 +++++++++++++++++++++
 .../gluten/utils/velox/VeloxTestSettings.scala     |   8 +
 .../python/GlutenExtractPythonUDFsSuite.scala      | 167 +++++++++++++++++++++
 .../gluten/utils/velox/VeloxTestSettings.scala     |   8 +
 .../python/GlutenExtractPythonUDFsSuite.scala      | 167 +++++++++++++++++++++
 8 files changed, 700 insertions(+)

diff --git 
a/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
 
b/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index c1d8b242ef..a277da3dba 100644
--- 
a/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++ 
b/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -823,6 +823,14 @@ class VeloxTestSettings extends BackendTestSettings {
     .exclude("Nested Python UDF: push down deterministic FilterExec 
predicates")
     .exclude("Python UDF: no push down on non-deterministic")
     .exclude("Python UDF: push down on deterministic predicates after the 
first non-deterministic")
+  enableSuite[GlutenExtractPythonUDFsSuite]
+    // Replaced with test that check for native operations
+    .exclude("Python UDF should not break column pruning/filter pushdown -- 
Parquet V1")
+    .exclude("Chained Scalar Pandas UDFs should be combined to a single 
physical node")
+    .exclude("Mixed Batched Python UDFs and Pandas UDF should be separate 
physical node")
+    .exclude("Independent Batched Python UDFs and Scalar Pandas UDFs should be 
combined separately")
+    .exclude("Dependent Batched Python UDFs and Scalar Pandas UDFs should not 
be combined")
+    .exclude("Python UDF should not break column pruning/filter pushdown -- 
Parquet V2")
 
   override def getSQLQueryTestSettings: SQLQueryTestSettings = 
VeloxSQLQueryTestSettings
 }
diff --git 
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/python/GlutenExtractPythonUDFsSuite.scala
 
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/python/GlutenExtractPythonUDFsSuite.scala
new file mode 100644
index 0000000000..1cd34bbf78
--- /dev/null
+++ 
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/python/GlutenExtractPythonUDFsSuite.scala
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.python
+
+import org.apache.gluten.execution.{BatchScanExecTransformer, 
FileSourceScanExecTransformer}
+
+import org.apache.spark.sql.GlutenSQLTestsBaseTrait
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.internal.SQLConf
+
+class GlutenExtractPythonUDFsSuite extends ExtractPythonUDFsSuite with 
GlutenSQLTestsBaseTrait {
+
+  import testImplicits._
+
+  def collectBatchExec(plan: SparkPlan): Seq[BatchEvalPythonExec] = 
plan.collect {
+    case b: BatchEvalPythonExec => b
+  }
+
+  def collectColumnarArrowExec(plan: SparkPlan): Seq[EvalPythonExec] = 
plan.collect {
+    // To check for ColumnarArrowEvalPythonExec
+    case b: EvalPythonExec
+        if !b.isInstanceOf[ArrowEvalPythonExec] && 
!b.isInstanceOf[BatchEvalPythonExec] =>
+      b
+  }
+
+  testGluten("Chained Scalar Pandas UDFs should be combined to a single 
physical node") {
+    val df = Seq(("Hello", 4)).toDF("a", "b")
+    val df2 = df
+      .withColumn("c", scalarPandasUDF(col("a")))
+      .withColumn("d", scalarPandasUDF(col("c")))
+    val arrowEvalNodes = 
collectColumnarArrowExec(df2.queryExecution.executedPlan)
+    assert(arrowEvalNodes.size == 1)
+  }
+
+  testGluten("Mixed Batched Python UDFs and Pandas UDF should be separate 
physical node") {
+    val df = Seq(("Hello", 4)).toDF("a", "b")
+    val df2 = df
+      .withColumn("c", batchedPythonUDF(col("a")))
+      .withColumn("d", scalarPandasUDF(col("b")))
+
+    val pythonEvalNodes = collectBatchExec(df2.queryExecution.executedPlan)
+    val arrowEvalNodes = 
collectColumnarArrowExec(df2.queryExecution.executedPlan)
+    assert(pythonEvalNodes.size == 1)
+    assert(arrowEvalNodes.size == 1)
+  }
+
+  testGluten(
+    "Independent Batched Python UDFs and Scalar Pandas UDFs should be combined 
separately") {
+    val df = Seq(("Hello", 4)).toDF("a", "b")
+    val df2 = df
+      .withColumn("c1", batchedPythonUDF(col("a")))
+      .withColumn("c2", batchedPythonUDF(col("c1")))
+      .withColumn("d1", scalarPandasUDF(col("a")))
+      .withColumn("d2", scalarPandasUDF(col("d1")))
+
+    val pythonEvalNodes = collectBatchExec(df2.queryExecution.executedPlan)
+    val arrowEvalNodes = 
collectColumnarArrowExec(df2.queryExecution.executedPlan)
+    assert(pythonEvalNodes.size == 1)
+    assert(arrowEvalNodes.size == 1)
+  }
+
+  testGluten("Dependent Batched Python UDFs and Scalar Pandas UDFs should not 
be combined") {
+    val df = Seq(("Hello", 4)).toDF("a", "b")
+    val df2 = df
+      .withColumn("c1", batchedPythonUDF(col("a")))
+      .withColumn("d1", scalarPandasUDF(col("c1")))
+      .withColumn("c2", batchedPythonUDF(col("d1")))
+      .withColumn("d2", scalarPandasUDF(col("c2")))
+
+    val pythonEvalNodes = collectBatchExec(df2.queryExecution.executedPlan)
+    val arrowEvalNodes = 
collectColumnarArrowExec(df2.queryExecution.executedPlan)
+    assert(pythonEvalNodes.size == 2)
+    assert(arrowEvalNodes.size == 2)
+  }
+
+  testGluten("Python UDF should not break column pruning/filter pushdown -- 
Parquet V2") {
+    withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "") {
+      withTempPath {
+        f =>
+          spark.range(10).select($"id".as("a"), 
$"id".as("b")).write.parquet(f.getCanonicalPath)
+          val df = spark.read.parquet(f.getCanonicalPath)
+
+          withClue("column pruning") {
+            val query = df.filter(batchedPythonUDF($"a")).select($"a")
+
+            val pythonEvalNodes = 
collectBatchExec(query.queryExecution.executedPlan)
+            assert(pythonEvalNodes.length == 1)
+
+            val scanNodes = query.queryExecution.executedPlan.collect {
+              case scan: BatchScanExecTransformer => scan
+            }
+            assert(scanNodes.length == 1)
+            assert(scanNodes.head.output.map(_.name) == Seq("a"))
+          }
+
+          withClue("filter pushdown") {
+            val query = df.filter($"a" > 1 && batchedPythonUDF($"a"))
+            val pythonEvalNodes = 
collectBatchExec(query.queryExecution.executedPlan)
+            assert(pythonEvalNodes.length == 1)
+
+            val scanNodes = query.queryExecution.executedPlan.collect {
+              case scan: BatchScanExecTransformer => scan
+            }
+            assert(scanNodes.length == 1)
+            // $"a" is not null and $"a" > 1
+            val filters = 
scanNodes.head.scan.asInstanceOf[ParquetScan].pushedFilters
+            assert(filters.length == 2)
+            assert(filters.flatMap(_.references).distinct === Array("a"))
+          }
+      }
+    }
+  }
+
+  testGluten("Python UDF should not break column pruning/filter pushdown -- 
Parquet V1") {
+    withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") {
+      withTempPath {
+        f =>
+          spark.range(10).select($"id".as("a"), 
$"id".as("b")).write.parquet(f.getCanonicalPath)
+          val df = spark.read.parquet(f.getCanonicalPath)
+
+          withClue("column pruning") {
+            val query = df.filter(batchedPythonUDF($"a")).select($"a")
+
+            val pythonEvalNodes = 
collectBatchExec(query.queryExecution.executedPlan)
+            assert(pythonEvalNodes.length == 1)
+
+            val scanNodes = query.queryExecution.executedPlan.collect {
+              case scan: FileSourceScanExecTransformer => scan
+            }
+            assert(scanNodes.length == 1)
+            assert(scanNodes.head.output.map(_.name) == Seq("a"))
+          }
+
+          withClue("filter pushdown") {
+            val query = df.filter($"a" > 1 && batchedPythonUDF($"a"))
+            val pythonEvalNodes = 
collectBatchExec(query.queryExecution.executedPlan)
+            assert(pythonEvalNodes.length == 1)
+
+            val scanNodes = query.queryExecution.executedPlan.collect {
+              case scan: FileSourceScanExecTransformer => scan
+            }
+            assert(scanNodes.length == 1)
+            // $"a" is not null and $"a" > 1
+            assert(scanNodes.head.dataFilters.length == 2)
+            assert(
+              
scanNodes.head.dataFilters.flatMap(_.references.map(_.name)).distinct == 
Seq("a"))
+          }
+      }
+    }
+  }
+}
diff --git 
a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
 
b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index 5d3f772966..718682cfcc 100644
--- 
a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++ 
b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -872,6 +872,14 @@ class VeloxTestSettings extends BackendTestSettings {
     .exclude("Nested Python UDF: push down deterministic FilterExec 
predicates")
     .exclude("Python UDF: no push down on non-deterministic")
     .exclude("Python UDF: push down on deterministic predicates after the 
first non-deterministic")
+  enableSuite[GlutenExtractPythonUDFsSuite]
+    // Replaced with test that check for native operations
+    .exclude("Python UDF should not break column pruning/filter pushdown -- 
Parquet V1")
+    .exclude("Chained Scalar Pandas UDFs should be combined to a single 
physical node")
+    .exclude("Mixed Batched Python UDFs and Pandas UDF should be separate 
physical node")
+    .exclude("Independent Batched Python UDFs and Scalar Pandas UDFs should be 
combined separately")
+    .exclude("Dependent Batched Python UDFs and Scalar Pandas UDFs should not 
be combined")
+    .exclude("Python UDF should not break column pruning/filter pushdown -- 
Parquet V2")
 
   override def getSQLQueryTestSettings: SQLQueryTestSettings = 
VeloxSQLQueryTestSettings
 }
diff --git 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/python/GlutenExtractPythonUDFsSuite.scala
 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/python/GlutenExtractPythonUDFsSuite.scala
new file mode 100644
index 0000000000..1cd34bbf78
--- /dev/null
+++ 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/python/GlutenExtractPythonUDFsSuite.scala
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.python
+
+import org.apache.gluten.execution.{BatchScanExecTransformer, 
FileSourceScanExecTransformer}
+
+import org.apache.spark.sql.GlutenSQLTestsBaseTrait
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.internal.SQLConf
+
+class GlutenExtractPythonUDFsSuite extends ExtractPythonUDFsSuite with 
GlutenSQLTestsBaseTrait {
+
+  import testImplicits._
+
+  def collectBatchExec(plan: SparkPlan): Seq[BatchEvalPythonExec] = 
plan.collect {
+    case b: BatchEvalPythonExec => b
+  }
+
+  def collectColumnarArrowExec(plan: SparkPlan): Seq[EvalPythonExec] = 
plan.collect {
+    // To check for ColumnarArrowEvalPythonExec
+    case b: EvalPythonExec
+        if !b.isInstanceOf[ArrowEvalPythonExec] && 
!b.isInstanceOf[BatchEvalPythonExec] =>
+      b
+  }
+
+  testGluten("Chained Scalar Pandas UDFs should be combined to a single 
physical node") {
+    val df = Seq(("Hello", 4)).toDF("a", "b")
+    val df2 = df
+      .withColumn("c", scalarPandasUDF(col("a")))
+      .withColumn("d", scalarPandasUDF(col("c")))
+    val arrowEvalNodes = 
collectColumnarArrowExec(df2.queryExecution.executedPlan)
+    assert(arrowEvalNodes.size == 1)
+  }
+
+  testGluten("Mixed Batched Python UDFs and Pandas UDF should be separate 
physical node") {
+    val df = Seq(("Hello", 4)).toDF("a", "b")
+    val df2 = df
+      .withColumn("c", batchedPythonUDF(col("a")))
+      .withColumn("d", scalarPandasUDF(col("b")))
+
+    val pythonEvalNodes = collectBatchExec(df2.queryExecution.executedPlan)
+    val arrowEvalNodes = 
collectColumnarArrowExec(df2.queryExecution.executedPlan)
+    assert(pythonEvalNodes.size == 1)
+    assert(arrowEvalNodes.size == 1)
+  }
+
+  testGluten(
+    "Independent Batched Python UDFs and Scalar Pandas UDFs should be combined 
separately") {
+    val df = Seq(("Hello", 4)).toDF("a", "b")
+    val df2 = df
+      .withColumn("c1", batchedPythonUDF(col("a")))
+      .withColumn("c2", batchedPythonUDF(col("c1")))
+      .withColumn("d1", scalarPandasUDF(col("a")))
+      .withColumn("d2", scalarPandasUDF(col("d1")))
+
+    val pythonEvalNodes = collectBatchExec(df2.queryExecution.executedPlan)
+    val arrowEvalNodes = 
collectColumnarArrowExec(df2.queryExecution.executedPlan)
+    assert(pythonEvalNodes.size == 1)
+    assert(arrowEvalNodes.size == 1)
+  }
+
+  testGluten("Dependent Batched Python UDFs and Scalar Pandas UDFs should not 
be combined") {
+    val df = Seq(("Hello", 4)).toDF("a", "b")
+    val df2 = df
+      .withColumn("c1", batchedPythonUDF(col("a")))
+      .withColumn("d1", scalarPandasUDF(col("c1")))
+      .withColumn("c2", batchedPythonUDF(col("d1")))
+      .withColumn("d2", scalarPandasUDF(col("c2")))
+
+    val pythonEvalNodes = collectBatchExec(df2.queryExecution.executedPlan)
+    val arrowEvalNodes = 
collectColumnarArrowExec(df2.queryExecution.executedPlan)
+    assert(pythonEvalNodes.size == 2)
+    assert(arrowEvalNodes.size == 2)
+  }
+
+  testGluten("Python UDF should not break column pruning/filter pushdown -- 
Parquet V2") {
+    withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "") {
+      withTempPath {
+        f =>
+          spark.range(10).select($"id".as("a"), 
$"id".as("b")).write.parquet(f.getCanonicalPath)
+          val df = spark.read.parquet(f.getCanonicalPath)
+
+          withClue("column pruning") {
+            val query = df.filter(batchedPythonUDF($"a")).select($"a")
+
+            val pythonEvalNodes = 
collectBatchExec(query.queryExecution.executedPlan)
+            assert(pythonEvalNodes.length == 1)
+
+            val scanNodes = query.queryExecution.executedPlan.collect {
+              case scan: BatchScanExecTransformer => scan
+            }
+            assert(scanNodes.length == 1)
+            assert(scanNodes.head.output.map(_.name) == Seq("a"))
+          }
+
+          withClue("filter pushdown") {
+            val query = df.filter($"a" > 1 && batchedPythonUDF($"a"))
+            val pythonEvalNodes = 
collectBatchExec(query.queryExecution.executedPlan)
+            assert(pythonEvalNodes.length == 1)
+
+            val scanNodes = query.queryExecution.executedPlan.collect {
+              case scan: BatchScanExecTransformer => scan
+            }
+            assert(scanNodes.length == 1)
+            // $"a" is not null and $"a" > 1
+            val filters = 
scanNodes.head.scan.asInstanceOf[ParquetScan].pushedFilters
+            assert(filters.length == 2)
+            assert(filters.flatMap(_.references).distinct === Array("a"))
+          }
+      }
+    }
+  }
+
+  testGluten("Python UDF should not break column pruning/filter pushdown -- 
Parquet V1") {
+    withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") {
+      withTempPath {
+        f =>
+          spark.range(10).select($"id".as("a"), 
$"id".as("b")).write.parquet(f.getCanonicalPath)
+          val df = spark.read.parquet(f.getCanonicalPath)
+
+          withClue("column pruning") {
+            val query = df.filter(batchedPythonUDF($"a")).select($"a")
+
+            val pythonEvalNodes = 
collectBatchExec(query.queryExecution.executedPlan)
+            assert(pythonEvalNodes.length == 1)
+
+            val scanNodes = query.queryExecution.executedPlan.collect {
+              case scan: FileSourceScanExecTransformer => scan
+            }
+            assert(scanNodes.length == 1)
+            assert(scanNodes.head.output.map(_.name) == Seq("a"))
+          }
+
+          withClue("filter pushdown") {
+            val query = df.filter($"a" > 1 && batchedPythonUDF($"a"))
+            val pythonEvalNodes = 
collectBatchExec(query.queryExecution.executedPlan)
+            assert(pythonEvalNodes.length == 1)
+
+            val scanNodes = query.queryExecution.executedPlan.collect {
+              case scan: FileSourceScanExecTransformer => scan
+            }
+            assert(scanNodes.length == 1)
+            // $"a" is not null and $"a" > 1
+            assert(scanNodes.head.dataFilters.length == 2)
+            assert(
+              
scanNodes.head.dataFilters.flatMap(_.references.map(_.name)).distinct == 
Seq("a"))
+          }
+      }
+    }
+  }
+}
diff --git 
a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
 
b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index e470e22ad3..77cbd2ad08 100644
--- 
a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++ 
b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -909,6 +909,14 @@ class VeloxTestSettings extends BackendTestSettings {
     .exclude("Nested Python UDF: push down deterministic FilterExec 
predicates")
     .exclude("Python UDF: no push down on non-deterministic")
     .exclude("Python UDF: push down on deterministic predicates after the 
first non-deterministic")
+  enableSuite[GlutenExtractPythonUDFsSuite]
+    // Replaced with test that check for native operations
+    .exclude("Python UDF should not break column pruning/filter pushdown -- 
Parquet V1")
+    .exclude("Chained Scalar Pandas UDFs should be combined to a single 
physical node")
+    .exclude("Mixed Batched Python UDFs and Pandas UDF should be separate 
physical node")
+    .exclude("Independent Batched Python UDFs and Scalar Pandas UDFs should be 
combined separately")
+    .exclude("Dependent Batched Python UDFs and Scalar Pandas UDFs should not 
be combined")
+    .exclude("Python UDF should not break column pruning/filter pushdown -- 
Parquet V2")
 
   override def getSQLQueryTestSettings: SQLQueryTestSettings = 
VeloxSQLQueryTestSettings
 }
diff --git 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/python/GlutenExtractPythonUDFsSuite.scala
 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/python/GlutenExtractPythonUDFsSuite.scala
new file mode 100644
index 0000000000..1cd34bbf78
--- /dev/null
+++ 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/python/GlutenExtractPythonUDFsSuite.scala
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.python
+
+import org.apache.gluten.execution.{BatchScanExecTransformer, 
FileSourceScanExecTransformer}
+
+import org.apache.spark.sql.GlutenSQLTestsBaseTrait
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.internal.SQLConf
+
+class GlutenExtractPythonUDFsSuite extends ExtractPythonUDFsSuite with 
GlutenSQLTestsBaseTrait {
+
+  import testImplicits._
+
+  def collectBatchExec(plan: SparkPlan): Seq[BatchEvalPythonExec] = 
plan.collect {
+    case b: BatchEvalPythonExec => b
+  }
+
+  def collectColumnarArrowExec(plan: SparkPlan): Seq[EvalPythonExec] = 
plan.collect {
+    // To check for ColumnarArrowEvalPythonExec
+    case b: EvalPythonExec
+        if !b.isInstanceOf[ArrowEvalPythonExec] && 
!b.isInstanceOf[BatchEvalPythonExec] =>
+      b
+  }
+
+  testGluten("Chained Scalar Pandas UDFs should be combined to a single 
physical node") {
+    val df = Seq(("Hello", 4)).toDF("a", "b")
+    val df2 = df
+      .withColumn("c", scalarPandasUDF(col("a")))
+      .withColumn("d", scalarPandasUDF(col("c")))
+    val arrowEvalNodes = 
collectColumnarArrowExec(df2.queryExecution.executedPlan)
+    assert(arrowEvalNodes.size == 1)
+  }
+
+  testGluten("Mixed Batched Python UDFs and Pandas UDF should be separate 
physical node") {
+    val df = Seq(("Hello", 4)).toDF("a", "b")
+    val df2 = df
+      .withColumn("c", batchedPythonUDF(col("a")))
+      .withColumn("d", scalarPandasUDF(col("b")))
+
+    val pythonEvalNodes = collectBatchExec(df2.queryExecution.executedPlan)
+    val arrowEvalNodes = 
collectColumnarArrowExec(df2.queryExecution.executedPlan)
+    assert(pythonEvalNodes.size == 1)
+    assert(arrowEvalNodes.size == 1)
+  }
+
+  testGluten(
+    "Independent Batched Python UDFs and Scalar Pandas UDFs should be combined 
separately") {
+    val df = Seq(("Hello", 4)).toDF("a", "b")
+    val df2 = df
+      .withColumn("c1", batchedPythonUDF(col("a")))
+      .withColumn("c2", batchedPythonUDF(col("c1")))
+      .withColumn("d1", scalarPandasUDF(col("a")))
+      .withColumn("d2", scalarPandasUDF(col("d1")))
+
+    val pythonEvalNodes = collectBatchExec(df2.queryExecution.executedPlan)
+    val arrowEvalNodes = 
collectColumnarArrowExec(df2.queryExecution.executedPlan)
+    assert(pythonEvalNodes.size == 1)
+    assert(arrowEvalNodes.size == 1)
+  }
+
+  testGluten("Dependent Batched Python UDFs and Scalar Pandas UDFs should not 
be combined") {
+    val df = Seq(("Hello", 4)).toDF("a", "b")
+    val df2 = df
+      .withColumn("c1", batchedPythonUDF(col("a")))
+      .withColumn("d1", scalarPandasUDF(col("c1")))
+      .withColumn("c2", batchedPythonUDF(col("d1")))
+      .withColumn("d2", scalarPandasUDF(col("c2")))
+
+    val pythonEvalNodes = collectBatchExec(df2.queryExecution.executedPlan)
+    val arrowEvalNodes = 
collectColumnarArrowExec(df2.queryExecution.executedPlan)
+    assert(pythonEvalNodes.size == 2)
+    assert(arrowEvalNodes.size == 2)
+  }
+
+  testGluten("Python UDF should not break column pruning/filter pushdown -- 
Parquet V2") {
+    withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "") {
+      withTempPath {
+        f =>
+          spark.range(10).select($"id".as("a"), 
$"id".as("b")).write.parquet(f.getCanonicalPath)
+          val df = spark.read.parquet(f.getCanonicalPath)
+
+          withClue("column pruning") {
+            val query = df.filter(batchedPythonUDF($"a")).select($"a")
+
+            val pythonEvalNodes = 
collectBatchExec(query.queryExecution.executedPlan)
+            assert(pythonEvalNodes.length == 1)
+
+            val scanNodes = query.queryExecution.executedPlan.collect {
+              case scan: BatchScanExecTransformer => scan
+            }
+            assert(scanNodes.length == 1)
+            assert(scanNodes.head.output.map(_.name) == Seq("a"))
+          }
+
+          withClue("filter pushdown") {
+            val query = df.filter($"a" > 1 && batchedPythonUDF($"a"))
+            val pythonEvalNodes = 
collectBatchExec(query.queryExecution.executedPlan)
+            assert(pythonEvalNodes.length == 1)
+
+            val scanNodes = query.queryExecution.executedPlan.collect {
+              case scan: BatchScanExecTransformer => scan
+            }
+            assert(scanNodes.length == 1)
+            // $"a" is not null and $"a" > 1
+            val filters = 
scanNodes.head.scan.asInstanceOf[ParquetScan].pushedFilters
+            assert(filters.length == 2)
+            assert(filters.flatMap(_.references).distinct === Array("a"))
+          }
+      }
+    }
+  }
+
+  testGluten("Python UDF should not break column pruning/filter pushdown -- 
Parquet V1") {
+    withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") {
+      withTempPath {
+        f =>
+          spark.range(10).select($"id".as("a"), 
$"id".as("b")).write.parquet(f.getCanonicalPath)
+          val df = spark.read.parquet(f.getCanonicalPath)
+
+          withClue("column pruning") {
+            val query = df.filter(batchedPythonUDF($"a")).select($"a")
+
+            val pythonEvalNodes = 
collectBatchExec(query.queryExecution.executedPlan)
+            assert(pythonEvalNodes.length == 1)
+
+            val scanNodes = query.queryExecution.executedPlan.collect {
+              case scan: FileSourceScanExecTransformer => scan
+            }
+            assert(scanNodes.length == 1)
+            assert(scanNodes.head.output.map(_.name) == Seq("a"))
+          }
+
+          withClue("filter pushdown") {
+            val query = df.filter($"a" > 1 && batchedPythonUDF($"a"))
+            val pythonEvalNodes = 
collectBatchExec(query.queryExecution.executedPlan)
+            assert(pythonEvalNodes.length == 1)
+
+            val scanNodes = query.queryExecution.executedPlan.collect {
+              case scan: FileSourceScanExecTransformer => scan
+            }
+            assert(scanNodes.length == 1)
+            // $"a" is not null and $"a" > 1
+            assert(scanNodes.head.dataFilters.length == 2)
+            assert(
+              
scanNodes.head.dataFilters.flatMap(_.references.map(_.name)).distinct == 
Seq("a"))
+          }
+      }
+    }
+  }
+}
diff --git 
a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
 
b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index 089c6f4c16..512d4b6bb9 100644
--- 
a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++ 
b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -949,6 +949,14 @@ class VeloxTestSettings extends BackendTestSettings {
     .exclude("Nested Python UDF: push down deterministic FilterExec 
predicates")
     .exclude("Python UDF: no push down on non-deterministic")
     .exclude("Python UDF: push down on deterministic predicates after the 
first non-deterministic")
+  enableSuite[GlutenExtractPythonUDFsSuite]
+    // Replaced with test that check for native operations
+    .exclude("Python UDF should not break column pruning/filter pushdown -- 
Parquet V1")
+    .exclude("Chained Scalar Pandas UDFs should be combined to a single 
physical node")
+    .exclude("Mixed Batched Python UDFs and Pandas UDF should be separate 
physical node")
+    .exclude("Independent Batched Python UDFs and Scalar Pandas UDFs should be 
combined separately")
+    .exclude("Dependent Batched Python UDFs and Scalar Pandas UDFs should not 
be combined")
+    .exclude("Python UDF should not break column pruning/filter pushdown -- 
Parquet V2")
 
   override def getSQLQueryTestSettings: SQLQueryTestSettings = 
VeloxSQLQueryTestSettings
 }
diff --git 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/python/GlutenExtractPythonUDFsSuite.scala
 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/python/GlutenExtractPythonUDFsSuite.scala
new file mode 100644
index 0000000000..1cd34bbf78
--- /dev/null
+++ 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/python/GlutenExtractPythonUDFsSuite.scala
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.python
+
+import org.apache.gluten.execution.{BatchScanExecTransformer, 
FileSourceScanExecTransformer}
+
+import org.apache.spark.sql.GlutenSQLTestsBaseTrait
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.internal.SQLConf
+
+class GlutenExtractPythonUDFsSuite extends ExtractPythonUDFsSuite with 
GlutenSQLTestsBaseTrait {
+
+  import testImplicits._
+
+  def collectBatchExec(plan: SparkPlan): Seq[BatchEvalPythonExec] = 
plan.collect {
+    case b: BatchEvalPythonExec => b
+  }
+
+  def collectColumnarArrowExec(plan: SparkPlan): Seq[EvalPythonExec] = 
plan.collect {
+    // To check for ColumnarArrowEvalPythonExec
+    case b: EvalPythonExec
+        if !b.isInstanceOf[ArrowEvalPythonExec] && 
!b.isInstanceOf[BatchEvalPythonExec] =>
+      b
+  }
+
+  testGluten("Chained Scalar Pandas UDFs should be combined to a single 
physical node") {
+    val df = Seq(("Hello", 4)).toDF("a", "b")
+    val df2 = df
+      .withColumn("c", scalarPandasUDF(col("a")))
+      .withColumn("d", scalarPandasUDF(col("c")))
+    val arrowEvalNodes = 
collectColumnarArrowExec(df2.queryExecution.executedPlan)
+    assert(arrowEvalNodes.size == 1)
+  }
+
+  testGluten("Mixed Batched Python UDFs and Pandas UDF should be separate 
physical node") {
+    val df = Seq(("Hello", 4)).toDF("a", "b")
+    val df2 = df
+      .withColumn("c", batchedPythonUDF(col("a")))
+      .withColumn("d", scalarPandasUDF(col("b")))
+
+    val pythonEvalNodes = collectBatchExec(df2.queryExecution.executedPlan)
+    val arrowEvalNodes = 
collectColumnarArrowExec(df2.queryExecution.executedPlan)
+    assert(pythonEvalNodes.size == 1)
+    assert(arrowEvalNodes.size == 1)
+  }
+
+  testGluten(
+    "Independent Batched Python UDFs and Scalar Pandas UDFs should be combined 
separately") {
+    val df = Seq(("Hello", 4)).toDF("a", "b")
+    val df2 = df
+      .withColumn("c1", batchedPythonUDF(col("a")))
+      .withColumn("c2", batchedPythonUDF(col("c1")))
+      .withColumn("d1", scalarPandasUDF(col("a")))
+      .withColumn("d2", scalarPandasUDF(col("d1")))
+
+    val pythonEvalNodes = collectBatchExec(df2.queryExecution.executedPlan)
+    val arrowEvalNodes = 
collectColumnarArrowExec(df2.queryExecution.executedPlan)
+    assert(pythonEvalNodes.size == 1)
+    assert(arrowEvalNodes.size == 1)
+  }
+
+  testGluten("Dependent Batched Python UDFs and Scalar Pandas UDFs should not 
be combined") {
+    val df = Seq(("Hello", 4)).toDF("a", "b")
+    val df2 = df
+      .withColumn("c1", batchedPythonUDF(col("a")))
+      .withColumn("d1", scalarPandasUDF(col("c1")))
+      .withColumn("c2", batchedPythonUDF(col("d1")))
+      .withColumn("d2", scalarPandasUDF(col("c2")))
+
+    val pythonEvalNodes = collectBatchExec(df2.queryExecution.executedPlan)
+    val arrowEvalNodes = 
collectColumnarArrowExec(df2.queryExecution.executedPlan)
+    assert(pythonEvalNodes.size == 2)
+    assert(arrowEvalNodes.size == 2)
+  }
+
+  testGluten("Python UDF should not break column pruning/filter pushdown -- 
Parquet V2") {
+    withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "") {
+      withTempPath {
+        f =>
+          spark.range(10).select($"id".as("a"), 
$"id".as("b")).write.parquet(f.getCanonicalPath)
+          val df = spark.read.parquet(f.getCanonicalPath)
+
+          withClue("column pruning") {
+            val query = df.filter(batchedPythonUDF($"a")).select($"a")
+
+            val pythonEvalNodes = 
collectBatchExec(query.queryExecution.executedPlan)
+            assert(pythonEvalNodes.length == 1)
+
+            val scanNodes = query.queryExecution.executedPlan.collect {
+              case scan: BatchScanExecTransformer => scan
+            }
+            assert(scanNodes.length == 1)
+            assert(scanNodes.head.output.map(_.name) == Seq("a"))
+          }
+
+          withClue("filter pushdown") {
+            val query = df.filter($"a" > 1 && batchedPythonUDF($"a"))
+            val pythonEvalNodes = 
collectBatchExec(query.queryExecution.executedPlan)
+            assert(pythonEvalNodes.length == 1)
+
+            val scanNodes = query.queryExecution.executedPlan.collect {
+              case scan: BatchScanExecTransformer => scan
+            }
+            assert(scanNodes.length == 1)
+            // $"a" is not null and $"a" > 1
+            val filters = 
scanNodes.head.scan.asInstanceOf[ParquetScan].pushedFilters
+            assert(filters.length == 2)
+            assert(filters.flatMap(_.references).distinct === Array("a"))
+          }
+      }
+    }
+  }
+
+  testGluten("Python UDF should not break column pruning/filter pushdown -- 
Parquet V1") {
+    withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") {
+      withTempPath {
+        f =>
+          spark.range(10).select($"id".as("a"), 
$"id".as("b")).write.parquet(f.getCanonicalPath)
+          val df = spark.read.parquet(f.getCanonicalPath)
+
+          withClue("column pruning") {
+            val query = df.filter(batchedPythonUDF($"a")).select($"a")
+
+            val pythonEvalNodes = 
collectBatchExec(query.queryExecution.executedPlan)
+            assert(pythonEvalNodes.length == 1)
+
+            val scanNodes = query.queryExecution.executedPlan.collect {
+              case scan: FileSourceScanExecTransformer => scan
+            }
+            assert(scanNodes.length == 1)
+            assert(scanNodes.head.output.map(_.name) == Seq("a"))
+          }
+
+          withClue("filter pushdown") {
+            val query = df.filter($"a" > 1 && batchedPythonUDF($"a"))
+            val pythonEvalNodes = 
collectBatchExec(query.queryExecution.executedPlan)
+            assert(pythonEvalNodes.length == 1)
+
+            val scanNodes = query.queryExecution.executedPlan.collect {
+              case scan: FileSourceScanExecTransformer => scan
+            }
+            assert(scanNodes.length == 1)
+            // $"a" is not null and $"a" > 1
+            assert(scanNodes.head.dataFilters.length == 2)
+            assert(
+              
scanNodes.head.dataFilters.flatMap(_.references.map(_.name)).distinct == 
Seq("a"))
+          }
+      }
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@gluten.apache.org
For additional commands, e-mail: commits-h...@gluten.apache.org


Reply via email to