Repository: spark
Updated Branches:
refs/heads/branch-1.6 1026aba16 -> c25aa8fca
[SPARK-16329][SQL][BACKPORT-1.6] Star Expansion over Table Containing No Column
#14040
#### What changes were proposed in this pull request?
Star expansion over a table containing zero column does not work since 1.6.
However, it works in Spark 1.5.1. This PR is to fix the issue in the master
branch.
For example,
```scala
val rddNoCols = sqlContext.sparkContext.parallelize(1 to 10).map(_ => Row.empty)
val dfNoCols = sqlContext.createDataFrame(rddNoCols, StructType(Seq.empty))
dfNoCols.registerTempTable("temp_table_no_cols")
sqlContext.sql("select * from temp_table_no_cols").show
```
Without the fix, users will get the following the exception:
```
java.lang.IllegalArgumentException: requirement failed
at scala.Predef$.require(Predef.scala:221)
at
org.apache.spark.sql.catalyst.analysis.UnresolvedStar.expand(unresolved.scala:199)
```
#### How was this patch tested?
Tests are added
Author: gatorsmile <[email protected]>
Closes #14042 from gatorsmile/starExpansionEmpty.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c25aa8fc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c25aa8fc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c25aa8fc
Branch: refs/heads/branch-1.6
Commit: c25aa8fca64a1a83e909a8f9baddb7b2a3fdaec5
Parents: 1026aba
Author: gatorsmile <[email protected]>
Authored: Tue Jul 5 00:40:08 2016 +0800
Committer: Wenchen Fan <[email protected]>
Committed: Tue Jul 5 00:40:08 2016 +0800
----------------------------------------------------------------------
.../sql/catalyst/analysis/unresolved.scala | 15 ++++------
.../org/apache/spark/sql/SQLQuerySuite.scala | 31 ++++++++++++++++++++
2 files changed, 37 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/c25aa8fc/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
----------------------------------------------------------------------
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
index 4f89b46..390bd1f 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
@@ -180,23 +180,20 @@ abstract class Star extends LeafExpression with
NamedExpression {
case class UnresolvedStar(target: Option[Seq[String]]) extends Star with
Unevaluable {
override def expand(input: LogicalPlan, resolver: Resolver):
Seq[NamedExpression] = {
+ // If there is no table specified, use all input attributes.
+ if (target.isEmpty) return input.output
- // First try to expand assuming it is table.*.
- val expandedAttributes: Seq[Attribute] = target match {
- // If there is no table specified, use all input attributes.
- case None => input.output
- // If there is a table, pick out attributes that are part of this table.
- case Some(t) => if (t.size == 1) {
- input.output.filter(_.qualifiers.exists(resolver(_, t.head)))
+ val expandedAttributes =
+ if (target.get.size == 1) {
+ // If there is a table, pick out attributes that are part of this
table.
+ input.output.filter(_.qualifiers.exists(resolver(_, target.get.head)))
} else {
List()
}
- }
if (expandedAttributes.nonEmpty) return expandedAttributes
// Try to resolve it as a struct expansion. If there is a conflict and
both are possible,
// (i.e. [name].* is both a table and a struct), the struct path can
always be qualified.
- require(target.isDefined)
val attribute = input.resolve(target.get, resolver)
if (attribute.isDefined) {
// This target resolved to an attribute in child. It must be a struct.
Expand it.
http://git-wip-us.apache.org/repos/asf/spark/blob/c25aa8fc/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 2be8343..6fec580 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -1950,6 +1950,37 @@ class SQLQuerySuite extends QueryTest with
SharedSQLContext {
}
}
+ test("Star Expansion - table with zero column") {
+ withTempTable("temp_table_no_cols") {
+ val rddNoCols = sparkContext.parallelize(1 to 10).map(_ => Row.empty)
+ val dfNoCols = sqlContext.createDataFrame(rddNoCols,
StructType(Seq.empty))
+ dfNoCols.registerTempTable("temp_table_no_cols")
+
+ // ResolvedStar
+ checkAnswer(
+ dfNoCols,
+ dfNoCols.select(dfNoCols.col("*")))
+
+ // UnresolvedStar
+ checkAnswer(
+ dfNoCols,
+ sql("SELECT * FROM temp_table_no_cols"))
+ checkAnswer(
+ dfNoCols,
+ dfNoCols.select($"*"))
+
+ var e = intercept[AnalysisException] {
+ sql("SELECT a.* FROM temp_table_no_cols a")
+ }.getMessage
+ assert(e.contains("cannot resolve 'a.*' give input columns ''"))
+
+ e = intercept[AnalysisException] {
+ dfNoCols.select($"b.*")
+ }.getMessage
+ assert(e.contains("cannot resolve 'b.*' give input columns ''"))
+ }
+ }
+
test("Common subexpression elimination") {
// select from a table to prevent constant folding.
val df = sql("SELECT a, b from testData2 limit 1")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]