This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 36fc73e7c42 [SPARK-39672][SQL][3.1] Fix removing project before filter
with correlated subquery
36fc73e7c42 is described below
commit 36fc73e7c42b84e05b15b2caecc0f804610dce20
Author: tianlzhang <[email protected]>
AuthorDate: Thu Jul 14 12:49:57 2022 +0800
[SPARK-39672][SQL][3.1] Fix removing project before filter with correlated
subquery
### What changes were proposed in this pull request?
Add more checks to`removeProjectBeforeFilter` in `ColumnPruning` and only
remove the project if
1. the filter condition contains correlated subquery
2. same attribute exists in both output of child of Project and subquery
### Why are the changes needed?
This is a legitimate self-join query and should not throw exception when
de-duplicating attributes in subquery and outer values.
```sql
select * from
(
select v1.a, v1.b, v2.c
from v1
inner join v2
on v1.a=v2.a) t3
where not exists (
select 1
from v2
where t3.a=v2.a and t3.b=v2.b and t3.c=v2.c
)
```
Here's what happens with the current code. The above query is analyzed into
following `LogicalPlan` before `ColumnPruning`.
```
Project [a#250, b#251, c#268]
+- Filter NOT exists#272 [(a#250 = a#266) && (b#251 = b#267) && (c#268 =
c#268#277)]
: +- Project [1 AS 1#273, _1#259 AS a#266, _2#260 AS b#267, _3#261 AS
c#268#277]
: +- LocalRelation [_1#259, _2#260, _3#261]
+- Project [a#250, b#251, c#268]
+- Join Inner, (a#250 = a#266)
:- Project [a#250, b#251]
: +- Project [_1#243 AS a#250, _2#244 AS b#251]
: +- LocalRelation [_1#243, _2#244, _3#245]
+- Project [a#266, c#268]
+- Project [_1#259 AS a#266, _3#261 AS c#268]
+- LocalRelation [_1#259, _2#260, _3#261]
```
Then in `ColumnPruning`, the Project before Filter (between Filter and
Join) is removed. This changes the `outputSet` of the child of Filter among
which the same attribute also exists in the subquery. Later, when
`RewritePredicateSubquery` de-duplicates conflicting attributes, it would
complain `Found conflicting attributes a#266 in the condition joining outer
plan`.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Add UT.
Closes #37074 from manuzhang/spark-39672.
Lead-authored-by: tianlzhang <[email protected]>
Co-authored-by: Manu Zhang <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../spark/sql/catalyst/optimizer/Optimizer.scala | 16 +++++-
.../scala/org/apache/spark/sql/SubquerySuite.scala | 58 +++++++++++++++++++++-
2 files changed, 71 insertions(+), 3 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index fe6e9c52b04..8b777bed706 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -742,12 +742,24 @@ object ColumnPruning extends Rule[LogicalPlan] {
* order, otherwise lower Projects can be missed.
*/
private def removeProjectBeforeFilter(plan: LogicalPlan): LogicalPlan = plan
transformUp {
- case p1 @ Project(_, f @ Filter(_, p2 @ Project(_, child)))
+ case p1 @ Project(_, f @ Filter(e, p2 @ Project(_, child)))
if p2.outputSet.subsetOf(child.outputSet) &&
// We only remove attribute-only project.
- p2.projectList.forall(_.isInstanceOf[AttributeReference]) =>
+ p2.projectList.forall(_.isInstanceOf[AttributeReference]) &&
+ // We can't remove project when the child has conflicting attributes
+ // with the subquery in filter predicate
+ !hasConflictingAttrsWithSubquery(e, child) =>
p1.copy(child = f.copy(child = child))
}
+
+ private def hasConflictingAttrsWithSubquery(
+ predicate: Expression,
+ child: LogicalPlan): Boolean = {
+ predicate.find {
+ case s: SubqueryExpression if
s.plan.outputSet.intersect(child.outputSet).nonEmpty => true
+ case _ => false
+ }.isDefined
+ }
}
/**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
index 8ea6ec0c7f1..6c8bcd38466 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
-import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan, Sort}
+import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan,
Project, Sort}
import org.apache.spark.sql.execution.{ColumnarToRowExec,
ExecSubqueryExpression, FileSourceScanExec, InputAdapter, ReusedSubqueryExec,
ScalarSubquery, SubqueryExec, WholeStageCodegenExec}
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper,
DisableAdaptiveExecution}
import org.apache.spark.sql.execution.datasources.FileScanRDD
@@ -1788,4 +1788,60 @@ class SubquerySuite extends QueryTest with
SharedSparkSession with AdaptiveSpark
}.getMessage.contains("Correlated column is not allowed in predicate"))
}
}
+
+ test("SPARK-39672: Fix removing project before filter with correlated
subquery") {
+ withTempView("v1", "v2") {
+ Seq((1, 2, 3), (4, 5, 6)).toDF("a", "b", "c").createTempView("v1")
+ Seq((1, 3, 5), (4, 5, 6)).toDF("a", "b", "c").createTempView("v2")
+
+ def findProject(df: DataFrame): Seq[Project] = {
+ df.queryExecution.optimizedPlan.collect {
+ case p: Project => p
+ }
+ }
+
+ // project before filter cannot be removed since subquery has
conflicting attributes
+ // with outer reference
+ val df1 = sql(
+ """
+ |select * from
+ |(
+ |select
+ |v1.a,
+ |v1.b,
+ |v2.c
+ |from v1
+ |inner join v2
+ |on v1.a=v2.a) t3
+ |where not exists (
+ | select 1
+ | from v2
+ | where t3.a=v2.a and t3.b=v2.b and t3.c=v2.c
+ |)
+ |""".stripMargin)
+ checkAnswer(df1, Row(1, 2, 5))
+ assert(findProject(df1).size == 4)
+
+ // project before filter can be removed when there are no conflicting
attributes
+ val df2 = sql(
+ """
+ |select * from
+ |(
+ |select
+ |v1.b,
+ |v2.c
+ |from v1
+ |inner join v2
+ |on v1.b=v2.c) t3
+ |where not exists (
+ | select 1
+ | from v2
+ | where t3.b=v2.b and t3.c=v2.c
+ |)
+ |""".stripMargin)
+
+ checkAnswer(df2, Row(5, 5))
+ assert(findProject(df2).size == 3)
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]