Repository: spark
Updated Branches:
  refs/heads/branch-1.3 1fe677a36 -> c59871cca


[SPARK-6073][SQL] Need to refresh metastore cache after append data in 
CreateMetastoreDataSourceAsSelect

JIRA: https://issues.apache.org/jira/browse/SPARK-6073

liancheng

Author: Yin Huai <yh...@databricks.com>

Closes #4824 from yhuai/refreshCache and squashes the following commits:

b9542ef [Yin Huai] Refresh metadata cache in the Catalog in 
CreateMetastoreDataSourceAsSelect.

(cherry picked from commit 39a54b40aff66816f8b8f5c6133eaaad6eaecae1)
Signed-off-by: Cheng Lian <l...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c59871cc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c59871cc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c59871cc

Branch: refs/heads/branch-1.3
Commit: c59871cca5695402e13a89344bd12f5952704019
Parents: 1fe677a
Author: Yin Huai <yh...@databricks.com>
Authored: Mon Mar 2 22:42:18 2015 +0800
Committer: Cheng Lian <l...@databricks.com>
Committed: Mon Mar 2 22:44:05 2015 +0800

----------------------------------------------------------------------
 .../spark/sql/hive/execution/commands.scala     |  2 +
 .../sql/hive/MetastoreDataSourcesSuite.scala    | 52 ++++++++++++++++++++
 2 files changed, 54 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c59871cc/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
index 9934a5d..ffaef8e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
@@ -248,6 +248,8 @@ case class CreateMetastoreDataSourceAsSelect(
         isExternal)
     }
 
+    // Refresh the cache of the table in the catalog.
+    hiveContext.refreshTable(tableName)
     Seq.empty[Row]
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/c59871cc/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 00306f1..868c35f 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -612,4 +612,56 @@ class MetastoreDataSourcesSuite extends QueryTest with 
BeforeAndAfterEach {
     val actualSchema = table("wide_schema").schema
     assert(schema === actualSchema)
   }
+
+  test("insert into a table") {
+    def createDF(from: Int, to: Int): DataFrame =
+      createDataFrame((from to to).map(i => Tuple2(i, s"str$i"))).toDF("c1", 
"c2")
+
+    createDF(0, 9).saveAsTable("insertParquet", "parquet")
+    checkAnswer(
+      sql("SELECT p.c1, p.c2 FROM insertParquet p WHERE p.c1 > 5"),
+      (6 to 9).map(i => Row(i, s"str$i")))
+
+    intercept[AnalysisException] {
+      createDF(10, 19).saveAsTable("insertParquet", "parquet")
+    }
+
+    createDF(10, 19).saveAsTable("insertParquet", "parquet", SaveMode.Append)
+    checkAnswer(
+      sql("SELECT p.c1, p.c2 FROM insertParquet p WHERE p.c1 > 5"),
+      (6 to 19).map(i => Row(i, s"str$i")))
+
+    createDF(20, 29).saveAsTable("insertParquet", "parquet", SaveMode.Append)
+    checkAnswer(
+      sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 5 AND p.c1 < 25"),
+      (6 to 24).map(i => Row(i, s"str$i")))
+
+    intercept[AnalysisException] {
+      createDF(30, 39).saveAsTable("insertParquet")
+    }
+
+    createDF(30, 39).saveAsTable("insertParquet", SaveMode.Append)
+    checkAnswer(
+      sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 5 AND p.c1 < 35"),
+      (6 to 34).map(i => Row(i, s"str$i")))
+
+    createDF(40, 49).insertInto("insertParquet")
+    checkAnswer(
+      sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 5 AND p.c1 < 45"),
+      (6 to 44).map(i => Row(i, s"str$i")))
+
+    createDF(50, 59).saveAsTable("insertParquet", SaveMode.Overwrite)
+    checkAnswer(
+      sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 51 AND p.c1 < 
55"),
+      (52 to 54).map(i => Row(i, s"str$i")))
+    createDF(60, 69).saveAsTable("insertParquet", SaveMode.Ignore)
+    checkAnswer(
+      sql("SELECT p.c1, c2 FROM insertParquet p"),
+      (50 to 59).map(i => Row(i, s"str$i")))
+
+    createDF(70, 79).insertInto("insertParquet", overwrite = true)
+    checkAnswer(
+      sql("SELECT p.c1, c2 FROM insertParquet p"),
+      (70 to 79).map(i => Row(i, s"str$i")))
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to