Repository: spark
Updated Branches:
refs/heads/master 54032682b -> 928845a42
[SPARK-24172][SQL] we should not apply operator pushdown to data source v2 many
times
## What changes were proposed in this pull request?
In `PushDownOperatorsToDataSource`, we use `transformUp` to match
`PhysicalOperation` and apply pushdown. This is problematic if we have multiple
`Filter` and `Project` above the data source v2 relation.
e.g. for a query
```
Project
Filter
DataSourceV2Relation
```
The pattern match will be triggered twice and we will do operator pushdown
twice. This is unnecessary, we can use `mapChildren` to only apply pushdown
once.
## How was this patch tested?
existing test
Author: Wenchen Fan <[email protected]>
Closes #21230 from cloud-fan/step2.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/928845a4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/928845a4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/928845a4
Branch: refs/heads/master
Commit: 928845a42230a2c0a318011002a54ad871468b2e
Parents: 5403268
Author: Wenchen Fan <[email protected]>
Authored: Fri May 11 10:00:28 2018 -0700
Committer: gatorsmile <[email protected]>
Committed: Fri May 11 10:00:28 2018 -0700
----------------------------------------------------------------------
.../v2/PushDownOperatorsToDataSource.scala | 15 +++++----------
1 file changed, 5 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/928845a4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala
index 9293d4f..e894f8a 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala
@@ -23,17 +23,10 @@ import org.apache.spark.sql.catalyst.plans.logical.{Filter,
LogicalPlan, Project
import org.apache.spark.sql.catalyst.rules.Rule
object PushDownOperatorsToDataSource extends Rule[LogicalPlan] {
- override def apply(
- plan: LogicalPlan): LogicalPlan = plan transformUp {
+ override def apply(plan: LogicalPlan): LogicalPlan = plan match {
// PhysicalOperation guarantees that filters are deterministic; no need to
check
- case PhysicalOperation(project, newFilters, relation :
DataSourceV2Relation) =>
- // merge the filters
- val filters = relation.filters match {
- case Some(existing) =>
- existing ++ newFilters
- case _ =>
- newFilters
- }
+ case PhysicalOperation(project, filters, relation: DataSourceV2Relation) =>
+ assert(relation.filters.isEmpty, "data source v2 should do push down
only once.")
val projectAttrs = project.map(_.toAttribute)
val projectSet = AttributeSet(project.flatMap(_.references))
@@ -67,5 +60,7 @@ object PushDownOperatorsToDataSource extends
Rule[LogicalPlan] {
} else {
filtered
}
+
+ case other => other.mapChildren(apply)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]