This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 264ff2ecd [CORE] Fix non-deterministic filter executed twice when push 
down to scan (#6296)
264ff2ecd is described below

commit 264ff2ecd28dfecd054b77c78c1b7f2c0db982e2
Author: Mingliang Zhu <[email protected]>
AuthorDate: Tue Jul 2 15:09:29 2024 +0800

    [CORE] Fix non-deterministic filter executed twice when push down to scan 
(#6296)
    
    Co-authored-by: wangguangxin.cn <[email protected]>
---
 .../org/apache/gluten/execution/TestOperator.scala    | 19 +++++++++++++++++++
 .../execution/BasicPhysicalOperatorTransformer.scala  |  2 +-
 2 files changed, 20 insertions(+), 1 deletion(-)

diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala 
b/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
index 9b47a519c..c010b9128 100644
--- 
a/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
+++ 
b/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
@@ -1892,4 +1892,23 @@ class TestOperator extends 
VeloxWholeStageTransformerSuite with AdaptiveSparkPla
         }
     }
   }
+
+  test("fix non-deterministic filter executed twice when push down to scan") {
+    val df = sql("select * from lineitem where rand() <= 0.5")
+    // plan check
+    val plan = df.queryExecution.executedPlan
+    val scans = plan.collect { case scan: FileSourceScanExecTransformer => 
scan }
+    val filters = plan.collect { case filter: FilterExecTransformer => filter }
+    assert(scans.size == 1)
+    assert(filters.size == 1)
+    assert(scans(0).dataFilters.size == 1)
+    val remainingFilters = FilterHandler.getRemainingFilters(
+      scans(0).dataFilters,
+      splitConjunctivePredicates(filters(0).condition))
+    assert(remainingFilters.size == 0)
+
+    // result length check, table lineitem has 60,000 rows
+    val resultLength = df.collect().length
+    assert(resultLength > 25000 && resultLength < 35000)
+  }
 }
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala
index 962ad6aca..0b792d52e 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala
@@ -365,7 +365,7 @@ object FilterHandler extends PredicateHelper {
    *   the filter conditions not pushed down into Scan.
    */
   def getRemainingFilters(scanFilters: Seq[Expression], filters: 
Seq[Expression]): Seq[Expression] =
-    (ExpressionSet(filters) -- ExpressionSet(scanFilters)).toSeq
+    (filters.toSet -- scanFilters.toSet).toSeq
 
   // Separate and compare the filter conditions in Scan and Filter.
   // Try to push down the remaining conditions in Filter into Scan.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to