Repository: spark
Updated Branches:
  refs/heads/branch-2.0 44052a707 -> e033fd50f


[SPARK-15269][SQL] Removes unexpected empty table directories created while 
creating external Spark SQL data sourcet tables.

This PR is an alternative to #13120 authored by xwu0226.

## What changes were proposed in this pull request?

When creating an external Spark SQL data source table and persisting its 
metadata to Hive metastore, we don't use the standard Hive `Table.dataLocation` 
field because Hive only allows directory paths as data locations while Spark 
SQL also allows file paths. However, if we don't set `Table.dataLocation`, Hive 
always creates an unexpected empty table directory under database location, but 
doesn't remove it while dropping the table (because the table is external).

This PR works around this issue by explicitly setting `Table.dataLocation` and 
then manullay removing the created directory after creating the external table.

Please refer to [this JIRA comment][1] for more details about why we chose this 
approach as a workaround.

[1]: 
https://issues.apache.org/jira/browse/SPARK-15269?focusedCommentId=15297408&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15297408

## How was this patch tested?

1. A new test case is added in `HiveQuerySuite` for this case
2. Updated `ShowCreateTableSuite` to use the same table name in all test cases. 
(This is how I hit this issue at the first place.)

Author: Cheng Lian <l...@databricks.com>

Closes #13270 from liancheng/spark-15269-unpleasant-fix.

(cherry picked from commit 7bb64aae27f670531699f59d3f410e38866609b7)
Signed-off-by: Cheng Lian <l...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e033fd50
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e033fd50
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e033fd50

Branch: refs/heads/branch-2.0
Commit: e033fd50f0fcefb2a6cffb763ff7e026b0066c07
Parents: 44052a7
Author: Cheng Lian <l...@databricks.com>
Authored: Wed Jun 1 16:02:27 2016 -0700
Committer: Cheng Lian <l...@databricks.com>
Committed: Wed Jun 1 16:02:37 2016 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/AnalysisException.scala    |  5 ++-
 .../sql/catalyst/catalog/SessionCatalog.scala   |  2 +-
 .../org/apache/spark/sql/SparkSession.scala     |  6 +--
 .../command/createDataSourceTables.scala        |  4 +-
 .../apache/spark/sql/internal/HiveSerDe.scala   |  1 -
 .../spark/sql/hive/HiveExternalCatalog.scala    | 45 ++++++++++++++++++--
 .../apache/spark/sql/hive/HiveSharedState.scala |  4 +-
 .../spark/sql/hive/client/HiveClientImpl.scala  | 22 ++++++++--
 .../sql/hive/HiveExternalCatalogSuite.scala     |  3 +-
 .../sql/hive/MetastoreDataSourcesSuite.scala    | 13 ++++++
 .../spark/sql/hive/ShowCreateTableSuite.scala   | 36 ++++++++--------
 11 files changed, 105 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e033fd50/sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala
