Repository: spark
Updated Branches:
refs/heads/branch-2.0 2ce240cfe -> 6cb24de99
[SPARK-16164][SQL] Update `CombineFilters` to try to construct predicates with
child predicate first
## What changes were proposed in this pull request?
This PR changes `CombineFilters` to compose the final predicate condition by
using (`child predicate` AND `parent predicate`) instead of (`parent predicate`
AND `child predicate`). This is a best effort approach. Some other optimization
rules may destroy this order by reorganizing conjunctive predicates.
**Reported Error Scenario**
Chris McCubbin reported a bug when he used StringIndexer in an ML pipeline with
additional filters. It seems that during filter pushdown, we changed the
ordering in the logical plan.
```scala
import org.apache.spark.ml.feature._
val df1 = (0 until 3).map(_.toString).toDF
val indexer = new StringIndexer()
.setInputCol("value")
.setOutputCol("idx")
.setHandleInvalid("skip")
.fit(df1)
val df2 = (0 until 5).map(_.toString).toDF
val predictions = indexer.transform(df2)
predictions.show() // this is okay
predictions.where('idx > 2).show() // this will throw an exception
```
Please see the notebook at
https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1233855/2159162931615821/588180/latest.html
for error messages.
## How was this patch tested?
Pass the Jenkins tests (including a new testcase).
Author: Dongjoon Hyun <[email protected]>
Closes #13872 from dongjoon-hyun/SPARK-16164.
(cherry picked from commit 91b1ef28d134313d7b6faaffa1c390f3ca4455d0)
Signed-off-by: Xiangrui Meng <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6cb24de9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6cb24de9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6cb24de9
Branch: refs/heads/branch-2.0
Commit: 6cb24de99e011ce97fb7d3513a2760b0d1a85a45
Parents: 2ce240c
Author: Dongjoon Hyun <[email protected]>
Authored: Thu Jun 23 15:27:43 2016 -0700
Committer: Xiangrui Meng <[email protected]>
Committed: Thu Jun 23 15:27:50 2016 -0700
----------------------------------------------------------------------
.../spark/sql/catalyst/optimizer/Optimizer.scala | 2 +-
.../catalyst/optimizer/FilterPushdownSuite.scala | 18 ++++++++++++++++++
2 files changed, 19 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/6cb24de9/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 6190f7a..6b10484 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -963,7 +963,7 @@ object CombineFilters extends Rule[LogicalPlan] with
PredicateHelper {
(ExpressionSet(splitConjunctivePredicates(fc)) --
ExpressionSet(splitConjunctivePredicates(nc))).reduceOption(And) match
{
case Some(ac) =>
- Filter(And(ac, nc), grandChild)
+ Filter(And(nc, ac), grandChild)
case None =>
nf
}
http://git-wip-us.apache.org/repos/asf/spark/blob/6cb24de9/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
----------------------------------------------------------------------
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index b8f28e8..9cb49e7 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
@@ -94,6 +94,24 @@ class FilterPushdownSuite extends PlanTest {
comparePlans(optimized, correctAnswer)
}
+ test("SPARK-16164: Filter pushdown should keep the ordering in the logical
plan") {
+ val originalQuery =
+ testRelation
+ .where('a === 1)
+ .select('a, 'b)
+ .where('b === 1)
+
+ val optimized = Optimize.execute(originalQuery.analyze)
+ val correctAnswer =
+ testRelation
+ .where('a === 1 && 'b === 1)
+ .select('a, 'b)
+ .analyze
+
+ // We can not use comparePlans here because it normalized the plan.
+ assert(optimized == correctAnswer)
+ }
+
test("can't push without rewrite") {
val originalQuery =
testRelation
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]