This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new d8d3e87 [SPARK-32509][SQL] Ignore unused DPP True Filter in
Canonicalization
d8d3e87 is described below
commit d8d3e870a03f7695eb8fdbaaafa13d7df756801a
Author: Prakhar Jain <[email protected]>
AuthorDate: Mon Aug 3 03:26:03 2020 +0000
[SPARK-32509][SQL] Ignore unused DPP True Filter in Canonicalization
### What changes were proposed in this pull request?
This PR fixes issues relate to Canonicalization of FileSourceScanExec when
it contains unused DPP Filter.
### Why are the changes needed?
As part of PlanDynamicPruningFilter rule, the unused DPP Filter are simply
replaced by `DynamicPruningExpression(TrueLiteral)` so that they can be
avoided. But these unnecessary`DynamicPruningExpression(TrueLiteral)` partition
filter inside the FileSourceScanExec affects the canonicalization of the node
and so in many cases, this can prevent ReuseExchange from happening.
This PR fixes this issue by ignoring the unused DPP filter in the `def
doCanonicalize` method.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added UT.
Closes #29318 from prakharjain09/SPARK-32509_df_reuse.
Authored-by: Prakhar Jain <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 7a09e71198a094250f04e0f82f0c7c9860169540)
Signed-off-by: Wenchen Fan <[email protected]>
---
.../spark/sql/execution/DataSourceScanExec.scala | 10 ++++++++-
.../spark/sql/DynamicPartitionPruningSuite.scala | 24 ++++++++++++++++++++++
2 files changed, 33 insertions(+), 1 deletion(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 90a3f97..7f534d5 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -589,12 +589,20 @@ case class FileSourceScanExec(
new FileScanRDD(fsRelation.sparkSession, readFile, partitions)
}
+ // Filters unused DynamicPruningExpression expressions - one which has been
replaced
+ // with DynamicPruningExpression(Literal.TrueLiteral) during Physical
Planning
+ private def filterUnusedDynamicPruningExpressions(
+ predicates: Seq[Expression]): Seq[Expression] = {
+ predicates.filterNot(_ == DynamicPruningExpression(Literal.TrueLiteral))
+ }
+
override def doCanonicalize(): FileSourceScanExec = {
FileSourceScanExec(
relation,
output.map(QueryPlan.normalizeExpressions(_, output)),
requiredSchema,
- QueryPlan.normalizePredicates(partitionFilters, output),
+ QueryPlan.normalizePredicates(
+ filterUnusedDynamicPruningExpressions(partitionFilters), output),
optionalBucketSet,
QueryPlan.normalizePredicates(dataFilters, output),
None)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
index cdf9ea4..1fbc4a6 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
@@ -1240,6 +1240,30 @@ abstract class DynamicPartitionPruningSuiteBase
}
}
+ test("SPARK-32509: Unused Dynamic Pruning filter shouldn't affect " +
+ "canonicalization and exchange reuse") {
+ withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key ->
"true") {
+ withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+ val df = sql(
+ """ WITH view1 as (
+ | SELECT f.store_id FROM fact_stats f WHERE f.units_sold = 70
+ | )
+ |
+ | SELECT * FROM view1 v1 join view1 v2 WHERE v1.store_id =
v2.store_id
+ """.stripMargin)
+
+ checkPartitionPruningPredicate(df, false, false)
+ val reuseExchangeNodes = df.queryExecution.executedPlan.collect {
+ case se: ReusedExchangeExec => se
+ }
+ assert(reuseExchangeNodes.size == 1, "Expected plan to contain 1
ReusedExchangeExec " +
+ s"nodes. Found ${reuseExchangeNodes.size}")
+
+ checkAnswer(df, Row(15, 15) :: Nil)
+ }
+ }
+ }
+
test("Plan broadcast pruning only when the broadcast can be reused") {
Given("dynamic pruning filter on the build side")
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key ->
"true") {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]