Repository: spark
Updated Branches:
  refs/heads/branch-2.1 415730e19 -> e374b2426


[SPARK-18659][SQL] Incorrect behaviors in overwrite table for datasource tables

## What changes were proposed in this pull request?

Two bugs are addressed here
1. INSERT OVERWRITE TABLE sometime crashed when catalog partition management 
was enabled. This was because when dropping partitions after an overwrite 
operation, the Hive client will attempt to delete the partition files. If the 
entire partition directory was dropped, this would fail. The PR fixes this by 
adding a flag to control whether the Hive client should attempt to delete files.
2. The static partition spec for OVERWRITE TABLE was not correctly resolved to 
the case-sensitive original partition names. This resulted in the entire table 
being overwritten if you did not correctly capitalize your partition names.

cc yhuai cloud-fan

## How was this patch tested?

Unit tests. Surprisingly, the existing overwrite table tests did not catch 
these edge cases.

Author: Eric Liang <[email protected]>

Closes #16088 from ericl/spark-18659.

(cherry picked from commit 7935c8470c5c162ef7213e394fe8588e5dd42ca2)
Signed-off-by: Wenchen Fan <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e374b242
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e374b242
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e374b242

Branch: refs/heads/branch-2.1
Commit: e374b2426114d841e1935719f6e21919475f6804
Parents: 415730e
Author: Eric Liang <[email protected]>
Authored: Fri Dec 2 21:59:02 2016 +0800
Committer: Wenchen Fan <[email protected]>
Committed: Fri Dec 2 21:59:21 2016 +0800

----------------------------------------------------------------------
 .../sql/catalyst/catalog/ExternalCatalog.scala  |  3 +-
 .../sql/catalyst/catalog/InMemoryCatalog.scala  | 10 ++++--
 .../sql/catalyst/catalog/SessionCatalog.scala   |  5 +--
 .../catalyst/catalog/ExternalCatalogSuite.scala | 21 +++++++-----
 .../catalyst/catalog/SessionCatalogSuite.scala  | 27 ++++++++++-----
 .../spark/sql/execution/SparkSqlParser.scala    |  5 +--
 .../spark/sql/execution/command/ddl.scala       |  6 ++--
 .../datasources/DataSourceStrategy.scala        | 13 ++++++--
 .../sql/execution/command/DDLCommandSuite.scala |  3 +-
 .../spark/sql/hive/HiveExternalCatalog.scala    |  6 ++--
 .../spark/sql/hive/client/HiveClient.scala      |  3 +-
 .../spark/sql/hive/client/HiveClientImpl.scala  |  6 ++--
 .../PartitionProviderCompatibilitySuite.scala   | 35 ++++++++++++++++++++
 .../spark/sql/hive/client/VersionsSuite.scala   |  4 +--
 14 files changed, 110 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e374b242/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
