Repository: spark Updated Branches: refs/heads/branch-1.3 419865475 -> a15a0a02c
[SPARK-5839][SQL]HiveMetastoreCatalog does not recognize table names and aliases of data source tables. JIRA: https://issues.apache.org/jira/browse/SPARK-5839 Author: Yin Huai <[email protected]> Closes #4626 from yhuai/SPARK-5839 and squashes the following commits: f779d85 [Yin Huai] Use subqeury to wrap replaced ParquetRelation. 2695f13 [Yin Huai] Merge remote-tracking branch 'upstream/master' into SPARK-5839 f1ba6ca [Yin Huai] Address comment. 2c7fa08 [Yin Huai] Use Subqueries to wrap a data source table. (cherry picked from commit f3ff1eb2985ff3e1567645b898f6b42e4b01f237) Signed-off-by: Michael Armbrust <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a15a0a02 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a15a0a02 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a15a0a02 Branch: refs/heads/branch-1.3 Commit: a15a0a02c654cd70bb84f6d610e1c178c9cb98a6 Parents: 4198654 Author: Yin Huai <[email protected]> Authored: Mon Feb 16 15:54:01 2015 -0800 Committer: Michael Armbrust <[email protected]> Committed: Mon Feb 16 15:54:13 2015 -0800 ---------------------------------------------------------------------- .../spark/sql/parquet/ParquetQuerySuite.scala | 5 +-- .../spark/sql/hive/HiveMetastoreCatalog.scala | 18 +++++++++-- .../sql/hive/MetastoreDataSourcesSuite.scala | 34 ++++++++++++++++++++ 3 files changed, 53 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/a15a0a02/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala index 9318c15..8b4d05e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala @@ -30,9 +30,10 @@ import org.apache.spark.sql.test.TestSQLContext._ class ParquetQuerySuiteBase extends QueryTest with ParquetTest { val sqlContext = TestSQLContext - test("simple projection") { + test("simple select queries") { withParquetTable((0 until 10).map(i => (i, i.toString)), "t") { - checkAnswer(sql("SELECT _1 FROM t"), (0 until 10).map(Row.apply(_))) + checkAnswer(sql("SELECT _1 FROM t where t._1 > 5"), (6 until 10).map(Row.apply(_))) + checkAnswer(sql("SELECT _1 FROM t as tmp where tmp._1 < 5"), (0 until 5).map(Row.apply(_))) } } http://git-wip-us.apache.org/repos/asf/spark/blob/a15a0a02/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 72211fe..87bc9fe 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -160,7 +160,15 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with } if (table.getProperty("spark.sql.sources.provider") != null) { - cachedDataSourceTables(QualifiedTableName(databaseName, tblName).toLowerCase) + val dataSourceTable = + cachedDataSourceTables(QualifiedTableName(databaseName, tblName).toLowerCase) + // Then, if alias is specified, wrap the table with a Subquery using the alias. + // Othersie, wrap the table with a Subquery using the table name. + val withAlias = + alias.map(a => Subquery(a, dataSourceTable)).getOrElse( + Subquery(tableIdent.last, dataSourceTable)) + + withAlias } else if (table.isView) { // if the unresolved relation is from hive view // parse the text into logic node. @@ -433,7 +441,13 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with val attributedRewrites = AttributeMap(relation.output.zip(parquetRelation.output)) lastPlan.transformUp { - case r: MetastoreRelation if r == relation => parquetRelation + case r: MetastoreRelation if r == relation => { + val withAlias = + r.alias.map(a => Subquery(a, parquetRelation)).getOrElse( + Subquery(r.tableName, parquetRelation)) + + withAlias + } case other => other.transformExpressions { case a: Attribute if a.resolved => attributedRewrites.getOrElse(a, a) } http://git-wip-us.apache.org/repos/asf/spark/blob/a15a0a02/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index 375aae5..0263e3b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -401,6 +401,40 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { sql("DROP TABLE jsonTable").collect().foreach(println) } + test("SPARK-5839 HiveMetastoreCatalog does not recognize table aliases of data source tables.") { + val originalDefaultSource = conf.defaultDataSourceName + + val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}""")) + val df = jsonRDD(rdd) + + conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "org.apache.spark.sql.json") + // Save the df as a managed table (by not specifiying the path). + df.saveAsTable("savedJsonTable") + + checkAnswer( + sql("SELECT * FROM savedJsonTable where savedJsonTable.a < 5"), + (1 to 4).map(i => Row(i, s"str${i}"))) + + checkAnswer( + sql("SELECT * FROM savedJsonTable tmp where tmp.a > 5"), + (6 to 10).map(i => Row(i, s"str${i}"))) + + invalidateTable("savedJsonTable") + + checkAnswer( + sql("SELECT * FROM savedJsonTable where savedJsonTable.a < 5"), + (1 to 4).map(i => Row(i, s"str${i}"))) + + checkAnswer( + sql("SELECT * FROM savedJsonTable tmp where tmp.a > 5"), + (6 to 10).map(i => Row(i, s"str${i}"))) + + // Drop table will also delete the data. + sql("DROP TABLE savedJsonTable") + + conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, originalDefaultSource) + } + test("save table") { val originalDefaultSource = conf.defaultDataSourceName --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
