Repository: spark
Updated Branches:
  refs/heads/branch-2.0 40d24686a -> bf53b96b5


[SPARK-15173][SQL] DataFrameWriter.insertInto should work with datasource table 
stored in hive

When we parse `CREATE TABLE USING`, we should build a `CreateTableUsing` plan 
with the `managedIfNoPath` set to true. Then we will add default table path to 
options when write it to hive.

new test in `SQLQuerySuite`

Author: Wenchen Fan <[email protected]>

Closes #12949 from cloud-fan/bug.

(cherry picked from commit 2adb11f6db591a7d8199e42dd23c7fb23ef5df3b)
Signed-off-by: Yin Huai <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bf53b96b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bf53b96b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bf53b96b

Branch: refs/heads/branch-2.0
Commit: bf53b96b515c99f67f82dbd8dcbc45788a199c46
Parents: 40d2468
Author: Wenchen Fan <[email protected]>
Authored: Mon May 9 12:54:45 2016 -0700
Committer: Yin Huai <[email protected]>
Committed: Mon May 9 12:59:46 2016 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/execution/SparkSqlParser.scala  |  2 +-
 .../execution/command/createDataSourceTables.scala   |  5 +++--
 .../spark/sql/execution/datasources/DataSource.scala | 15 +++++++++++----
 .../spark/sql/hive/execution/SQLQuerySuite.scala     |  8 ++++++++
 4 files changed, 23 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bf53b96b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index 146e036b..a85ac16 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -307,7 +307,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
         table, provider, temp, partitionColumnNames, bucketSpec, mode, 
options, query)
     } else {
       val struct = Option(ctx.colTypeList()).map(createStructType)
-      CreateTableUsing(table, struct, provider, temp, options, ifNotExists, 
managedIfNoPath = false)
+      CreateTableUsing(table, struct, provider, temp, options, ifNotExists, 
managedIfNoPath = true)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/bf53b96b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index e07ab99..16d6115 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -97,7 +97,7 @@ case class CreateDataSourceTableCommand(
       userSpecifiedSchema = userSpecifiedSchema,
       className = provider,
       bucketSpec = None,
-      options = optionsWithPath).resolveRelation()
+      options = optionsWithPath).resolveRelation(checkPathExist = false)
 
     CreateDataSourceTableUtils.createDataSourceTable(
       sparkSession = sparkSession,
@@ -382,7 +382,8 @@ object CreateDataSourceTableUtils extends Logging {
     // TODO: Support persisting partitioned data source relations in Hive 
compatible format
     val qualifiedTableName = tableIdent.quotedString
     val skipHiveMetadata = options.getOrElse("skipHiveMetadata", 
"false").toBoolean
-    val (hiveCompatibleTable, logMessage) = (maybeSerDe, 
dataSource.resolveRelation()) match {
+    val resolvedRelation = dataSource.resolveRelation(checkPathExist = false)
+    val (hiveCompatibleTable, logMessage) = (maybeSerDe, resolvedRelation) 
match {
       case _ if skipHiveMetadata =>
         val message =
           s"Persisting partitioned data source relation $qualifiedTableName 
into " +

http://git-wip-us.apache.org/repos/asf/spark/blob/bf53b96b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 618ea3d..0342ec5 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -239,8 +239,15 @@ case class DataSource(
     }
   }
 
-  /** Create a resolved [[BaseRelation]] that can be used to read data from 
this [[DataSource]] */
-  def resolveRelation(): BaseRelation = {
+  /**
+   * Create a resolved [[BaseRelation]] that can be used to read data from or 
write data into this
+   * [[DataSource]]
+   *
+   * @param checkPathExist A flag to indicate whether to check the existence 
of path or not.
+   *                       This flag will be set to false when we create an 
empty table (the
+   *                       path of the table does not exist).
+   */
+  def resolveRelation(checkPathExist: Boolean = true): BaseRelation = {
     val caseInsensitiveOptions = new CaseInsensitiveMap(options)
     val relation = (providingClass.newInstance(), userSpecifiedSchema) match {
       // TODO: Throw when too much is given.
@@ -288,11 +295,11 @@ case class DataSource(
           val qualified = hdfsPath.makeQualified(fs.getUri, 
fs.getWorkingDirectory)
           val globPath = SparkHadoopUtil.get.globPathIfNecessary(qualified)
 
-          if (globPath.isEmpty) {
+          if (checkPathExist && globPath.isEmpty) {
             throw new AnalysisException(s"Path does not exist: $qualified")
           }
           // Sufficient to check head of the globPath seq for non-glob scenario
-          if (!fs.exists(globPath.head)) {
+          if (checkPathExist && !fs.exists(globPath.head)) {
             throw new AnalysisException(s"Path does not exist: 
${globPath.head}")
           }
           globPath

http://git-wip-us.apache.org/repos/asf/spark/blob/bf53b96b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 4845da7..1d597fe 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -1553,4 +1553,12 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils 
with TestHiveSingleton {
       assert(cause.getMessage.contains("Column ordering must be ASC, was 
'DESC'"))
     }
   }
+
+  test("insert into datasource table") {
+    withTable("tbl") {
+      sql("CREATE TABLE tbl(i INT, j STRING) USING parquet")
+      Seq(1 -> "a").toDF("i", "j").write.mode("overwrite").insertInto("tbl")
+      checkAnswer(sql("SELECT * FROM tbl"), Row(1, "a"))
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to