index 14dd707..259008f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
@@ -154,7 +154,8 @@ abstract class ExternalCatalog {
       table: String,
       parts: Seq[TablePartitionSpec],
       ignoreIfNotExists: Boolean,
-      purge: Boolean): Unit
+      purge: Boolean,
+      retainData: Boolean): Unit
 
   /**
    * Override the specs of one or many existing table partitions, assuming 
they exist.

http://git-wip-us.apache.org/repos/asf/spark/blob/e374b242/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index a3ffeaa..880a7a0 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -385,7 +385,8 @@ class InMemoryCatalog(
       table: String,
       partSpecs: Seq[TablePartitionSpec],
       ignoreIfNotExists: Boolean,
-      purge: Boolean): Unit = synchronized {
+      purge: Boolean,
+      retainData: Boolean): Unit = synchronized {
     requireTableExists(db, table)
     val existingParts = catalog(db).tables(table).partitions
     if (!ignoreIfNotExists) {
@@ -395,7 +396,12 @@ class InMemoryCatalog(
       }
     }
 
-    val shouldRemovePartitionLocation = getTable(db, table).tableType == 
CatalogTableType.MANAGED
+    val shouldRemovePartitionLocation = if (retainData) {
+      false
+    } else {
+      getTable(db, table).tableType == CatalogTableType.MANAGED
+    }
+
     // TODO: we should follow hive to roll back if one partition path failed 
to delete, and support
     // partial partition spec.
     partSpecs.foreach { p =>

http://git-wip-us.apache.org/repos/asf/spark/blob/e374b242/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 0b6a91f..da3a207 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -687,13 +687,14 @@ class SessionCatalog(
       tableName: TableIdentifier,
       specs: Seq[TablePartitionSpec],
       ignoreIfNotExists: Boolean,
-      purge: Boolean): Unit = {
+      purge: Boolean,
+      retainData: Boolean): Unit = {
     val db = 
formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
     val table = formatTableName(tableName.table)
     requireDbExists(db)
     requireTableExists(TableIdentifier(table, Option(db)))
     requirePartialMatchedPartitionSpec(specs, getTableMetadata(tableName))
-    externalCatalog.dropPartitions(db, table, specs, ignoreIfNotExists, purge)
+    externalCatalog.dropPartitions(db, table, specs, ignoreIfNotExists, purge, 
retainData)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/e374b242/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
index 303a866..3b39f42 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
@@ -361,13 +361,14 @@ abstract class ExternalCatalogSuite extends SparkFunSuite 
with BeforeAndAfterEac
     val catalog = newBasicCatalog()
     assert(catalogPartitionsEqual(catalog, "db2", "tbl2", Seq(part1, part2)))
     catalog.dropPartitions(
-      "db2", "tbl2", Seq(part1.spec), ignoreIfNotExists = false, purge = false)
+      "db2", "tbl2", Seq(part1.spec), ignoreIfNotExists = false, purge = 
false, retainData = false)
     assert(catalogPartitionsEqual(catalog, "db2", "tbl2", Seq(part2)))
     resetState()
     val catalog2 = newBasicCatalog()
     assert(catalogPartitionsEqual(catalog2, "db2", "tbl2", Seq(part1, part2)))
     catalog2.dropPartitions(
-      "db2", "tbl2", Seq(part1.spec, part2.spec), ignoreIfNotExists = false, 
purge = false)
+      "db2", "tbl2", Seq(part1.spec, part2.spec), ignoreIfNotExists = false, 
purge = false,
+      retainData = false)
     assert(catalog2.listPartitions("db2", "tbl2").isEmpty)
   }
 
@@ -375,11 +376,13 @@ abstract class ExternalCatalogSuite extends SparkFunSuite 
with BeforeAndAfterEac
     val catalog = newBasicCatalog()
     intercept[AnalysisException] {
       catalog.dropPartitions(
-        "does_not_exist", "tbl1", Seq(), ignoreIfNotExists = false, purge = 
false)
+        "does_not_exist", "tbl1", Seq(), ignoreIfNotExists = false, purge = 
false,
+        retainData = false)
     }
     intercept[AnalysisException] {
       catalog.dropPartitions(
-        "db2", "does_not_exist", Seq(), ignoreIfNotExists = false, purge = 
false)
+        "db2", "does_not_exist", Seq(), ignoreIfNotExists = false, purge = 
false,
+        retainData = false)
     }
   }
 
@@ -387,10 +390,11 @@ abstract class ExternalCatalogSuite extends SparkFunSuite 
with BeforeAndAfterEac
     val catalog = newBasicCatalog()
     intercept[AnalysisException] {
       catalog.dropPartitions(
-        "db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = false, purge = 
false)
+        "db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = false, purge = 
false,
+        retainData = false)
     }
     catalog.dropPartitions(
-      "db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = true, purge = false)
+      "db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = true, purge = false, 
retainData = false)
   }
 
   test("get partition") {
@@ -713,7 +717,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite 
with BeforeAndAfterEac
     assert(exists(tableLocation, "partCol1=5", "partCol2=6"))
 
     catalog.dropPartitions("db1", "tbl", Seq(part2.spec, part3.spec), 
ignoreIfNotExists = false,
-      purge = false)
+      purge = false, retainData = false)
     assert(!exists(tableLocation, "partCol1=3", "partCol2=4"))
     assert(!exists(tableLocation, "partCol1=5", "partCol2=6"))
 
@@ -745,7 +749,8 @@ abstract class ExternalCatalogSuite extends SparkFunSuite 
with BeforeAndAfterEac
     val fs = partPath.getFileSystem(new Configuration)
     assert(fs.exists(partPath))
 
-    catalog.dropPartitions("db2", "tbl1", Seq(part1.spec), ignoreIfNotExists = 
false, purge = false)
+    catalog.dropPartitions(
+      "db2", "tbl1", Seq(part1.spec), ignoreIfNotExists = false, purge = 
false, retainData = false)
     assert(fs.exists(partPath))
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/e374b242/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index 3f27160..f9c4b26 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -618,7 +618,8 @@ class SessionCatalogSuite extends SparkFunSuite {
       TableIdentifier("tbl2", Some("db2")),
       Seq(part1.spec),
       ignoreIfNotExists = false,
-      purge = false)
+      purge = false,
+      retainData = false)
     assert(catalogPartitionsEqual(externalCatalog.listPartitions("db2", 
"tbl2"), part2))
     // Drop partitions without explicitly specifying database
     sessionCatalog.setCurrentDatabase("db2")
@@ -626,7 +627,8 @@ class SessionCatalogSuite extends SparkFunSuite {
       TableIdentifier("tbl2"),
       Seq(part2.spec),
       ignoreIfNotExists = false,
-      purge = false)
+      purge = false,
+      retainData = false)
     assert(externalCatalog.listPartitions("db2", "tbl2").isEmpty)
     // Drop multiple partitions at once
     sessionCatalog.createPartitions(
@@ -636,7 +638,8 @@ class SessionCatalogSuite extends SparkFunSuite {
       TableIdentifier("tbl2", Some("db2")),
       Seq(part1.spec, part2.spec),
       ignoreIfNotExists = false,
-      purge = false)
+      purge = false,
+      retainData = false)
     assert(externalCatalog.listPartitions("db2", "tbl2").isEmpty)
   }
 
@@ -647,14 +650,16 @@ class SessionCatalogSuite extends SparkFunSuite {
         TableIdentifier("tbl1", Some("unknown_db")),
         Seq(),
         ignoreIfNotExists = false,
-        purge = false)
+        purge = false,
+        retainData = false)
     }
     intercept[NoSuchTableException] {
       catalog.dropPartitions(
         TableIdentifier("does_not_exist", Some("db2")),
         Seq(),
         ignoreIfNotExists = false,
-        purge = false)
+        purge = false,
+        retainData = false)
     }
   }
 
@@ -665,13 +670,15 @@ class SessionCatalogSuite extends SparkFunSuite {
         TableIdentifier("tbl2", Some("db2")),
         Seq(part3.spec),
         ignoreIfNotExists = false,
-        purge = false)
+        purge = false,
+        retainData = false)
     }
     catalog.dropPartitions(
       TableIdentifier("tbl2", Some("db2")),
       Seq(part3.spec),
       ignoreIfNotExists = true,
-      purge = false)
+      purge = false,
+      retainData = false)
   }
 
   test("drop partitions with invalid partition spec") {
@@ -681,7 +688,8 @@ class SessionCatalogSuite extends SparkFunSuite {
         TableIdentifier("tbl2", Some("db2")),
         Seq(partWithMoreColumns.spec),
         ignoreIfNotExists = false,
-        purge = false)
+        purge = false,
+        retainData = false)
     }
     assert(e.getMessage.contains(
       "Partition spec is invalid. The spec (a, b, c) must be contained within 
" +
@@ -691,7 +699,8 @@ class SessionCatalogSuite extends SparkFunSuite {
         TableIdentifier("tbl2", Some("db2")),
         Seq(partWithUnknownColumns.spec),
         ignoreIfNotExists = false,
-        purge = false)
+        purge = false,
+        retainData = false)
     }
     assert(e.getMessage.contains(
       "Partition spec is invalid. The spec (a, unknown) must be contained 
within " +

http://git-wip-us.apache.org/repos/asf/spark/blob/e374b242/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index 5f89a22..7a659ea 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -833,8 +833,9 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
     AlterTableDropPartitionCommand(
       visitTableIdentifier(ctx.tableIdentifier),
       ctx.partitionSpec.asScala.map(visitNonOptionalPartitionSpec),
-      ctx.EXISTS != null,
-      ctx.PURGE != null)
+      ifExists = ctx.EXISTS != null,
+      purge = ctx.PURGE != null,
+      retainData = false)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/e374b242/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 0f126d0..c62c142 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -421,7 +421,8 @@ case class AlterTableDropPartitionCommand(
     tableName: TableIdentifier,
     specs: Seq[TablePartitionSpec],
     ifExists: Boolean,
-    purge: Boolean)
+    purge: Boolean,
+    retainData: Boolean)
   extends RunnableCommand {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
@@ -439,7 +440,8 @@ case class AlterTableDropPartitionCommand(
     }
 
     catalog.dropPartitions(
-      table.identifier, normalizedSpecs, ignoreIfNotExists = ifExists, purge = 
purge)
+      table.identifier, normalizedSpecs, ignoreIfNotExists = ifExists, purge = 
purge,
+      retainData = retainData)
     Seq.empty[Row]
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e374b242/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index f3d92bf..4468dc5 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -217,16 +217,25 @@ case class DataSourceAnalysis(conf: CatalystConf) extends 
Rule[LogicalPlan] {
             if (deletedPartitions.nonEmpty) {
               AlterTableDropPartitionCommand(
                 l.catalogTable.get.identifier, deletedPartitions.toSeq,
-                ifExists = true, purge = true).run(t.sparkSession)
+                ifExists = true, purge = false,
+                retainData = true /* already deleted */).run(t.sparkSession)
             }
           }
         }
         t.location.refresh()
       }
 
