Repository: spark
Updated Branches:
  refs/heads/master 324388531 -> 8db4d95c0


[SPARK-18703][SQL] Drop Staging Directories and Data Files After each 
Insertion/CTAS of Hive serde Tables

### What changes were proposed in this pull request?
Below are the files/directories generated for three inserts againsts a Hive 
table:
```
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-29_149_4298858301766472202-1
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-29_149_4298858301766472202-1/-ext-10000
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-29_149_4298858301766472202-1/-ext-10000/._SUCCESS.crc
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-29_149_4298858301766472202-1/-ext-10000/.part-00000.crc
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-29_149_4298858301766472202-1/-ext-10000/_SUCCESS
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-29_149_4298858301766472202-1/-ext-10000/part-00000
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_454_6445008511655931341-1
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_454_6445008511655931341-1/-ext-10000
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_454_6445008511655931341-1/-ext-10000/._SUCCESS.crc
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_454_6445008511655931341-1/-ext-10000/.part-00000.crc
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_454_6445008511655931341-1/-ext-10000/_SUCCESS
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_454_6445008511655931341-1/-ext-10000/part-00000
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_722_3388423608658711001-1
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_722_3388423608658711001-1/-ext-10000
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_722_3388423608658711001-1/-ext-10000/._SUCCESS.crc
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_722_3388423608658711001-1/-ext-10000/.part-00000.crc
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_722_3388423608658711001-1/-ext-10000/_SUCCESS
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_722_3388423608658711001-1/-ext-10000/part-00000
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.part-00000.crc
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/part-00000
```

The first 18 files are temporary. We do not drop it until the end of JVM 
termination. If JVM does not appropriately terminate, these temporary 
files/directories will not be dropped.

Only the last two files are needed, as shown below.
```
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.part-00000.crc
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/part-00000
```
The temporary files/directories could accumulate a lot when we issue many 
inserts, since each insert generats at least six files. This could eat a lot of 
spaces and slow down the JVM termination. When the JVM does not terminates 
approprately, the files might not be dropped.

This PR is to drop the created staging files and temporary data files after 
each insert/CTAS.

### How was this patch tested?
Added a test case

Author: gatorsmile <[email protected]>

Closes #16134 from gatorsmile/deleteFiles.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8db4d95c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8db4d95c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8db4d95c

Branch: refs/heads/master
Commit: 8db4d95c02080cd120740d106292f0281e8f9249
Parents: 3243885
Author: gatorsmile <[email protected]>
Authored: Thu Dec 15 09:23:55 2016 +0800
Committer: Wenchen Fan <[email protected]>
Committed: Thu Dec 15 09:23:55 2016 +0800

----------------------------------------------------------------------
 .../hive/execution/InsertIntoHiveTable.scala    | 16 +++++++++++--
 .../spark/sql/hive/client/VersionsSuite.scala   | 24 ++++++++++++++++++++
 2 files changed, 38 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8db4d95c/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 82c7b1a..aa858e8 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -22,6 +22,8 @@ import java.net.URI
 import java.text.SimpleDateFormat
 import java.util.{Date, Locale, Random}
 
+import scala.util.control.NonFatal
+
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.hive.common.FileUtils
 import org.apache.hadoop.hive.ql.exec.TaskRunner
@@ -84,6 +86,7 @@ case class InsertIntoHiveTable(
   def output: Seq[Attribute] = Seq.empty
 
   val hadoopConf = sessionState.newHadoopConf()
+  var createdTempDir: Option[Path] = None
   val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging")
   val scratchDir = hadoopConf.get("hive.exec.scratchdir", "/tmp/hive")
 
@@ -111,12 +114,12 @@ case class InsertIntoHiveTable(
       if (!FileUtils.mkdir(fs, dir, true, hadoopConf)) {
         throw new IllegalStateException("Cannot create staging directory  '" + 
dir.toString + "'")
       }
+      createdTempDir = Some(dir)
       fs.deleteOnExit(dir)
     } catch {
       case e: IOException =>
         throw new RuntimeException(
           "Cannot create staging directory '" + dir.toString + "': " + 
e.getMessage, e)
-
     }
     return dir
   }
@@ -163,11 +166,11 @@ case class InsertIntoHiveTable(
       if (!FileUtils.mkdir(fs, dirPath, true, hadoopConf)) {
         throw new IllegalStateException("Cannot create staging directory: " + 
dirPath.toString)
       }
+      createdTempDir = Some(dirPath)
       fs.deleteOnExit(dirPath)
     } catch {
       case e: IOException =>
         throw new RuntimeException("Cannot create staging directory: " + 
dirPath.toString, e)
-
     }
     dirPath
   }
@@ -378,6 +381,15 @@ case class InsertIntoHiveTable(
         isSrcLocal = false)
     }
 
+    // Attempt to delete the staging directory and the inclusive files. If 
failed, the files are
+    // expected to be dropped at the normal termination of VM since 
deleteOnExit is used.
+    try {
+      createdTempDir.foreach { path => 
path.getFileSystem(hadoopConf).delete(path, true) }
+    } catch {
+      case NonFatal(e) =>
+        logWarning(s"Unable to delete staging directory: $stagingDir.\n" + e)
+    }
+
     // Invalidate the cache.
     sqlContext.sharedState.cacheManager.invalidateCache(table)
     sqlContext.sessionState.catalog.refreshTable(table.catalogTable.identifier)

http://git-wip-us.apache.org/repos/asf/spark/blob/8db4d95c/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index 9b26383..8dd0699 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -546,6 +546,30 @@ class VersionsSuite extends SparkFunSuite with 
SQLTestUtils with TestHiveSinglet
       }
     }
 
+    test(s"$version: Delete the temporary staging directory and files after 
each insert") {
+      withTempDir { tmpDir =>
+        withTable("tab") {
+          spark.sql(
+            s"""
+               |CREATE TABLE tab(c1 string)
+               |location '${tmpDir.toURI.toString}'
+             """.stripMargin)
+
+          (1 to 3).map { i =>
+            spark.sql(s"INSERT OVERWRITE TABLE tab SELECT '$i'")
+          }
+          def listFiles(path: File): List[String] = {
+            val dir = path.listFiles()
+            val folders = dir.filter(_.isDirectory).toList
+            val filePaths = dir.map(_.getName).toList
+            folders.flatMap(listFiles) ++: filePaths
+          }
+          val expectedFiles = ".part-00000.crc" :: "part-00000" :: Nil
+          assert(listFiles(tmpDir).sorted == expectedFiles)
+        }
+      }
+    }
+
     // TODO: add more tests.
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to