Repository: spark
Updated Branches:
refs/heads/branch-2.0 5a835b99f -> d17db34e0
[SPARK-15620][SQL] Fix transformed dataset attributes revolve failure
## What changes were proposed in this pull request?
Join on transformed dataset has attributes conflicts, which make query
execution failure, for example:
```
val dataset = Seq(1, 2, 3).toDs
val mappedDs = dataset.map(_ + 1)
mappedDs.as("t1").joinWith(mappedDs.as("t2"), $"t1.value" ===
$"t2.value").show()
```
will throw exception:
```
org.apache.spark.sql.AnalysisException: cannot resolve '`t1.value`' given input
columns: [value];
at
org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:62)
at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:59)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:287)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:287)
```
## How was this patch tested?
Unit test.
Author: jerryshao <[email protected]>
Closes #13399 from jerryshao/SPARK-15620.
(cherry picked from commit 8288e16a5a5a12a45207c13a1341c707c6b4b940)
Signed-off-by: Wenchen Fan <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d17db34e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d17db34e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d17db34e
Branch: refs/heads/branch-2.0
Commit: d17db34e073311687d128a28fe95ba36a07394a1
Parents: 5a835b9
Author: jerryshao <[email protected]>
Authored: Wed Jun 1 21:58:05 2016 -0700
Committer: Wenchen Fan <[email protected]>
Committed: Wed Jun 1 21:58:14 2016 -0700
----------------------------------------------------------------------
.../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 4 ++++
.../test/scala/org/apache/spark/sql/DatasetSuite.scala | 10 ++++++++++
2 files changed, 14 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/d17db34e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index bf221e0..eb46c0e 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -524,6 +524,10 @@ class Analyzer(
val newVersion = oldVersion.newInstance()
(oldVersion, newVersion)
+ case oldVersion: SerializeFromObject
+ if oldVersion.outputSet.intersect(conflictingAttributes).nonEmpty
=>
+ (oldVersion, oldVersion.copy(serializer =
oldVersion.serializer.map(_.newInstance())))
+
// Handle projects that create conflicting aliases.
case oldVersion @ Project(projectList, _)
if
findAliases(projectList).intersect(conflictingAttributes).nonEmpty =>
http://git-wip-us.apache.org/repos/asf/spark/blob/d17db34e/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 0b6874e..a3881ff 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -769,6 +769,16 @@ class DatasetSuite extends QueryTest with SharedSQLContext
{
}
}
+ test("mapped dataset should resolve duplicated attributes for self join") {
+ val ds = Seq(1, 2, 3).toDS().map(_ + 1)
+ val ds1 = ds.as("d1")
+ val ds2 = ds.as("d2")
+
+ checkDataset(ds1.joinWith(ds2, $"d1.value" === $"d2.value"), (2, 2), (3,
3), (4, 4))
+ checkDataset(ds1.intersect(ds2), 2, 3, 4)
+ checkDataset(ds1.except(ds1))
+ }
+
test("SPARK-15441: Dataset outer join") {
val left = Seq(ClassData("a", 1), ClassData("b", 2)).toDS().as("left")
val right = Seq(ClassData("x", 2), ClassData("y", 3)).toDS().as("right")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]