Repository: spark Updated Branches: refs/heads/master 67e23b39a -> 425ff03f5
[SPARK-11436] [SQL] rebind right encoder when join 2 datasets When we join 2 datasets, we will combine 2 encoders into a tupled one, and use it as the encoder for the jioned dataset. Assume both of the 2 encoders are flat, their `constructExpression`s both reference to the first element of input row. However, when we combine 2 encoders, the schema of input row changed, now the right encoder should reference to second element of input row. So we should rebind right encoder to let it know the new schema of input row before combine it. Author: Wenchen Fan <[email protected]> Closes #9391 from cloud-fan/join and squashes the following commits: 846d3ab [Wenchen Fan] rebind right encoder when join 2 datasets Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/425ff03f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/425ff03f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/425ff03f Branch: refs/heads/master Commit: 425ff03f5ac4f3ddda1ba06656e620d5426f4209 Parents: 67e23b3 Author: Wenchen Fan <[email protected]> Authored: Tue Nov 3 12:47:39 2015 +0100 Committer: Michael Armbrust <[email protected]> Committed: Tue Nov 3 12:47:39 2015 +0100 ---------------------------------------------------------------------- sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala | 4 +++- .../src/test/scala/org/apache/spark/sql/DatasetSuite.scala | 8 ++++++++ 2 files changed, 11 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/425ff03f/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index e0ab5f5..ed98a25 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -390,7 +390,9 @@ class Dataset[T] private( val rightEncoder = if (other.encoder.flat) other.encoder else other.encoder.nested(rightData.toAttribute) implicit val tuple2Encoder: Encoder[(T, U)] = - ExpressionEncoder.tuple(leftEncoder, rightEncoder) + ExpressionEncoder.tuple( + leftEncoder, + rightEncoder.rebind(right.output, left.output ++ right.output)) withPlan[(T, U)](other) { (left, right) => Project( http://git-wip-us.apache.org/repos/asf/spark/blob/425ff03f/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 993e6d2..95b8d05 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -214,4 +214,12 @@ class DatasetSuite extends QueryTest with SharedSQLContext { cogrouped, 1 -> "a#", 2 -> "#q", 3 -> "abcfoo#w", 5 -> "hello#er") } + + test("SPARK-11436: we should rebind right encoder when join 2 datasets") { + val ds1 = Seq("1", "2").toDS().as("a") + val ds2 = Seq(2, 3).toDS().as("b") + + val joined = ds1.joinWith(ds2, $"a.value" === $"b.value") + checkAnswer(joined, ("2", 2)) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
