This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 24cdf7b [SPARK-36783][SQL] ScanOperation should not push Filter
through nondeterministic Project
24cdf7b is described below
commit 24cdf7bb86935b930e7d97966726ee58809479d9
Author: Wenchen Fan <[email protected]>
AuthorDate: Fri Sep 17 10:51:15 2021 +0800
[SPARK-36783][SQL] ScanOperation should not push Filter through
nondeterministic Project
`ScanOperation` collects adjacent Projects and Filters. The caller side
always assume that the collected Filters should run before collected Projects,
which means `ScanOperation` effectively pushes Filter through Project.
Following `PushPredicateThroughNonJoin`, we should not push Filter through
nondeterministic Project. This PR fixes `ScanOperation` to follow this rule.
Fix a bug that violates the semantic of nondeterministic expressions.
Most likely no change, but in some cases, this is a correctness bug fix
which changes the query result.
existing tests
Closes #34023 from cloud-fan/scan.
Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit dfd5237c0cd6e3024032b371f0182d2af691af7d)
Signed-off-by: Wenchen Fan <[email protected]>
---
.../spark/sql/catalyst/planning/patterns.scala | 14 ++++++++------
.../sql/catalyst/planning/ScanOperationSuite.scala | 22 +++++-----------------
2 files changed, 13 insertions(+), 23 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
index 5b694d9..d88666d 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
@@ -142,13 +142,15 @@ object ScanOperation extends OperationHelper with
PredicateHelper {
case Filter(condition, child) =>
collectProjectsAndFilters(child) match {
case Some((fields, filters, other, aliases)) =>
- // Follow CombineFilters and only keep going if 1) the collected
Filters
- // and this filter are all deterministic or 2) if this filter is
the first
- // collected filter and doesn't have common non-deterministic
expressions
- // with lower Project.
+ // When collecting projects and filters, we effectively push down
filters through
+ // projects. We need to meet the following conditions to do so:
+ // 1) no Project collected so far or the collected Projects are
all deterministic
+ // 2) the collected filters and this filter are all
deterministic, or this is the
+ // first collected filter.
+ val canCombineFilters = fields.forall(_.forall(_.deterministic))
&& {
+ filters.isEmpty || (filters.forall(_.deterministic) &&
condition.deterministic)
+ }
val substitutedCondition = substitute(aliases)(condition)
- val canCombineFilters = (filters.nonEmpty &&
filters.forall(_.deterministic) &&
- substitutedCondition.deterministic) || filters.isEmpty
if (canCombineFilters &&
!hasCommonNonDeterministic(Seq(condition), aliases)) {
Some((fields, filters ++
splitConjunctivePredicates(substitutedCondition),
other, aliases))
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/planning/ScanOperationSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/planning/ScanOperationSuite.scala
index 7790f46..9bd3d8e 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/planning/ScanOperationSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/planning/ScanOperationSuite.scala
@@ -68,35 +68,23 @@ class ScanOperationSuite extends SparkFunSuite {
}
}
- test("Filter which has the same non-deterministic expression with its child
Project") {
- val filter1 = Filter(EqualTo(colR, Literal(1)), Project(Seq(colA, aliasR),
relation))
+ test("Filter with non-deterministic Project") {
+ val filter1 = Filter(EqualTo(colA, Literal(1)), Project(Seq(colA, aliasR),
relation))
assert(ScanOperation.unapply(filter1).isEmpty)
}
- test("Deterministic filter with a child Project with a non-deterministic
expression") {
- val filter2 = Filter(EqualTo(colA, Literal(1)), Project(Seq(colA, aliasR),
relation))
- filter2 match {
- case ScanOperation(projects, filters, _: LocalRelation) =>
- assert(projects.size === 2)
- assert(projects(0) === colA)
- assert(projects(1) === aliasR)
- assert(filters.size === 1)
- }
- }
-
- test("Filter which has different non-deterministic expressions with its
child Project") {
+ test("Non-deterministic Filter with deterministic Project") {
val filter3 = Filter(EqualTo(MonotonicallyIncreasingID(), Literal(1)),
- Project(Seq(colA, aliasR), relation))
+ Project(Seq(colA, colB), relation))
filter3 match {
case ScanOperation(projects, filters, _: LocalRelation) =>
assert(projects.size === 2)
assert(projects(0) === colA)
- assert(projects(1) === aliasR)
+ assert(projects(1) === colB)
assert(filters.size === 1)
}
}
-
test("Deterministic filter which has a non-deterministic child Filter") {
val filter4 = Filter(EqualTo(colA, Literal(1)), Filter(EqualTo(aliasR,
Literal(1)), relation))
assert(ScanOperation.unapply(filter4).isEmpty)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]