This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 264ff2ecd [CORE] Fix non-deterministic filter executed twice when push
down to scan (#6296)
264ff2ecd is described below
commit 264ff2ecd28dfecd054b77c78c1b7f2c0db982e2
Author: Mingliang Zhu <[email protected]>
AuthorDate: Tue Jul 2 15:09:29 2024 +0800
[CORE] Fix non-deterministic filter executed twice when push down to scan
(#6296)
Co-authored-by: wangguangxin.cn <[email protected]>
---
.../org/apache/gluten/execution/TestOperator.scala | 19 +++++++++++++++++++
.../execution/BasicPhysicalOperatorTransformer.scala | 2 +-
2 files changed, 20 insertions(+), 1 deletion(-)
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
b/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
index 9b47a519c..c010b9128 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
@@ -1892,4 +1892,23 @@ class TestOperator extends
VeloxWholeStageTransformerSuite with AdaptiveSparkPla
}
}
}
+
+ test("fix non-deterministic filter executed twice when push down to scan") {
+ val df = sql("select * from lineitem where rand() <= 0.5")
+ // plan check
+ val plan = df.queryExecution.executedPlan
+ val scans = plan.collect { case scan: FileSourceScanExecTransformer =>
scan }
+ val filters = plan.collect { case filter: FilterExecTransformer => filter }
+ assert(scans.size == 1)
+ assert(filters.size == 1)
+ assert(scans(0).dataFilters.size == 1)
+ val remainingFilters = FilterHandler.getRemainingFilters(
+ scans(0).dataFilters,
+ splitConjunctivePredicates(filters(0).condition))
+ assert(remainingFilters.size == 0)
+
+ // result length check, table lineitem has 60,000 rows
+ val resultLength = df.collect().length
+ assert(resultLength > 25000 && resultLength < 35000)
+ }
}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala
b/gluten-core/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala
index 962ad6aca..0b792d52e 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala
@@ -365,7 +365,7 @@ object FilterHandler extends PredicateHelper {
* the filter conditions not pushed down into Scan.
*/
def getRemainingFilters(scanFilters: Seq[Expression], filters:
Seq[Expression]): Seq[Expression] =
- (ExpressionSet(filters) -- ExpressionSet(scanFilters)).toSeq
+ (filters.toSet -- scanFilters.toSet).toSeq
// Separate and compare the filter conditions in Scan and Filter.
// Try to push down the remaining conditions in Filter into Scan.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]