+      val staticPartitionKeys: TablePartitionSpec = if (overwrite.enabled) {
+        overwrite.staticPartitionKeys.map { case (k, v) =>
+          (partitionSchema.map(_.name).find(_.equalsIgnoreCase(k)).get, v)
+        }
+      } else {
+        Map.empty
+      }
+
       val insertCmd = InsertIntoHadoopFsRelationCommand(
         outputPath,
-        if (overwrite.enabled) overwrite.staticPartitionKeys else Map.empty,
+        staticPartitionKeys,
         customPartitionLocations,
         partitionSchema,
         t.bucketSpec,

http://git-wip-us.apache.org/repos/asf/spark/blob/e374b242/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
index d31e7ae..5ef5f8e 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
@@ -615,7 +615,8 @@ class DDLCommandSuite extends PlanTest {
         Map("dt" -> "2008-08-08", "country" -> "us"),
         Map("dt" -> "2009-09-09", "country" -> "uk")),
       ifExists = true,
-      purge = false)
+      purge = false,
+      retainData = false)
     val expected2_table = expected1_table.copy(ifExists = false)
     val expected1_purge = expected1_table.copy(purge = true)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e374b242/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 0658832..c213e8e 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -850,9 +850,11 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
       table: String,
       parts: Seq[TablePartitionSpec],
       ignoreIfNotExists: Boolean,
