This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 1339afe [SPARK-36783][SQL] ScanOperation should not push Filter
through nondeterministic Project
1339afe is described below
commit 1339afe3b70bb94cd067873a99bb3e760a04f82a
Author: Wenchen Fan <[email protected]>
AuthorDate: Fri Sep 17 10:51:15 2021 +0800
[SPARK-36783][SQL] ScanOperation should not push Filter through
nondeterministic Project
### What changes were proposed in this pull request?
`ScanOperation` collects adjacent Projects and Filters. The caller side
always assume that the collected Filters should run before collected Projects,
which means `ScanOperation` effectively pushes Filter through Project.
Following `PushPredicateThroughNonJoin`, we should not push Filter through
nondeterministic Project. This PR fixes `ScanOperation` to follow this rule.
### Why are the changes needed?
Fix a bug that violates the semantic of nondeterministic expressions.
### Does this PR introduce _any_ user-facing change?
Most likely no change, but in some cases, this is a correctness bug fix
which changes the query result.
### How was this patch tested?
existing tests
Closes #34023 from cloud-fan/scan.
Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit dfd5237c0cd6e3024032b371f0182d2af691af7d)
Signed-off-by: Wenchen Fan <[email protected]>
---
.../spark/sql/catalyst/planning/patterns.scala | 14 ++++++++------
.../sql/catalyst/planning/ScanOperationSuite.scala | 22 +++++-----------------
2 files changed, 13 insertions(+), 23 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
index c22a874..fac4d2a 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
@@ -144,13 +144,15 @@ object ScanOperation extends OperationHelper with
PredicateHelper {
case Filter(condition, child) =>
collectProjectsAndFilters(child) match {
case Some((fields, filters, other, aliases)) =>
- // Follow CombineFilters and only keep going if 1) the collected
Filters
- // and this filter are all deterministic or 2) if this filter is
the first
- // collected filter and doesn't have common non-deterministic
expressions
- // with lower Project.
+ // When collecting projects and filters, we effectively push down
filters through
+ // projects. We need to meet the following conditions to do so:
+ // 1) no Project collected so far or the collected Projects are
all deterministic
+ // 2) the collected filters and this filter are all
deterministic, or this is the
+ // first collected filter.
+ val canCombineFilters = fields.forall(_.forall(_.deterministic))
&& {
+ filters.isEmpty || (filters.forall(_.deterministic) &&
condition.deterministic)
+ }
val substitutedCondition = substitute(aliases)(condition)
- val canCombineFilters = (filters.nonEmpty &&
filters.forall(_.deterministic) &&
- substitutedCondition.deterministic) || filters.isEmpty
if (canCombineFilters &&
!hasCommonNonDeterministic(Seq(condition), aliases)) {
Some((fields, filters ++
splitConjunctivePredicates(substitutedCondition),
other, aliases))
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/planning/ScanOperationSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/planning/ScanOperationSuite.scala
index 1290f77..b1baecc 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/planning/ScanOperationSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/planning/ScanOperationSuite.scala
@@ -71,31 +71,19 @@ class ScanOperationSuite extends SparkFunSuite {
}
}
- test("Filter which has the same non-deterministic expression with its child
Project") {
- val filter1 = Filter(EqualTo(colR, Literal(1)), Project(Seq(colA, aliasR),
relation))
+ test("Filter with non-deterministic Project") {
+ val filter1 = Filter(EqualTo(colA, Literal(1)), Project(Seq(colA, aliasR),
relation))
assert(ScanOperation.unapply(filter1).isEmpty)
}
- test("Deterministic filter with a child Project with a non-deterministic
expression") {
- val filter2 = Filter(EqualTo(colA, Literal(1)), Project(Seq(colA, aliasR),
relation))
- filter2 match {
- case ScanOperation(projects, filters, _: LocalRelation) =>
- assert(projects.size === 2)
- assert(projects(0) === colA)
- assert(projects(1) === aliasR)
- assert(filters.size === 1)
- case _ => assert(false)
- }
- }
-
- test("Filter which has different non-deterministic expressions with its
child Project") {
+ test("Non-deterministic Filter with deterministic Project") {
val filter3 = Filter(EqualTo(MonotonicallyIncreasingID(), Literal(1)),
- Project(Seq(colA, aliasR), relation))
+ Project(Seq(colA, colB), relation))
filter3 match {
case ScanOperation(projects, filters, _: LocalRelation) =>
assert(projects.size === 2)
assert(projects(0) === colA)
- assert(projects(1) === aliasR)
+ assert(projects(1) === colB)
assert(filters.size === 1)
case _ => assert(false)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]