Repository: spark
Updated Branches:
  refs/heads/master 67e23b39a -> 425ff03f5


[SPARK-11436] [SQL] rebind right encoder when join 2 datasets

When we join 2 datasets, we will combine 2 encoders into a tupled one, and use 
it as the encoder for the jioned dataset. Assume both of the 2 encoders are 
flat, their `constructExpression`s both reference to the first element of input 
row. However, when we combine 2 encoders, the schema of input row changed,  now 
the right encoder should reference to second element of input row. So we should 
rebind right encoder to let it know the new schema of input row before combine 
it.

Author: Wenchen Fan <[email protected]>

Closes #9391 from cloud-fan/join and squashes the following commits:

846d3ab [Wenchen Fan] rebind right encoder when join 2 datasets


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/425ff03f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/425ff03f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/425ff03f

Branch: refs/heads/master
Commit: 425ff03f5ac4f3ddda1ba06656e620d5426f4209
Parents: 67e23b3
Author: Wenchen Fan <[email protected]>
Authored: Tue Nov 3 12:47:39 2015 +0100
Committer: Michael Armbrust <[email protected]>
Committed: Tue Nov 3 12:47:39 2015 +0100

----------------------------------------------------------------------
 sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala   | 4 +++-
 .../src/test/scala/org/apache/spark/sql/DatasetSuite.scala   | 8 ++++++++
 2 files changed, 11 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/425ff03f/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index e0ab5f5..ed98a25 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -390,7 +390,9 @@ class Dataset[T] private(
     val rightEncoder =
       if (other.encoder.flat) other.encoder else 
other.encoder.nested(rightData.toAttribute)
     implicit val tuple2Encoder: Encoder[(T, U)] =
-      ExpressionEncoder.tuple(leftEncoder, rightEncoder)
+      ExpressionEncoder.tuple(
+        leftEncoder,
+        rightEncoder.rebind(right.output, left.output ++ right.output))
 
     withPlan[(T, U)](other) { (left, right) =>
       Project(

http://git-wip-us.apache.org/repos/asf/spark/blob/425ff03f/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 993e6d2..95b8d05 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -214,4 +214,12 @@ class DatasetSuite extends QueryTest with SharedSQLContext 
{
       cogrouped,
       1 -> "a#", 2 -> "#q", 3 -> "abcfoo#w", 5 -> "hello#er")
   }
+
+  test("SPARK-11436: we should rebind right encoder when join 2 datasets") {
+    val ds1 = Seq("1", "2").toDS().as("a")
+    val ds2 = Seq(2, 3).toDS().as("b")
+
+    val joined = ds1.joinWith(ds2, $"a.value" === $"b.value")
+    checkAnswer(joined, ("2", 2))
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to