This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 256227477b0 [SPARK-37287][SQL] Pull out dynamic partition and bucket
sort from FileFormatWriter
256227477b0 is described below
commit 256227477b027ea8160c4c266c3e8bc7d302a7e2
Author: allisonwang-db <[email protected]>
AuthorDate: Tue Jul 19 21:31:37 2022 +0800
[SPARK-37287][SQL] Pull out dynamic partition and bucket sort from
FileFormatWriter
### What changes were proposed in this pull request?
`FileFormatWriter.write` is used by all V1 write commands including data
source and hive tables. Depending on dynamic partitions, bucketed, and sort
columns in the V1 write command, `FileFormatWriter` can add a physical sort on
top of the query plan which is not visible from plan directly.
This PR (based on https://github.com/apache/spark/pull/34568) intends to
pull out the physical sort added by `FileFormatWriter` into logical planning.
It adds a new logical rule `V1Writes` to add logical Sort operators based on
the required ordering of a V1 write command. This behavior can be controlled by
the new config **spark.sql.optimizer.plannedWrite.enabled** (default: true).
### Why are the changes needed?
Improve observability of V1 write, and unify the logic of V1 and V2 write
commands.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
New unit tests.
Closes #37099 from allisonwang-db/spark-37287-v1-writes.
Authored-by: allisonwang-db <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../org/apache/spark/sql/internal/SQLConf.scala | 10 +
.../spark/sql/execution/SparkOptimizer.scala | 4 +-
.../adaptive/InsertAdaptiveSparkPlan.scala | 4 +-
.../execution/command/createDataSourceTables.scala | 15 +-
.../sql/execution/datasources/DataSource.scala | 39 ++--
.../execution/datasources/FileFormatWriter.scala | 51 ++---
.../InsertIntoHadoopFsRelationCommand.scala | 7 +-
.../spark/sql/execution/datasources/V1Writes.scala | 137 +++++++++++++
.../adaptive/AdaptiveQueryExecSuite.scala | 37 ++--
.../datasources/V1WriteCommandSuite.scala | 217 +++++++++++++++++++++
.../execution/CreateHiveTableAsSelectCommand.scala | 20 +-
.../sql/hive/execution/InsertIntoHiveTable.scala | 69 ++-----
.../spark/sql/hive/execution/SaveAsHiveFile.scala | 8 +-
.../sql/hive/execution/V1WritesHiveUtils.scala | 108 ++++++++++
.../command/V1WriteHiveCommandSuite.scala | 103 ++++++++++
15 files changed, 703 insertions(+), 126 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 1b7857ead59..631c89d798f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -412,6 +412,14 @@ object SQLConf {
.longConf
.createWithDefault(67108864L)
+ val PLANNED_WRITE_ENABLED =
buildConf("spark.sql.optimizer.plannedWrite.enabled")
+ .internal()
+ .doc("When set to true, Spark optimizer will add logical sort operators to
V1 write commands " +
+ "if needed so that `FileFormatWriter` does not need to insert physical
sorts.")
+ .version("3.4.0")
+ .booleanConf
+ .createWithDefault(true)
+
val COMPRESS_CACHED =
buildConf("spark.sql.inMemoryColumnarStorage.compressed")
.doc("When set to true Spark SQL will automatically select a compression
codec for each " +
"column based on statistics of the data.")
@@ -4617,6 +4625,8 @@ class SQLConf extends Serializable with Logging {
def maxConcurrentOutputFileWriters: Int =
getConf(SQLConf.MAX_CONCURRENT_OUTPUT_FILE_WRITERS)
+ def plannedWriteEnabled: Boolean = getConf(SQLConf.PLANNED_WRITE_ENABLED)
+
def inferDictAsStruct: Boolean = getConf(SQLConf.INFER_NESTED_DICT_AS_STRUCT)
def legacyInferArrayTypeFromFirstElement: Boolean = getConf(
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
index bffa1d1dae7..72bdab409a9 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
@@ -23,8 +23,7 @@ import org.apache.spark.sql.catalyst.optimizer._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.CatalogManager
-import org.apache.spark.sql.execution.datasources.PruneFileSourcePartitions
-import org.apache.spark.sql.execution.datasources.SchemaPruning
+import org.apache.spark.sql.execution.datasources.{PruneFileSourcePartitions,
SchemaPruning, V1Writes}
import
org.apache.spark.sql.execution.datasources.v2.{GroupBasedRowLevelOperationScanPlanning,
OptimizeMetadataOnlyDeleteFromTable, V2ScanPartitioningAndOrdering,
V2ScanRelationPushDown, V2Writes}
import
org.apache.spark.sql.execution.dynamicpruning.{CleanupDynamicPruningFilters,
PartitionPruning}
import
org.apache.spark.sql.execution.python.{ExtractGroupingPythonUDFFromAggregate,
ExtractPythonUDFFromAggregate, ExtractPythonUDFs}
@@ -39,6 +38,7 @@ class SparkOptimizer(
// TODO: move SchemaPruning into catalyst
Seq(SchemaPruning) :+
GroupBasedRowLevelOperationScanPlanning :+
+ V1Writes :+
V2ScanRelationPushDown :+
V2ScanPartitioningAndOrdering :+
V2Writes :+
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
index 4410f7fea81..94cef481ecf 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.TreePattern._
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.command.{DataWritingCommandExec,
ExecutedCommandExec}
+import org.apache.spark.sql.execution.datasources.V1WriteCommand
import org.apache.spark.sql.execution.datasources.v2.V2CommandExec
import org.apache.spark.sql.execution.exchange.Exchange
import org.apache.spark.sql.internal.SQLConf
@@ -46,8 +47,9 @@ case class InsertAdaptiveSparkPlan(
case _ if !conf.adaptiveExecutionEnabled => plan
case _: ExecutedCommandExec => plan
case _: CommandResultExec => plan
- case c: DataWritingCommandExec => c.copy(child = apply(c.child))
case c: V2CommandExec => c.withNewChildren(c.children.map(apply))
+ case c: DataWritingCommandExec if !c.cmd.isInstanceOf[V1WriteCommand] =>
+ c.copy(child = apply(c.child))
case _ if shouldApplyAQE(plan, isSubquery) =>
if (supportAdaptive(plan)) {
try {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index e64426f8de8..50ad55d5633 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -20,7 +20,9 @@ package org.apache.spark.sql.execution.command
import java.net.URI
import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.catalyst.expressions.SortOrder
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.errors.QueryCompilationErrors
@@ -141,7 +143,18 @@ case class CreateDataSourceTableAsSelectCommand(
mode: SaveMode,
query: LogicalPlan,
outputColumnNames: Seq[String])
- extends DataWritingCommand {
+ extends V1WriteCommand {
+
+ override def requiredOrdering: Seq[SortOrder] = {
+ val unresolvedPartitionColumns =
table.partitionColumnNames.map(UnresolvedAttribute.quoted)
+ val partitionColumns = DataSource.resolvePartitionColumns(
+ unresolvedPartitionColumns,
+ outputColumns,
+ query,
+ SparkSession.active.sessionState.conf.resolver)
+ val options = table.storage.properties
+ V1WritesUtils.getSortOrder(outputColumns, partitionColumns,
table.bucketSpec, options)
+ }
override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = {
assert(table.tableType != CatalogTableType.VIEW)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index a220fd334a5..8f8846b89f3 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -29,8 +29,9 @@ import org.apache.spark.SparkException
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
+import org.apache.spark.sql.catalyst.analysis.{Resolver, UnresolvedAttribute}
import org.apache.spark.sql.catalyst.catalog.{BucketSpec,
CatalogStorageFormat, CatalogTable, CatalogUtils}
+import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, TypeUtils}
import org.apache.spark.sql.connector.catalog.TableProvider
@@ -519,18 +520,8 @@ case class DataSource(
case format: FileFormat =>
disallowWritingIntervals(outputColumns.map(_.dataType),
forbidAnsiIntervals = false)
val cmd = planForWritingFileFormat(format, mode, data)
- val resolvedPartCols = cmd.partitionColumns.map { col =>
- // The partition columns created in `planForWritingFileFormat`
should always be
- // `UnresolvedAttribute` with a single name part.
- assert(col.isInstanceOf[UnresolvedAttribute])
- val unresolved = col.asInstanceOf[UnresolvedAttribute]
- assert(unresolved.nameParts.length == 1)
- val name = unresolved.nameParts.head
- outputColumns.find(a => equality(a.name, name)).getOrElse {
- throw QueryCompilationErrors.cannotResolveAttributeError(
- name, data.output.map(_.name).mkString(", "))
- }
- }
+ val resolvedPartCols =
+ DataSource.resolvePartitionColumns(cmd.partitionColumns,
outputColumns, data, equality)
val resolved = cmd.copy(
partitionColumns = resolvedPartCols,
outputColumnNames = outputColumnNames)
@@ -836,4 +827,26 @@ object DataSource extends Logging {
throw
QueryCompilationErrors.writeEmptySchemasUnsupportedByDataSourceError()
}
}
+
+ /**
+ * Resolve partition columns using output columns of the query plan.
+ */
+ def resolvePartitionColumns(
+ partitionColumns: Seq[Attribute],
+ outputColumns: Seq[Attribute],
+ plan: LogicalPlan,
+ resolver: Resolver): Seq[Attribute] = {
+ partitionColumns.map { col =>
+ // The partition columns created in `planForWritingFileFormat` should
always be
+ // `UnresolvedAttribute` with a single name part.
+ assert(col.isInstanceOf[UnresolvedAttribute])
+ val unresolved = col.asInstanceOf[UnresolvedAttribute]
+ assert(unresolved.nameParts.length == 1)
+ val name = unresolved.nameParts.head
+ outputColumns.find(a => resolver(a.name, name)).getOrElse {
+ throw QueryCompilationErrors.cannotResolveAttributeError(
+ name, plan.output.map(_.name).mkString(", "))
+ }
+ }
+ }
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
index e75041b7fb0..6b48420920d 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
@@ -36,7 +36,6 @@ import
org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReferences
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext,
ExprCode}
-import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.{ProjectExec, SortExec, SparkPlan,
SQLExecution, UnsafeExternalRowSorter}
@@ -78,6 +77,12 @@ object FileFormatWriter extends Logging {
maxWriters: Int,
createSorter: () => UnsafeExternalRowSorter)
+ /**
+ * A variable used in tests to check whether the output ordering of the
query matches the
+ * required ordering of the write command.
+ */
+ private[sql] var outputOrderingMatched: Boolean = false
+
/**
* Basic work flow of this command is:
* 1. Driver side setup, including output committer initialization and data
source specific
@@ -126,38 +131,8 @@ object FileFormatWriter extends Logging {
}
val empty2NullPlan = if (needConvert) ProjectExec(projectList, plan) else
plan
- val writerBucketSpec = bucketSpec.map { spec =>
- val bucketColumns = spec.bucketColumnNames.map(c =>
dataColumns.find(_.name == c).get)
-
- if (options.getOrElse(BucketingUtils.optionForHiveCompatibleBucketWrite,
"false") ==
- "true") {
- // Hive bucketed table: use `HiveHash` and bitwise-and as bucket id
expression.
- // Without the extra bitwise-and operation, we can get wrong bucket id
when hash value of
- // columns is negative. See Hive implementation in
- //
`org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils#getBucketNumber()`.
- val hashId = BitwiseAnd(HiveHash(bucketColumns), Literal(Int.MaxValue))
- val bucketIdExpression = Pmod(hashId, Literal(spec.numBuckets))
-
- // The bucket file name prefix is following Hive, Presto and Trino
conversion, so this
- // makes sure Hive bucketed table written by Spark, can be read by
other SQL engines.
- //
- // Hive:
`org.apache.hadoop.hive.ql.exec.Utilities#getBucketIdFromFile()`.
- // Trino:
`io.trino.plugin.hive.BackgroundHiveSplitLoader#BUCKET_PATTERNS`.
- val fileNamePrefix = (bucketId: Int) => f"$bucketId%05d_0_"
- WriterBucketSpec(bucketIdExpression, fileNamePrefix)
- } else {
- // Spark bucketed table: use `HashPartitioning.partitionIdExpression`
as bucket id
- // expression, so that we can guarantee the data distribution is same
between shuffle and
- // bucketed data source, which enables us to only shuffle one side
when join a bucketed
- // table and a normal one.
- val bucketIdExpression = HashPartitioning(bucketColumns,
spec.numBuckets)
- .partitionIdExpression
- WriterBucketSpec(bucketIdExpression, (_: Int) => "")
- }
- }
- val sortColumns = bucketSpec.toSeq.flatMap {
- spec => spec.sortColumnNames.map(c => dataColumns.find(_.name == c).get)
- }
+ val writerBucketSpec = V1WritesUtils.getWriterBucketSpec(bucketSpec,
dataColumns, options)
+ val sortColumns = V1WritesUtils.getBucketSortColumns(bucketSpec,
dataColumns)
val caseInsensitiveOptions = CaseInsensitiveMap(options)
@@ -209,6 +184,16 @@ object FileFormatWriter extends Logging {
// prepares the job, any exception thrown from here shouldn't cause
abortJob() to be called.
committer.setupJob(job)
+ // When `PLANNED_WRITE_ENABLED` is true, the optimizer rule V1Writes will
add logical sort
+ // operator based on the required ordering of the V1 write command. So the
output
+ // ordering of the physical plan should always match the required
ordering. Here
+ // we set the variable to verify this behavior in tests.
+ // There are two cases where FileFormatWriter still needs to add physical
sort:
+ // 1) When the planned write config is disabled.
+ // 2) When the concurrent writers are enabled (in this case the required
ordering of a
+ // V1 write command will be empty).
+ if (Utils.isTesting) outputOrderingMatched = orderingMatched
+
try {
val (rdd, concurrentOutputWriterSpec) = if (orderingMatched) {
(empty2NullPlan.execute(), None)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
index d773d4bd271..e20d9ed8b53 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable,
CatalogTablePartition}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils._
-import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.errors.{QueryCompilationErrors,
QueryExecutionErrors}
@@ -57,7 +57,7 @@ case class InsertIntoHadoopFsRelationCommand(
catalogTable: Option[CatalogTable],
fileIndex: Option[FileIndex],
outputColumnNames: Seq[String])
- extends DataWritingCommand {
+ extends V1WriteCommand {
private lazy val parameters = CaseInsensitiveMap(options)
@@ -74,6 +74,9 @@ case class InsertIntoHadoopFsRelationCommand(
staticPartitions.size < partitionColumns.length
}
+ override def requiredOrdering: Seq[SortOrder] =
+ V1WritesUtils.getSortOrder(outputColumns, partitionColumns, bucketSpec,
options)
+
override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = {
// Most formats don't do well with duplicate columns, so lets not allow
that
SchemaUtils.checkColumnNameDuplication(
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala
new file mode 100644
index 00000000000..94e5f3ccaf4
--- /dev/null
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.spark.sql.catalyst.SQLConfHelper
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute,
AttributeSet, BitwiseAnd, HiveHash, Literal, Pmod, SortOrder}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Sort}
+import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.command.DataWritingCommand
+import org.apache.spark.sql.internal.SQLConf
+
+trait V1WriteCommand extends DataWritingCommand {
+ // Specify the required ordering for the V1 write command.
`FileFormatWriter` will
+ // add SortExec if necessary when the requiredOrdering is empty.
+ def requiredOrdering: Seq[SortOrder]
+}
+
+/**
+ * A rule that adds logical sorts to V1 data writing commands.
+ */
+object V1Writes extends Rule[LogicalPlan] with SQLConfHelper {
+ override def apply(plan: LogicalPlan): LogicalPlan = {
+ if (conf.plannedWriteEnabled) {
+ plan.transformDown {
+ case write: V1WriteCommand =>
+ val newQuery = prepareQuery(write, write.query)
+ write.withNewChildren(newQuery :: Nil)
+ }
+ } else {
+ plan
+ }
+ }
+
+ private def prepareQuery(write: V1WriteCommand, query: LogicalPlan):
LogicalPlan = {
+ val requiredOrdering = write.requiredOrdering
+ val outputOrdering = query.outputOrdering
+ // Check if the ordering is already matched. It is needed to ensure the
+ // idempotency of the rule.
+ val orderingMatched = if (requiredOrdering.length > outputOrdering.length)
{
+ false
+ } else {
+ requiredOrdering.zip(outputOrdering).forall {
+ case (requiredOrder, outputOrder) =>
requiredOrder.semanticEquals(outputOrder)
+ }
+ }
+ if (orderingMatched) {
+ query
+ } else {
+ Sort(requiredOrdering, global = false, query)
+ }
+ }
+}
+
+object V1WritesUtils {
+
+ def getWriterBucketSpec(
+ bucketSpec: Option[BucketSpec],
+ dataColumns: Seq[Attribute],
+ options: Map[String, String]): Option[WriterBucketSpec] = {
+ bucketSpec.map { spec =>
+ val bucketColumns = spec.bucketColumnNames.map(c =>
dataColumns.find(_.name == c).get)
+
+ if (options.getOrElse(BucketingUtils.optionForHiveCompatibleBucketWrite,
"false") ==
+ "true") {
+ // Hive bucketed table: use `HiveHash` and bitwise-and as bucket id
expression.
+ // Without the extra bitwise-and operation, we can get wrong bucket id
when hash value of
+ // columns is negative. See Hive implementation in
+ //
`org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils#getBucketNumber()`.
+ val hashId = BitwiseAnd(HiveHash(bucketColumns), Literal(Int.MaxValue))
+ val bucketIdExpression = Pmod(hashId, Literal(spec.numBuckets))
+
+ // The bucket file name prefix is following Hive, Presto and Trino
conversion, so this
+ // makes sure Hive bucketed table written by Spark, can be read by
other SQL engines.
+ //
+ // Hive:
`org.apache.hadoop.hive.ql.exec.Utilities#getBucketIdFromFile()`.
+ // Trino:
`io.trino.plugin.hive.BackgroundHiveSplitLoader#BUCKET_PATTERNS`.
+ val fileNamePrefix = (bucketId: Int) => f"$bucketId%05d_0_"
+ WriterBucketSpec(bucketIdExpression, fileNamePrefix)
+ } else {
+ // Spark bucketed table: use `HashPartitioning.partitionIdExpression`
as bucket id
+ // expression, so that we can guarantee the data distribution is same
between shuffle and
+ // bucketed data source, which enables us to only shuffle one side
when join a bucketed
+ // table and a normal one.
+ val bucketIdExpression = HashPartitioning(bucketColumns,
spec.numBuckets)
+ .partitionIdExpression
+ WriterBucketSpec(bucketIdExpression, (_: Int) => "")
+ }
+ }
+ }
+
+ def getBucketSortColumns(
+ bucketSpec: Option[BucketSpec],
+ dataColumns: Seq[Attribute]): Seq[Attribute] = {
+ bucketSpec.toSeq.flatMap {
+ spec => spec.sortColumnNames.map(c => dataColumns.find(_.name == c).get)
+ }
+ }
+
+ def getSortOrder(
+ outputColumns: Seq[Attribute],
+ partitionColumns: Seq[Attribute],
+ bucketSpec: Option[BucketSpec],
+ options: Map[String, String]): Seq[SortOrder] = {
+ val partitionSet = AttributeSet(partitionColumns)
+ val dataColumns = outputColumns.filterNot(partitionSet.contains)
+ val writerBucketSpec = V1WritesUtils.getWriterBucketSpec(bucketSpec,
dataColumns, options)
+ val sortColumns = V1WritesUtils.getBucketSortColumns(bucketSpec,
dataColumns)
+
+ if (SQLConf.get.maxConcurrentOutputFileWriters > 0 && sortColumns.isEmpty)
{
+ // Do not insert logical sort when concurrent output writers are enabled.
+ Seq.empty
+ } else {
+ // We should first sort by partition columns, then bucket id, and
finally sorting columns.
+ // Note we do not need to convert empty string partition columns to null
when sorting the
+ // columns since null and empty string values will be next to each other.
+ (partitionColumns ++writerBucketSpec.map(_.bucketIdExpression) ++
sortColumns)
+ .map(SortOrder(_, Ascending))
+ }
+ }
+}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index 77d7261fb65..5d8e4bbecfe 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -29,7 +29,7 @@ import org.apache.spark.scheduler.{SparkListener,
SparkListenerEvent, SparkListe
import org.apache.spark.sql.{Dataset, QueryTest, Row, SparkSession, Strategy}
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan}
-import org.apache.spark.sql.execution.{CollectLimitExec, CommandResultExec,
LocalTableScanExec, PartialReducerPartitionSpec, QueryExecution,
ReusedSubqueryExec, ShuffledRowRDD, SortExec, SparkPlan, UnaryExecNode,
UnionExec}
+import org.apache.spark.sql.execution.{CollectLimitExec, LocalTableScanExec,
PartialReducerPartitionSpec, QueryExecution, ReusedSubqueryExec,
ShuffledRowRDD, SortExec, SparkPlan, UnionExec}
import org.apache.spark.sql.execution.aggregate.BaseAggregateExec
import org.apache.spark.sql.execution.command.DataWritingCommandExec
import org.apache.spark.sql.execution.datasources.noop.NoopDataSource
@@ -1119,16 +1119,27 @@ class AdaptiveQueryExecSuite
}
}
- test("SPARK-30953: InsertAdaptiveSparkPlan should apply AQE on child plan of
write commands") {
+ test("SPARK-30953: InsertAdaptiveSparkPlan should apply AQE on child plan of
v2 write commands") {
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key -> "true") {
+ var plan: SparkPlan = null
+ val listener = new QueryExecutionListener {
+ override def onSuccess(funcName: String, qe: QueryExecution,
durationNs: Long): Unit = {
+ plan = qe.executedPlan
+ }
+ override def onFailure(
+ funcName: String, qe: QueryExecution, exception: Exception): Unit =
{}
+ }
+ spark.listenerManager.register(listener)
withTable("t1") {
- val plan = sql("CREATE TABLE t1 USING parquet AS SELECT 1
col").queryExecution.executedPlan
- assert(plan.isInstanceOf[CommandResultExec])
- val commandResultExec = plan.asInstanceOf[CommandResultExec]
-
assert(commandResultExec.commandPhysicalPlan.isInstanceOf[DataWritingCommandExec])
-
assert(commandResultExec.commandPhysicalPlan.asInstanceOf[DataWritingCommandExec]
- .child.isInstanceOf[AdaptiveSparkPlanExec])
+ val format = classOf[NoopDataSource].getName
+ Seq((0, 1)).toDF("x",
"y").write.format(format).mode("overwrite").save()
+
+ sparkContext.listenerBus.waitUntilEmpty()
+ assert(plan.isInstanceOf[V2TableWriteExec])
+
assert(plan.asInstanceOf[V2TableWriteExec].child.isInstanceOf[AdaptiveSparkPlanExec])
+
+ spark.listenerManager.unregister(listener)
}
}
}
@@ -1179,7 +1190,10 @@ class AdaptiveQueryExecSuite
override def onOtherEvent(event: SparkListenerEvent): Unit = {
event match {
case SparkListenerSQLAdaptiveExecutionUpdate(_, _, planInfo) =>
- assert(planInfo.nodeName == "Execute
CreateDataSourceTableAsSelectCommand")
+ assert(planInfo.nodeName == "AdaptiveSparkPlan")
+ assert(planInfo.children.size == 1)
+ assert(planInfo.children.head.nodeName ==
+ "Execute CreateDataSourceTableAsSelectCommand")
checkDone = true
case _ => // ignore other events
}
@@ -1584,9 +1598,8 @@ class AdaptiveQueryExecSuite
var noLocalread: Boolean = false
val listener = new QueryExecutionListener {
override def onSuccess(funcName: String, qe: QueryExecution,
durationNs: Long): Unit = {
- qe.executedPlan match {
- case plan@(_: DataWritingCommandExec | _: V2TableWriteExec) =>
-
assert(plan.asInstanceOf[UnaryExecNode].child.isInstanceOf[AdaptiveSparkPlanExec])
+ stripAQEPlan(qe.executedPlan) match {
+ case plan @ (_: DataWritingCommandExec | _: V2TableWriteExec) =>
noLocalread = collect(plan) {
case exec: AQEShuffleReadExec if exec.isLocalRead => exec
}.isEmpty
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala
new file mode 100644
index 00000000000..350cac49139
--- /dev/null
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.spark.sql.{QueryTest, Row}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Sort}
+import org.apache.spark.sql.execution.QueryExecution
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
+import org.apache.spark.sql.util.QueryExecutionListener
+
+abstract class V1WriteCommandSuiteBase extends QueryTest with SQLTestUtils {
+
+ import testImplicits._
+
+ setupTestData()
+
+ protected override def beforeAll(): Unit = {
+ super.beforeAll()
+ (0 to 20).map(i => (i, i % 5, (i % 10).toString))
+ .toDF("i", "j", "k")
+ .write
+ .saveAsTable("t0")
+ }
+
+ protected override def afterAll(): Unit = {
+ sql("drop table if exists t0")
+ super.afterAll()
+ }
+
+ protected def withPlannedWrite(testFunc: Boolean => Any): Unit = {
+ Seq(true, false).foreach { enabled =>
+ withSQLConf(SQLConf.PLANNED_WRITE_ENABLED.key -> enabled.toString) {
+ testFunc(enabled)
+ }
+ }
+ }
+
+ // Execute a write query and check ordering of the plan.
+ protected def executeAndCheckOrdering(
+ hasLogicalSort: Boolean, orderingMatched: Boolean)(query: => Unit): Unit
= {
+ var optimizedPlan: LogicalPlan = null
+
+ val listener = new QueryExecutionListener {
+ override def onSuccess(funcName: String, qe: QueryExecution, durationNs:
Long): Unit = {
+ qe.optimizedPlan match {
+ case w: V1WriteCommand =>
+ optimizedPlan = w.query
+ case _ =>
+ }
+ }
+ override def onFailure(funcName: String, qe: QueryExecution, exception:
Exception): Unit = {}
+ }
+ spark.listenerManager.register(listener)
+
+ query
+
+ // Check whether the output ordering is matched before FileFormatWriter
executes rdd.
+ assert(FileFormatWriter.outputOrderingMatched == orderingMatched,
+ s"Expect: $orderingMatched, Actual:
${FileFormatWriter.outputOrderingMatched}")
+
+ sparkContext.listenerBus.waitUntilEmpty()
+
+ // Check whether a logical sort node is at the top of the logical plan of
the write query.
+ if (optimizedPlan != null) {
+ assert(optimizedPlan.isInstanceOf[Sort] == hasLogicalSort,
+ s"Expect hasLogicalSort: $hasLogicalSort, Actual:
${optimizedPlan.isInstanceOf[Sort]}")
+ }
+
+ spark.listenerManager.unregister(listener)
+ }
+}
+
+class V1WriteCommandSuite extends V1WriteCommandSuiteBase with
SharedSparkSession {
+
+ import testImplicits._
+
+ test("v1 write without partition columns") {
+ withPlannedWrite { enabled =>
+ withTable("t") {
+ executeAndCheckOrdering(hasLogicalSort = false, orderingMatched =
true) {
+ sql("CREATE TABLE t USING PARQUET AS SELECT * FROM t0")
+ }
+ }
+ }
+ }
+
+ test("v1 write with non-string partition columns") {
+ withPlannedWrite { enabled =>
+ withTable("t") {
+ executeAndCheckOrdering(hasLogicalSort = enabled, orderingMatched =
enabled) {
+ sql("CREATE TABLE t USING PARQUET PARTITIONED BY (j) AS SELECT i, k,
j FROM t0")
+ }
+ }
+ }
+ }
+
+ test("v1 write with string partition columns") {
+ withPlannedWrite { enabled =>
+ withTable("t") {
+ executeAndCheckOrdering(hasLogicalSort = enabled, orderingMatched =
enabled) {
+ sql("CREATE TABLE t USING PARQUET PARTITIONED BY (k) AS SELECT *
FROM t0")
+ }
+ }
+ }
+ }
+
+ test("v1 write with partition, bucketed and sort columns") {
+ withPlannedWrite { enabled =>
+ withTable("t") {
+ sql(
+ """
+ |CREATE TABLE t(i INT, j INT) USING PARQUET
+ |PARTITIONED BY (k STRING)
+ |CLUSTERED BY (i, j) SORTED BY (j) INTO 2 BUCKETS
+ |""".stripMargin)
+ executeAndCheckOrdering(hasLogicalSort = enabled, orderingMatched =
enabled) {
+ sql("INSERT INTO t SELECT * FROM t0")
+ }
+ }
+ }
+ }
+
+ test("v1 write with already sorted plan - non-string partition column") {
+ withPlannedWrite { enabled =>
+ withTable("t") {
+ sql(
+ """
+ |CREATE TABLE t(i INT, k STRING) USING PARQUET
+ |PARTITIONED BY (j INT)
+ |""".stripMargin)
+ executeAndCheckOrdering(hasLogicalSort = true, orderingMatched = true)
{
+ sql("INSERT INTO t SELECT i, k, j FROM t0 ORDER BY j")
+ }
+ }
+ }
+ }
+
+ test("v1 write with already sorted plan - string partition column") {
+ withPlannedWrite { enabled =>
+ withTable("t") {
+ sql(
+ """
+ |CREATE TABLE t(i INT, j INT) USING PARQUET
+ |PARTITIONED BY (k STRING)
+ |""".stripMargin)
+ executeAndCheckOrdering(hasLogicalSort = true, orderingMatched = true)
{
+ sql("INSERT INTO t SELECT * FROM t0 ORDER BY k")
+ }
+ }
+ }
+ }
+
+ test("v1 write with null and empty string column values") {
+ withPlannedWrite { enabled =>
+ withTempPath { path =>
+ executeAndCheckOrdering(hasLogicalSort = enabled, orderingMatched =
enabled) {
+ Seq((0, None), (1, Some("")), (2, None), (3, Some("x")))
+ .toDF("id", "p")
+ .write
+ .partitionBy("p")
+ .parquet(path.toString)
+ checkAnswer(
+ spark.read.parquet(path.toString).where("p IS NULL").sort($"id"),
+ Seq(Row(0, null), Row(1, null), Row(2, null)))
+ // Check the empty string and null values should be written to the
same file.
+ val files = path.listFiles().filterNot(
+ f => f.getName.startsWith(".") || f.getName.startsWith("_"))
+ assert(files.length == 2)
+ }
+ }
+ }
+ }
+
+ test("v1 write with AQE changing SMJ to BHJ") {
+ withPlannedWrite { enabled =>
+ withTable("t") {
+ sql(
+ """
+ |CREATE TABLE t(key INT, value STRING) USING PARQUET
+ |PARTITIONED BY (a INT)
+ |""".stripMargin)
+ withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80") {
+ // The top level sort added by V1 write will be removed by the
physical rule
+ // RemoveRedundantSorts initially, and during the execution AQE will
change
+ // SMJ to BHJ which will remove the original output ordering from
the SMJ.
+ // In this case AQE should still add back the sort node from the
logical plan
+ // during re-planning, and ordering should be matched in
FileFormatWriter.
+ executeAndCheckOrdering(hasLogicalSort = enabled, orderingMatched =
enabled) {
+ sql(
+ """
+ |INSERT INTO t
+ |SELECT key, value, a
+ |FROM testData JOIN testData2 ON key = a
+ |WHERE value = '1'
+ |""".stripMargin)
+ }
+ }
+ }
+ }
+ }
+}
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
index 96b41dd8e35..55644e6a341 100644
---
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
+++
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
@@ -21,16 +21,17 @@ import scala.util.control.NonFatal
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, SessionCatalog}
+import org.apache.spark.sql.catalyst.expressions.SortOrder
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.command.{DataWritingCommand, DDLUtils}
-import org.apache.spark.sql.execution.datasources.{HadoopFsRelation,
InsertIntoHadoopFsRelationCommand, LogicalRelation}
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation,
InsertIntoHadoopFsRelationCommand, LogicalRelation, V1WriteCommand,
V1WritesUtils}
import org.apache.spark.sql.hive.HiveSessionCatalog
import org.apache.spark.util.Utils
-trait CreateHiveTableAsSelectBase extends DataWritingCommand {
+trait CreateHiveTableAsSelectBase extends V1WriteCommand with
V1WritesHiveUtils {
val tableDesc: CatalogTable
val query: LogicalPlan
val outputColumnNames: Seq[String]
@@ -38,6 +39,21 @@ trait CreateHiveTableAsSelectBase extends DataWritingCommand
{
protected val tableIdentifier = tableDesc.identifier
+ override def requiredOrdering: Seq[SortOrder] = {
+ // If the table does not exist the schema should always be empty.
+ val table = if (tableDesc.schema.isEmpty) {
+ val tableSchema =
CharVarcharUtils.getRawSchema(outputColumns.toStructType, conf)
+ tableDesc.copy(schema = tableSchema)
+ } else {
+ tableDesc
+ }
+ // For CTAS, there is no static partition values to insert.
+ val partition = tableDesc.partitionColumnNames.map(_ -> None).toMap
+ val partitionColumns = getDynamicPartitionColumns(table, partition, query)
+ val options = getOptionsWithHiveBucketWrite(tableDesc.bucketSpec)
+ V1WritesUtils.getSortOrder(outputColumns, partitionColumns,
tableDesc.bucketSpec, options)
+ }
+
override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
val tableExists = catalog.tableExists(tableIdentifier)
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 8fca95130dd..dcaeac63fb2 100644
---
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -17,23 +17,20 @@
package org.apache.spark.sql.hive.execution
-import java.util.Locale
-
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.conf.HiveConf
-import org.apache.hadoop.hive.ql.ErrorMsg
import org.apache.hadoop.hive.ql.plan.TableDesc
-import org.apache.spark.SparkException
-import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
+import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.catalog._
-import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.expressions.SortOrder
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
-import org.apache.spark.sql.errors.{QueryCompilationErrors,
QueryExecutionErrors}
+import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.command.CommandUtils
+import org.apache.spark.sql.execution.datasources.{V1WriteCommand,
V1WritesUtils}
import org.apache.spark.sql.hive.HiveExternalCatalog
import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
import org.apache.spark.sql.hive.client.HiveClientImpl
@@ -76,7 +73,14 @@ case class InsertIntoHiveTable(
query: LogicalPlan,
overwrite: Boolean,
ifPartitionNotExists: Boolean,
- outputColumnNames: Seq[String]) extends SaveAsHiveFile {
+ outputColumnNames: Seq[String]
+ ) extends SaveAsHiveFile with V1WriteCommand with V1WritesHiveUtils {
+
+ override def requiredOrdering: Seq[SortOrder] = {
+ val partitionColumns = getDynamicPartitionColumns(table, partition, query)
+ val options = getOptionsWithHiveBucketWrite(table.bucketSpec)
+ V1WritesUtils.getSortOrder(outputColumns, partitionColumns,
table.bucketSpec, options)
+ }
/**
* Inserts all the rows in the table into Hive. Row objects are properly
serialized with the
@@ -133,53 +137,8 @@ case class InsertIntoHiveTable(
val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false)
val numDynamicPartitions = partition.values.count(_.isEmpty)
- val numStaticPartitions = partition.values.count(_.nonEmpty)
- val partitionSpec = partition.map {
- case (key, Some(null)) => key ->
ExternalCatalogUtils.DEFAULT_PARTITION_NAME
- case (key, Some(value)) => key -> value
- case (key, None) => key -> ""
- }
-
- // All partition column names in the format of "<column name 1>/<column
name 2>/..."
- val partitionColumns =
fileSinkConf.getTableInfo.getProperties.getProperty("partition_columns")
- val partitionColumnNames =
Option(partitionColumns).map(_.split("/")).getOrElse(Array.empty)
-
- // By this time, the partition map must match the table's partition columns
- if (partitionColumnNames.toSet != partition.keySet) {
- throw
QueryExecutionErrors.requestedPartitionsMismatchTablePartitionsError(table,
partition)
- }
-
- // Validate partition spec if there exist any dynamic partitions
- if (numDynamicPartitions > 0) {
- // Report error if dynamic partitioning is not enabled
- if (!hadoopConf.get("hive.exec.dynamic.partition", "true").toBoolean) {
- throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg)
- }
-
- // Report error if dynamic partition strict mode is on but no static
partition is found
- if (numStaticPartitions == 0 &&
- hadoopConf.get("hive.exec.dynamic.partition.mode",
"strict").equalsIgnoreCase("strict")) {
- throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_STRICT_MODE.getMsg)
- }
-
- // Report error if any static partition appears after a dynamic partition
- val isDynamic = partitionColumnNames.map(partitionSpec(_).isEmpty)
- if (isDynamic.init.zip(isDynamic.tail).contains((true, false))) {
- throw new AnalysisException(ErrorMsg.PARTITION_DYN_STA_ORDER.getMsg)
- }
- }
-
- val partitionAttributes =
partitionColumnNames.takeRight(numDynamicPartitions).map { name =>
- val attr = query.resolve(name :: Nil,
sparkSession.sessionState.analyzer.resolver).getOrElse {
- throw QueryCompilationErrors.cannotResolveAttributeError(
- name, query.output.map(_.name).mkString(", "))
- }.asInstanceOf[Attribute]
- // SPARK-28054: Hive metastore is not case preserving and keeps
partition columns
- // with lower cased names. Hive will validate the column names in the
partition directories
- // during `loadDynamicPartitions`. Spark needs to write partition
directories with lower-cased
- // column names in order to make `loadDynamicPartitions` work.
- attr.withName(name.toLowerCase(Locale.ROOT))
- }
+ val partitionSpec = getPartitionSpec(partition)
+ val partitionAttributes = getDynamicPartitionColumns(table, partition,
query)
val writtenParts = saveAsHiveFile(
sparkSession = sparkSession,
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
index 7f885729bd2..799cea42e1e 100644
---
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
+++
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
@@ -37,13 +37,13 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.command.DataWritingCommand
-import org.apache.spark.sql.execution.datasources.{BucketingUtils,
FileFormatWriter}
+import org.apache.spark.sql.execution.datasources.FileFormatWriter
import org.apache.spark.sql.hive.HiveExternalCatalog
import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
import org.apache.spark.sql.hive.client.HiveVersion
// Base trait from which all hive insert statement physical execution extends.
-private[hive] trait SaveAsHiveFile extends DataWritingCommand {
+private[hive] trait SaveAsHiveFile extends DataWritingCommand with
V1WritesHiveUtils {
var createdTempDir: Option[Path] = None
@@ -86,9 +86,7 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand
{
jobId = java.util.UUID.randomUUID().toString,
outputPath = outputLocation)
- val options = bucketSpec
- .map(_ => Map(BucketingUtils.optionForHiveCompatibleBucketWrite ->
"true"))
- .getOrElse(Map.empty)
+ val options = getOptionsWithHiveBucketWrite(bucketSpec)
FileFormatWriter.write(
sparkSession = sparkSession,
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/V1WritesHiveUtils.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/V1WritesHiveUtils.scala
new file mode 100644
index 00000000000..752753f334a
--- /dev/null
+++
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/V1WritesHiveUtils.scala
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import java.util.Locale
+
+import org.apache.hadoop.hive.ql.ErrorMsg
+import org.apache.hadoop.hive.ql.plan.TableDesc
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable,
ExternalCatalogUtils}
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.errors.{QueryCompilationErrors,
QueryExecutionErrors}
+import org.apache.spark.sql.execution.datasources.BucketingUtils
+import org.apache.spark.sql.hive.client.HiveClientImpl
+
+trait V1WritesHiveUtils {
+ def getPartitionSpec(partition: Map[String, Option[String]]): Map[String,
String] = {
+ partition.map {
+ case (key, Some(null)) => key ->
ExternalCatalogUtils.DEFAULT_PARTITION_NAME
+ case (key, Some(value)) => key -> value
+ case (key, None) => key -> ""
+ }
+ }
+
+ def getDynamicPartitionColumns(
+ table: CatalogTable,
+ partition: Map[String, Option[String]],
+ query: LogicalPlan): Seq[Attribute] = {
+ val numDynamicPartitions = partition.values.count(_.isEmpty)
+ val numStaticPartitions = partition.values.count(_.nonEmpty)
+ val partitionSpec = getPartitionSpec(partition)
+
+ val hiveQlTable = HiveClientImpl.toHiveTable(table)
+ val tableDesc = new TableDesc(
+ hiveQlTable.getInputFormatClass,
+ hiveQlTable.getOutputFormatClass,
+ hiveQlTable.getMetadata
+ )
+
+ // All partition column names in the format of "<column name 1>/<column
name 2>/..."
+ val partitionColumns =
tableDesc.getProperties.getProperty("partition_columns")
+ val partitionColumnNames =
Option(partitionColumns).map(_.split("/")).getOrElse(Array.empty)
+
+ // By this time, the partition map must match the table's partition columns
+ if (partitionColumnNames.toSet != partition.keySet) {
+ throw
QueryExecutionErrors.requestedPartitionsMismatchTablePartitionsError(table,
partition)
+ }
+
+ val sessionState = SparkSession.active.sessionState
+ val hadoopConf = sessionState.newHadoopConf()
+
+ // Validate partition spec if there exist any dynamic partitions
+ if (numDynamicPartitions > 0) {
+ // Report error if dynamic partitioning is not enabled
+ if (!hadoopConf.get("hive.exec.dynamic.partition", "true").toBoolean) {
+ throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg)
+ }
+
+ // Report error if dynamic partition strict mode is on but no static
partition is found
+ if (numStaticPartitions == 0 &&
+ hadoopConf.get("hive.exec.dynamic.partition.mode",
"strict").equalsIgnoreCase("strict")) {
+ throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_STRICT_MODE.getMsg)
+ }
+
+ // Report error if any static partition appears after a dynamic partition
+ val isDynamic = partitionColumnNames.map(partitionSpec(_).isEmpty)
+ if (isDynamic.init.zip(isDynamic.tail).contains((true, false))) {
+ throw new AnalysisException(ErrorMsg.PARTITION_DYN_STA_ORDER.getMsg)
+ }
+ }
+
+ partitionColumnNames.takeRight(numDynamicPartitions).map { name =>
+ val attr = query.resolve(name :: Nil,
sessionState.analyzer.resolver).getOrElse {
+ throw QueryCompilationErrors.cannotResolveAttributeError(
+ name, query.output.map(_.name).mkString(", "))
+ }.asInstanceOf[Attribute]
+ // SPARK-28054: Hive metastore is not case preserving and keeps
partition columns
+ // with lower cased names. Hive will validate the column names in the
partition directories
+ // during `loadDynamicPartitions`. Spark needs to write partition
directories with lower-cased
+ // column names in order to make `loadDynamicPartitions` work.
+ attr.withName(name.toLowerCase(Locale.ROOT))
+ }
+ }
+
+ def getOptionsWithHiveBucketWrite(bucketSpec: Option[BucketSpec]):
Map[String, String] = {
+ bucketSpec
+ .map(_ => Map(BucketingUtils.optionForHiveCompatibleBucketWrite ->
"true"))
+ .getOrElse(Map.empty)
+ }
+}
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/V1WriteHiveCommandSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/V1WriteHiveCommandSuite.scala
new file mode 100644
index 00000000000..2c8b2001501
--- /dev/null
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/V1WriteHiveCommandSuite.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution.command
+
+import org.apache.spark.sql.execution.datasources.V1WriteCommandSuiteBase
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+
+class V1WriteHiveCommandSuite extends V1WriteCommandSuiteBase with
TestHiveSingleton {
+
+ test("create hive table as select - no partition column") {
+ withPlannedWrite { enabled =>
+ withTable("t") {
+ executeAndCheckOrdering(hasLogicalSort = false, orderingMatched =
true) {
+ sql("CREATE TABLE t AS SELECT * FROM t0")
+ }
+ }
+ }
+ }
+
+ test("create hive table as select") {
+ withPlannedWrite { enabled =>
+ withTable("t") {
+ withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
+ executeAndCheckOrdering(hasLogicalSort = enabled, orderingMatched =
enabled) {
+ sql(
+ """
+ |CREATE TABLE t
+ |PARTITIONED BY (k)
+ |AS SELECT * FROM t0
+ |""".stripMargin)
+ }
+ }
+ }
+ }
+ }
+
+ test("insert into hive table") {
+ withPlannedWrite { enabled =>
+ withTable("t") {
+ sql(
+ """
+ |CREATE TABLE t (i INT, j INT)
+ |PARTITIONED BY (k STRING)
+ |CLUSTERED BY (i, j) SORTED BY (j) INTO 2 BUCKETS
+ |""".stripMargin)
+ withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
+ executeAndCheckOrdering(hasLogicalSort = enabled, orderingMatched =
enabled) {
+ sql("INSERT INTO t SELECT * FROM t0")
+ }
+ }
+ }
+ }
+ }
+
+ test("insert overwrite hive table") {
+ withPlannedWrite { enabled =>
+ withTable("t") {
+ withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
+ sql(
+ """
+ |CREATE TABLE t
+ |PARTITIONED BY (k)
+ |AS SELECT * FROM t0
+ |""".stripMargin)
+ executeAndCheckOrdering(hasLogicalSort = enabled, orderingMatched =
enabled) {
+ sql("INSERT OVERWRITE t SELECT j AS i, i AS j, k FROM t0")
+ }
+ }
+ }
+ }
+ }
+
+ test("insert into hive table with static partitions only") {
+ withPlannedWrite { enabled =>
+ withTable("t") {
+ sql(
+ """
+ |CREATE TABLE t (i INT, j INT)
+ |PARTITIONED BY (k STRING)
+ |""".stripMargin)
+ // No dynamic partition so no sort is needed.
+ executeAndCheckOrdering(hasLogicalSort = false, orderingMatched =
true) {
+ sql("INSERT INTO t PARTITION (k='0') SELECT i, j FROM t0 WHERE k =
'0'")
+ }
+ }
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]