-      purge: Boolean): Unit = withClient {
+      purge: Boolean,
+      retainData: Boolean): Unit = withClient {
     requireTableExists(db, table)
-    client.dropPartitions(db, table, parts.map(lowerCasePartitionSpec), 
ignoreIfNotExists, purge)
+    client.dropPartitions(
+      db, table, parts.map(lowerCasePartitionSpec), ignoreIfNotExists, purge, 
retainData)
   }
 
   override def renamePartitions(

http://git-wip-us.apache.org/repos/asf/spark/blob/e374b242/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
index 569a9c1..4c76932 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
@@ -125,7 +125,8 @@ private[hive] trait HiveClient {
       table: String,
       specs: Seq[TablePartitionSpec],
       ignoreIfNotExists: Boolean,
-      purge: Boolean): Unit
+      purge: Boolean,
+      retainData: Boolean): Unit
 
   /**
    * Rename one or many existing table partitions, assuming they exist.

http://git-wip-us.apache.org/repos/asf/spark/blob/e374b242/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 590029a..bd840af 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -453,7 +453,8 @@ private[hive] class HiveClientImpl(
       table: String,
       specs: Seq[TablePartitionSpec],
       ignoreIfNotExists: Boolean,
-      purge: Boolean): Unit = withHiveState {
+      purge: Boolean,
+      retainData: Boolean): Unit = withHiveState {
     // TODO: figure out how to drop multiple partitions in one call
     val hiveTable = client.getTable(db, table, true /* throw exception */)
     // do the check at first and collect all the matching partitions
