This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 27e20fe9eb1 [SPARK-41708][SQL] Pull v1write information to `WriteFiles`
27e20fe9eb1 is described below
commit 27e20fe9eb1b1ef1b3d32e180de55931f31fc345
Author: ulysses-you <[email protected]>
AuthorDate: Fri Jan 6 12:13:30 2023 +0800
[SPARK-41708][SQL] Pull v1write information to `WriteFiles`
### What changes were proposed in this pull request?
This pr aims to pull out the v1write information from `V1WriteCommand` to
`WriteFiles`:
```scala
case class WriteFiles(child: LogicalPlan)
=>
case class WriteFiles(
child: LogicalPlan,
fileFormat: FileFormat,
partitionColumns: Seq[Attribute],
bucketSpec: Option[BucketSpec],
options: Map[String, String],
staticPartitions: TablePartitionSpec)
```
Also, this pr do a cleanup for `WriteSpec` which is unnecessary.
### Why are the changes needed?
After this pr, `WriteFiles` will hold write information that can help
developers
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
Pass CI
Closes #39277 from ulysses-you/SPARK-41708.
Authored-by: ulysses-you <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../org/apache/spark/sql/internal/WriteSpec.java | 33 ---
.../org/apache/spark/sql/execution/SparkPlan.scala | 9 +-
.../spark/sql/execution/SparkStrategies.scala | 5 +-
.../spark/sql/execution/datasources/V1Writes.scala | 24 ++-
.../sql/execution/datasources/WriteFiles.scala | 26 ++-
.../org/apache/spark/sql/hive/HiveStrategies.scala | 3 +-
.../{SaveAsHiveFile.scala => HiveTempPath.scala} | 204 ++++++-------------
.../hive/execution/InsertIntoHiveDirCommand.scala | 13 +-
.../sql/hive/execution/InsertIntoHiveTable.scala | 88 +++++---
.../spark/sql/hive/execution/SaveAsHiveFile.scala | 221 +--------------------
.../sql/hive/execution/V1WritesHiveUtils.scala | 33 ++-
.../org/apache/spark/sql/hive/InsertSuite.scala | 15 +-
12 files changed, 224 insertions(+), 450 deletions(-)
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/internal/WriteSpec.java
b/sql/core/src/main/java/org/apache/spark/sql/internal/WriteSpec.java
deleted file mode 100644
index c51a3ed7dc6..00000000000
--- a/sql/core/src/main/java/org/apache/spark/sql/internal/WriteSpec.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.internal;
-
-import java.io.Serializable;
-
-/**
- * Write spec is a input parameter of
- * {@link org.apache.spark.sql.execution.SparkPlan#executeWrite}.
- *
- * <p>
- * This is an empty interface, the concrete class which implements
- * {@link org.apache.spark.sql.execution.SparkPlan#doExecuteWrite}
- * should define its own class and use it.
- *
- * @since 3.4.0
- */
-public interface WriteSpec extends Serializable {}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index 401302e5bde..5ca36a8a216 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -36,8 +36,9 @@ import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.catalyst.trees.{BinaryLike, LeafLike, TreeNodeTag,
UnaryLike}
import org.apache.spark.sql.connector.write.WriterCommitMessage
import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.execution.datasources.WriteFilesSpec
import org.apache.spark.sql.execution.metric.SQLMetric
-import org.apache.spark.sql.internal.{SQLConf, WriteSpec}
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.NextIterator
import org.apache.spark.util.io.{ChunkedByteBuffer,
ChunkedByteBufferOutputStream}
@@ -230,11 +231,11 @@ abstract class SparkPlan extends QueryPlan[SparkPlan]
with Logging with Serializ
*
* Concrete implementations of SparkPlan should override `doExecuteWrite`.
*/
- def executeWrite(writeSpec: WriteSpec): RDD[WriterCommitMessage] =
executeQuery {
+ def executeWrite(writeFilesSpec: WriteFilesSpec): RDD[WriterCommitMessage] =
executeQuery {
if (isCanonicalizedPlan) {
throw SparkException.internalError("A canonicalized plan is not supposed
to be executed.")
}
- doExecuteWrite(writeSpec)
+ doExecuteWrite(writeFilesSpec)
}
/**
@@ -343,7 +344,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with
Logging with Serializ
*
* Overridden by concrete implementations of SparkPlan.
*/
- protected def doExecuteWrite(writeSpec: WriteSpec): RDD[WriterCommitMessage]
= {
+ protected def doExecuteWrite(writeFilesSpec: WriteFilesSpec):
RDD[WriterCommitMessage] = {
throw SparkException.internalError(s"Internal Error ${this.getClass} has
write support" +
s" mismatch:\n${this}")
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 51a0c837c3e..110fe45cc12 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -895,8 +895,9 @@ abstract class SparkStrategies extends
QueryPlanner[SparkPlan] {
throw QueryExecutionErrors.ddlUnsupportedTemporarilyError("MERGE INTO
TABLE")
case logical.CollectMetrics(name, metrics, child) =>
execution.CollectMetricsExec(name, metrics, planLater(child)) :: Nil
- case WriteFiles(child) =>
- WriteFilesExec(planLater(child)) :: Nil
+ case WriteFiles(child, fileFormat, partitionColumns, bucket, options,
staticPartitions) =>
+ WriteFilesExec(planLater(child), fileFormat, partitionColumns, bucket,
options,
+ staticPartitions) :: Nil
case _ => Nil
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala
index 3ed04e5bd6d..7b4fa7ad80b 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources
import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Attribute,
AttributeMap, AttributeSet, BitwiseAnd, Expression, HiveHash, Literal,
NamedExpression, Pmod, SortOrder, String2StringExpression, UnaryExpression}
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext,
ExprCode}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Sort}
@@ -31,11 +32,31 @@ import org.apache.spark.sql.types.StringType
import org.apache.spark.unsafe.types.UTF8String
trait V1WriteCommand extends DataWritingCommand {
+ /**
+ * Specify the [[FileFormat]] of the provider of V1 write command.
+ */
+ def fileFormat: FileFormat
+
/**
* Specify the partition columns of the V1 write command.
*/
def partitionColumns: Seq[Attribute]
+ /**
+ * Specify the partition spec of the V1 write command.
+ */
+ def staticPartitions: TablePartitionSpec
+
+ /**
+ * Specify the bucket spec of the V1 write command.
+ */
+ def bucketSpec: Option[BucketSpec]
+
+ /**
+ * Specify the storage options of the V1 write command.
+ */
+ def options: Map[String, String]
+
/**
* Specify the required ordering for the V1 write command.
`FileFormatWriter` will
* add SortExec if necessary when the requiredOrdering is empty.
@@ -56,7 +77,8 @@ object V1Writes extends Rule[LogicalPlan] with SQLConfHelper {
case write: V1WriteCommand if !write.child.isInstanceOf[WriteFiles] =>
val newQuery = prepareQuery(write, write.query)
val attrMap = AttributeMap(write.query.output.zip(newQuery.output))
- val newChild = WriteFiles(newQuery)
+ val newChild = WriteFiles(newQuery, write.fileFormat,
write.partitionColumns,
+ write.bucketSpec, write.options, write.staticPartitions)
val newWrite = write.withNewChildren(newChild ::
Nil).transformExpressions {
case a: Attribute if attrMap.contains(a) =>
a.withExprId(attrMap(a).exprId)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteFiles.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteFiles.scala
index 5bc8f9db32b..53d94704471 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteFiles.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteFiles.scala
@@ -23,12 +23,13 @@ import org.apache.spark.{SparkException, TaskContext}
import org.apache.spark.internal.io.{FileCommitProtocol,
SparkHadoopWriterUtils}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryNode}
import org.apache.spark.sql.connector.write.WriterCommitMessage
import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
import
org.apache.spark.sql.execution.datasources.FileFormatWriter.ConcurrentOutputWriterSpec
-import org.apache.spark.sql.internal.WriteSpec
/**
* The write files spec holds all information of [[V1WriteCommand]] if its
provider is
@@ -38,13 +39,18 @@ case class WriteFilesSpec(
description: WriteJobDescription,
committer: FileCommitProtocol,
concurrentOutputWriterSpecFunc: SparkPlan =>
Option[ConcurrentOutputWriterSpec])
- extends WriteSpec
/**
* During Optimizer, [[V1Writes]] injects the [[WriteFiles]] between
[[V1WriteCommand]] and query.
* [[WriteFiles]] must be the root plan as the child of [[V1WriteCommand]].
*/
-case class WriteFiles(child: LogicalPlan) extends UnaryNode {
+case class WriteFiles(
+ child: LogicalPlan,
+ fileFormat: FileFormat,
+ partitionColumns: Seq[Attribute],
+ bucketSpec: Option[BucketSpec],
+ options: Map[String, String],
+ staticPartitions: TablePartitionSpec) extends UnaryNode {
override def output: Seq[Attribute] = child.output
override protected def withNewChildInternal(newChild: LogicalPlan):
WriteFiles =
copy(child = newChild)
@@ -53,13 +59,17 @@ case class WriteFiles(child: LogicalPlan) extends UnaryNode
{
/**
* Responsible for writing files.
*/
-case class WriteFilesExec(child: SparkPlan) extends UnaryExecNode {
+case class WriteFilesExec(
+ child: SparkPlan,
+ fileFormat: FileFormat,
+ partitionColumns: Seq[Attribute],
+ bucketSpec: Option[BucketSpec],
+ options: Map[String, String],
+ staticPartitions: TablePartitionSpec) extends UnaryExecNode {
override def output: Seq[Attribute] = Seq.empty
- override protected def doExecuteWrite(writeSpec: WriteSpec):
RDD[WriterCommitMessage] = {
- assert(writeSpec.isInstanceOf[WriteFilesSpec])
- val writeFilesSpec: WriteFilesSpec = writeSpec.asInstanceOf[WriteFilesSpec]
-
+ override protected def doExecuteWrite(
+ writeFilesSpec: WriteFilesSpec): RDD[WriterCommitMessage] = {
val rdd = child.execute()
// SPARK-23271 If we are attempting to write a zero partition rdd, create
a dummy single
// partition rdd to make sure we at least set up one write task to write
the metadata.
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index af727f966e5..6c5646a2416 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -238,7 +238,8 @@ case class RelationConversions(
// that only matches table insertion inside Hive CTAS.
// This pattern would not cause conflicts because this rule is always
applied before
// `HiveAnalysis` and both of these rules are running once.
- case InsertIntoHiveTable(tableDesc, _, query, overwrite,
ifPartitionNotExists, _)
+ case InsertIntoHiveTable(
+ tableDesc, _, query, overwrite, ifPartitionNotExists, _, _, _, _, _, _)
if query.resolved && DDLUtils.isHiveTable(tableDesc) &&
tableDesc.partitionColumnNames.isEmpty && isConvertible(tableDesc)
&&
conf.getConf(HiveUtils.CONVERT_METASTORE_CTAS) =>
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTempPath.scala
similarity index 54%
copy from
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
copy to
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTempPath.scala
index 799cea42e1e..9981ae4cc31 100644
---
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
+++
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTempPath.scala
@@ -29,83 +29,19 @@ import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hive.common.FileUtils
import org.apache.hadoop.hive.ql.exec.TaskRunner
-import org.apache.spark.internal.io.FileCommitProtocol
+import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.catalog.BucketSpec
-import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
-import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.errors.QueryExecutionErrors
-import org.apache.spark.sql.execution.SparkPlan
-import org.apache.spark.sql.execution.command.DataWritingCommand
-import org.apache.spark.sql.execution.datasources.FileFormatWriter
import org.apache.spark.sql.hive.HiveExternalCatalog
-import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
import org.apache.spark.sql.hive.client.HiveVersion
-// Base trait from which all hive insert statement physical execution extends.
-private[hive] trait SaveAsHiveFile extends DataWritingCommand with
V1WritesHiveUtils {
+class HiveTempPath(session: SparkSession, val hadoopConf: Configuration, path:
Path)
+ extends Logging {
+ private var stagingDirForCreating: Option[Path] = None
- var createdTempDir: Option[Path] = None
+ lazy val externalTempPath: Path = getExternalTmpPath(path)
- protected def saveAsHiveFile(
- sparkSession: SparkSession,
- plan: SparkPlan,
- hadoopConf: Configuration,
- fileSinkConf: FileSinkDesc,
- outputLocation: String,
- customPartitionLocations: Map[TablePartitionSpec, String] = Map.empty,
- partitionAttributes: Seq[Attribute] = Nil,
- bucketSpec: Option[BucketSpec] = None): Set[String] = {
-
- val isCompressed =
-
fileSinkConf.getTableInfo.getOutputFileFormatClassName.toLowerCase(Locale.ROOT)
match {
- case formatName if formatName.endsWith("orcoutputformat") =>
- // For ORC,"mapreduce.output.fileoutputformat.compress",
- // "mapreduce.output.fileoutputformat.compress.codec", and
- // "mapreduce.output.fileoutputformat.compress.type"
- // have no impact because it uses table properties to store
compression information.
- false
- case _ => hadoopConf.get("hive.exec.compress.output",
"false").toBoolean
- }
-
- if (isCompressed) {
- hadoopConf.set("mapreduce.output.fileoutputformat.compress", "true")
- fileSinkConf.setCompressed(true)
- fileSinkConf.setCompressCodec(hadoopConf
- .get("mapreduce.output.fileoutputformat.compress.codec"))
- fileSinkConf.setCompressType(hadoopConf
- .get("mapreduce.output.fileoutputformat.compress.type"))
- } else {
- // Set compression by priority
- HiveOptions.getHiveWriteCompression(fileSinkConf.getTableInfo,
sparkSession.sessionState.conf)
- .foreach { case (compression, codec) => hadoopConf.set(compression,
codec) }
- }
-
- val committer = FileCommitProtocol.instantiate(
- sparkSession.sessionState.conf.fileCommitProtocolClass,
- jobId = java.util.UUID.randomUUID().toString,
- outputPath = outputLocation)
-
- val options = getOptionsWithHiveBucketWrite(bucketSpec)
-
- FileFormatWriter.write(
- sparkSession = sparkSession,
- plan = plan,
- fileFormat = new HiveFileFormat(fileSinkConf),
- committer = committer,
- outputSpec =
- FileFormatWriter.OutputSpec(outputLocation, customPartitionLocations,
outputColumns),
- hadoopConf = hadoopConf,
- partitionColumns = partitionAttributes,
- bucketSpec = bucketSpec,
- statsTrackers = Seq(basicWriteJobStatsTracker(hadoopConf)),
- options = options)
- }
-
- protected def getExternalTmpPath(
- sparkSession: SparkSession,
- hadoopConf: Configuration,
- path: Path): Path = {
+ private def getExternalTmpPath(path: Path): Path = {
import org.apache.spark.sql.hive.client.hive._
// Before Hive 1.1, when inserting into a table, Hive will create the
staging directory under
@@ -125,43 +61,22 @@ private[hive] trait SaveAsHiveFile extends
DataWritingCommand with V1WritesHiveU
assert(hiveVersionsUsingNewExternalTempPath ++
hiveVersionsUsingOldExternalTempPath ==
allSupportedHiveVersions)
- val externalCatalog = sparkSession.sharedState.externalCatalog
+ val externalCatalog = session.sharedState.externalCatalog
val hiveVersion =
externalCatalog.unwrapped.asInstanceOf[HiveExternalCatalog].client.version
val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging")
val scratchDir = hadoopConf.get("hive.exec.scratchdir", "/tmp/hive")
if (hiveVersionsUsingOldExternalTempPath.contains(hiveVersion)) {
- oldVersionExternalTempPath(path, hadoopConf, scratchDir)
+ oldVersionExternalTempPath(path, scratchDir)
} else if (hiveVersionsUsingNewExternalTempPath.contains(hiveVersion)) {
- newVersionExternalTempPath(path, hadoopConf, stagingDir)
+ newVersionExternalTempPath(path, stagingDir)
} else {
throw new IllegalStateException("Unsupported hive version: " +
hiveVersion.fullVersion)
}
}
- protected def deleteExternalTmpPath(hadoopConf: Configuration) : Unit = {
- // Attempt to delete the staging directory and the inclusive files. If
failed, the files are
- // expected to be dropped at the normal termination of VM since
deleteOnExit is used.
- try {
- createdTempDir.foreach { path =>
- val fs = path.getFileSystem(hadoopConf)
- if (fs.delete(path, true)) {
- // If we successfully delete the staging directory, remove it from
FileSystem's cache.
- fs.cancelDeleteOnExit(path)
- }
- }
- } catch {
- case NonFatal(e) =>
- val stagingDir = hadoopConf.get("hive.exec.stagingdir",
".hive-staging")
- logWarning(s"Unable to delete staging directory: $stagingDir.\n" + e)
- }
- }
-
// Mostly copied from Context.java#getExternalTmpPath of Hive 0.13
- private def oldVersionExternalTempPath(
- path: Path,
- hadoopConf: Configuration,
- scratchDir: String): Path = {
+ private def oldVersionExternalTempPath(path: Path, scratchDir: String): Path
= {
val extURI: URI = path.toUri
val scratchPath = new Path(scratchDir, executionId)
var dirPath = new Path(
@@ -169,56 +84,34 @@ private[hive] trait SaveAsHiveFile extends
DataWritingCommand with V1WritesHiveU
extURI.getAuthority,
scratchPath.toUri.getPath + "-" + TaskRunner.getTaskRunnerID())
- try {
- val fs: FileSystem = dirPath.getFileSystem(hadoopConf)
- dirPath = new Path(fs.makeQualified(dirPath).toString())
-
- if (!FileUtils.mkdir(fs, dirPath, true, hadoopConf)) {
- throw new IllegalStateException("Cannot create staging directory: " +
dirPath.toString)
- }
- createdTempDir = Some(dirPath)
- fs.deleteOnExit(dirPath)
- } catch {
- case e: IOException =>
- throw
QueryExecutionErrors.cannotCreateStagingDirError(dirPath.toString, e)
- }
+ val fs = dirPath.getFileSystem(hadoopConf)
+ dirPath = new Path(fs.makeQualified(dirPath).toString())
+ stagingDirForCreating = Some(dirPath)
dirPath
}
// Mostly copied from Context.java#getExternalTmpPath of Hive 1.2
- private def newVersionExternalTempPath(
- path: Path,
- hadoopConf: Configuration,
- stagingDir: String): Path = {
+ private def newVersionExternalTempPath(path: Path, stagingDir: String): Path
= {
val extURI: URI = path.toUri
if (extURI.getScheme == "viewfs") {
- getExtTmpPathRelTo(path, hadoopConf, stagingDir)
+ val qualifiedStagingDir = getStagingDir(path, stagingDir)
+ stagingDirForCreating = Some(qualifiedStagingDir)
+ // Hive uses 10000
+ new Path(qualifiedStagingDir, "-ext-10000")
} else {
- new Path(getExternalScratchDir(extURI, hadoopConf, stagingDir),
"-ext-10000")
+ val qualifiedStagingDir = getExternalScratchDir(extURI, stagingDir)
+ stagingDirForCreating = Some(qualifiedStagingDir)
+ new Path(qualifiedStagingDir, "-ext-10000")
}
}
- private def getExtTmpPathRelTo(
- path: Path,
- hadoopConf: Configuration,
- stagingDir: String): Path = {
- new Path(getStagingDir(path, hadoopConf, stagingDir), "-ext-10000") //
Hive uses 10000
- }
-
- private def getExternalScratchDir(
- extURI: URI,
- hadoopConf: Configuration,
- stagingDir: String): Path = {
+ private def getExternalScratchDir(extURI: URI, stagingDir: String): Path = {
getStagingDir(
new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath),
- hadoopConf,
stagingDir)
}
- private[hive] def getStagingDir(
- inputPath: Path,
- hadoopConf: Configuration,
- stagingDir: String): Path = {
+ private[hive] def getStagingDir(inputPath: Path, stagingDir: String): Path =
{
val inputPathName: String = inputPath.toString
val fs: FileSystem = inputPath.getFileSystem(hadoopConf)
var stagingPathName: String =
@@ -243,17 +136,6 @@ private[hive] trait SaveAsHiveFile extends
DataWritingCommand with V1WritesHiveU
fs.makeQualified(
new Path(stagingPathName + "_" + executionId + "-" +
TaskRunner.getTaskRunnerID))
logDebug("Created staging dir = " + dir + " for path = " + inputPath)
- try {
- if (!FileUtils.mkdir(fs, dir, true, hadoopConf)) {
- throw new IllegalStateException("Cannot create staging directory '" +
dir.toString + "'")
- }
- createdTempDir = Some(dir)
- fs.deleteOnExit(dir)
- } catch {
- case e: IOException =>
- throw QueryExecutionErrors.cannotCreateStagingDirError(
- s"'${dir.toString}': ${e.getMessage}", e)
- }
dir
}
@@ -269,5 +151,43 @@ private[hive] trait SaveAsHiveFile extends
DataWritingCommand with V1WritesHiveU
val format = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss_SSS", Locale.US)
"hive_" + format.format(new Date) + "_" + Math.abs(rand.nextLong)
}
-}
+ def deleteTmpPath() : Unit = {
+ // Attempt to delete the staging directory and the inclusive files. If
failed, the files are
+ // expected to be dropped at the normal termination of VM since
deleteOnExit is used.
+ try {
+ stagingDirForCreating.foreach { stagingDir =>
+ val fs = stagingDir.getFileSystem(hadoopConf)
+ if (fs.delete(stagingDir, true)) {
+ // If we successfully delete the staging directory, remove it from
FileSystem's cache.
+ fs.cancelDeleteOnExit(stagingDir)
+ }
+ }
+ } catch {
+ case NonFatal(e) =>
+ val stagingDir = hadoopConf.get("hive.exec.stagingdir",
".hive-staging")
+ logWarning(s"Unable to delete staging directory: $stagingDir.\n" + e)
+ }
+ }
+
+ def createTmpPath(): Unit = {
+ try {
+ stagingDirForCreating.foreach { stagingDir =>
+ val fs: FileSystem = stagingDir.getFileSystem(hadoopConf)
+ if (!FileUtils.mkdir(fs, stagingDir, true, hadoopConf)) {
+ throw new IllegalStateException(
+ "Cannot create staging directory '" + stagingDir.toString + "'")
+ }
+ fs.deleteOnExit(stagingDir)
+ }
+ } catch {
+ case e: IOException =>
+ throw QueryExecutionErrors.cannotCreateStagingDirError(
+ s"'${stagingDirForCreating.toString}': ${e.getMessage}", e)
+ }
+ }
+
+ def deleteIfNotStagingDir(path: Path, fs: FileSystem): Unit = {
+ if (Option(path) != stagingDirForCreating) fs.delete(path, true)
+ }
+}
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala
index fb15432013d..bd6278473a7 100644
---
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala
+++
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala
@@ -55,7 +55,7 @@ case class InsertIntoHiveDirCommand(
storage: CatalogStorageFormat,
query: LogicalPlan,
overwrite: Boolean,
- outputColumnNames: Seq[String]) extends SaveAsHiveFile {
+ outputColumnNames: Seq[String]) extends SaveAsHiveFile with
V1WritesHiveUtils {
override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = {
assert(storage.locationUri.nonEmpty)
@@ -99,21 +99,24 @@ case class InsertIntoHiveDirCommand(
}
// The temporary path must be a HDFS path, not a local path.
- val tmpPath = getExternalTmpPath(sparkSession, hadoopConf, qualifiedPath)
+ val hiveTempPath = new HiveTempPath(sparkSession, hadoopConf,
qualifiedPath)
+ val tmpPath = hiveTempPath.externalTempPath
val fileSinkConf = new org.apache.spark.sql.hive.HiveShim.ShimFileSinkDesc(
tmpPath.toString, tableDesc, false)
+ setupHadoopConfForCompression(fileSinkConf, hadoopConf, sparkSession)
+ hiveTempPath.createTmpPath()
try {
saveAsHiveFile(
sparkSession = sparkSession,
plan = child,
hadoopConf = hadoopConf,
- fileSinkConf = fileSinkConf,
+ fileFormat = new HiveFileFormat(fileSinkConf),
outputLocation = tmpPath.toString)
if (overwrite && fs.exists(writeToPath)) {
fs.listStatus(writeToPath).foreach { existFile =>
- if (Option(existFile.getPath) != createdTempDir)
fs.delete(existFile.getPath, true)
+ hiveTempPath.deleteIfNotStagingDir(existFile.getPath, fs)
}
}
@@ -131,7 +134,7 @@ case class InsertIntoHiveDirCommand(
throw new SparkException(
"Failed inserting overwrite directory " + storage.locationUri.get, e)
} finally {
- deleteExternalTmpPath(hadoopConf)
+ hiveTempPath.deleteTmpPath()
}
Seq.empty[Row]
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 8c3aa0a80c1..6785b5d96d9 100644
---
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -22,15 +22,17 @@ import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.ql.plan.TableDesc
+import org.apache.spark.internal.Logging
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.command.CommandUtils
-import org.apache.spark.sql.execution.datasources.{V1WriteCommand,
V1WritesUtils}
+import org.apache.spark.sql.execution.datasources.{FileFormat, V1WriteCommand,
V1WritesUtils}
import org.apache.spark.sql.hive.HiveExternalCatalog
import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
import org.apache.spark.sql.hive.client.HiveClientImpl
@@ -73,16 +75,20 @@ case class InsertIntoHiveTable(
query: LogicalPlan,
overwrite: Boolean,
ifPartitionNotExists: Boolean,
- outputColumnNames: Seq[String]
+ outputColumnNames: Seq[String],
+ partitionColumns: Seq[Attribute],
+ bucketSpec: Option[BucketSpec],
+ options: Map[String, String],
+ fileFormat: FileFormat,
+ @transient hiveTmpPath: HiveTempPath
) extends SaveAsHiveFile with V1WriteCommand with V1WritesHiveUtils {
- override lazy val partitionColumns: Seq[Attribute] = {
- getDynamicPartitionColumns(table, partition, query)
+ override def staticPartitions: TablePartitionSpec = {
+ partition.filter(_._2.nonEmpty).map { case (k, v) => k -> v.get }
}
override def requiredOrdering: Seq[SortOrder] = {
- val options = getOptionsWithHiveBucketWrite(table.bucketSpec)
- V1WritesUtils.getSortOrder(outputColumns, partitionColumns,
table.bucketSpec, options)
+ V1WritesUtils.getSortOrder(outputColumns, partitionColumns, bucketSpec,
options)
}
/**
@@ -92,29 +98,16 @@ case class InsertIntoHiveTable(
*/
override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = {
val externalCatalog = sparkSession.sharedState.externalCatalog
- val hadoopConf = sparkSession.sessionState.newHadoopConf()
-
- val hiveQlTable = HiveClientImpl.toHiveTable(table)
- // Have to pass the TableDesc object to RDD.mapPartitions and then
instantiate new serializer
- // instances within the closure, since Serializer is not serializable
while TableDesc is.
- val tableDesc = new TableDesc(
- hiveQlTable.getInputFormatClass,
- // The class of table should be org.apache.hadoop.hive.ql.metadata.Table
because
- // getOutputFormatClass will use
HiveFileFormatUtils.getOutputFormatSubstitute to
- // substitute some output formats, e.g. substituting
SequenceFileOutputFormat to
- // HiveSequenceFileOutputFormat.
- hiveQlTable.getOutputFormatClass,
- hiveQlTable.getMetadata
- )
- val tableLocation = hiveQlTable.getDataLocation
- val tmpLocation = getExternalTmpPath(sparkSession, hadoopConf,
tableLocation)
+ val hadoopConf = hiveTmpPath.hadoopConf
+ val tmpLocation = hiveTmpPath.externalTempPath
+ hiveTmpPath.createTmpPath()
try {
- processInsert(sparkSession, externalCatalog, hadoopConf, tableDesc,
tmpLocation, child)
+ processInsert(sparkSession, externalCatalog, hadoopConf, tmpLocation,
child)
} finally {
// Attempt to delete the staging directory and the inclusive files. If
failed, the files are
// expected to be dropped at the normal termination of VM since
deleteOnExit is used.
- deleteExternalTmpPath(hadoopConf)
+ hiveTmpPath.deleteTmpPath()
}
// un-cache this table.
@@ -134,23 +127,21 @@ case class InsertIntoHiveTable(
sparkSession: SparkSession,
externalCatalog: ExternalCatalog,
hadoopConf: Configuration,
- tableDesc: TableDesc,
tmpLocation: Path,
child: SparkPlan): Unit = {
- val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false)
val numDynamicPartitions = partition.values.count(_.isEmpty)
val partitionSpec = getPartitionSpec(partition)
- val partitionAttributes = getDynamicPartitionColumns(table, partition,
query)
val writtenParts = saveAsHiveFile(
sparkSession = sparkSession,
plan = child,
hadoopConf = hadoopConf,
- fileSinkConf = fileSinkConf,
+ fileFormat = fileFormat,
outputLocation = tmpLocation.toString,
- partitionAttributes = partitionAttributes,
- bucketSpec = table.bucketSpec)
+ partitionAttributes = partitionColumns,
+ bucketSpec = bucketSpec,
+ options = options)
if (partition.nonEmpty) {
if (numDynamicPartitions > 0) {
@@ -294,3 +285,40 @@ case class InsertIntoHiveTable(
override protected def withNewChildInternal(newChild: LogicalPlan):
InsertIntoHiveTable =
copy(query = newChild)
}
+
+object InsertIntoHiveTable extends V1WritesHiveUtils with Logging {
+ def apply(
+ table: CatalogTable,
+ partition: Map[String, Option[String]],
+ query: LogicalPlan,
+ overwrite: Boolean,
+ ifPartitionNotExists: Boolean,
+ outputColumnNames: Seq[String]): InsertIntoHiveTable = {
+ val sparkSession = SparkSession.getActiveSession.orNull
+ val hiveQlTable = HiveClientImpl.toHiveTable(table)
+ // Have to pass the TableDesc object to RDD.mapPartitions and then
instantiate new serializer
+ // instances within the closure, since Serializer is not serializable
while TableDesc is.
+ val tableDesc = new TableDesc(
+ hiveQlTable.getInputFormatClass,
+ // The class of table should be org.apache.hadoop.hive.ql.metadata.Table
because
+ // getOutputFormatClass will use
HiveFileFormatUtils.getOutputFormatSubstitute to
+ // substitute some output formats, e.g. substituting
SequenceFileOutputFormat to
+ // HiveSequenceFileOutputFormat.
+ hiveQlTable.getOutputFormatClass,
+ hiveQlTable.getMetadata
+ )
+ val hadoopConf = sparkSession.sessionState.newHadoopConf()
+ val tableLocation = hiveQlTable.getDataLocation
+ val hiveTempPath = new HiveTempPath(sparkSession, hadoopConf,
tableLocation)
+ val fileSinkConf = new
FileSinkDesc(hiveTempPath.externalTempPath.toString, tableDesc, false)
+ setupHadoopConfForCompression(fileSinkConf, hadoopConf, sparkSession)
+ val fileFormat: FileFormat = new HiveFileFormat(fileSinkConf)
+
+ val partitionColumns = getDynamicPartitionColumns(table, partition, query)
+ val bucketSpec = table.bucketSpec
+ val options = getOptionsWithHiveBucketWrite(bucketSpec)
+
+ new InsertIntoHiveTable(table, partition, query, overwrite,
ifPartitionNotExists,
+ outputColumnNames, partitionColumns, bucketSpec, options, fileFormat,
hiveTempPath)
+ }
+}
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
index 799cea42e1e..47d402c2e8b 100644
---
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
+++
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
@@ -17,81 +17,40 @@
package org.apache.spark.sql.hive.execution
-import java.io.IOException
-import java.net.URI
-import java.text.SimpleDateFormat
-import java.util.{Date, Locale, Random}
-
-import scala.util.control.NonFatal
-
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.hive.common.FileUtils
-import org.apache.hadoop.hive.ql.exec.TaskRunner
import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.command.DataWritingCommand
-import org.apache.spark.sql.execution.datasources.FileFormatWriter
-import org.apache.spark.sql.hive.HiveExternalCatalog
-import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
-import org.apache.spark.sql.hive.client.HiveVersion
+import org.apache.spark.sql.execution.datasources.{FileFormat,
FileFormatWriter}
// Base trait from which all hive insert statement physical execution extends.
-private[hive] trait SaveAsHiveFile extends DataWritingCommand with
V1WritesHiveUtils {
-
- var createdTempDir: Option[Path] = None
+private[hive] trait SaveAsHiveFile extends DataWritingCommand {
protected def saveAsHiveFile(
sparkSession: SparkSession,
plan: SparkPlan,
hadoopConf: Configuration,
- fileSinkConf: FileSinkDesc,
+ fileFormat: FileFormat,
outputLocation: String,
customPartitionLocations: Map[TablePartitionSpec, String] = Map.empty,
partitionAttributes: Seq[Attribute] = Nil,
- bucketSpec: Option[BucketSpec] = None): Set[String] = {
-
- val isCompressed =
-
fileSinkConf.getTableInfo.getOutputFileFormatClassName.toLowerCase(Locale.ROOT)
match {
- case formatName if formatName.endsWith("orcoutputformat") =>
- // For ORC,"mapreduce.output.fileoutputformat.compress",
- // "mapreduce.output.fileoutputformat.compress.codec", and
- // "mapreduce.output.fileoutputformat.compress.type"
- // have no impact because it uses table properties to store
compression information.
- false
- case _ => hadoopConf.get("hive.exec.compress.output",
"false").toBoolean
- }
-
- if (isCompressed) {
- hadoopConf.set("mapreduce.output.fileoutputformat.compress", "true")
- fileSinkConf.setCompressed(true)
- fileSinkConf.setCompressCodec(hadoopConf
- .get("mapreduce.output.fileoutputformat.compress.codec"))
- fileSinkConf.setCompressType(hadoopConf
- .get("mapreduce.output.fileoutputformat.compress.type"))
- } else {
- // Set compression by priority
- HiveOptions.getHiveWriteCompression(fileSinkConf.getTableInfo,
sparkSession.sessionState.conf)
- .foreach { case (compression, codec) => hadoopConf.set(compression,
codec) }
- }
+ bucketSpec: Option[BucketSpec] = None,
+ options: Map[String, String] = Map.empty): Set[String] = {
val committer = FileCommitProtocol.instantiate(
sparkSession.sessionState.conf.fileCommitProtocolClass,
jobId = java.util.UUID.randomUUID().toString,
outputPath = outputLocation)
- val options = getOptionsWithHiveBucketWrite(bucketSpec)
-
FileFormatWriter.write(
sparkSession = sparkSession,
plan = plan,
- fileFormat = new HiveFileFormat(fileSinkConf),
+ fileFormat = fileFormat,
committer = committer,
outputSpec =
FileFormatWriter.OutputSpec(outputLocation, customPartitionLocations,
outputColumns),
@@ -101,173 +60,5 @@ private[hive] trait SaveAsHiveFile extends
DataWritingCommand with V1WritesHiveU
statsTrackers = Seq(basicWriteJobStatsTracker(hadoopConf)),
options = options)
}
-
- protected def getExternalTmpPath(
- sparkSession: SparkSession,
- hadoopConf: Configuration,
- path: Path): Path = {
- import org.apache.spark.sql.hive.client.hive._
-
- // Before Hive 1.1, when inserting into a table, Hive will create the
staging directory under
- // a common scratch directory. After the writing is finished, Hive will
simply empty the table
- // directory and move the staging directory to it.
- // After Hive 1.1, Hive will create the staging directory under the table
directory, and when
- // moving staging directory to table directory, Hive will still empty the
table directory, but
- // will exclude the staging directory there.
- // We have to follow the Hive behavior here, to avoid troubles. For
example, if we create
- // staging directory under the table director for Hive prior to 1.1, the
staging directory will
- // be removed by Hive when Hive is trying to empty the table directory.
- val hiveVersionsUsingOldExternalTempPath: Set[HiveVersion] = Set(v12, v13,
v14, v1_0)
- val hiveVersionsUsingNewExternalTempPath: Set[HiveVersion] =
- Set(v1_1, v1_2, v2_0, v2_1, v2_2, v2_3, v3_0, v3_1)
-
- // Ensure all the supported versions are considered here.
- assert(hiveVersionsUsingNewExternalTempPath ++
hiveVersionsUsingOldExternalTempPath ==
- allSupportedHiveVersions)
-
- val externalCatalog = sparkSession.sharedState.externalCatalog
- val hiveVersion =
externalCatalog.unwrapped.asInstanceOf[HiveExternalCatalog].client.version
- val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging")
- val scratchDir = hadoopConf.get("hive.exec.scratchdir", "/tmp/hive")
-
- if (hiveVersionsUsingOldExternalTempPath.contains(hiveVersion)) {
- oldVersionExternalTempPath(path, hadoopConf, scratchDir)
- } else if (hiveVersionsUsingNewExternalTempPath.contains(hiveVersion)) {
- newVersionExternalTempPath(path, hadoopConf, stagingDir)
- } else {
- throw new IllegalStateException("Unsupported hive version: " +
hiveVersion.fullVersion)
- }
- }
-
- protected def deleteExternalTmpPath(hadoopConf: Configuration) : Unit = {
- // Attempt to delete the staging directory and the inclusive files. If
failed, the files are
- // expected to be dropped at the normal termination of VM since
deleteOnExit is used.
- try {
- createdTempDir.foreach { path =>
- val fs = path.getFileSystem(hadoopConf)
- if (fs.delete(path, true)) {
- // If we successfully delete the staging directory, remove it from
FileSystem's cache.
- fs.cancelDeleteOnExit(path)
- }
- }
- } catch {
- case NonFatal(e) =>
- val stagingDir = hadoopConf.get("hive.exec.stagingdir",
".hive-staging")
- logWarning(s"Unable to delete staging directory: $stagingDir.\n" + e)
- }
- }
-
- // Mostly copied from Context.java#getExternalTmpPath of Hive 0.13
- private def oldVersionExternalTempPath(
- path: Path,
- hadoopConf: Configuration,
- scratchDir: String): Path = {
- val extURI: URI = path.toUri
- val scratchPath = new Path(scratchDir, executionId)
- var dirPath = new Path(
- extURI.getScheme,
- extURI.getAuthority,
- scratchPath.toUri.getPath + "-" + TaskRunner.getTaskRunnerID())
-
- try {
- val fs: FileSystem = dirPath.getFileSystem(hadoopConf)
- dirPath = new Path(fs.makeQualified(dirPath).toString())
-
- if (!FileUtils.mkdir(fs, dirPath, true, hadoopConf)) {
- throw new IllegalStateException("Cannot create staging directory: " +
dirPath.toString)
- }
- createdTempDir = Some(dirPath)
- fs.deleteOnExit(dirPath)
- } catch {
- case e: IOException =>
- throw
QueryExecutionErrors.cannotCreateStagingDirError(dirPath.toString, e)
- }
- dirPath
- }
-
- // Mostly copied from Context.java#getExternalTmpPath of Hive 1.2
- private def newVersionExternalTempPath(
- path: Path,
- hadoopConf: Configuration,
- stagingDir: String): Path = {
- val extURI: URI = path.toUri
- if (extURI.getScheme == "viewfs") {
- getExtTmpPathRelTo(path, hadoopConf, stagingDir)
- } else {
- new Path(getExternalScratchDir(extURI, hadoopConf, stagingDir),
"-ext-10000")
- }
- }
-
- private def getExtTmpPathRelTo(
- path: Path,
- hadoopConf: Configuration,
- stagingDir: String): Path = {
- new Path(getStagingDir(path, hadoopConf, stagingDir), "-ext-10000") //
Hive uses 10000
- }
-
- private def getExternalScratchDir(
- extURI: URI,
- hadoopConf: Configuration,
- stagingDir: String): Path = {
- getStagingDir(
- new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath),
- hadoopConf,
- stagingDir)
- }
-
- private[hive] def getStagingDir(
- inputPath: Path,
- hadoopConf: Configuration,
- stagingDir: String): Path = {
- val inputPathName: String = inputPath.toString
- val fs: FileSystem = inputPath.getFileSystem(hadoopConf)
- var stagingPathName: String =
- if (inputPathName.indexOf(stagingDir) == -1) {
- new Path(inputPathName, stagingDir).toString
- } else {
- inputPathName.substring(0, inputPathName.indexOf(stagingDir) +
stagingDir.length)
- }
-
- // SPARK-20594: This is a walk-around fix to resolve a Hive bug. Hive
requires that the
- // staging directory needs to avoid being deleted when users set
hive.exec.stagingdir
- // under the table directory.
- if (isSubDir(new Path(stagingPathName), inputPath, fs) &&
-
!stagingPathName.stripPrefix(inputPathName).stripPrefix("/").startsWith(".")) {
- logDebug(s"The staging dir '$stagingPathName' should be a child
directory starts " +
- "with '.' to avoid being deleted if we set hive.exec.stagingdir under
the table " +
- "directory.")
- stagingPathName = new Path(inputPathName, ".hive-staging").toString
- }
-
- val dir: Path =
- fs.makeQualified(
- new Path(stagingPathName + "_" + executionId + "-" +
TaskRunner.getTaskRunnerID))
- logDebug("Created staging dir = " + dir + " for path = " + inputPath)
- try {
- if (!FileUtils.mkdir(fs, dir, true, hadoopConf)) {
- throw new IllegalStateException("Cannot create staging directory '" +
dir.toString + "'")
- }
- createdTempDir = Some(dir)
- fs.deleteOnExit(dir)
- } catch {
- case e: IOException =>
- throw QueryExecutionErrors.cannotCreateStagingDirError(
- s"'${dir.toString}': ${e.getMessage}", e)
- }
- dir
- }
-
- // HIVE-14259 removed FileUtils.isSubDir(). Adapted it from Hive 1.2's
FileUtils.isSubDir().
- private def isSubDir(p1: Path, p2: Path, fs: FileSystem): Boolean = {
- val path1 = fs.makeQualified(p1).toString + Path.SEPARATOR
- val path2 = fs.makeQualified(p2).toString + Path.SEPARATOR
- path1.startsWith(path2)
- }
-
- private def executionId: String = {
- val rand: Random = new Random
- val format = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss_SSS", Locale.US)
- "hive_" + format.format(new Date) + "_" + Math.abs(rand.nextLong)
- }
}
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/V1WritesHiveUtils.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/V1WritesHiveUtils.scala
index 752753f334a..6421dd184ae 100644
---
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/V1WritesHiveUtils.scala
+++
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/V1WritesHiveUtils.scala
@@ -19,8 +19,9 @@ package org.apache.spark.sql.hive.execution
import java.util.Locale
+import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hive.ql.ErrorMsg
-import org.apache.hadoop.hive.ql.plan.TableDesc
+import org.apache.hadoop.hive.ql.plan.{FileSinkDesc, TableDesc}
import org.apache.spark.SparkException
import org.apache.spark.sql.{AnalysisException, SparkSession}
@@ -32,6 +33,7 @@ import
org.apache.spark.sql.execution.datasources.BucketingUtils
import org.apache.spark.sql.hive.client.HiveClientImpl
trait V1WritesHiveUtils {
+
def getPartitionSpec(partition: Map[String, Option[String]]): Map[String,
String] = {
partition.map {
case (key, Some(null)) => key ->
ExternalCatalogUtils.DEFAULT_PARTITION_NAME
@@ -105,4 +107,33 @@ trait V1WritesHiveUtils {
.map(_ => Map(BucketingUtils.optionForHiveCompatibleBucketWrite ->
"true"))
.getOrElse(Map.empty)
}
+
+ def setupHadoopConfForCompression(
+ fileSinkConf: FileSinkDesc,
+ hadoopConf: Configuration,
+ sparkSession: SparkSession): Unit = {
+ val isCompressed =
+
fileSinkConf.getTableInfo.getOutputFileFormatClassName.toLowerCase(Locale.ROOT)
match {
+ case formatName if formatName.endsWith("orcoutputformat") =>
+ // For ORC,"mapreduce.output.fileoutputformat.compress",
+ // "mapreduce.output.fileoutputformat.compress.codec", and
+ // "mapreduce.output.fileoutputformat.compress.type"
+ // have no impact because it uses table properties to store
compression information.
+ false
+ case _ => hadoopConf.get("hive.exec.compress.output",
"false").toBoolean
+ }
+
+ if (isCompressed) {
+ hadoopConf.set("mapreduce.output.fileoutputformat.compress", "true")
+ fileSinkConf.setCompressed(true)
+ fileSinkConf.setCompressCodec(hadoopConf
+ .get("mapreduce.output.fileoutputformat.compress.codec"))
+ fileSinkConf.setCompressType(hadoopConf
+ .get("mapreduce.output.fileoutputformat.compress.type"))
+ } else {
+ // Set compression by priority
+ HiveOptions.getHiveWriteCompression(fileSinkConf.getTableInfo,
sparkSession.sessionState.conf)
+ .foreach { case (compression, codec) => hadoopConf.set(compression,
codec) }
+ }
+ }
}
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
index f62d941746b..13f2a865936 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
@@ -27,7 +27,7 @@ import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
import org.apache.spark.SparkException
import org.apache.spark.sql.{QueryTest, _}
import org.apache.spark.sql.catalyst.parser.ParseException
-import org.apache.spark.sql.hive.execution.InsertIntoHiveTable
+import org.apache.spark.sql.hive.execution.HiveTempPath
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SQLTestUtils
@@ -541,25 +541,24 @@ class InsertSuite extends QueryTest with
TestHiveSingleton with BeforeAndAfter
val conf = spark.sessionState.newHadoopConf()
val inputPath = new Path("/tmp/b/c")
var stagingDir = "tmp/b"
- val saveHiveFile = InsertIntoHiveTable(null, Map.empty, null, false,
false, null)
- val getStagingDir = PrivateMethod[Path](Symbol("getStagingDir"))
- var path = saveHiveFile invokePrivate getStagingDir(inputPath, conf,
stagingDir)
+ val hiveTempPath = new HiveTempPath(null, conf, null)
+ var path = hiveTempPath.getStagingDir(inputPath, stagingDir)
assert(path.toString.indexOf("/tmp/b_hive_") != -1)
stagingDir = "tmp/b/c"
- path = saveHiveFile invokePrivate getStagingDir(inputPath, conf,
stagingDir)
+ path = hiveTempPath.getStagingDir(inputPath, stagingDir)
assert(path.toString.indexOf("/tmp/b/c/.hive-staging_hive_") != -1)
stagingDir = "d/e"
- path = saveHiveFile invokePrivate getStagingDir(inputPath, conf,
stagingDir)
+ path = hiveTempPath.getStagingDir(inputPath, stagingDir)
assert(path.toString.indexOf("/tmp/b/c/.hive-staging_hive_") != -1)
stagingDir = ".d/e"
- path = saveHiveFile invokePrivate getStagingDir(inputPath, conf,
stagingDir)
+ path = hiveTempPath.getStagingDir(inputPath, stagingDir)
assert(path.toString.indexOf("/tmp/b/c/.d/e_hive_") != -1)
stagingDir = "/tmp/c/"
- path = saveHiveFile invokePrivate getStagingDir(inputPath, conf,
stagingDir)
+ path = hiveTempPath.getStagingDir(inputPath, stagingDir)
assert(path.toString.indexOf("/tmp/c_hive_") != -1)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]