This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new dfd5237  [SPARK-36783][SQL] ScanOperation should not push Filter 
through nondeterministic Project
dfd5237 is described below

commit dfd5237c0cd6e3024032b371f0182d2af691af7d
Author: Wenchen Fan <[email protected]>
AuthorDate: Fri Sep 17 10:51:15 2021 +0800

    [SPARK-36783][SQL] ScanOperation should not push Filter through 
nondeterministic Project
    
    ### What changes were proposed in this pull request?
    
    `ScanOperation` collects adjacent Projects and Filters. The caller side 
always assume that the collected Filters should run before collected Projects, 
which means `ScanOperation` effectively pushes Filter through Project.
    
    Following `PushPredicateThroughNonJoin`, we should not push Filter through 
nondeterministic Project. This PR fixes `ScanOperation` to follow this rule.
    
    ### Why are the changes needed?
    
    Fix a bug that violates the semantic of nondeterministic expressions.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Most likely no change, but in some cases, this is a correctness bug fix 
which changes the query result.
    
    ### How was this patch tested?
    
    existing tests
    
    Closes #34023 from cloud-fan/scan.
    
    Authored-by: Wenchen Fan <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../spark/sql/catalyst/planning/patterns.scala     | 14 ++++++++------
 .../sql/catalyst/planning/ScanOperationSuite.scala | 22 +++++-----------------
 2 files changed, 13 insertions(+), 23 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
index afc4ab7..2a983ee 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
@@ -144,13 +144,15 @@ object ScanOperation extends OperationHelper with 
PredicateHelper {
       case Filter(condition, child) =>
         collectProjectsAndFilters(child) match {
           case Some((fields, filters, other, aliases)) =>
-            // Follow CombineFilters and only keep going if 1) the collected 
Filters
-            // and this filter are all deterministic or 2) if this filter is 
the first
-            // collected filter and doesn't have common non-deterministic 
expressions
-            // with lower Project.
+            // When collecting projects and filters, we effectively push down 
filters through
+            // projects. We need to meet the following conditions to do so:
+            //   1) no Project collected so far or the collected Projects are 
all deterministic
+            //   2) the collected filters and this filter are all 
deterministic, or this is the
+            //      first collected filter.
+            val canCombineFilters = fields.forall(_.forall(_.deterministic)) 
&& {
+              filters.isEmpty || (filters.forall(_.deterministic) && 
condition.deterministic)
+            }
             val substitutedCondition = substitute(aliases)(condition)
-            val canCombineFilters = (filters.nonEmpty && 
filters.forall(_.deterministic) &&
-              substitutedCondition.deterministic) || filters.isEmpty
             if (canCombineFilters && 
!hasCommonNonDeterministic(Seq(condition), aliases)) {
               Some((fields, filters ++ 
splitConjunctivePredicates(substitutedCondition),
                 other, aliases))
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/planning/ScanOperationSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/planning/ScanOperationSuite.scala
index 1290f77..b1baecc 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/planning/ScanOperationSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/planning/ScanOperationSuite.scala
@@ -71,31 +71,19 @@ class ScanOperationSuite extends SparkFunSuite {
     }
   }
 
-  test("Filter which has the same non-deterministic expression with its child 
Project") {
-    val filter1 = Filter(EqualTo(colR, Literal(1)), Project(Seq(colA, aliasR), 
relation))
+  test("Filter with non-deterministic Project") {
+    val filter1 = Filter(EqualTo(colA, Literal(1)), Project(Seq(colA, aliasR), 
relation))
     assert(ScanOperation.unapply(filter1).isEmpty)
   }
 
-  test("Deterministic filter with a child Project with a non-deterministic 
expression") {
-    val filter2 = Filter(EqualTo(colA, Literal(1)), Project(Seq(colA, aliasR), 
relation))
-    filter2 match {
-      case ScanOperation(projects, filters, _: LocalRelation) =>
-        assert(projects.size === 2)
-        assert(projects(0) === colA)
-        assert(projects(1) === aliasR)
-        assert(filters.size === 1)
-      case _ => assert(false)
-    }
-  }
-
-  test("Filter which has different non-deterministic expressions with its 
child Project") {
+  test("Non-deterministic Filter with deterministic Project") {
     val filter3 = Filter(EqualTo(MonotonicallyIncreasingID(), Literal(1)),
-      Project(Seq(colA, aliasR), relation))
+      Project(Seq(colA, colB), relation))
     filter3 match {
       case ScanOperation(projects, filters, _: LocalRelation) =>
         assert(projects.size === 2)
         assert(projects(0) === colA)
-        assert(projects(1) === aliasR)
+        assert(projects(1) === colB)
         assert(filters.size === 1)
       case _ => assert(false)
     }

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to