Repository: spark Updated Branches: refs/heads/branch-1.3 1fe677a36 -> c59871cca
[SPARK-6073][SQL] Need to refresh metastore cache after append data in CreateMetastoreDataSourceAsSelect JIRA: https://issues.apache.org/jira/browse/SPARK-6073 liancheng Author: Yin Huai <yh...@databricks.com> Closes #4824 from yhuai/refreshCache and squashes the following commits: b9542ef [Yin Huai] Refresh metadata cache in the Catalog in CreateMetastoreDataSourceAsSelect. (cherry picked from commit 39a54b40aff66816f8b8f5c6133eaaad6eaecae1) Signed-off-by: Cheng Lian <l...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c59871cc Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c59871cc Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c59871cc Branch: refs/heads/branch-1.3 Commit: c59871cca5695402e13a89344bd12f5952704019 Parents: 1fe677a Author: Yin Huai <yh...@databricks.com> Authored: Mon Mar 2 22:42:18 2015 +0800 Committer: Cheng Lian <l...@databricks.com> Committed: Mon Mar 2 22:44:05 2015 +0800 ---------------------------------------------------------------------- .../spark/sql/hive/execution/commands.scala | 2 + .../sql/hive/MetastoreDataSourcesSuite.scala | 52 ++++++++++++++++++++ 2 files changed, 54 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/c59871cc/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala index 9934a5d..ffaef8e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala @@ -248,6 +248,8 @@ case class CreateMetastoreDataSourceAsSelect( isExternal) } + // Refresh the cache of the table in the catalog. + hiveContext.refreshTable(tableName) Seq.empty[Row] } } http://git-wip-us.apache.org/repos/asf/spark/blob/c59871cc/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index 00306f1..868c35f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -612,4 +612,56 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { val actualSchema = table("wide_schema").schema assert(schema === actualSchema) } + + test("insert into a table") { + def createDF(from: Int, to: Int): DataFrame = + createDataFrame((from to to).map(i => Tuple2(i, s"str$i"))).toDF("c1", "c2") + + createDF(0, 9).saveAsTable("insertParquet", "parquet") + checkAnswer( + sql("SELECT p.c1, p.c2 FROM insertParquet p WHERE p.c1 > 5"), + (6 to 9).map(i => Row(i, s"str$i"))) + + intercept[AnalysisException] { + createDF(10, 19).saveAsTable("insertParquet", "parquet") + } + + createDF(10, 19).saveAsTable("insertParquet", "parquet", SaveMode.Append) + checkAnswer( + sql("SELECT p.c1, p.c2 FROM insertParquet p WHERE p.c1 > 5"), + (6 to 19).map(i => Row(i, s"str$i"))) + + createDF(20, 29).saveAsTable("insertParquet", "parquet", SaveMode.Append) + checkAnswer( + sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 5 AND p.c1 < 25"), + (6 to 24).map(i => Row(i, s"str$i"))) + + intercept[AnalysisException] { + createDF(30, 39).saveAsTable("insertParquet") + } + + createDF(30, 39).saveAsTable("insertParquet", SaveMode.Append) + checkAnswer( + sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 5 AND p.c1 < 35"), + (6 to 34).map(i => Row(i, s"str$i"))) + + createDF(40, 49).insertInto("insertParquet") + checkAnswer( + sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 5 AND p.c1 < 45"), + (6 to 44).map(i => Row(i, s"str$i"))) + + createDF(50, 59).saveAsTable("insertParquet", SaveMode.Overwrite) + checkAnswer( + sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 51 AND p.c1 < 55"), + (52 to 54).map(i => Row(i, s"str$i"))) + createDF(60, 69).saveAsTable("insertParquet", SaveMode.Ignore) + checkAnswer( + sql("SELECT p.c1, c2 FROM insertParquet p"), + (50 to 59).map(i => Row(i, s"str$i"))) + + createDF(70, 79).insertInto("insertParquet", overwrite = true) + checkAnswer( + sql("SELECT p.c1, c2 FROM insertParquet p"), + (70 to 79).map(i => Row(i, s"str$i"))) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org