Repository: spark
Updated Branches:
  refs/heads/branch-2.0 be2d23dfd -> 6dae027a6


[SPARK-15112][SQL] Disables EmbedSerializerInFilter for plan fragments that 
change schema

## What changes were proposed in this pull request?

`EmbedSerializerInFilter` implicitly assumes that the plan fragment being 
optimized doesn't change plan schema, which is reasonable because 
`Dataset.filter` should never change the schema.

However, due to another issue involving `DeserializeToObject` and 
`SerializeFromObject`, typed filter *does* change plan schema (see 
[SPARK-15632][1]). This breaks `EmbedSerializerInFilter` and causes corrupted 
data.

This PR disables `EmbedSerializerInFilter` when there's a schema change to 
avoid data corruption. The schema change issue should be addressed in follow-up 
PRs.

## How was this patch tested?

New test case added in `DatasetSuite`.

[1]: https://issues.apache.org/jira/browse/SPARK-15632

Author: Cheng Lian <l...@databricks.com>

Closes #13362 from liancheng/spark-15112-corrupted-filter.

(cherry picked from commit 1360a6d636dd812a27955fc85df8e0255db60dfa)
Signed-off-by: Cheng Lian <l...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6dae027a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6dae027a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6dae027a

Branch: refs/heads/branch-2.0
Commit: 6dae027a6cdd7c862963f71e1ea08f7f1b4b3506
Parents: be2d23d
Author: Cheng Lian <l...@databricks.com>
Authored: Sun May 29 23:19:12 2016 -0700
Committer: Cheng Lian <l...@databricks.com>
Committed: Sun May 29 23:19:29 2016 -0700

----------------------------------------------------------------------
 .../sql/catalyst/optimizer/Optimizer.scala      | 21 +++++++++++++++++++-
 .../org/apache/spark/sql/DatasetSuite.scala     | 16 ++++++++++++++-
 2 files changed, 35 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6dae027a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 48d7009..688c77d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -1597,7 +1597,19 @@ case class GetCurrentDatabase(sessionCatalog: 
SessionCatalog) extends Rule[Logic
  */
 object EmbedSerializerInFilter extends Rule[LogicalPlan] {
   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
-    case s @ SerializeFromObject(_, Filter(condition, d: DeserializeToObject)) 
=>
+    case s @ SerializeFromObject(_, Filter(condition, d: DeserializeToObject))
+      // SPARK-15632: Conceptually, filter operator should never introduce 
schema change. This
+      // optimization rule also relies on this assumption. However, Dataset 
typed filter operator
+      // does introduce schema changes in some cases. Thus, we only enable 
this optimization when
+      //
+      //  1. either input and output schemata are exactly the same, or
+      //  2. both input and output schemata are single-field schema and share 
the same type.
+      //
+      // The 2nd case is included because encoders for primitive types always 
have only a single
+      // field with hard-coded field name "value".
+      // TODO Cleans this up after fixing SPARK-15632.
+      if s.schema == d.child.schema || samePrimitiveType(s.schema, 
d.child.schema) =>
+
       val numObjects = condition.collect {
         case a: Attribute if a == d.output.head => a
       }.length
@@ -1622,6 +1634,13 @@ object EmbedSerializerInFilter extends Rule[LogicalPlan] 
{
         Project(objAttrs, filter)
       }
   }
+
+  def samePrimitiveType(lhs: StructType, rhs: StructType): Boolean = {
+    (lhs, rhs) match {
+      case (StructType(Array(f1)), StructType(Array(f2))) => f1.dataType == 
f2.dataType
+      case _ => false
+    }
+  }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/6dae027a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index e395007..8fc4dc9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -706,7 +706,7 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
     val dataset = Seq(1, 2, 3).toDS()
     dataset.createOrReplaceTempView("tempView")
 
-    // Overrrides the existing temporary view with same name
+    // Overrides the existing temporary view with same name
     // No exception should be thrown here.
     dataset.createOrReplaceTempView("tempView")
 
@@ -769,6 +769,20 @@ class DatasetSuite extends QueryTest with SharedSQLContext 
{
 
     checkShowString(ds, expected)
   }
+
+  test(
+    "SPARK-15112: EmbedDeserializerInFilter should not optimize plan fragment 
that changes schema"
+  ) {
+    val ds = Seq(1 -> "foo", 2 -> "bar").toDF("b", "a").as[ClassData]
+
+    assertResult(Seq(ClassData("foo", 1), ClassData("bar", 2))) {
+      ds.collect().toSeq
+    }
+
+    assertResult(Seq(ClassData("bar", 2))) {
+      ds.filter(_.b > 1).collect().toSeq
+    }
+  }
 }
 
 case class Generic[T](id: T, value: Double)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to