index d2003fd..6911843 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala
@@ -32,8 +32,9 @@ class AnalysisException protected[sql] (
     val message: String,
     val line: Option[Int] = None,
     val startPosition: Option[Int] = None,
-    val plan: Option[LogicalPlan] = None)
-  extends Exception with Serializable {
+    val plan: Option[LogicalPlan] = None,
+    val cause: Option[Throwable] = None)
+  extends Exception(message, cause.orNull) with Serializable {
 
   def withPosition(line: Option[Int], startPosition: Option[Int]): 
AnalysisException = {
     val newException = new AnalysisException(message, line, startPosition)

http://git-wip-us.apache.org/repos/asf/spark/blob/e033fd50/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index cf9286e..371c198 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -22,7 +22,7 @@ import javax.annotation.concurrent.GuardedBy
 import scala.collection.mutable
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.{FileSystem, Path}
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.AnalysisException

http://git-wip-us.apache.org/repos/asf/spark/blob/e033fd50/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 52bedf9..7d7fd03 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -557,9 +557,9 @@ class SparkSession private(
   }
 
 
-  /* ------------------------ *
-   |  Catalog-related methods |
-   * ----------------- ------ */
+  /* ------------------------- *
+   |  Catalog-related methods  |
+   * ------------------------- */
 
   /**
    * Interface through which the user may create, drop, alter or query 
underlying

http://git-wip-us.apache.org/repos/asf/spark/blob/e033fd50/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index 4b9aab6..9956c5b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -118,8 +118,8 @@ case class CreateDataSourceTableCommand(
 /**
  * A command used to create a data source table using the result of a query.
  *
- * Note: This is different from [[CreateTableAsSelect]]. Please check the 
syntax for difference.
- * This is not intended for temporary tables.
+ * Note: This is different from [[CreateTableAsSelectLogicalPlan]]. Please 
check the syntax for
+ * difference. This is not intended for temporary tables.
  *
  * The syntax of using this command in SQL is:
  * {{{

http://git-wip-us.apache.org/repos/asf/spark/blob/e033fd50/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala
index 38317d4..d554937 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala
@@ -77,4 +77,3 @@ object HiveSerDe {
     serdeMap.get(key)
   }
 }
-

http://git-wip-us.apache.org/repos/asf/spark/blob/e033fd50/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 5ffd8ef..b8bc9ab 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -21,6 +21,8 @@ import java.util
 
 import scala.util.control.NonFatal
 
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.hive.ql.metadata.HiveException
 import org.apache.thrift.TException
 
@@ -35,7 +37,9 @@ import org.apache.spark.sql.hive.client.HiveClient
  * A persistent implementation of the system catalog using Hive.
  * All public methods must be synchronized for thread-safety.
  */
-private[spark] class HiveExternalCatalog(client: HiveClient) extends 
ExternalCatalog with Logging {
+private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: 
Configuration)
+  extends ExternalCatalog with Logging {
+
   import CatalogTypes.TablePartitionSpec
 
   // Exceptions thrown by the hive client that we would like to wrap
@@ -68,7 +72,8 @@ private[spark] class HiveExternalCatalog(client: HiveClient) 
extends ExternalCat
       body
     } catch {
       case NonFatal(e) if isClientException(e) =>
-        throw new AnalysisException(e.getClass.getCanonicalName + ": " + 
e.getMessage)
+        throw new AnalysisException(
+          e.getClass.getCanonicalName + ": " + e.getMessage, cause = Some(e))
     }
   }
 
