This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 27e20fe9eb1 [SPARK-41708][SQL] Pull v1write information to `WriteFiles`
27e20fe9eb1 is described below

commit 27e20fe9eb1b1ef1b3d32e180de55931f31fc345
Author: ulysses-you <ulyssesyo...@gmail.com>
AuthorDate: Fri Jan 6 12:13:30 2023 +0800

    [SPARK-41708][SQL] Pull v1write information to `WriteFiles`
    
    ### What changes were proposed in this pull request?
    
    This pr aims to pull out the v1write information from `V1WriteCommand` to 
`WriteFiles`:
    ```scala
    case class WriteFiles(child: LogicalPlan)
    
    =>
    
    case class WriteFiles(
        child: LogicalPlan,
        fileFormat: FileFormat,
        partitionColumns: Seq[Attribute],
        bucketSpec: Option[BucketSpec],
        options: Map[String, String],
        staticPartitions: TablePartitionSpec)
    ```
    
    Also, this pr do a cleanup for `WriteSpec` which is unnecessary.
    
    ### Why are the changes needed?
    
    After this pr, `WriteFiles` will hold write information that can help 
developers
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    Pass CI
    
    Closes #39277 from ulysses-you/SPARK-41708.
    
    Authored-by: ulysses-you <ulyssesyo...@gmail.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../org/apache/spark/sql/internal/WriteSpec.java   |  33 ---
 .../org/apache/spark/sql/execution/SparkPlan.scala |   9 +-
 .../spark/sql/execution/SparkStrategies.scala      |   5 +-
 .../spark/sql/execution/datasources/V1Writes.scala |  24 ++-
 .../sql/execution/datasources/WriteFiles.scala     |  26 ++-
 .../org/apache/spark/sql/hive/HiveStrategies.scala |   3 +-
 .../{SaveAsHiveFile.scala => HiveTempPath.scala}   | 204 ++++++-------------
 .../hive/execution/InsertIntoHiveDirCommand.scala  |  13 +-
 .../sql/hive/execution/InsertIntoHiveTable.scala   |  88 +++++---
 .../spark/sql/hive/execution/SaveAsHiveFile.scala  | 221 +--------------------
 .../sql/hive/execution/V1WritesHiveUtils.scala     |  33 ++-
 .../org/apache/spark/sql/hive/InsertSuite.scala    |  15 +-
 12 files changed, 224 insertions(+), 450 deletions(-)

diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/internal/WriteSpec.java 
b/sql/core/src/main/java/org/apache/spark/sql/internal/WriteSpec.java
deleted file mode 100644
index c51a3ed7dc6..00000000000
--- a/sql/core/src/main/java/org/apache/spark/sql/internal/WriteSpec.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.internal;
-
-import java.io.Serializable;
-
-/**
- * Write spec is a input parameter of
- * {@link org.apache.spark.sql.execution.SparkPlan#executeWrite}.
- *
- * <p>
- * This is an empty interface, the concrete class which implements
- * {@link org.apache.spark.sql.execution.SparkPlan#doExecuteWrite}
- * should define its own class and use it.
- *
- * @since 3.4.0
- */
-public interface WriteSpec extends Serializable {}
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index 401302e5bde..5ca36a8a216 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -36,8 +36,9 @@ import org.apache.spark.sql.catalyst.plans.physical._
 import org.apache.spark.sql.catalyst.trees.{BinaryLike, LeafLike, TreeNodeTag, 
UnaryLike}
 import org.apache.spark.sql.connector.write.WriterCommitMessage
 import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.execution.datasources.WriteFilesSpec
 import org.apache.spark.sql.execution.metric.SQLMetric
-import org.apache.spark.sql.internal.{SQLConf, WriteSpec}
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.vectorized.ColumnarBatch
 import org.apache.spark.util.NextIterator
 import org.apache.spark.util.io.{ChunkedByteBuffer, 
ChunkedByteBufferOutputStream}
@@ -230,11 +231,11 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] 
with Logging with Serializ
    *
    * Concrete implementations of SparkPlan should override `doExecuteWrite`.
    */
