Repository: spark Updated Branches: refs/heads/branch-2.0 3d5878751 -> e21e9d416
[SPARK-17123][SQL][BRANCH-2.0] Use type-widened encoder for DataFrame for set operations ## What changes were proposed in this pull request? This PR backports https://github.com/apache/spark/pull/15072 Please note that the test code is a bit different with the master as https://github.com/apache/spark/pull/14786 was only merged into master and therefore, it does not support type-widening between `DateType` and `TimestampType`. So, both types were taken out from the test. ## How was this patch tested? Unit test in `DataFrameSuite`. Author: hyukjinkwon <[email protected]> Closes #15601 from HyukjinKwon/backport-17123. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e21e9d41 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e21e9d41 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e21e9d41 Branch: refs/heads/branch-2.0 Commit: e21e9d4162cc798d9ec43ef984d17b89dab77826 Parents: 3d58787 Author: hyukjinkwon <[email protected]> Authored: Sun Oct 23 14:00:35 2016 +0200 Committer: Herman van Hovell <[email protected]> Committed: Sun Oct 23 14:00:35 2016 +0200 ---------------------------------------------------------------------- .../main/scala/org/apache/spark/sql/Dataset.scala | 16 +++++++++++++--- .../scala/org/apache/spark/sql/DataFrameSuite.scala | 16 ++++++++++++++++ 2 files changed, 29 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/e21e9d41/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 0b236a0..4946bbe 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -1456,7 +1456,7 @@ class Dataset[T] private[sql]( * @group typedrel * @since 2.0.0 */ - def union(other: Dataset[T]): Dataset[T] = withTypedPlan { + def union(other: Dataset[T]): Dataset[T] = withSetOperator { // This breaks caching, but it's usually ok because it addresses a very specific use case: // using union to union many files or partitions. CombineUnions(Union(logicalPlan, other.logicalPlan)) @@ -1472,7 +1472,7 @@ class Dataset[T] private[sql]( * @group typedrel * @since 1.6.0 */ - def intersect(other: Dataset[T]): Dataset[T] = withTypedPlan { + def intersect(other: Dataset[T]): Dataset[T] = withSetOperator { Intersect(logicalPlan, other.logicalPlan) } @@ -1486,7 +1486,7 @@ class Dataset[T] private[sql]( * @group typedrel * @since 2.0.0 */ - def except(other: Dataset[T]): Dataset[T] = withTypedPlan { + def except(other: Dataset[T]): Dataset[T] = withSetOperator { Except(logicalPlan, other.logicalPlan) } @@ -2607,4 +2607,14 @@ class Dataset[T] private[sql]( @inline private def withTypedPlan[U : Encoder](logicalPlan: => LogicalPlan): Dataset[U] = { Dataset(sparkSession, logicalPlan) } + + /** A convenient function to wrap a set based logical plan and produce a Dataset. */ + @inline private def withSetOperator[U : Encoder](logicalPlan: => LogicalPlan): Dataset[U] = { + if (classTag.runtimeClass.isAssignableFrom(classOf[Row])) { + // Set operators widen types (change the schema), so we cannot reuse the row encoder. + Dataset.ofRows(sparkSession, logicalPlan).asInstanceOf[Dataset[U]] + } else { + Dataset(sparkSession, logicalPlan) + } + } } http://git-wip-us.apache.org/repos/asf/spark/blob/e21e9d41/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 7ab0fe0..f8d7ddd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql import java.io.File import java.nio.charset.StandardCharsets +import java.sql.{Date, Timestamp} import java.util.UUID import scala.language.postfixOps @@ -1585,4 +1586,19 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { } } } + + test("SPARK-17123: Performing set operations that combine non-scala native types") { + val dates = Seq( + (BigDecimal.valueOf(1), new Timestamp(2)), + (BigDecimal.valueOf(4), new Timestamp(5)) + ).toDF("decimal", "timestamp") + + val widenTypedRows = Seq( + (10.5D, "string") + ).toDF("decimal", "timestamp") + + dates.union(widenTypedRows).collect() + dates.except(widenTypedRows).collect() + dates.intersect(widenTypedRows).collect() + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
