This is an automated email from the ASF dual-hosted git repository.

rui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new d00a12b8ca [VL] Enable BatchEvalPythonExecSuite (#9780)
d00a12b8ca is described below

commit d00a12b8cae83a234c558c0c64700ce13c7b596d
Author: Ankita Victor <[email protected]>
AuthorDate: Tue Jun 3 22:23:29 2025 +0530

    [VL] Enable BatchEvalPythonExecSuite (#9780)
---
 .../gluten/utils/velox/VeloxTestSettings.scala     |   7 ++
 .../python/GlutenBatchEvalPythonExecSuite.scala    | 102 +++++++++++++++++++++
 .../gluten/utils/velox/VeloxTestSettings.scala     |   7 ++
 .../python/GlutenBatchEvalPythonExecSuite.scala    | 102 +++++++++++++++++++++
 .../gluten/utils/velox/VeloxTestSettings.scala     |   7 ++
 .../python/GlutenBatchEvalPythonExecSuite.scala    | 102 +++++++++++++++++++++
 .../gluten/utils/velox/VeloxTestSettings.scala     |   7 ++
 .../python/GlutenBatchEvalPythonExecSuite.scala    | 102 +++++++++++++++++++++
 8 files changed, 436 insertions(+)

diff --git 
a/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
 
b/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index 60d81f106f..c1d8b242ef 100644
--- 
a/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++ 
b/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -35,6 +35,7 @@ import 
org.apache.spark.sql.execution.datasources.text.{GlutenTextV1Suite, Glute
 import org.apache.spark.sql.execution.datasources.v2.GlutenFileTableSuite
 import org.apache.spark.sql.execution.exchange.GlutenEnsureRequirementsSuite
 import org.apache.spark.sql.execution.joins.{GlutenBroadcastJoinSuite, 
GlutenExistenceJoinSuite, GlutenInnerJoinSuite, GlutenOuterJoinSuite}
+import org.apache.spark.sql.execution.python._
 import 
org.apache.spark.sql.extension.{GlutenCollapseProjectExecTransformerSuite, 
GlutenSessionExtensionSuite}
 import org.apache.spark.sql.hive.execution.GlutenHiveSQLQuerySuite
 import org.apache.spark.sql.sources._
@@ -816,6 +817,12 @@ class VeloxTestSettings extends BackendTestSettings {
   enableSuite[GlutenCollapseProjectExecTransformerSuite]
   enableSuite[GlutenSparkSessionExtensionSuite]
   enableSuite[GlutenSQLCollectLimitExecSuite]
+  enableSuite[GlutenBatchEvalPythonExecSuite]
+    // Replaced with other tests that check for native operations
+    .exclude("Python UDF: push down deterministic FilterExec predicates")
+    .exclude("Nested Python UDF: push down deterministic FilterExec 
predicates")
+    .exclude("Python UDF: no push down on non-deterministic")
+    .exclude("Python UDF: push down on deterministic predicates after the 
first non-deterministic")
 
   override def getSQLQueryTestSettings: SQLQueryTestSettings = 
VeloxSQLQueryTestSettings
 }
diff --git 
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/python/GlutenBatchEvalPythonExecSuite.scala
 
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/python/GlutenBatchEvalPythonExecSuite.scala
new file mode 100644
index 0000000000..d364262be5
--- /dev/null
+++ 
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/python/GlutenBatchEvalPythonExecSuite.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.python
+
+import org.apache.gluten.execution.{ColumnarToRowExecBase, 
FilterExecTransformer, RowToColumnarExecBase, WholeStageTransformer}
+
+import org.apache.spark.sql.GlutenSQLTestsBaseTrait
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
GreaterThan, In}
+import org.apache.spark.sql.execution.{ColumnarInputAdapter, 
InputIteratorTransformer}
+
+class GlutenBatchEvalPythonExecSuite extends BatchEvalPythonExecSuite with 
GlutenSQLTestsBaseTrait {
+
+  import testImplicits._
+
+  testGluten("Python UDF: push down deterministic FilterExecTransformer 
predicates") {
+    val df = Seq(("Hello", 4))
+      .toDF("a", "b")
+      .where("dummyPythonUDF(b) and dummyPythonUDF(a) and a in (3, 4)")
+    val qualifiedPlanNodes = df.queryExecution.executedPlan.collect {
+      case f @ FilterExecTransformer(
+            And(_: AttributeReference, _: AttributeReference),
+            InputIteratorTransformer(ColumnarInputAdapter(r: 
RowToColumnarExecBase)))
+          if r.child.isInstanceOf[BatchEvalPythonExec] =>
+        f
+      case b @ BatchEvalPythonExec(_, _, c: ColumnarToRowExecBase) =>
+        c.child match {
+          case WholeStageTransformer(FilterExecTransformer(_: In, _), _) => b
+        }
+    }
+    assert(qualifiedPlanNodes.size == 2)
+  }
+
+  testGluten("Nested Python UDF: push down deterministic FilterExecTransformer 
predicates") {
+    val df = Seq(("Hello", 4))
+      .toDF("a", "b")
+      .where("dummyPythonUDF(a, dummyPythonUDF(a, b)) and a in (3, 4)")
+    val qualifiedPlanNodes = df.queryExecution.executedPlan.collect {
+      case f @ FilterExecTransformer(
+            _: AttributeReference,
+            InputIteratorTransformer(ColumnarInputAdapter(r: 
RowToColumnarExecBase)))
+          if r.child.isInstanceOf[BatchEvalPythonExec] =>
+        f
+      case b @ BatchEvalPythonExec(_, _, c: ColumnarToRowExecBase) =>
+        c.child match {
+          case WholeStageTransformer(FilterExecTransformer(_: In, _), _) => b
+        }
+    }
+    assert(qualifiedPlanNodes.size == 2)
+  }
+
+  testGluten("Python UDF: no push down on non-deterministic") {
+    val df = Seq(("Hello", 4))
+      .toDF("a", "b")
+      .where("b > 4 and dummyPythonUDF(a) and rand() > 0.3")
+    val qualifiedPlanNodes = df.queryExecution.executedPlan.collect {
+      case f @ FilterExecTransformer(
+            And(_: AttributeReference, _: GreaterThan),
+            InputIteratorTransformer(ColumnarInputAdapter(r: 
RowToColumnarExecBase)))
+          if r.child.isInstanceOf[BatchEvalPythonExec] =>
+        f
+      case b @ BatchEvalPythonExec(_, _, c: ColumnarToRowExecBase) =>
+        c.child match {
+          case WholeStageTransformer(_: FilterExecTransformer, _) => b
+        }
+    }
+    assert(qualifiedPlanNodes.size == 2)
+  }
+
+  testGluten(
+    "Python UDF: push down on deterministic predicates after the first 
non-deterministic") {
+    val df = Seq(("Hello", 4))
+      .toDF("a", "b")
+      .where("dummyPythonUDF(a) and rand() > 0.3 and b > 4")
+
+    val qualifiedPlanNodes = df.queryExecution.executedPlan.collect {
+      case f @ FilterExecTransformer(
+            And(_: AttributeReference, _: GreaterThan),
+            InputIteratorTransformer(ColumnarInputAdapter(r: 
RowToColumnarExecBase)))
+          if r.child.isInstanceOf[BatchEvalPythonExec] =>
+        f
+      case b @ BatchEvalPythonExec(_, _, c: ColumnarToRowExecBase) =>
+        c.child match {
+          case WholeStageTransformer(_: FilterExecTransformer, _) => b
+        }
+    }
+    assert(qualifiedPlanNodes.size == 2)
+  }
+}
diff --git 
a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
 
b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index 676b562889..5d3f772966 100644
--- 
a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++ 
b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -36,6 +36,7 @@ import 
org.apache.spark.sql.execution.datasources.text.{GlutenTextV1Suite, Glute
 import 
org.apache.spark.sql.execution.datasources.v2.{GlutenDataSourceV2StrategySuite, 
GlutenFileTableSuite, GlutenV2PredicateSuite}
 import org.apache.spark.sql.execution.exchange.GlutenEnsureRequirementsSuite
 import org.apache.spark.sql.execution.joins.{GlutenBroadcastJoinSuite, 
GlutenExistenceJoinSuite, GlutenInnerJoinSuite, GlutenOuterJoinSuite}
+import org.apache.spark.sql.execution.python._
 import 
org.apache.spark.sql.extension.{GlutenCollapseProjectExecTransformerSuite, 
GlutenSessionExtensionSuite, TestFileSourceScanExecTransformer}
 import org.apache.spark.sql.gluten.GlutenFallbackSuite
 import org.apache.spark.sql.hive.execution.GlutenHiveSQLQuerySuite
@@ -865,6 +866,12 @@ class VeloxTestSettings extends BackendTestSettings {
   enableSuite[GlutenCollapseProjectExecTransformerSuite]
   enableSuite[GlutenSparkSessionExtensionSuite]
   enableSuite[GlutenSQLCollectLimitExecSuite]
+  enableSuite[GlutenBatchEvalPythonExecSuite]
+    // Replaced with other tests that check for native operations
+    .exclude("Python UDF: push down deterministic FilterExec predicates")
+    .exclude("Nested Python UDF: push down deterministic FilterExec 
predicates")
+    .exclude("Python UDF: no push down on non-deterministic")
+    .exclude("Python UDF: push down on deterministic predicates after the 
first non-deterministic")
 
   override def getSQLQueryTestSettings: SQLQueryTestSettings = 
VeloxSQLQueryTestSettings
 }
diff --git 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/python/GlutenBatchEvalPythonExecSuite.scala
 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/python/GlutenBatchEvalPythonExecSuite.scala
new file mode 100644
index 0000000000..d364262be5
--- /dev/null
+++ 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/python/GlutenBatchEvalPythonExecSuite.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.python
+
+import org.apache.gluten.execution.{ColumnarToRowExecBase, 
FilterExecTransformer, RowToColumnarExecBase, WholeStageTransformer}
+
+import org.apache.spark.sql.GlutenSQLTestsBaseTrait
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
GreaterThan, In}
+import org.apache.spark.sql.execution.{ColumnarInputAdapter, 
InputIteratorTransformer}
+
+class GlutenBatchEvalPythonExecSuite extends BatchEvalPythonExecSuite with 
GlutenSQLTestsBaseTrait {
+
+  import testImplicits._
+
+  testGluten("Python UDF: push down deterministic FilterExecTransformer 
predicates") {
+    val df = Seq(("Hello", 4))
+      .toDF("a", "b")
+      .where("dummyPythonUDF(b) and dummyPythonUDF(a) and a in (3, 4)")
+    val qualifiedPlanNodes = df.queryExecution.executedPlan.collect {
+      case f @ FilterExecTransformer(
+            And(_: AttributeReference, _: AttributeReference),
+            InputIteratorTransformer(ColumnarInputAdapter(r: 
RowToColumnarExecBase)))
+          if r.child.isInstanceOf[BatchEvalPythonExec] =>
+        f
+      case b @ BatchEvalPythonExec(_, _, c: ColumnarToRowExecBase) =>
+        c.child match {
+          case WholeStageTransformer(FilterExecTransformer(_: In, _), _) => b
+        }
+    }
+    assert(qualifiedPlanNodes.size == 2)
+  }
+
+  testGluten("Nested Python UDF: push down deterministic FilterExecTransformer 
predicates") {
+    val df = Seq(("Hello", 4))
+      .toDF("a", "b")
+      .where("dummyPythonUDF(a, dummyPythonUDF(a, b)) and a in (3, 4)")
+    val qualifiedPlanNodes = df.queryExecution.executedPlan.collect {
+      case f @ FilterExecTransformer(
+            _: AttributeReference,
+            InputIteratorTransformer(ColumnarInputAdapter(r: 
RowToColumnarExecBase)))
+          if r.child.isInstanceOf[BatchEvalPythonExec] =>
+        f
+      case b @ BatchEvalPythonExec(_, _, c: ColumnarToRowExecBase) =>
+        c.child match {
+          case WholeStageTransformer(FilterExecTransformer(_: In, _), _) => b
+        }
+    }
+    assert(qualifiedPlanNodes.size == 2)
+  }
+
+  testGluten("Python UDF: no push down on non-deterministic") {
+    val df = Seq(("Hello", 4))
+      .toDF("a", "b")
+      .where("b > 4 and dummyPythonUDF(a) and rand() > 0.3")
+    val qualifiedPlanNodes = df.queryExecution.executedPlan.collect {
+      case f @ FilterExecTransformer(
+            And(_: AttributeReference, _: GreaterThan),
+            InputIteratorTransformer(ColumnarInputAdapter(r: 
RowToColumnarExecBase)))
+          if r.child.isInstanceOf[BatchEvalPythonExec] =>
+        f
+      case b @ BatchEvalPythonExec(_, _, c: ColumnarToRowExecBase) =>
+        c.child match {
+          case WholeStageTransformer(_: FilterExecTransformer, _) => b
+        }
+    }
+    assert(qualifiedPlanNodes.size == 2)
+  }
+
+  testGluten(
+    "Python UDF: push down on deterministic predicates after the first 
non-deterministic") {
+    val df = Seq(("Hello", 4))
+      .toDF("a", "b")
+      .where("dummyPythonUDF(a) and rand() > 0.3 and b > 4")
+
+    val qualifiedPlanNodes = df.queryExecution.executedPlan.collect {
+      case f @ FilterExecTransformer(
+            And(_: AttributeReference, _: GreaterThan),
+            InputIteratorTransformer(ColumnarInputAdapter(r: 
RowToColumnarExecBase)))
+          if r.child.isInstanceOf[BatchEvalPythonExec] =>
+        f
+      case b @ BatchEvalPythonExec(_, _, c: ColumnarToRowExecBase) =>
+        c.child match {
+          case WholeStageTransformer(_: FilterExecTransformer, _) => b
+        }
+    }
+    assert(qualifiedPlanNodes.size == 2)
+  }
+}
diff --git 
a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
 
b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index 3f6ed85f87..e470e22ad3 100644
--- 
a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++ 
b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -37,6 +37,7 @@ import 
org.apache.spark.sql.execution.datasources.text.{GlutenTextV1Suite, Glute
 import 
org.apache.spark.sql.execution.datasources.v2.{GlutenDataSourceV2StrategySuite, 
GlutenFileTableSuite, GlutenV2PredicateSuite}
 import org.apache.spark.sql.execution.exchange.GlutenEnsureRequirementsSuite
 import org.apache.spark.sql.execution.joins.{GlutenBroadcastJoinSuite, 
GlutenExistenceJoinSuite, GlutenInnerJoinSuite, GlutenOuterJoinSuite}
+import org.apache.spark.sql.execution.python._
 import 
org.apache.spark.sql.extension.{GlutenCollapseProjectExecTransformerSuite, 
GlutenSessionExtensionSuite, TestFileSourceScanExecTransformer}
 import org.apache.spark.sql.gluten.GlutenFallbackSuite
 import org.apache.spark.sql.hive.execution.GlutenHiveSQLQuerySuite
@@ -902,6 +903,12 @@ class VeloxTestSettings extends BackendTestSettings {
     .excludeByPrefix("row index generation")
     .excludeByPrefix("invalid row index column type")
   enableSuite[GlutenSQLCollectLimitExecSuite]
+  enableSuite[GlutenBatchEvalPythonExecSuite]
+    // Replaced with other tests that check for native operations
+    .exclude("Python UDF: push down deterministic FilterExec predicates")
+    .exclude("Nested Python UDF: push down deterministic FilterExec 
predicates")
+    .exclude("Python UDF: no push down on non-deterministic")
+    .exclude("Python UDF: push down on deterministic predicates after the 
first non-deterministic")
 
   override def getSQLQueryTestSettings: SQLQueryTestSettings = 
VeloxSQLQueryTestSettings
 }
diff --git 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/python/GlutenBatchEvalPythonExecSuite.scala
 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/python/GlutenBatchEvalPythonExecSuite.scala
new file mode 100644
index 0000000000..d364262be5
--- /dev/null
+++ 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/python/GlutenBatchEvalPythonExecSuite.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.python
+
+import org.apache.gluten.execution.{ColumnarToRowExecBase, 
FilterExecTransformer, RowToColumnarExecBase, WholeStageTransformer}
+
+import org.apache.spark.sql.GlutenSQLTestsBaseTrait
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
GreaterThan, In}
+import org.apache.spark.sql.execution.{ColumnarInputAdapter, 
InputIteratorTransformer}
+
+class GlutenBatchEvalPythonExecSuite extends BatchEvalPythonExecSuite with 
GlutenSQLTestsBaseTrait {
+
+  import testImplicits._
+
+  testGluten("Python UDF: push down deterministic FilterExecTransformer 
predicates") {
+    val df = Seq(("Hello", 4))
+      .toDF("a", "b")
+      .where("dummyPythonUDF(b) and dummyPythonUDF(a) and a in (3, 4)")
+    val qualifiedPlanNodes = df.queryExecution.executedPlan.collect {
+      case f @ FilterExecTransformer(
+            And(_: AttributeReference, _: AttributeReference),
+            InputIteratorTransformer(ColumnarInputAdapter(r: 
RowToColumnarExecBase)))
+          if r.child.isInstanceOf[BatchEvalPythonExec] =>
+        f
+      case b @ BatchEvalPythonExec(_, _, c: ColumnarToRowExecBase) =>
+        c.child match {
+          case WholeStageTransformer(FilterExecTransformer(_: In, _), _) => b
+        }
+    }
+    assert(qualifiedPlanNodes.size == 2)
+  }
+
+  testGluten("Nested Python UDF: push down deterministic FilterExecTransformer 
predicates") {
+    val df = Seq(("Hello", 4))
+      .toDF("a", "b")
+      .where("dummyPythonUDF(a, dummyPythonUDF(a, b)) and a in (3, 4)")
+    val qualifiedPlanNodes = df.queryExecution.executedPlan.collect {
+      case f @ FilterExecTransformer(
+            _: AttributeReference,
+            InputIteratorTransformer(ColumnarInputAdapter(r: 
RowToColumnarExecBase)))
+          if r.child.isInstanceOf[BatchEvalPythonExec] =>
+        f
+      case b @ BatchEvalPythonExec(_, _, c: ColumnarToRowExecBase) =>
+        c.child match {
+          case WholeStageTransformer(FilterExecTransformer(_: In, _), _) => b
+        }
+    }
+    assert(qualifiedPlanNodes.size == 2)
+  }
+
+  testGluten("Python UDF: no push down on non-deterministic") {
+    val df = Seq(("Hello", 4))
+      .toDF("a", "b")
+      .where("b > 4 and dummyPythonUDF(a) and rand() > 0.3")
+    val qualifiedPlanNodes = df.queryExecution.executedPlan.collect {
+      case f @ FilterExecTransformer(
+            And(_: AttributeReference, _: GreaterThan),
+            InputIteratorTransformer(ColumnarInputAdapter(r: 
RowToColumnarExecBase)))
+          if r.child.isInstanceOf[BatchEvalPythonExec] =>
+        f
+      case b @ BatchEvalPythonExec(_, _, c: ColumnarToRowExecBase) =>
+        c.child match {
+          case WholeStageTransformer(_: FilterExecTransformer, _) => b
+        }
+    }
+    assert(qualifiedPlanNodes.size == 2)
+  }
+
+  testGluten(
+    "Python UDF: push down on deterministic predicates after the first 
non-deterministic") {
+    val df = Seq(("Hello", 4))
+      .toDF("a", "b")
+      .where("dummyPythonUDF(a) and rand() > 0.3 and b > 4")
+
+    val qualifiedPlanNodes = df.queryExecution.executedPlan.collect {
+      case f @ FilterExecTransformer(
+            And(_: AttributeReference, _: GreaterThan),
+            InputIteratorTransformer(ColumnarInputAdapter(r: 
RowToColumnarExecBase)))
+          if r.child.isInstanceOf[BatchEvalPythonExec] =>
+        f
+      case b @ BatchEvalPythonExec(_, _, c: ColumnarToRowExecBase) =>
+        c.child match {
+          case WholeStageTransformer(_: FilterExecTransformer, _) => b
+        }
+    }
+    assert(qualifiedPlanNodes.size == 2)
+  }
+}
diff --git 
a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
 
b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index dfdb4a6b10..089c6f4c16 100644
--- 
a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++ 
b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -36,6 +36,7 @@ import 
org.apache.spark.sql.execution.datasources.text.{GlutenTextV1Suite, Glute
 import 
org.apache.spark.sql.execution.datasources.v2.{GlutenDataSourceV2StrategySuite, 
GlutenFileTableSuite, GlutenV2PredicateSuite}
 import org.apache.spark.sql.execution.exchange.GlutenEnsureRequirementsSuite
 import org.apache.spark.sql.execution.joins._
+import org.apache.spark.sql.execution.python._
 import 
org.apache.spark.sql.extension.{GlutenCollapseProjectExecTransformerSuite, 
GlutenSessionExtensionSuite, TestFileSourceScanExecTransformer}
 import org.apache.spark.sql.gluten.GlutenFallbackSuite
 import org.apache.spark.sql.hive.execution.GlutenHiveSQLQuerySuite
@@ -942,6 +943,12 @@ class VeloxTestSettings extends BackendTestSettings {
   enableSuite[GlutenTableLocationSuite]
   enableSuite[GlutenRemoveRedundantWindowGroupLimitsSuite]
   enableSuite[GlutenSQLCollectLimitExecSuite]
+  enableSuite[GlutenBatchEvalPythonExecSuite]
+    // Replaced with other tests that check for native operations
+    .exclude("Python UDF: push down deterministic FilterExec predicates")
+    .exclude("Nested Python UDF: push down deterministic FilterExec 
predicates")
+    .exclude("Python UDF: no push down on non-deterministic")
+    .exclude("Python UDF: push down on deterministic predicates after the 
first non-deterministic")
 
   override def getSQLQueryTestSettings: SQLQueryTestSettings = 
VeloxSQLQueryTestSettings
 }
diff --git 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/python/GlutenBatchEvalPythonExecSuite.scala
 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/python/GlutenBatchEvalPythonExecSuite.scala
new file mode 100644
index 0000000000..d364262be5
--- /dev/null
+++ 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/python/GlutenBatchEvalPythonExecSuite.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.python
+
+import org.apache.gluten.execution.{ColumnarToRowExecBase, 
FilterExecTransformer, RowToColumnarExecBase, WholeStageTransformer}
+
+import org.apache.spark.sql.GlutenSQLTestsBaseTrait
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
GreaterThan, In}
+import org.apache.spark.sql.execution.{ColumnarInputAdapter, 
InputIteratorTransformer}
+
+class GlutenBatchEvalPythonExecSuite extends BatchEvalPythonExecSuite with 
GlutenSQLTestsBaseTrait {
+
+  import testImplicits._
+
+  testGluten("Python UDF: push down deterministic FilterExecTransformer 
predicates") {
+    val df = Seq(("Hello", 4))
+      .toDF("a", "b")
+      .where("dummyPythonUDF(b) and dummyPythonUDF(a) and a in (3, 4)")
+    val qualifiedPlanNodes = df.queryExecution.executedPlan.collect {
+      case f @ FilterExecTransformer(
+            And(_: AttributeReference, _: AttributeReference),
+            InputIteratorTransformer(ColumnarInputAdapter(r: 
RowToColumnarExecBase)))
+          if r.child.isInstanceOf[BatchEvalPythonExec] =>
+        f
+      case b @ BatchEvalPythonExec(_, _, c: ColumnarToRowExecBase) =>
+        c.child match {
+          case WholeStageTransformer(FilterExecTransformer(_: In, _), _) => b
+        }
+    }
+    assert(qualifiedPlanNodes.size == 2)
+  }
+
+  testGluten("Nested Python UDF: push down deterministic FilterExecTransformer 
predicates") {
+    val df = Seq(("Hello", 4))
+      .toDF("a", "b")
+      .where("dummyPythonUDF(a, dummyPythonUDF(a, b)) and a in (3, 4)")
+    val qualifiedPlanNodes = df.queryExecution.executedPlan.collect {
+      case f @ FilterExecTransformer(
+            _: AttributeReference,
+            InputIteratorTransformer(ColumnarInputAdapter(r: 
RowToColumnarExecBase)))
+          if r.child.isInstanceOf[BatchEvalPythonExec] =>
+        f
+      case b @ BatchEvalPythonExec(_, _, c: ColumnarToRowExecBase) =>
+        c.child match {
+          case WholeStageTransformer(FilterExecTransformer(_: In, _), _) => b
+        }
+    }
+    assert(qualifiedPlanNodes.size == 2)
+  }
+
+  testGluten("Python UDF: no push down on non-deterministic") {
+    val df = Seq(("Hello", 4))
+      .toDF("a", "b")
+      .where("b > 4 and dummyPythonUDF(a) and rand() > 0.3")
+    val qualifiedPlanNodes = df.queryExecution.executedPlan.collect {
+      case f @ FilterExecTransformer(
+            And(_: AttributeReference, _: GreaterThan),
+            InputIteratorTransformer(ColumnarInputAdapter(r: 
RowToColumnarExecBase)))
+          if r.child.isInstanceOf[BatchEvalPythonExec] =>
+        f
+      case b @ BatchEvalPythonExec(_, _, c: ColumnarToRowExecBase) =>
+        c.child match {
+          case WholeStageTransformer(_: FilterExecTransformer, _) => b
+        }
+    }
+    assert(qualifiedPlanNodes.size == 2)
+  }
+
+  testGluten(
+    "Python UDF: push down on deterministic predicates after the first 
non-deterministic") {
+    val df = Seq(("Hello", 4))
+      .toDF("a", "b")
+      .where("dummyPythonUDF(a) and rand() > 0.3 and b > 4")
+
+    val qualifiedPlanNodes = df.queryExecution.executedPlan.collect {
+      case f @ FilterExecTransformer(
+            And(_: AttributeReference, _: GreaterThan),
+            InputIteratorTransformer(ColumnarInputAdapter(r: 
RowToColumnarExecBase)))
+          if r.child.isInstanceOf[BatchEvalPythonExec] =>
+        f
+      case b @ BatchEvalPythonExec(_, _, c: ColumnarToRowExecBase) =>
+        c.child match {
+          case WholeStageTransformer(_: FilterExecTransformer, _) => b
+        }
+    }
+    assert(qualifiedPlanNodes.size == 2)
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to