-  def executeWrite(writeSpec: WriteSpec): RDD[WriterCommitMessage] = 
executeQuery {
+  def executeWrite(writeFilesSpec: WriteFilesSpec): RDD[WriterCommitMessage] = 
executeQuery {
     if (isCanonicalizedPlan) {
       throw SparkException.internalError("A canonicalized plan is not supposed 
to be executed.")
     }
-    doExecuteWrite(writeSpec)
+    doExecuteWrite(writeFilesSpec)
   }
 
   /**
@@ -343,7 +344,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with 
Logging with Serializ
    *
    * Overridden by concrete implementations of SparkPlan.
    */
-  protected def doExecuteWrite(writeSpec: WriteSpec): RDD[WriterCommitMessage] 
= {
+  protected def doExecuteWrite(writeFilesSpec: WriteFilesSpec): 
RDD[WriterCommitMessage] = {
     throw SparkException.internalError(s"Internal Error ${this.getClass} has 
write support" +
       s" mismatch:\n${this}")
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 51a0c837c3e..110fe45cc12 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -895,8 +895,9 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
         throw QueryExecutionErrors.ddlUnsupportedTemporarilyError("MERGE INTO 
TABLE")
       case logical.CollectMetrics(name, metrics, child) =>
         execution.CollectMetricsExec(name, metrics, planLater(child)) :: Nil
-      case WriteFiles(child) =>
-        WriteFilesExec(planLater(child)) :: Nil
+      case WriteFiles(child, fileFormat, partitionColumns, bucket, options, 
staticPartitions) =>
+        WriteFilesExec(planLater(child), fileFormat, partitionColumns, bucket, 
options,
+          staticPartitions) :: Nil
       case _ => Nil
     }
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala
index 3ed04e5bd6d..7b4fa7ad80b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources
 
 import org.apache.spark.sql.catalyst.SQLConfHelper
 import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Attribute, 
AttributeMap, AttributeSet, BitwiseAnd, Expression, HiveHash, Literal, 
NamedExpression, Pmod, SortOrder, String2StringExpression, UnaryExpression}
 import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, 
ExprCode}
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Sort}
@@ -31,11 +32,31 @@ import org.apache.spark.sql.types.StringType
 import org.apache.spark.unsafe.types.UTF8String
 
 trait V1WriteCommand extends DataWritingCommand {
+  /**
+   * Specify the [[FileFormat]] of the provider of V1 write command.
+   */
+  def fileFormat: FileFormat
+
   /**
    * Specify the partition columns of the V1 write command.
    */
   def partitionColumns: Seq[Attribute]
 
+  /**
+   * Specify the partition spec of the V1 write command.
+   */
+  def staticPartitions: TablePartitionSpec
+
+  /**
+   * Specify the bucket spec of the V1 write command.
+   */
+  def bucketSpec: Option[BucketSpec]
+
+  /**
+   * Specify the storage options of the V1 write command.
+   */
+  def options: Map[String, String]
+
   /**
    * Specify the required ordering for the V1 write command. 
`FileFormatWriter` will
    * add SortExec if necessary when the requiredOrdering is empty.
@@ -56,7 +77,8 @@ object V1Writes extends Rule[LogicalPlan] with SQLConfHelper {
         case write: V1WriteCommand if !write.child.isInstanceOf[WriteFiles] =>
           val newQuery = prepareQuery(write, write.query)
           val attrMap = AttributeMap(write.query.output.zip(newQuery.output))
-          val newChild = WriteFiles(newQuery)
+          val newChild = WriteFiles(newQuery, write.fileFormat, 
write.partitionColumns,
+            write.bucketSpec, write.options, write.staticPartitions)
           val newWrite = write.withNewChildren(newChild :: 
Nil).transformExpressions {
             case a: Attribute if attrMap.contains(a) =>
               a.withExprId(attrMap(a).exprId)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteFiles.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteFiles.scala
index 5bc8f9db32b..53d94704471 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteFiles.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteFiles.scala
@@ -23,12 +23,13 @@ import org.apache.spark.{SparkException, TaskContext}
 import org.apache.spark.internal.io.{FileCommitProtocol, 
SparkHadoopWriterUtils}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryNode}
 import org.apache.spark.sql.connector.write.WriterCommitMessage
 import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
 import 
org.apache.spark.sql.execution.datasources.FileFormatWriter.ConcurrentOutputWriterSpec
-import org.apache.spark.sql.internal.WriteSpec
 
 /**
  * The write files spec holds all information of [[V1WriteCommand]] if its 
provider is
@@ -38,13 +39,18 @@ case class WriteFilesSpec(
     description: WriteJobDescription,
     committer: FileCommitProtocol,
     concurrentOutputWriterSpecFunc: SparkPlan => 
Option[ConcurrentOutputWriterSpec])
-  extends WriteSpec
 
 /**
  * During Optimizer, [[V1Writes]] injects the [[WriteFiles]] between 
[[V1WriteCommand]] and query.
  * [[WriteFiles]] must be the root plan as the child of [[V1WriteCommand]].
  */
-case class WriteFiles(child: LogicalPlan) extends UnaryNode {
+case class WriteFiles(
+    child: LogicalPlan,
+    fileFormat: FileFormat,
+    partitionColumns: Seq[Attribute],
+    bucketSpec: Option[BucketSpec],
+    options: Map[String, String],
+    staticPartitions: TablePartitionSpec) extends UnaryNode {
   override def output: Seq[Attribute] = child.output
   override protected def withNewChildInternal(newChild: LogicalPlan): 
WriteFiles =
     copy(child = newChild)
@@ -53,13 +59,17 @@ case class WriteFiles(child: LogicalPlan) extends UnaryNode 
{
 /**
  * Responsible for writing files.
  */
-case class WriteFilesExec(child: SparkPlan) extends UnaryExecNode {
+case class WriteFilesExec(
+    child: SparkPlan,
+    fileFormat: FileFormat,
+    partitionColumns: Seq[Attribute],
+    bucketSpec: Option[BucketSpec],
+    options: Map[String, String],
+    staticPartitions: TablePartitionSpec) extends UnaryExecNode {
   override def output: Seq[Attribute] = Seq.empty
 
-  override protected def doExecuteWrite(writeSpec: WriteSpec): 
RDD[WriterCommitMessage] = {
-    assert(writeSpec.isInstanceOf[WriteFilesSpec])
-    val writeFilesSpec: WriteFilesSpec = writeSpec.asInstanceOf[WriteFilesSpec]
-
+  override protected def doExecuteWrite(
+      writeFilesSpec: WriteFilesSpec): RDD[WriterCommitMessage] = {
     val rdd = child.execute()
     // SPARK-23271 If we are attempting to write a zero partition rdd, create 
a dummy single
     // partition rdd to make sure we at least set up one write task to write 
the metadata.
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index af727f966e5..6c5646a2416 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -238,7 +238,8 @@ case class RelationConversions(
       // that only matches table insertion inside Hive CTAS.
       // This pattern would not cause conflicts because this rule is always 
applied before
       // `HiveAnalysis` and both of these rules are running once.
-      case InsertIntoHiveTable(tableDesc, _, query, overwrite, 
ifPartitionNotExists, _)
+      case InsertIntoHiveTable(
+        tableDesc, _, query, overwrite, ifPartitionNotExists, _, _, _, _, _, _)
           if query.resolved && DDLUtils.isHiveTable(tableDesc) &&
             tableDesc.partitionColumnNames.isEmpty && isConvertible(tableDesc) 
&&
             conf.getConf(HiveUtils.CONVERT_METASTORE_CTAS) =>
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTempPath.scala
similarity index 54%
copy from 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
copy to 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTempPath.scala
index 799cea42e1e..9981ae4cc31 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTempPath.scala
@@ -29,83 +29,19 @@ import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.hive.common.FileUtils
 import org.apache.hadoop.hive.ql.exec.TaskRunner
 
-import org.apache.spark.internal.io.FileCommitProtocol
+import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.catalog.BucketSpec
-import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
-import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.errors.QueryExecutionErrors
-import org.apache.spark.sql.execution.SparkPlan
-import org.apache.spark.sql.execution.command.DataWritingCommand
-import org.apache.spark.sql.execution.datasources.FileFormatWriter
 import org.apache.spark.sql.hive.HiveExternalCatalog
-import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
 import org.apache.spark.sql.hive.client.HiveVersion
 
-// Base trait from which all hive insert statement physical execution extends.
-private[hive] trait SaveAsHiveFile extends DataWritingCommand with 
V1WritesHiveUtils {
+class HiveTempPath(session: SparkSession, val hadoopConf: Configuration, path: 
Path)
+  extends Logging {
+  private var stagingDirForCreating: Option[Path] = None
 
-  var createdTempDir: Option[Path] = None
+  lazy val externalTempPath: Path = getExternalTmpPath(path)
 
-  protected def saveAsHiveFile(
-      sparkSession: SparkSession,
-      plan: SparkPlan,
-      hadoopConf: Configuration,
-      fileSinkConf: FileSinkDesc,
-      outputLocation: String,
-      customPartitionLocations: Map[TablePartitionSpec, String] = Map.empty,
-      partitionAttributes: Seq[Attribute] = Nil,
-      bucketSpec: Option[BucketSpec] = None): Set[String] = {
-
-    val isCompressed =
-      
fileSinkConf.getTableInfo.getOutputFileFormatClassName.toLowerCase(Locale.ROOT) 
match {
-        case formatName if formatName.endsWith("orcoutputformat") =>
-          // For ORC,"mapreduce.output.fileoutputformat.compress",
-          // "mapreduce.output.fileoutputformat.compress.codec", and
-          // "mapreduce.output.fileoutputformat.compress.type"
-          // have no impact because it uses table properties to store 
compression information.
-          false
-        case _ => hadoopConf.get("hive.exec.compress.output", 
"false").toBoolean
-    }
-
-    if (isCompressed) {
-      hadoopConf.set("mapreduce.output.fileoutputformat.compress", "true")
-      fileSinkConf.setCompressed(true)
-      fileSinkConf.setCompressCodec(hadoopConf
-        .get("mapreduce.output.fileoutputformat.compress.codec"))
-      fileSinkConf.setCompressType(hadoopConf
-        .get("mapreduce.output.fileoutputformat.compress.type"))
-    } else {
-      // Set compression by priority
-      HiveOptions.getHiveWriteCompression(fileSinkConf.getTableInfo, 
sparkSession.sessionState.conf)
-        .foreach { case (compression, codec) => hadoopConf.set(compression, 
codec) }
-    }
-
-    val committer = FileCommitProtocol.instantiate(
-      sparkSession.sessionState.conf.fileCommitProtocolClass,
-      jobId = java.util.UUID.randomUUID().toString,
-      outputPath = outputLocation)
-
-    val options = getOptionsWithHiveBucketWrite(bucketSpec)
-
-    FileFormatWriter.write(
-      sparkSession = sparkSession,
-      plan = plan,
-      fileFormat = new HiveFileFormat(fileSinkConf),
-      committer = committer,
-      outputSpec =
-        FileFormatWriter.OutputSpec(outputLocation, customPartitionLocations, 
outputColumns),
-      hadoopConf = hadoopConf,
-      partitionColumns = partitionAttributes,
-      bucketSpec = bucketSpec,
-      statsTrackers = Seq(basicWriteJobStatsTracker(hadoopConf)),
-      options = options)
-  }
-
-  protected def getExternalTmpPath(
-      sparkSession: SparkSession,
-      hadoopConf: Configuration,
-      path: Path): Path = {
+  private def getExternalTmpPath(path: Path): Path = {
     import org.apache.spark.sql.hive.client.hive._
 
     // Before Hive 1.1, when inserting into a table, Hive will create the 
staging directory under
@@ -125,43 +61,22 @@ private[hive] trait SaveAsHiveFile extends 
DataWritingCommand with V1WritesHiveU
     assert(hiveVersionsUsingNewExternalTempPath ++ 
hiveVersionsUsingOldExternalTempPath ==
       allSupportedHiveVersions)
 
-    val externalCatalog = sparkSession.sharedState.externalCatalog
+    val externalCatalog = session.sharedState.externalCatalog
     val hiveVersion = 
externalCatalog.unwrapped.asInstanceOf[HiveExternalCatalog].client.version
     val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging")
     val scratchDir = hadoopConf.get("hive.exec.scratchdir", "/tmp/hive")
 
     if (hiveVersionsUsingOldExternalTempPath.contains(hiveVersion)) {
-      oldVersionExternalTempPath(path, hadoopConf, scratchDir)
+      oldVersionExternalTempPath(path, scratchDir)
     } else if (hiveVersionsUsingNewExternalTempPath.contains(hiveVersion)) {
-      newVersionExternalTempPath(path, hadoopConf, stagingDir)
+      newVersionExternalTempPath(path, stagingDir)
     } else {
       throw new IllegalStateException("Unsupported hive version: " + 
hiveVersion.fullVersion)
     }
   }
 
-  protected def deleteExternalTmpPath(hadoopConf: Configuration) : Unit = {
-    // Attempt to delete the staging directory and the inclusive files. If 
failed, the files are
-    // expected to be dropped at the normal termination of VM since 
deleteOnExit is used.
-    try {
-      createdTempDir.foreach { path =>
-        val fs = path.getFileSystem(hadoopConf)
-        if (fs.delete(path, true)) {
-          // If we successfully delete the staging directory, remove it from 
FileSystem's cache.
-          fs.cancelDeleteOnExit(path)
-        }
-      }
-    } catch {
-      case NonFatal(e) =>
-        val stagingDir = hadoopConf.get("hive.exec.stagingdir", 
".hive-staging")
-        logWarning(s"Unable to delete staging directory: $stagingDir.\n" + e)
-    }
-  }
-
   // Mostly copied from Context.java#getExternalTmpPath of Hive 0.13
-  private def oldVersionExternalTempPath(
-      path: Path,
-      hadoopConf: Configuration,
-      scratchDir: String): Path = {
+  private def oldVersionExternalTempPath(path: Path, scratchDir: String): Path 
= {
     val extURI: URI = path.toUri
     val scratchPath = new Path(scratchDir, executionId)
     var dirPath = new Path(
@@ -169,56 +84,34 @@ private[hive] trait SaveAsHiveFile extends 
DataWritingCommand with V1WritesHiveU
       extURI.getAuthority,
       scratchPath.toUri.getPath + "-" + TaskRunner.getTaskRunnerID())
 
-    try {
-      val fs: FileSystem = dirPath.getFileSystem(hadoopConf)
-      dirPath = new Path(fs.makeQualified(dirPath).toString())
-
-      if (!FileUtils.mkdir(fs, dirPath, true, hadoopConf)) {
-        throw new IllegalStateException("Cannot create staging directory: " + 
dirPath.toString)
-      }
-      createdTempDir = Some(dirPath)
-      fs.deleteOnExit(dirPath)
-    } catch {
-      case e: IOException =>
-        throw 
QueryExecutionErrors.cannotCreateStagingDirError(dirPath.toString, e)
-    }
+    val fs = dirPath.getFileSystem(hadoopConf)
+    dirPath = new Path(fs.makeQualified(dirPath).toString())
+    stagingDirForCreating = Some(dirPath)
     dirPath
   }
 
   // Mostly copied from Context.java#getExternalTmpPath of Hive 1.2
-  private def newVersionExternalTempPath(
-      path: Path,
-      hadoopConf: Configuration,
-      stagingDir: String): Path = {
+  private def newVersionExternalTempPath(path: Path, stagingDir: String): Path 
= {
     val extURI: URI = path.toUri
     if (extURI.getScheme == "viewfs") {
-      getExtTmpPathRelTo(path, hadoopConf, stagingDir)
+      val qualifiedStagingDir = getStagingDir(path, stagingDir)
+      stagingDirForCreating = Some(qualifiedStagingDir)
+      // Hive uses 10000
+      new Path(qualifiedStagingDir, "-ext-10000")
     } else {
-      new Path(getExternalScratchDir(extURI, hadoopConf, stagingDir), 
"-ext-10000")
+      val qualifiedStagingDir = getExternalScratchDir(extURI, stagingDir)
+      stagingDirForCreating = Some(qualifiedStagingDir)
+      new Path(qualifiedStagingDir, "-ext-10000")
     }
   }
 
-  private def getExtTmpPathRelTo(
-      path: Path,
-      hadoopConf: Configuration,
-      stagingDir: String): Path = {
-    new Path(getStagingDir(path, hadoopConf, stagingDir), "-ext-10000") // 
Hive uses 10000
-  }
-
-  private def getExternalScratchDir(
-      extURI: URI,
-      hadoopConf: Configuration,
-      stagingDir: String): Path = {
+  private def getExternalScratchDir(extURI: URI, stagingDir: String): Path = {
     getStagingDir(
       new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath),
-      hadoopConf,
       stagingDir)
   }
 
-  private[hive] def getStagingDir(
-      inputPath: Path,
-      hadoopConf: Configuration,
-      stagingDir: String): Path = {
+  private[hive] def getStagingDir(inputPath: Path, stagingDir: String): Path = 
{
     val inputPathName: String = inputPath.toString
     val fs: FileSystem = inputPath.getFileSystem(hadoopConf)
     var stagingPathName: String =
@@ -243,17 +136,6 @@ private[hive] trait SaveAsHiveFile extends 
DataWritingCommand with V1WritesHiveU
       fs.makeQualified(
         new Path(stagingPathName + "_" + executionId + "-" + 
TaskRunner.getTaskRunnerID))
     logDebug("Created staging dir = " + dir + " for path = " + inputPath)
-    try {
-      if (!FileUtils.mkdir(fs, dir, true, hadoopConf)) {
-        throw new IllegalStateException("Cannot create staging directory  '" + 
dir.toString + "'")
-      }
-      createdTempDir = Some(dir)
-      fs.deleteOnExit(dir)
-    } catch {
-      case e: IOException =>
-        throw QueryExecutionErrors.cannotCreateStagingDirError(
-          s"'${dir.toString}': ${e.getMessage}", e)
-    }
     dir
   }
 
@@ -269,5 +151,43 @@ private[hive] trait SaveAsHiveFile extends 
DataWritingCommand with V1WritesHiveU
     val format = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss_SSS", Locale.US)
     "hive_" + format.format(new Date) + "_" + Math.abs(rand.nextLong)
   }
-}
 
+  def deleteTmpPath() : Unit = {
+    // Attempt to delete the staging directory and the inclusive files. If 
failed, the files are
+    // expected to be dropped at the normal termination of VM since 
deleteOnExit is used.
+    try {
+      stagingDirForCreating.foreach { stagingDir =>
+        val fs = stagingDir.getFileSystem(hadoopConf)
+        if (fs.delete(stagingDir, true)) {
+          // If we successfully delete the staging directory, remove it from 
FileSystem's cache.
+          fs.cancelDeleteOnExit(stagingDir)
+        }
+      }
+    } catch {
+      case NonFatal(e) =>
+        val stagingDir = hadoopConf.get("hive.exec.stagingdir", 
".hive-staging")
+        logWarning(s"Unable to delete staging directory: $stagingDir.\n" + e)
+    }
+  }
+
+  def createTmpPath(): Unit = {
+    try {
+      stagingDirForCreating.foreach { stagingDir =>
+        val fs: FileSystem = stagingDir.getFileSystem(hadoopConf)
+        if (!FileUtils.mkdir(fs, stagingDir, true, hadoopConf)) {
+          throw new IllegalStateException(
+            "Cannot create staging directory  '" + stagingDir.toString + "'")
+        }
+        fs.deleteOnExit(stagingDir)
+      }
+    } catch {
+      case e: IOException =>
+        throw QueryExecutionErrors.cannotCreateStagingDirError(
+          s"'${stagingDirForCreating.toString}': ${e.getMessage}", e)
+    }
+  }
+
+  def deleteIfNotStagingDir(path: Path, fs: FileSystem): Unit = {
+    if (Option(path) != stagingDirForCreating) fs.delete(path, true)
+  }
+}
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala
index fb15432013d..bd6278473a7 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala
@@ -55,7 +55,7 @@ case class InsertIntoHiveDirCommand(
     storage: CatalogStorageFormat,
     query: LogicalPlan,
     overwrite: Boolean,
-    outputColumnNames: Seq[String]) extends SaveAsHiveFile {
+    outputColumnNames: Seq[String]) extends SaveAsHiveFile with 
V1WritesHiveUtils {
 
   override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = {
     assert(storage.locationUri.nonEmpty)
@@ -99,21 +99,24 @@ case class InsertIntoHiveDirCommand(
     }
 
     // The temporary path must be a HDFS path, not a local path.
-    val tmpPath = getExternalTmpPath(sparkSession, hadoopConf, qualifiedPath)
+    val hiveTempPath = new HiveTempPath(sparkSession, hadoopConf, 
qualifiedPath)
+    val tmpPath = hiveTempPath.externalTempPath
     val fileSinkConf = new org.apache.spark.sql.hive.HiveShim.ShimFileSinkDesc(
       tmpPath.toString, tableDesc, false)
+    setupHadoopConfForCompression(fileSinkConf, hadoopConf, sparkSession)
+    hiveTempPath.createTmpPath()
 
     try {
       saveAsHiveFile(
         sparkSession = sparkSession,
         plan = child,
         hadoopConf = hadoopConf,
-        fileSinkConf = fileSinkConf,
+        fileFormat = new HiveFileFormat(fileSinkConf),
         outputLocation = tmpPath.toString)
 
       if (overwrite && fs.exists(writeToPath)) {
         fs.listStatus(writeToPath).foreach { existFile =>
-          if (Option(existFile.getPath) != createdTempDir) 
fs.delete(existFile.getPath, true)
+          hiveTempPath.deleteIfNotStagingDir(existFile.getPath, fs)
         }
       }
 
@@ -131,7 +134,7 @@ case class InsertIntoHiveDirCommand(
         throw new SparkException(
           "Failed inserting overwrite directory " + storage.locationUri.get, e)
     } finally {
-      deleteExternalTmpPath(hadoopConf)
+      hiveTempPath.deleteTmpPath()
     }
 
     Seq.empty[Row]
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 8c3aa0a80c1..6785b5d96d9 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -22,15 +22,17 @@ import org.apache.hadoop.fs.Path
 import org.apache.hadoop.hive.conf.HiveConf
 import org.apache.hadoop.hive.ql.plan.TableDesc
 
+import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{Row, SparkSession}
 import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.command.CommandUtils
-import org.apache.spark.sql.execution.datasources.{V1WriteCommand, 
V1WritesUtils}
+import org.apache.spark.sql.execution.datasources.{FileFormat, V1WriteCommand, 
V1WritesUtils}
 import org.apache.spark.sql.hive.HiveExternalCatalog
 import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
 import org.apache.spark.sql.hive.client.HiveClientImpl
@@ -73,16 +75,20 @@ case class InsertIntoHiveTable(
     query: LogicalPlan,
     overwrite: Boolean,
     ifPartitionNotExists: Boolean,
-    outputColumnNames: Seq[String]
+    outputColumnNames: Seq[String],
+    partitionColumns: Seq[Attribute],
+    bucketSpec: Option[BucketSpec],
+    options: Map[String, String],
+    fileFormat: FileFormat,
+    @transient hiveTmpPath: HiveTempPath
   ) extends SaveAsHiveFile with V1WriteCommand with V1WritesHiveUtils {
 
-  override lazy val partitionColumns: Seq[Attribute] = {
-    getDynamicPartitionColumns(table, partition, query)
+  override def staticPartitions: TablePartitionSpec = {
+    partition.filter(_._2.nonEmpty).map { case (k, v) => k -> v.get }
   }
 
   override def requiredOrdering: Seq[SortOrder] = {
-    val options = getOptionsWithHiveBucketWrite(table.bucketSpec)
-    V1WritesUtils.getSortOrder(outputColumns, partitionColumns, 
table.bucketSpec, options)
+    V1WritesUtils.getSortOrder(outputColumns, partitionColumns, bucketSpec, 
options)
   }
 
   /**
@@ -92,29 +98,16 @@ case class InsertIntoHiveTable(
    */
   override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = {
     val externalCatalog = sparkSession.sharedState.externalCatalog
-    val hadoopConf = sparkSession.sessionState.newHadoopConf()
-
-    val hiveQlTable = HiveClientImpl.toHiveTable(table)
-    // Have to pass the TableDesc object to RDD.mapPartitions and then 
instantiate new serializer
-    // instances within the closure, since Serializer is not serializable 
while TableDesc is.
-    val tableDesc = new TableDesc(
-      hiveQlTable.getInputFormatClass,
-      // The class of table should be org.apache.hadoop.hive.ql.metadata.Table 
because
-      // getOutputFormatClass will use 
HiveFileFormatUtils.getOutputFormatSubstitute to
-      // substitute some output formats, e.g. substituting 
SequenceFileOutputFormat to
-      // HiveSequenceFileOutputFormat.
-      hiveQlTable.getOutputFormatClass,
-      hiveQlTable.getMetadata
-    )
-    val tableLocation = hiveQlTable.getDataLocation
-    val tmpLocation = getExternalTmpPath(sparkSession, hadoopConf, 
tableLocation)
+    val hadoopConf = hiveTmpPath.hadoopConf
+    val tmpLocation = hiveTmpPath.externalTempPath
 
+    hiveTmpPath.createTmpPath()
     try {
-      processInsert(sparkSession, externalCatalog, hadoopConf, tableDesc, 
tmpLocation, child)
+      processInsert(sparkSession, externalCatalog, hadoopConf, tmpLocation, 
child)
     } finally {
       // Attempt to delete the staging directory and the inclusive files. If 
failed, the files are
       // expected to be dropped at the normal termination of VM since 
deleteOnExit is used.
-      deleteExternalTmpPath(hadoopConf)
+      hiveTmpPath.deleteTmpPath()
     }
 
     // un-cache this table.
@@ -134,23 +127,21 @@ case class InsertIntoHiveTable(
       sparkSession: SparkSession,
       externalCatalog: ExternalCatalog,
       hadoopConf: Configuration,
-      tableDesc: TableDesc,
       tmpLocation: Path,
       child: SparkPlan): Unit = {
-    val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false)
 
     val numDynamicPartitions = partition.values.count(_.isEmpty)
     val partitionSpec = getPartitionSpec(partition)
-    val partitionAttributes = getDynamicPartitionColumns(table, partition, 
query)
 
     val writtenParts = saveAsHiveFile(
       sparkSession = sparkSession,
       plan = child,
       hadoopConf = hadoopConf,
-      fileSinkConf = fileSinkConf,
+      fileFormat = fileFormat,
       outputLocation = tmpLocation.toString,
-      partitionAttributes = partitionAttributes,
-      bucketSpec = table.bucketSpec)
+      partitionAttributes = partitionColumns,
+      bucketSpec = bucketSpec,
+      options = options)
 
     if (partition.nonEmpty) {
       if (numDynamicPartitions > 0) {
@@ -294,3 +285,40 @@ case class InsertIntoHiveTable(
   override protected def withNewChildInternal(newChild: LogicalPlan): 
InsertIntoHiveTable =
     copy(query = newChild)
 }
+
+object InsertIntoHiveTable extends V1WritesHiveUtils with Logging {
+  def apply(
+      table: CatalogTable,
+      partition: Map[String, Option[String]],
+      query: LogicalPlan,
+      overwrite: Boolean,
+      ifPartitionNotExists: Boolean,
+      outputColumnNames: Seq[String]): InsertIntoHiveTable = {
+    val sparkSession = SparkSession.getActiveSession.orNull
+    val hiveQlTable = HiveClientImpl.toHiveTable(table)
+    // Have to pass the TableDesc object to RDD.mapPartitions and then 
instantiate new serializer
+    // instances within the closure, since Serializer is not serializable 
while TableDesc is.
+    val tableDesc = new TableDesc(
+      hiveQlTable.getInputFormatClass,
+      // The class of table should be org.apache.hadoop.hive.ql.metadata.Table 
because
+      // getOutputFormatClass will use 
HiveFileFormatUtils.getOutputFormatSubstitute to
+      // substitute some output formats, e.g. substituting 
SequenceFileOutputFormat to
+      // HiveSequenceFileOutputFormat.
+      hiveQlTable.getOutputFormatClass,
+      hiveQlTable.getMetadata
+    )
+    val hadoopConf = sparkSession.sessionState.newHadoopConf()
+    val tableLocation = hiveQlTable.getDataLocation
+    val hiveTempPath = new HiveTempPath(sparkSession, hadoopConf, 
tableLocation)
+    val fileSinkConf = new 
FileSinkDesc(hiveTempPath.externalTempPath.toString, tableDesc, false)
+    setupHadoopConfForCompression(fileSinkConf, hadoopConf, sparkSession)
+    val fileFormat: FileFormat = new HiveFileFormat(fileSinkConf)
+
+    val partitionColumns = getDynamicPartitionColumns(table, partition, query)
+    val bucketSpec = table.bucketSpec
+    val options = getOptionsWithHiveBucketWrite(bucketSpec)
+
+    new InsertIntoHiveTable(table, partition, query, overwrite, 
ifPartitionNotExists,
+      outputColumnNames, partitionColumns, bucketSpec, options, fileFormat, 
hiveTempPath)
+  }
+}
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
index 799cea42e1e..47d402c2e8b 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
@@ -17,81 +17,40 @@
 
 package org.apache.spark.sql.hive.execution
 
-import java.io.IOException
-import java.net.URI
-import java.text.SimpleDateFormat
-import java.util.{Date, Locale, Random}
-
-import scala.util.control.NonFatal
-
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.hive.common.FileUtils
-import org.apache.hadoop.hive.ql.exec.TaskRunner
 
 import org.apache.spark.internal.io.FileCommitProtocol
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.catalog.BucketSpec
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.command.DataWritingCommand
-import org.apache.spark.sql.execution.datasources.FileFormatWriter
-import org.apache.spark.sql.hive.HiveExternalCatalog
-import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
-import org.apache.spark.sql.hive.client.HiveVersion
+import org.apache.spark.sql.execution.datasources.{FileFormat, 
FileFormatWriter}
 
 // Base trait from which all hive insert statement physical execution extends.
-private[hive] trait SaveAsHiveFile extends DataWritingCommand with 
V1WritesHiveUtils {
-
-  var createdTempDir: Option[Path] = None
+private[hive] trait SaveAsHiveFile extends DataWritingCommand {
 
   protected def saveAsHiveFile(
       sparkSession: SparkSession,
       plan: SparkPlan,
       hadoopConf: Configuration,
-      fileSinkConf: FileSinkDesc,
+      fileFormat: FileFormat,
       outputLocation: String,
       customPartitionLocations: Map[TablePartitionSpec, String] = Map.empty,
       partitionAttributes: Seq[Attribute] = Nil,
-      bucketSpec: Option[BucketSpec] = None): Set[String] = {
-
-    val isCompressed =
-      
fileSinkConf.getTableInfo.getOutputFileFormatClassName.toLowerCase(Locale.ROOT) 
match {
-        case formatName if formatName.endsWith("orcoutputformat") =>
-          // For ORC,"mapreduce.output.fileoutputformat.compress",
-          // "mapreduce.output.fileoutputformat.compress.codec", and
-          // "mapreduce.output.fileoutputformat.compress.type"
-          // have no impact because it uses table properties to store 
compression information.
-          false
-        case _ => hadoopConf.get("hive.exec.compress.output", 
"false").toBoolean
-    }
-
-    if (isCompressed) {
-      hadoopConf.set("mapreduce.output.fileoutputformat.compress", "true")
-      fileSinkConf.setCompressed(true)
-      fileSinkConf.setCompressCodec(hadoopConf
-        .get("mapreduce.output.fileoutputformat.compress.codec"))
-      fileSinkConf.setCompressType(hadoopConf
-        .get("mapreduce.output.fileoutputformat.compress.type"))
-    } else {
-      // Set compression by priority
-      HiveOptions.getHiveWriteCompression(fileSinkConf.getTableInfo, 
sparkSession.sessionState.conf)
-        .foreach { case (compression, codec) => hadoopConf.set(compression, 
codec) }
-    }
+      bucketSpec: Option[BucketSpec] = None,
+      options: Map[String, String] = Map.empty): Set[String] = {
 
     val committer = FileCommitProtocol.instantiate(
       sparkSession.sessionState.conf.fileCommitProtocolClass,
       jobId = java.util.UUID.randomUUID().toString,
       outputPath = outputLocation)
 
-    val options = getOptionsWithHiveBucketWrite(bucketSpec)
-
     FileFormatWriter.write(
       sparkSession = sparkSession,
       plan = plan,
-      fileFormat = new HiveFileFormat(fileSinkConf),
+      fileFormat = fileFormat,
       committer = committer,
       outputSpec =
         FileFormatWriter.OutputSpec(outputLocation, customPartitionLocations, 
outputColumns),
@@ -101,173 +60,5 @@ private[hive] trait SaveAsHiveFile extends 
DataWritingCommand with V1WritesHiveU
       statsTrackers = Seq(basicWriteJobStatsTracker(hadoopConf)),
       options = options)
   }
-
-  protected def getExternalTmpPath(
-      sparkSession: SparkSession,
-      hadoopConf: Configuration,
-      path: Path): Path = {
-    import org.apache.spark.sql.hive.client.hive._
-
-    // Before Hive 1.1, when inserting into a table, Hive will create the 
staging directory under
-    // a common scratch directory. After the writing is finished, Hive will 
simply empty the table
-    // directory and move the staging directory to it.
-    // After Hive 1.1, Hive will create the staging directory under the table 
directory, and when
-    // moving staging directory to table directory, Hive will still empty the 
table directory, but
-    // will exclude the staging directory there.
-    // We have to follow the Hive behavior here, to avoid troubles. For 
example, if we create
-    // staging directory under the table director for Hive prior to 1.1, the 
staging directory will
-    // be removed by Hive when Hive is trying to empty the table directory.
-    val hiveVersionsUsingOldExternalTempPath: Set[HiveVersion] = Set(v12, v13, 
v14, v1_0)
-    val hiveVersionsUsingNewExternalTempPath: Set[HiveVersion] =
-      Set(v1_1, v1_2, v2_0, v2_1, v2_2, v2_3, v3_0, v3_1)
-
-    // Ensure all the supported versions are considered here.
-    assert(hiveVersionsUsingNewExternalTempPath ++ 
hiveVersionsUsingOldExternalTempPath ==
-      allSupportedHiveVersions)
-
-    val externalCatalog = sparkSession.sharedState.externalCatalog
-    val hiveVersion = 
externalCatalog.unwrapped.asInstanceOf[HiveExternalCatalog].client.version
-    val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging")
-    val scratchDir = hadoopConf.get("hive.exec.scratchdir", "/tmp/hive")
-
-    if (hiveVersionsUsingOldExternalTempPath.contains(hiveVersion)) {
-      oldVersionExternalTempPath(path, hadoopConf, scratchDir)
-    } else if (hiveVersionsUsingNewExternalTempPath.contains(hiveVersion)) {
-      newVersionExternalTempPath(path, hadoopConf, stagingDir)
-    } else {
-      throw new IllegalStateException("Unsupported hive version: " + 
hiveVersion.fullVersion)
-    }
-  }
-
-  protected def deleteExternalTmpPath(hadoopConf: Configuration) : Unit = {
-    // Attempt to delete the staging directory and the inclusive files. If 
failed, the files are
-    // expected to be dropped at the normal termination of VM since 
deleteOnExit is used.
-    try {
-      createdTempDir.foreach { path =>
-        val fs = path.getFileSystem(hadoopConf)
-        if (fs.delete(path, true)) {
-          // If we successfully delete the staging directory, remove it from 
FileSystem's cache.
-          fs.cancelDeleteOnExit(path)
-        }
-      }
-    } catch {
-      case NonFatal(e) =>
-        val stagingDir = hadoopConf.get("hive.exec.stagingdir", 
".hive-staging")
-        logWarning(s"Unable to delete staging directory: $stagingDir.\n" + e)
-    }
-  }
-
-  // Mostly copied from Context.java#getExternalTmpPath of Hive 0.13
-  private def oldVersionExternalTempPath(
-      path: Path,
-      hadoopConf: Configuration,
-      scratchDir: String): Path = {
-    val extURI: URI = path.toUri
-    val scratchPath = new Path(scratchDir, executionId)
-    var dirPath = new Path(
-      extURI.getScheme,
-      extURI.getAuthority,
-      scratchPath.toUri.getPath + "-" + TaskRunner.getTaskRunnerID())
-
-    try {
-      val fs: FileSystem = dirPath.getFileSystem(hadoopConf)
-      dirPath = new Path(fs.makeQualified(dirPath).toString())
-
-      if (!FileUtils.mkdir(fs, dirPath, true, hadoopConf)) {
-        throw new IllegalStateException("Cannot create staging directory: " + 
dirPath.toString)
-      }
-      createdTempDir = Some(dirPath)
-      fs.deleteOnExit(dirPath)
-    } catch {
-      case e: IOException =>
-        throw 
QueryExecutionErrors.cannotCreateStagingDirError(dirPath.toString, e)
-    }
-    dirPath
-  }
-
-  // Mostly copied from Context.java#getExternalTmpPath of Hive 1.2
-  private def newVersionExternalTempPath(
-      path: Path,
-      hadoopConf: Configuration,
-      stagingDir: String): Path = {
-    val extURI: URI = path.toUri
-    if (extURI.getScheme == "viewfs") {
-      getExtTmpPathRelTo(path, hadoopConf, stagingDir)
-    } else {
-      new Path(getExternalScratchDir(extURI, hadoopConf, stagingDir), 
"-ext-10000")
-    }
-  }
-
-  private def getExtTmpPathRelTo(
-      path: Path,
-      hadoopConf: Configuration,
-      stagingDir: String): Path = {
-    new Path(getStagingDir(path, hadoopConf, stagingDir), "-ext-10000") // 
Hive uses 10000
-  }
-
-  private def getExternalScratchDir(
-      extURI: URI,
-      hadoopConf: Configuration,
-      stagingDir: String): Path = {
-    getStagingDir(
-      new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath),
-      hadoopConf,
-      stagingDir)
-  }
-
-  private[hive] def getStagingDir(
-      inputPath: Path,
-      hadoopConf: Configuration,
-      stagingDir: String): Path = {
-    val inputPathName: String = inputPath.toString
-    val fs: FileSystem = inputPath.getFileSystem(hadoopConf)
-    var stagingPathName: String =
-      if (inputPathName.indexOf(stagingDir) == -1) {
-        new Path(inputPathName, stagingDir).toString
-      } else {
-        inputPathName.substring(0, inputPathName.indexOf(stagingDir) + 
stagingDir.length)
-      }
-
-    // SPARK-20594: This is a walk-around fix to resolve a Hive bug. Hive 
requires that the
-    // staging directory needs to avoid being deleted when users set 
hive.exec.stagingdir
-    // under the table directory.
-    if (isSubDir(new Path(stagingPathName), inputPath, fs) &&
-      
!stagingPathName.stripPrefix(inputPathName).stripPrefix("/").startsWith(".")) {
-      logDebug(s"The staging dir '$stagingPathName' should be a child 
directory starts " +
-        "with '.' to avoid being deleted if we set hive.exec.stagingdir under 
the table " +
-        "directory.")
-      stagingPathName = new Path(inputPathName, ".hive-staging").toString
-    }
-
-    val dir: Path =
-      fs.makeQualified(
-        new Path(stagingPathName + "_" + executionId + "-" + 
TaskRunner.getTaskRunnerID))
-    logDebug("Created staging dir = " + dir + " for path = " + inputPath)
-    try {
-      if (!FileUtils.mkdir(fs, dir, true, hadoopConf)) {
-        throw new IllegalStateException("Cannot create staging directory  '" + 
dir.toString + "'")
-      }
-      createdTempDir = Some(dir)
-      fs.deleteOnExit(dir)
-    } catch {
-      case e: IOException =>
-        throw QueryExecutionErrors.cannotCreateStagingDirError(
-          s"'${dir.toString}': ${e.getMessage}", e)
-    }
-    dir
-  }
-
-  // HIVE-14259 removed FileUtils.isSubDir(). Adapted it from Hive 1.2's 
FileUtils.isSubDir().
-  private def isSubDir(p1: Path, p2: Path, fs: FileSystem): Boolean = {
-    val path1 = fs.makeQualified(p1).toString + Path.SEPARATOR
-    val path2 = fs.makeQualified(p2).toString + Path.SEPARATOR
-    path1.startsWith(path2)
-  }
-
-  private def executionId: String = {
-    val rand: Random = new Random
-    val format = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss_SSS", Locale.US)
-    "hive_" + format.format(new Date) + "_" + Math.abs(rand.nextLong)
-  }
 }
 
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/V1WritesHiveUtils.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/V1WritesHiveUtils.scala
index 752753f334a..6421dd184ae 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/V1WritesHiveUtils.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/V1WritesHiveUtils.scala
@@ -19,8 +19,9 @@ package org.apache.spark.sql.hive.execution
 
 import java.util.Locale
 
+import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.hive.ql.ErrorMsg
-import org.apache.hadoop.hive.ql.plan.TableDesc
+import org.apache.hadoop.hive.ql.plan.{FileSinkDesc, TableDesc}
 
 import org.apache.spark.SparkException
 import org.apache.spark.sql.{AnalysisException, SparkSession}
@@ -32,6 +33,7 @@ import 
org.apache.spark.sql.execution.datasources.BucketingUtils
 import org.apache.spark.sql.hive.client.HiveClientImpl
 
 trait V1WritesHiveUtils {
+
   def getPartitionSpec(partition: Map[String, Option[String]]): Map[String, 
String] = {
     partition.map {
       case (key, Some(null)) => key -> 
ExternalCatalogUtils.DEFAULT_PARTITION_NAME
@@ -105,4 +107,33 @@ trait V1WritesHiveUtils {
       .map(_ => Map(BucketingUtils.optionForHiveCompatibleBucketWrite -> 
"true"))
       .getOrElse(Map.empty)
   }
+
+  def setupHadoopConfForCompression(
+      fileSinkConf: FileSinkDesc,
+      hadoopConf: Configuration,
+      sparkSession: SparkSession): Unit = {
+    val isCompressed =
+      
fileSinkConf.getTableInfo.getOutputFileFormatClassName.toLowerCase(Locale.ROOT) 
match {
+        case formatName if formatName.endsWith("orcoutputformat") =>
+          // For ORC,"mapreduce.output.fileoutputformat.compress",
+          // "mapreduce.output.fileoutputformat.compress.codec", and
+          // "mapreduce.output.fileoutputformat.compress.type"
+          // have no impact because it uses table properties to store 
compression information.
+          false
+        case _ => hadoopConf.get("hive.exec.compress.output", 
"false").toBoolean
+      }
+
+    if (isCompressed) {
+      hadoopConf.set("mapreduce.output.fileoutputformat.compress", "true")
+      fileSinkConf.setCompressed(true)
+      fileSinkConf.setCompressCodec(hadoopConf
+        .get("mapreduce.output.fileoutputformat.compress.codec"))
+      fileSinkConf.setCompressType(hadoopConf
+        .get("mapreduce.output.fileoutputformat.compress.type"))
+    } else {
+      // Set compression by priority
+      HiveOptions.getHiveWriteCompression(fileSinkConf.getTableInfo, 
sparkSession.sessionState.conf)
+        .foreach { case (compression, codec) => hadoopConf.set(compression, 
codec) }
+    }
+  }
 }
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
index f62d941746b..13f2a865936 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
@@ -27,7 +27,7 @@ import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
 import org.apache.spark.SparkException
 import org.apache.spark.sql.{QueryTest, _}
 import org.apache.spark.sql.catalyst.parser.ParseException
-import org.apache.spark.sql.hive.execution.InsertIntoHiveTable
+import org.apache.spark.sql.hive.execution.HiveTempPath
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SQLTestUtils
@@ -541,25 +541,24 @@ class InsertSuite extends QueryTest with 
TestHiveSingleton with BeforeAndAfter
     val conf = spark.sessionState.newHadoopConf()
     val inputPath = new Path("/tmp/b/c")
     var stagingDir = "tmp/b"
-    val saveHiveFile = InsertIntoHiveTable(null, Map.empty, null, false, 
false, null)
-    val getStagingDir = PrivateMethod[Path](Symbol("getStagingDir"))
-    var path = saveHiveFile invokePrivate getStagingDir(inputPath, conf, 
stagingDir)
+    val hiveTempPath = new HiveTempPath(null, conf, null)
+    var path = hiveTempPath.getStagingDir(inputPath, stagingDir)
     assert(path.toString.indexOf("/tmp/b_hive_") != -1)
 
     stagingDir = "tmp/b/c"
-    path = saveHiveFile invokePrivate getStagingDir(inputPath, conf, 
stagingDir)
+    path = hiveTempPath.getStagingDir(inputPath, stagingDir)
     assert(path.toString.indexOf("/tmp/b/c/.hive-staging_hive_") != -1)
 
     stagingDir = "d/e"
-    path = saveHiveFile invokePrivate getStagingDir(inputPath, conf, 
stagingDir)
+    path = hiveTempPath.getStagingDir(inputPath, stagingDir)
     assert(path.toString.indexOf("/tmp/b/c/.hive-staging_hive_") != -1)
 
     stagingDir = ".d/e"
-    path = saveHiveFile invokePrivate getStagingDir(inputPath, conf, 
stagingDir)
+    path = hiveTempPath.getStagingDir(inputPath, stagingDir)
     assert(path.toString.indexOf("/tmp/b/c/.d/e_hive_") != -1)
 
     stagingDir = "/tmp/c/"
-    path = saveHiveFile invokePrivate getStagingDir(inputPath, conf, 
stagingDir)
+    path = hiveTempPath.getStagingDir(inputPath, stagingDir)
     assert(path.toString.indexOf("/tmp/c_hive_") != -1)
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to