Repository: spark
Updated Branches:
  refs/heads/branch-2.0 d0707c6ba -> 3276ccfac


[SPARK-16686][SQL] Remove PushProjectThroughSample since it is handled by 
ColumnPruning

We push down `Project` through `Sample` in `Optimizer` by the rule 
`PushProjectThroughSample`. However, if the projected columns produce new 
output, they will encounter whole data instead of sampled data. It will bring 
some inconsistency between original plan (Sample then Project) and optimized 
plan (Project then Sample). In the extreme case such as attached in the JIRA, 
if the projected column is an UDF which is supposed to not see the sampled out 
data, the result of UDF will be incorrect.

Since the rule `ColumnPruning` already handles general `Project` pushdown. We 
don't need  `PushProjectThroughSample` anymore. The rule `ColumnPruning` also 
avoids the described issue.

Jenkins tests.

Author: Liang-Chi Hsieh <[email protected]>

Closes #14327 from viirya/fix-sample-pushdown.

(cherry picked from commit 7b06a8948fc16d3c14e240fdd632b79ce1651008)
Signed-off-by: Reynold Xin <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3276ccfa
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3276ccfa
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3276ccfa

Branch: refs/heads/branch-2.0
Commit: 3276ccfac807514d5a959415bcf58d2aa6ed8fbc
Parents: d0707c6
Author: Liang-Chi Hsieh <[email protected]>
Authored: Tue Jul 26 12:00:01 2016 +0800
Committer: Reynold Xin <[email protected]>
Committed: Fri Aug 19 11:18:55 2016 -0700

----------------------------------------------------------------------
 .../sql/catalyst/optimizer/Optimizer.scala      | 12 ----------
 .../catalyst/optimizer/ColumnPruningSuite.scala | 15 ++++++++++++
 .../optimizer/FilterPushdownSuite.scala         | 17 -------------
 .../org/apache/spark/sql/DatasetSuite.scala     | 25 ++++++++++++++++++++
 4 files changed, 40 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3276ccfa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 19d3c39..88cc0e4 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -75,7 +75,6 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, 
conf: CatalystConf)
     Batch("Operator Optimizations", fixedPoint,
       // Operator push down
       PushThroughSetOperations,
-      PushProjectThroughSample,
       ReorderJoin,
       EliminateOuterJoin,
       PushPredicateThroughJoin,
@@ -147,17 +146,6 @@ class SimpleTestOptimizer extends Optimizer(
   new SimpleCatalystConf(caseSensitiveAnalysis = true))
 
 /**
- * Pushes projects down beneath Sample to enable column pruning with sampling.
- */
-object PushProjectThroughSample extends Rule[LogicalPlan] {
-  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
-    // Push down projection into sample
-    case Project(projectList, Sample(lb, up, replace, seed, child)) =>
-      Sample(lb, up, replace, seed, Project(projectList, child))()
-  }
-}
-
-/**
  * Removes the Project only conducting Alias of its child node.
  * It is created mainly for removing extra Project added in 
EliminateSerialization rule,
  * but can also benefit other operators.

http://git-wip-us.apache.org/repos/asf/spark/blob/3276ccfa/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala
index b5664a5..589607e 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala
@@ -346,5 +346,20 @@ class ColumnPruningSuite extends PlanTest {
     comparePlans(Optimize.execute(plan1.analyze), correctAnswer1)
   }
 
+  test("push project down into sample") {
+    val testRelation = LocalRelation('a.int, 'b.int, 'c.int)
+    val x = testRelation.subquery('x)
+
+    val query1 = Sample(0.0, 0.6, false, 11L, x)().select('a)
+    val optimized1 = Optimize.execute(query1.analyze)
+    val expected1 = Sample(0.0, 0.6, false, 11L, x.select('a))()
+    comparePlans(optimized1, expected1.analyze)
+
+    val query2 = Sample(0.0, 0.6, false, 11L, x)().select('a as 'aa)
+    val optimized2 = Optimize.execute(query2.analyze)
+    val expected2 = Sample(0.0, 0.6, false, 11L, x.select('a))().select('a as 
'aa)
+    comparePlans(optimized2, expected2.analyze)
+  }
+
   // todo: add more tests for column pruning
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/3276ccfa/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index 0e5c2acd..1dcabf9 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
@@ -34,7 +34,6 @@ class FilterPushdownSuite extends PlanTest {
       Batch("Subqueries", Once,
         EliminateSubqueryAliases) ::
       Batch("Filter Pushdown", FixedPoint(10),
-        PushProjectThroughSample,
         CombineFilters,
         PushDownPredicate,
         BooleanSimplification,
@@ -591,22 +590,6 @@ class FilterPushdownSuite extends PlanTest {
     comparePlans(optimized, originalQuery)
   }
 
-  test("push project and filter down into sample") {
-    val x = testRelation.subquery('x)
-    val originalQuery =
-      Sample(0.0, 0.6, false, 11L, x)().select('a)
-
-    val originalQueryAnalyzed =
-      EliminateSubqueryAliases(analysis.SimpleAnalyzer.execute(originalQuery))
-
-    val optimized = Optimize.execute(originalQueryAnalyzed)
-
-    val correctAnswer =
-      Sample(0.0, 0.6, false, 11L, x.select('a))()
-
-    comparePlans(optimized, correctAnswer.analyze)
-  }
-
   test("aggregate: push down filter when filter on group by expression") {
     val originalQuery = testRelation
                         .groupBy('a)('a, count('b) as 'c)

http://git-wip-us.apache.org/repos/asf/spark/blob/3276ccfa/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 9aeeda4..f897cfb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -422,6 +422,31 @@ class DatasetSuite extends QueryTest with SharedSQLContext 
{
       3, 17, 27, 58, 62)
   }
 
+  test("SPARK-16686: Dataset.sample with seed results shouldn't depend on 
downstream usage") {
+    val simpleUdf = udf((n: Int) => {
+      require(n != 1, "simpleUdf shouldn't see id=1!")
+      1
+    })
+
+    val df = Seq(
+      (0, "string0"),
+      (1, "string1"),
+      (2, "string2"),
+      (3, "string3"),
+      (4, "string4"),
+      (5, "string5"),
+      (6, "string6"),
+      (7, "string7"),
+      (8, "string8"),
+      (9, "string9")
+    ).toDF("id", "stringData")
+    val sampleDF = df.sample(false, 0.7, 50)
+    // After sampling, sampleDF doesn't contain id=1.
+    assert(!sampleDF.select("id").collect.contains(1))
+    // simpleUdf should not encounter id=1.
+    checkAnswer(sampleDF.select(simpleUdf($"id")), 
List.fill(sampleDF.count.toInt)(Row(1)))
+  }
+
   test("SPARK-11436: we should rebind right encoder when join 2 datasets") {
     val ds1 = Seq("1", "2").toDS().as("a")
     val ds2 = Seq(2, 3).toDS().as("b")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to