This is an automated email from the ASF dual-hosted git repository.
rui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new d00a12b8ca [VL] Enable BatchEvalPythonExecSuite (#9780)
d00a12b8ca is described below
commit d00a12b8cae83a234c558c0c64700ce13c7b596d
Author: Ankita Victor <[email protected]>
AuthorDate: Tue Jun 3 22:23:29 2025 +0530
[VL] Enable BatchEvalPythonExecSuite (#9780)
---
.../gluten/utils/velox/VeloxTestSettings.scala | 7 ++
.../python/GlutenBatchEvalPythonExecSuite.scala | 102 +++++++++++++++++++++
.../gluten/utils/velox/VeloxTestSettings.scala | 7 ++
.../python/GlutenBatchEvalPythonExecSuite.scala | 102 +++++++++++++++++++++
.../gluten/utils/velox/VeloxTestSettings.scala | 7 ++
.../python/GlutenBatchEvalPythonExecSuite.scala | 102 +++++++++++++++++++++
.../gluten/utils/velox/VeloxTestSettings.scala | 7 ++
.../python/GlutenBatchEvalPythonExecSuite.scala | 102 +++++++++++++++++++++
8 files changed, 436 insertions(+)
diff --git
a/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
b/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index 60d81f106f..c1d8b242ef 100644
---
a/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++
b/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -35,6 +35,7 @@ import
org.apache.spark.sql.execution.datasources.text.{GlutenTextV1Suite, Glute
import org.apache.spark.sql.execution.datasources.v2.GlutenFileTableSuite
import org.apache.spark.sql.execution.exchange.GlutenEnsureRequirementsSuite
import org.apache.spark.sql.execution.joins.{GlutenBroadcastJoinSuite,
GlutenExistenceJoinSuite, GlutenInnerJoinSuite, GlutenOuterJoinSuite}
+import org.apache.spark.sql.execution.python._
import
org.apache.spark.sql.extension.{GlutenCollapseProjectExecTransformerSuite,
GlutenSessionExtensionSuite}
import org.apache.spark.sql.hive.execution.GlutenHiveSQLQuerySuite
import org.apache.spark.sql.sources._
@@ -816,6 +817,12 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenCollapseProjectExecTransformerSuite]
enableSuite[GlutenSparkSessionExtensionSuite]
enableSuite[GlutenSQLCollectLimitExecSuite]
+ enableSuite[GlutenBatchEvalPythonExecSuite]
+ // Replaced with other tests that check for native operations
+ .exclude("Python UDF: push down deterministic FilterExec predicates")
+ .exclude("Nested Python UDF: push down deterministic FilterExec
predicates")
+ .exclude("Python UDF: no push down on non-deterministic")
+ .exclude("Python UDF: push down on deterministic predicates after the
first non-deterministic")
override def getSQLQueryTestSettings: SQLQueryTestSettings =
VeloxSQLQueryTestSettings
}
diff --git
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/python/GlutenBatchEvalPythonExecSuite.scala
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/python/GlutenBatchEvalPythonExecSuite.scala
new file mode 100644
index 0000000000..d364262be5
--- /dev/null
+++
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/python/GlutenBatchEvalPythonExecSuite.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.python
+
+import org.apache.gluten.execution.{ColumnarToRowExecBase,
FilterExecTransformer, RowToColumnarExecBase, WholeStageTransformer}
+
+import org.apache.spark.sql.GlutenSQLTestsBaseTrait
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference,
GreaterThan, In}
+import org.apache.spark.sql.execution.{ColumnarInputAdapter,
InputIteratorTransformer}
+
+class GlutenBatchEvalPythonExecSuite extends BatchEvalPythonExecSuite with
GlutenSQLTestsBaseTrait {
+
+ import testImplicits._
+
+ testGluten("Python UDF: push down deterministic FilterExecTransformer
predicates") {
+ val df = Seq(("Hello", 4))
+ .toDF("a", "b")
+ .where("dummyPythonUDF(b) and dummyPythonUDF(a) and a in (3, 4)")
+ val qualifiedPlanNodes = df.queryExecution.executedPlan.collect {
+ case f @ FilterExecTransformer(
+ And(_: AttributeReference, _: AttributeReference),
+ InputIteratorTransformer(ColumnarInputAdapter(r:
RowToColumnarExecBase)))
+ if r.child.isInstanceOf[BatchEvalPythonExec] =>
+ f
+ case b @ BatchEvalPythonExec(_, _, c: ColumnarToRowExecBase) =>
+ c.child match {
+ case WholeStageTransformer(FilterExecTransformer(_: In, _), _) => b
+ }
+ }
+ assert(qualifiedPlanNodes.size == 2)
+ }
+
+ testGluten("Nested Python UDF: push down deterministic FilterExecTransformer
predicates") {
+ val df = Seq(("Hello", 4))
+ .toDF("a", "b")
+ .where("dummyPythonUDF(a, dummyPythonUDF(a, b)) and a in (3, 4)")
+ val qualifiedPlanNodes = df.queryExecution.executedPlan.collect {
+ case f @ FilterExecTransformer(
+ _: AttributeReference,
+ InputIteratorTransformer(ColumnarInputAdapter(r:
RowToColumnarExecBase)))
+ if r.child.isInstanceOf[BatchEvalPythonExec] =>
+ f
+ case b @ BatchEvalPythonExec(_, _, c: ColumnarToRowExecBase) =>
+ c.child match {
+ case WholeStageTransformer(FilterExecTransformer(_: In, _), _) => b
+ }
+ }
+ assert(qualifiedPlanNodes.size == 2)
+ }
+
+ testGluten("Python UDF: no push down on non-deterministic") {
+ val df = Seq(("Hello", 4))
+ .toDF("a", "b")
+ .where("b > 4 and dummyPythonUDF(a) and rand() > 0.3")
+ val qualifiedPlanNodes = df.queryExecution.executedPlan.collect {
+ case f @ FilterExecTransformer(
+ And(_: AttributeReference, _: GreaterThan),
+ InputIteratorTransformer(ColumnarInputAdapter(r:
RowToColumnarExecBase)))
+ if r.child.isInstanceOf[BatchEvalPythonExec] =>
+ f
+ case b @ BatchEvalPythonExec(_, _, c: ColumnarToRowExecBase) =>
+ c.child match {
+ case WholeStageTransformer(_: FilterExecTransformer, _) => b
+ }
+ }
+ assert(qualifiedPlanNodes.size == 2)
+ }
+
+ testGluten(
+ "Python UDF: push down on deterministic predicates after the first
non-deterministic") {
+ val df = Seq(("Hello", 4))
+ .toDF("a", "b")
+ .where("dummyPythonUDF(a) and rand() > 0.3 and b > 4")
+
+ val qualifiedPlanNodes = df.queryExecution.executedPlan.collect {
+ case f @ FilterExecTransformer(
+ And(_: AttributeReference, _: GreaterThan),
+ InputIteratorTransformer(ColumnarInputAdapter(r:
RowToColumnarExecBase)))
+ if r.child.isInstanceOf[BatchEvalPythonExec] =>
+ f
+ case b @ BatchEvalPythonExec(_, _, c: ColumnarToRowExecBase) =>
+ c.child match {
+ case WholeStageTransformer(_: FilterExecTransformer, _) => b
+ }
+ }
+ assert(qualifiedPlanNodes.size == 2)
+ }
+}
diff --git
a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index 676b562889..5d3f772966 100644
---
a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++
b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -36,6 +36,7 @@ import
org.apache.spark.sql.execution.datasources.text.{GlutenTextV1Suite, Glute
import
org.apache.spark.sql.execution.datasources.v2.{GlutenDataSourceV2StrategySuite,
GlutenFileTableSuite, GlutenV2PredicateSuite}
import org.apache.spark.sql.execution.exchange.GlutenEnsureRequirementsSuite
import org.apache.spark.sql.execution.joins.{GlutenBroadcastJoinSuite,
GlutenExistenceJoinSuite, GlutenInnerJoinSuite, GlutenOuterJoinSuite}
+import org.apache.spark.sql.execution.python._
import
org.apache.spark.sql.extension.{GlutenCollapseProjectExecTransformerSuite,
GlutenSessionExtensionSuite, TestFileSourceScanExecTransformer}
import org.apache.spark.sql.gluten.GlutenFallbackSuite
import org.apache.spark.sql.hive.execution.GlutenHiveSQLQuerySuite
@@ -865,6 +866,12 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenCollapseProjectExecTransformerSuite]
enableSuite[GlutenSparkSessionExtensionSuite]
enableSuite[GlutenSQLCollectLimitExecSuite]
+ enableSuite[GlutenBatchEvalPythonExecSuite]
+ // Replaced with other tests that check for native operations
+ .exclude("Python UDF: push down deterministic FilterExec predicates")
+ .exclude("Nested Python UDF: push down deterministic FilterExec
predicates")
+ .exclude("Python UDF: no push down on non-deterministic")
+ .exclude("Python UDF: push down on deterministic predicates after the
first non-deterministic")
override def getSQLQueryTestSettings: SQLQueryTestSettings =
VeloxSQLQueryTestSettings
}
diff --git
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/python/GlutenBatchEvalPythonExecSuite.scala
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/python/GlutenBatchEvalPythonExecSuite.scala
new file mode 100644
index 0000000000..d364262be5
--- /dev/null
+++
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/python/GlutenBatchEvalPythonExecSuite.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.python
+
+import org.apache.gluten.execution.{ColumnarToRowExecBase,
FilterExecTransformer, RowToColumnarExecBase, WholeStageTransformer}
+
+import org.apache.spark.sql.GlutenSQLTestsBaseTrait
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference,
GreaterThan, In}
+import org.apache.spark.sql.execution.{ColumnarInputAdapter,
InputIteratorTransformer}
+
+class GlutenBatchEvalPythonExecSuite extends BatchEvalPythonExecSuite with
GlutenSQLTestsBaseTrait {
+
+ import testImplicits._
+
+ testGluten("Python UDF: push down deterministic FilterExecTransformer
predicates") {
+ val df = Seq(("Hello", 4))
+ .toDF("a", "b")
+ .where("dummyPythonUDF(b) and dummyPythonUDF(a) and a in (3, 4)")
+ val qualifiedPlanNodes = df.queryExecution.executedPlan.collect {
+ case f @ FilterExecTransformer(
+ And(_: AttributeReference, _: AttributeReference),
+ InputIteratorTransformer(ColumnarInputAdapter(r:
RowToColumnarExecBase)))
+ if r.child.isInstanceOf[BatchEvalPythonExec] =>
+ f
+ case b @ BatchEvalPythonExec(_, _, c: ColumnarToRowExecBase) =>
+ c.child match {
+ case WholeStageTransformer(FilterExecTransformer(_: In, _), _) => b
+ }
+ }
+ assert(qualifiedPlanNodes.size == 2)
+ }
+
+ testGluten("Nested Python UDF: push down deterministic FilterExecTransformer
predicates") {
+ val df = Seq(("Hello", 4))
+ .toDF("a", "b")
+ .where("dummyPythonUDF(a, dummyPythonUDF(a, b)) and a in (3, 4)")
+ val qualifiedPlanNodes = df.queryExecution.executedPlan.collect {
+ case f @ FilterExecTransformer(
+ _: AttributeReference,
+ InputIteratorTransformer(ColumnarInputAdapter(r:
RowToColumnarExecBase)))
+ if r.child.isInstanceOf[BatchEvalPythonExec] =>
+ f
+ case b @ BatchEvalPythonExec(_, _, c: ColumnarToRowExecBase) =>
+ c.child match {
+ case WholeStageTransformer(FilterExecTransformer(_: In, _), _) => b
+ }
+ }
+ assert(qualifiedPlanNodes.size == 2)
+ }
+
+ testGluten("Python UDF: no push down on non-deterministic") {
+ val df = Seq(("Hello", 4))
+ .toDF("a", "b")
+ .where("b > 4 and dummyPythonUDF(a) and rand() > 0.3")
+ val qualifiedPlanNodes = df.queryExecution.executedPlan.collect {
+ case f @ FilterExecTransformer(
+ And(_: AttributeReference, _: GreaterThan),
+ InputIteratorTransformer(ColumnarInputAdapter(r:
RowToColumnarExecBase)))
+ if r.child.isInstanceOf[BatchEvalPythonExec] =>
+ f
+ case b @ BatchEvalPythonExec(_, _, c: ColumnarToRowExecBase) =>
+ c.child match {
+ case WholeStageTransformer(_: FilterExecTransformer, _) => b
+ }
+ }
+ assert(qualifiedPlanNodes.size == 2)
+ }
+
+ testGluten(
+ "Python UDF: push down on deterministic predicates after the first
non-deterministic") {
+ val df = Seq(("Hello", 4))
+ .toDF("a", "b")
+ .where("dummyPythonUDF(a) and rand() > 0.3 and b > 4")
+
+ val qualifiedPlanNodes = df.queryExecution.executedPlan.collect {
+ case f @ FilterExecTransformer(
+ And(_: AttributeReference, _: GreaterThan),
+ InputIteratorTransformer(ColumnarInputAdapter(r:
RowToColumnarExecBase)))
+ if r.child.isInstanceOf[BatchEvalPythonExec] =>
+ f
+ case b @ BatchEvalPythonExec(_, _, c: ColumnarToRowExecBase) =>
+ c.child match {
+ case WholeStageTransformer(_: FilterExecTransformer, _) => b
+ }
+ }
+ assert(qualifiedPlanNodes.size == 2)
+ }
+}
diff --git
a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index 3f6ed85f87..e470e22ad3 100644
---
a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++
b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -37,6 +37,7 @@ import
org.apache.spark.sql.execution.datasources.text.{GlutenTextV1Suite, Glute
import
org.apache.spark.sql.execution.datasources.v2.{GlutenDataSourceV2StrategySuite,
GlutenFileTableSuite, GlutenV2PredicateSuite}
import org.apache.spark.sql.execution.exchange.GlutenEnsureRequirementsSuite
import org.apache.spark.sql.execution.joins.{GlutenBroadcastJoinSuite,
GlutenExistenceJoinSuite, GlutenInnerJoinSuite, GlutenOuterJoinSuite}
+import org.apache.spark.sql.execution.python._
import
org.apache.spark.sql.extension.{GlutenCollapseProjectExecTransformerSuite,
GlutenSessionExtensionSuite, TestFileSourceScanExecTransformer}
import org.apache.spark.sql.gluten.GlutenFallbackSuite
import org.apache.spark.sql.hive.execution.GlutenHiveSQLQuerySuite
@@ -902,6 +903,12 @@ class VeloxTestSettings extends BackendTestSettings {
.excludeByPrefix("row index generation")
.excludeByPrefix("invalid row index column type")
enableSuite[GlutenSQLCollectLimitExecSuite]
+ enableSuite[GlutenBatchEvalPythonExecSuite]
+ // Replaced with other tests that check for native operations
+ .exclude("Python UDF: push down deterministic FilterExec predicates")
+ .exclude("Nested Python UDF: push down deterministic FilterExec
predicates")
+ .exclude("Python UDF: no push down on non-deterministic")
+ .exclude("Python UDF: push down on deterministic predicates after the
first non-deterministic")
override def getSQLQueryTestSettings: SQLQueryTestSettings =
VeloxSQLQueryTestSettings
}
diff --git
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/python/GlutenBatchEvalPythonExecSuite.scala
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/python/GlutenBatchEvalPythonExecSuite.scala
new file mode 100644
index 0000000000..d364262be5
--- /dev/null
+++
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/python/GlutenBatchEvalPythonExecSuite.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.python
+
+import org.apache.gluten.execution.{ColumnarToRowExecBase,
FilterExecTransformer, RowToColumnarExecBase, WholeStageTransformer}
+
+import org.apache.spark.sql.GlutenSQLTestsBaseTrait
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference,
GreaterThan, In}
+import org.apache.spark.sql.execution.{ColumnarInputAdapter,
InputIteratorTransformer}
+
+class GlutenBatchEvalPythonExecSuite extends BatchEvalPythonExecSuite with
GlutenSQLTestsBaseTrait {
+
+ import testImplicits._
+
+ testGluten("Python UDF: push down deterministic FilterExecTransformer
predicates") {
+ val df = Seq(("Hello", 4))
+ .toDF("a", "b")
+ .where("dummyPythonUDF(b) and dummyPythonUDF(a) and a in (3, 4)")
+ val qualifiedPlanNodes = df.queryExecution.executedPlan.collect {
+ case f @ FilterExecTransformer(
+ And(_: AttributeReference, _: AttributeReference),
+ InputIteratorTransformer(ColumnarInputAdapter(r:
RowToColumnarExecBase)))
+ if r.child.isInstanceOf[BatchEvalPythonExec] =>
+ f
+ case b @ BatchEvalPythonExec(_, _, c: ColumnarToRowExecBase) =>
+ c.child match {
+ case WholeStageTransformer(FilterExecTransformer(_: In, _), _) => b
+ }
+ }
+ assert(qualifiedPlanNodes.size == 2)
+ }
+
+ testGluten("Nested Python UDF: push down deterministic FilterExecTransformer
predicates") {
+ val df = Seq(("Hello", 4))
+ .toDF("a", "b")
+ .where("dummyPythonUDF(a, dummyPythonUDF(a, b)) and a in (3, 4)")
+ val qualifiedPlanNodes = df.queryExecution.executedPlan.collect {
+ case f @ FilterExecTransformer(
+ _: AttributeReference,
+ InputIteratorTransformer(ColumnarInputAdapter(r:
RowToColumnarExecBase)))
+ if r.child.isInstanceOf[BatchEvalPythonExec] =>
+ f
+ case b @ BatchEvalPythonExec(_, _, c: ColumnarToRowExecBase) =>
+ c.child match {
+ case WholeStageTransformer(FilterExecTransformer(_: In, _), _) => b
+ }
+ }
+ assert(qualifiedPlanNodes.size == 2)
+ }
+
+ testGluten("Python UDF: no push down on non-deterministic") {
+ val df = Seq(("Hello", 4))
+ .toDF("a", "b")
+ .where("b > 4 and dummyPythonUDF(a) and rand() > 0.3")
+ val qualifiedPlanNodes = df.queryExecution.executedPlan.collect {
+ case f @ FilterExecTransformer(
+ And(_: AttributeReference, _: GreaterThan),
+ InputIteratorTransformer(ColumnarInputAdapter(r:
RowToColumnarExecBase)))
+ if r.child.isInstanceOf[BatchEvalPythonExec] =>
+ f
+ case b @ BatchEvalPythonExec(_, _, c: ColumnarToRowExecBase) =>
+ c.child match {
+ case WholeStageTransformer(_: FilterExecTransformer, _) => b
+ }
+ }
+ assert(qualifiedPlanNodes.size == 2)
+ }
+
+ testGluten(
+ "Python UDF: push down on deterministic predicates after the first
non-deterministic") {
+ val df = Seq(("Hello", 4))
+ .toDF("a", "b")
+ .where("dummyPythonUDF(a) and rand() > 0.3 and b > 4")
+
+ val qualifiedPlanNodes = df.queryExecution.executedPlan.collect {
+ case f @ FilterExecTransformer(
+ And(_: AttributeReference, _: GreaterThan),
+ InputIteratorTransformer(ColumnarInputAdapter(r:
RowToColumnarExecBase)))
+ if r.child.isInstanceOf[BatchEvalPythonExec] =>
+ f
+ case b @ BatchEvalPythonExec(_, _, c: ColumnarToRowExecBase) =>
+ c.child match {
+ case WholeStageTransformer(_: FilterExecTransformer, _) => b
+ }
+ }
+ assert(qualifiedPlanNodes.size == 2)
+ }
+}
diff --git
a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index dfdb4a6b10..089c6f4c16 100644
---
a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++
b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -36,6 +36,7 @@ import
org.apache.spark.sql.execution.datasources.text.{GlutenTextV1Suite, Glute
import
org.apache.spark.sql.execution.datasources.v2.{GlutenDataSourceV2StrategySuite,
GlutenFileTableSuite, GlutenV2PredicateSuite}
import org.apache.spark.sql.execution.exchange.GlutenEnsureRequirementsSuite
import org.apache.spark.sql.execution.joins._
+import org.apache.spark.sql.execution.python._
import
org.apache.spark.sql.extension.{GlutenCollapseProjectExecTransformerSuite,
GlutenSessionExtensionSuite, TestFileSourceScanExecTransformer}
import org.apache.spark.sql.gluten.GlutenFallbackSuite
import org.apache.spark.sql.hive.execution.GlutenHiveSQLQuerySuite
@@ -942,6 +943,12 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenTableLocationSuite]
enableSuite[GlutenRemoveRedundantWindowGroupLimitsSuite]
enableSuite[GlutenSQLCollectLimitExecSuite]
+ enableSuite[GlutenBatchEvalPythonExecSuite]
+ // Replaced with other tests that check for native operations
+ .exclude("Python UDF: push down deterministic FilterExec predicates")
+ .exclude("Nested Python UDF: push down deterministic FilterExec
predicates")
+ .exclude("Python UDF: no push down on non-deterministic")
+ .exclude("Python UDF: push down on deterministic predicates after the
first non-deterministic")
override def getSQLQueryTestSettings: SQLQueryTestSettings =
VeloxSQLQueryTestSettings
}
diff --git
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/python/GlutenBatchEvalPythonExecSuite.scala
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/python/GlutenBatchEvalPythonExecSuite.scala
new file mode 100644
index 0000000000..d364262be5
--- /dev/null
+++
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/python/GlutenBatchEvalPythonExecSuite.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.python
+
+import org.apache.gluten.execution.{ColumnarToRowExecBase,
FilterExecTransformer, RowToColumnarExecBase, WholeStageTransformer}
+
+import org.apache.spark.sql.GlutenSQLTestsBaseTrait
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference,
GreaterThan, In}
+import org.apache.spark.sql.execution.{ColumnarInputAdapter,
InputIteratorTransformer}
+
+class GlutenBatchEvalPythonExecSuite extends BatchEvalPythonExecSuite with
GlutenSQLTestsBaseTrait {
+
+ import testImplicits._
+
+ testGluten("Python UDF: push down deterministic FilterExecTransformer
predicates") {
+ val df = Seq(("Hello", 4))
+ .toDF("a", "b")
+ .where("dummyPythonUDF(b) and dummyPythonUDF(a) and a in (3, 4)")
+ val qualifiedPlanNodes = df.queryExecution.executedPlan.collect {
+ case f @ FilterExecTransformer(
+ And(_: AttributeReference, _: AttributeReference),
+ InputIteratorTransformer(ColumnarInputAdapter(r:
RowToColumnarExecBase)))
+ if r.child.isInstanceOf[BatchEvalPythonExec] =>
+ f
+ case b @ BatchEvalPythonExec(_, _, c: ColumnarToRowExecBase) =>
+ c.child match {
+ case WholeStageTransformer(FilterExecTransformer(_: In, _), _) => b
+ }
+ }
+ assert(qualifiedPlanNodes.size == 2)
+ }
+
+ testGluten("Nested Python UDF: push down deterministic FilterExecTransformer
predicates") {
+ val df = Seq(("Hello", 4))
+ .toDF("a", "b")
+ .where("dummyPythonUDF(a, dummyPythonUDF(a, b)) and a in (3, 4)")
+ val qualifiedPlanNodes = df.queryExecution.executedPlan.collect {
+ case f @ FilterExecTransformer(
+ _: AttributeReference,
+ InputIteratorTransformer(ColumnarInputAdapter(r:
RowToColumnarExecBase)))
+ if r.child.isInstanceOf[BatchEvalPythonExec] =>
+ f
+ case b @ BatchEvalPythonExec(_, _, c: ColumnarToRowExecBase) =>
+ c.child match {
+ case WholeStageTransformer(FilterExecTransformer(_: In, _), _) => b
+ }
+ }
+ assert(qualifiedPlanNodes.size == 2)
+ }
+
+ testGluten("Python UDF: no push down on non-deterministic") {
+ val df = Seq(("Hello", 4))
+ .toDF("a", "b")
+ .where("b > 4 and dummyPythonUDF(a) and rand() > 0.3")
+ val qualifiedPlanNodes = df.queryExecution.executedPlan.collect {
+ case f @ FilterExecTransformer(
+ And(_: AttributeReference, _: GreaterThan),
+ InputIteratorTransformer(ColumnarInputAdapter(r:
RowToColumnarExecBase)))
+ if r.child.isInstanceOf[BatchEvalPythonExec] =>
+ f
+ case b @ BatchEvalPythonExec(_, _, c: ColumnarToRowExecBase) =>
+ c.child match {
+ case WholeStageTransformer(_: FilterExecTransformer, _) => b
+ }
+ }
+ assert(qualifiedPlanNodes.size == 2)
+ }
+
+ testGluten(
+ "Python UDF: push down on deterministic predicates after the first
non-deterministic") {
+ val df = Seq(("Hello", 4))
+ .toDF("a", "b")
+ .where("dummyPythonUDF(a) and rand() > 0.3 and b > 4")
+
+ val qualifiedPlanNodes = df.queryExecution.executedPlan.collect {
+ case f @ FilterExecTransformer(
+ And(_: AttributeReference, _: GreaterThan),
+ InputIteratorTransformer(ColumnarInputAdapter(r:
RowToColumnarExecBase)))
+ if r.child.isInstanceOf[BatchEvalPythonExec] =>
+ f
+ case b @ BatchEvalPythonExec(_, _, c: ColumnarToRowExecBase) =>
+ c.child match {
+ case WholeStageTransformer(_: FilterExecTransformer, _) => b
+ }
+ }
+ assert(qualifiedPlanNodes.size == 2)
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]