@@ -147,7 +152,41 @@ private[spark] class HiveExternalCatalog(client: 
HiveClient) extends ExternalCat
       ignoreIfExists: Boolean): Unit = withClient {
     requireDbExists(db)
     requireDbMatches(db, tableDefinition)
-    client.createTable(tableDefinition, ignoreIfExists)
+
+    if (
+    // If this is an external data source table...
+      tableDefinition.properties.contains("spark.sql.sources.provider") &&
+        tableDefinition.tableType == CatalogTableType.EXTERNAL &&
+        // ... that is not persisted as Hive compatible format (external 
tables in Hive compatible
+        // format always set `locationUri` to the actual data location and 
should NOT be hacked as
+        // following.)
+        tableDefinition.storage.locationUri.isEmpty
+    ) {
+      // !! HACK ALERT !!
+      //
+      // Due to a restriction of Hive metastore, here we have to set 
`locationUri` to a temporary
+      // directory that doesn't exist yet but can definitely be successfully 
created, and then
+      // delete it right after creating the external data source table. This 
location will be
+      // persisted to Hive metastore as standard Hive table location URI, but 
Spark SQL doesn't
+      // really use it. Also, since we only do this workaround for external 
tables, deleting the
+      // directory after the fact doesn't do any harm.
+      //
+      // Please refer to https://issues.apache.org/jira/browse/SPARK-15269 for 
more details.
+      val tempPath = {
+        val dbLocation = getDatabase(tableDefinition.database).locationUri
+        new Path(dbLocation, tableDefinition.identifier.table + 
"-__PLACEHOLDER__")
+      }
+
+      try {
+        client.createTable(
+          tableDefinition.withNewStorage(locationUri = 
Some(tempPath.toString)),
+          ignoreIfExists)
+      } finally {
+        FileSystem.get(tempPath.toUri, hadoopConf).delete(tempPath, true)
+      }
+    } else {
+      client.createTable(tableDefinition, ignoreIfExists)
+    }
   }
 
   override def dropTable(

http://git-wip-us.apache.org/repos/asf/spark/blob/e033fd50/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala
index f0d9640..a0106ee 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala
@@ -51,6 +51,6 @@ private[hive] class HiveSharedState(override val 
sparkContext: SparkContext)
   /**
    * A catalog that interacts with the Hive metastore.
    */
-  override lazy val externalCatalog = new HiveExternalCatalog(metadataHive)
-
+  override lazy val externalCatalog =
+    new HiveExternalCatalog(metadataHive, sparkContext.hadoopConfiguration)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/e033fd50/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 71d5c99..47fa418 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -43,6 +43,7 @@ import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.execution.QueryExecutionException
+import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.util.{CircularBuffer, Utils}
 
 /**
@@ -323,7 +324,7 @@ private[hive] class HiveClientImpl(
   }
 
   override def listDatabases(pattern: String): Seq[String] = withHiveState {
-    client.getDatabasesByPattern(pattern).asScala.toSeq
+    client.getDatabasesByPattern(pattern).asScala
   }
 
   override def getTableOption(
@@ -351,6 +352,8 @@ private[hive] class HiveClientImpl(
         unsupportedFeatures += "bucketing"
       }
 
+      val properties = h.getParameters.asScala.toMap
+
       CatalogTable(
         identifier = TableIdentifier(h.getTableName, Option(h.getDbName)),
         tableType = h.getTableType match {
@@ -368,14 +371,27 @@ private[hive] class HiveClientImpl(
         createTime = h.getTTable.getCreateTime.toLong * 1000,
         lastAccessTime = h.getLastAccessTime.toLong * 1000,
         storage = CatalogStorageFormat(
-          locationUri = shim.getDataLocation(h),
+          locationUri = shim.getDataLocation(h).filterNot { _ =>
+            // SPARK-15269: Persisted data source tables always store the 
location URI as a SerDe
+            // property named "path" instead of standard Hive `dataLocation`, 
because Hive only
+            // allows directory paths as location URIs while Spark SQL data 
source tables also
+            // allows file paths. So the standard Hive `dataLocation` is 
meaningless for Spark SQL
+            // data source tables.
+            DDLUtils.isDatasourceTable(properties) &&
+              h.getTableType == HiveTableType.EXTERNAL_TABLE &&
+              // Spark SQL may also save external data source in Hive 
compatible format when
+              // possible, so that these tables can be directly accessed by 
Hive. For these tables,
+              // `dataLocation` is still necessary. Here we also check for 
input format class
+              // because only these Hive compatible tables set this field.
+              h.getInputFormatClass == null
+          },
           inputFormat = Option(h.getInputFormatClass).map(_.getName),
           outputFormat = Option(h.getOutputFormatClass).map(_.getName),
           serde = Option(h.getSerializationLib),
           compressed = h.getTTable.getSd.isCompressed,
           serdeProperties = 
h.getTTable.getSd.getSerdeInfo.getParameters.asScala.toMap
         ),
-        properties = h.getParameters.asScala.toMap,
+        properties = properties,
         viewOriginalText = Option(h.getViewOriginalText),
         viewText = Option(h.getViewExpandedText),
         unsupportedFeatures = unsupportedFeatures)

http://git-wip-us.apache.org/repos/asf/spark/blob/e033fd50/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
index bf9935a..175889b 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
@@ -37,7 +37,8 @@ class HiveExternalCatalogSuite extends ExternalCatalogSuite {
   protected override val utils: CatalogTestUtils = new CatalogTestUtils {
     override val tableInputFormat: String = 
"org.apache.hadoop.mapred.SequenceFileInputFormat"
     override val tableOutputFormat: String = 
"org.apache.hadoop.mapred.SequenceFileOutputFormat"
-    override def newEmptyCatalog(): ExternalCatalog = new 
HiveExternalCatalog(client)
+    override def newEmptyCatalog(): ExternalCatalog =
+      new HiveExternalCatalog(client, new Configuration())
   }
 
   protected override def resetState(): Unit = client.reset()

http://git-wip-us.apache.org/repos/asf/spark/blob/e033fd50/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 2c50cc8..3d8123d 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -1104,4 +1104,17 @@ class MetastoreDataSourcesSuite extends QueryTest with 
SQLTestUtils with TestHiv
       }
     }
   }
+
+  test("SPARK-15269 external data source table creation") {
+    withTempPath { dir =>
+      val path = dir.getCanonicalPath
+      spark.range(1).write.json(path)
+
+      withTable("t") {
+        sql(s"CREATE TABLE t USING json OPTIONS (PATH '$path')")
+        sql("DROP TABLE t")
+        sql(s"CREATE TABLE t USING json AS SELECT 1 AS c")
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/e033fd50/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala
index f789d88..3f3dc12 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala
@@ -28,11 +28,11 @@ class ShowCreateTableSuite extends QueryTest with 
SQLTestUtils with TestHiveSing
   import testImplicits._
 
   test("data source table with user specified schema") {
-    withTable("ddl_test1") {
+    withTable("ddl_test") {
       val jsonFilePath = 
Utils.getSparkClassLoader.getResource("sample.json").getFile
 
       sql(
-        s"""CREATE TABLE ddl_test1 (
+        s"""CREATE TABLE ddl_test (
            |  a STRING,
            |  b STRING,
            |  `extra col` ARRAY<INT>,
@@ -45,55 +45,55 @@ class ShowCreateTableSuite extends QueryTest with 
SQLTestUtils with TestHiveSing
          """.stripMargin
       )
 
-      checkCreateTable("ddl_test1")
+      checkCreateTable("ddl_test")
     }
   }
 
   test("data source table CTAS") {
-    withTable("ddl_test2") {
+    withTable("ddl_test") {
       sql(
-        s"""CREATE TABLE ddl_test2
+        s"""CREATE TABLE ddl_test
            |USING json
            |AS SELECT 1 AS a, "foo" AS b
          """.stripMargin
       )
 
-      checkCreateTable("ddl_test2")
+      checkCreateTable("ddl_test")
     }
   }
 
   test("partitioned data source table") {
-    withTable("ddl_test3") {
+    withTable("ddl_test") {
       sql(
-        s"""CREATE TABLE ddl_test3
+        s"""CREATE TABLE ddl_test
            |USING json
            |PARTITIONED BY (b)
            |AS SELECT 1 AS a, "foo" AS b
          """.stripMargin
       )
 
-      checkCreateTable("ddl_test3")
+      checkCreateTable("ddl_test")
     }
   }
 
   test("bucketed data source table") {
-    withTable("ddl_test3") {
+    withTable("ddl_test") {
       sql(
-        s"""CREATE TABLE ddl_test3
+        s"""CREATE TABLE ddl_test
            |USING json
            |CLUSTERED BY (a) SORTED BY (b) INTO 2 BUCKETS
            |AS SELECT 1 AS a, "foo" AS b
          """.stripMargin
       )
 
-      checkCreateTable("ddl_test3")
+      checkCreateTable("ddl_test")
     }
   }
 
   test("partitioned bucketed data source table") {
-    withTable("ddl_test4") {
+    withTable("ddl_test") {
       sql(
-        s"""CREATE TABLE ddl_test4
+        s"""CREATE TABLE ddl_test
            |USING json
            |PARTITIONED BY (c)
            |CLUSTERED BY (a) SORTED BY (b) INTO 2 BUCKETS
@@ -101,12 +101,12 @@ class ShowCreateTableSuite extends QueryTest with 
SQLTestUtils with TestHiveSing
          """.stripMargin
       )
 
-      checkCreateTable("ddl_test4")
+      checkCreateTable("ddl_test")
     }
   }
 
   test("data source table using Dataset API") {
-    withTable("ddl_test5") {
+    withTable("ddl_test") {
       spark
         .range(3)
         .select('id as 'a, 'id as 'b, 'id as 'c, 'id as 'd, 'id as 'e)
@@ -114,9 +114,9 @@ class ShowCreateTableSuite extends QueryTest with 
SQLTestUtils with TestHiveSing
         .mode("overwrite")
         .partitionBy("a", "b")
         .bucketBy(2, "c", "d")
-        .saveAsTable("ddl_test5")
+        .saveAsTable("ddl_test")
 
-      checkCreateTable("ddl_test5")
+      checkCreateTable("ddl_test")
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to