@@ -473,8 +474,7 @@ private[hive] class HiveClientImpl(
     var droppedParts = ArrayBuffer.empty[java.util.List[String]]
     matchingParts.foreach { partition =>
       try {
-        val deleteData = true
-        shim.dropPartition(client, db, table, partition, deleteData, purge)
+        shim.dropPartition(client, db, table, partition, !retainData, purge)
       } catch {
         case e: Exception =>
           val remainingParts = matchingParts.toBuffer -- droppedParts

http://git-wip-us.apache.org/repos/asf/spark/blob/e374b242/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
index e8e4238..c2ac032 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
@@ -259,6 +259,41 @@ class PartitionProviderCompatibilitySuite
         }
       }
     }
+
+    test(s"SPARK-18659 insert overwrite table files - partition management 
$enabled") {
+      withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> 
enabled.toString) {
+        withTable("test") {
+          spark.range(10)
+            .selectExpr("id", "id as A", "'x' as B")
+            .write.partitionBy("A", "B").mode("overwrite")
+            .saveAsTable("test")
+          spark.sql("insert overwrite table test select id, id, 'x' from 
range(1)")
+          assert(spark.sql("select * from test").count() == 1)
+
+          spark.range(10)
+            .selectExpr("id", "id as A", "'x' as B")
+            .write.partitionBy("A", "B").mode("overwrite")
+            .saveAsTable("test")
+          spark.sql(
+            "insert overwrite table test partition (A, B) select id, id, 'x' 
from range(1)")
+          assert(spark.sql("select * from test").count() == 1)
+        }
+      }
+    }
+
+    test(s"SPARK-18659 insert overwrite table with lowercase - partition 
management $enabled") {
+      withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> 
enabled.toString) {
+        withTable("test") {
+          spark.range(10)
+            .selectExpr("id", "id as A", "'x' as B")
+            .write.partitionBy("A", "B").mode("overwrite")
+            .saveAsTable("test")
+          // note that 'A', 'B' are lowercase instead of their original case 
here
+          spark.sql("insert overwrite table test partition (a=1, b) select id, 
'x' from range(1)")
+          assert(spark.sql("select * from test").count() == 10)
+        }
+      }
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/e374b242/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index 081b0ed..16ae345 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -352,13 +352,13 @@ class VersionsSuite extends SparkFunSuite with Logging {
       // with a version that is older than the minimum (1.2 in this case).
       try {
         client.dropPartitions("default", "src_part", Seq(spec), 
ignoreIfNotExists = true,
-          purge = true)
+          purge = true, retainData = false)
         assert(!versionsWithoutPurge.contains(version))
       } catch {
         case _: UnsupportedOperationException =>
           assert(versionsWithoutPurge.contains(version))
           client.dropPartitions("default", "src_part", Seq(spec), 
ignoreIfNotExists = true,
-            purge = false)
+            purge = false, retainData = false)
       }
 
       assert(client.getPartitionOption("default", "src_part", spec).isEmpty)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to