This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 256227477b0 [SPARK-37287][SQL] Pull out dynamic partition and bucket 
sort from FileFormatWriter
256227477b0 is described below

commit 256227477b027ea8160c4c266c3e8bc7d302a7e2
Author: allisonwang-db <[email protected]>
AuthorDate: Tue Jul 19 21:31:37 2022 +0800

    [SPARK-37287][SQL] Pull out dynamic partition and bucket sort from 
FileFormatWriter
    
    ### What changes were proposed in this pull request?
    `FileFormatWriter.write` is used by all V1 write commands including data 
source and hive tables. Depending on dynamic partitions, bucketed, and sort 
columns in the V1 write command, `FileFormatWriter` can add a physical sort on 
top of the query plan which is not visible from plan directly.
    
    This PR (based on https://github.com/apache/spark/pull/34568) intends to 
pull out the physical sort added by `FileFormatWriter` into logical planning. 
It adds a new logical rule `V1Writes` to add logical Sort operators based on 
the required ordering of a V1 write command. This behavior can be controlled by 
the new config **spark.sql.optimizer.plannedWrite.enabled** (default: true).
    
    ### Why are the changes needed?
    
    Improve observability of V1 write, and unify the logic of V1 and V2 write 
commands.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    New unit tests.
    
    Closes #37099 from allisonwang-db/spark-37287-v1-writes.
    
    Authored-by: allisonwang-db <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../org/apache/spark/sql/internal/SQLConf.scala    |  10 +
 .../spark/sql/execution/SparkOptimizer.scala       |   4 +-
 .../adaptive/InsertAdaptiveSparkPlan.scala         |   4 +-
 .../execution/command/createDataSourceTables.scala |  15 +-
 .../sql/execution/datasources/DataSource.scala     |  39 ++--
 .../execution/datasources/FileFormatWriter.scala   |  51 ++---
 .../InsertIntoHadoopFsRelationCommand.scala        |   7 +-
 .../spark/sql/execution/datasources/V1Writes.scala | 137 +++++++++++++
 .../adaptive/AdaptiveQueryExecSuite.scala          |  37 ++--
 .../datasources/V1WriteCommandSuite.scala          | 217 +++++++++++++++++++++
 .../execution/CreateHiveTableAsSelectCommand.scala |  20 +-
 .../sql/hive/execution/InsertIntoHiveTable.scala   |  69 ++-----
 .../spark/sql/hive/execution/SaveAsHiveFile.scala  |   8 +-
 .../sql/hive/execution/V1WritesHiveUtils.scala     | 108 ++++++++++
 .../command/V1WriteHiveCommandSuite.scala          | 103 ++++++++++
 15 files changed, 703 insertions(+), 126 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 1b7857ead59..631c89d798f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -412,6 +412,14 @@ object SQLConf {
       .longConf
       .createWithDefault(67108864L)
 
+  val PLANNED_WRITE_ENABLED = 
buildConf("spark.sql.optimizer.plannedWrite.enabled")
+    .internal()
+    .doc("When set to true, Spark optimizer will add logical sort operators to 
V1 write commands " +
+      "if needed so that `FileFormatWriter` does not need to insert physical 
sorts.")
+    .version("3.4.0")
+    .booleanConf
+    .createWithDefault(true)
+
   val COMPRESS_CACHED = 
buildConf("spark.sql.inMemoryColumnarStorage.compressed")
     .doc("When set to true Spark SQL will automatically select a compression 
codec for each " +
       "column based on statistics of the data.")
@@ -4617,6 +4625,8 @@ class SQLConf extends Serializable with Logging {
 
   def maxConcurrentOutputFileWriters: Int = 
getConf(SQLConf.MAX_CONCURRENT_OUTPUT_FILE_WRITERS)
 
+  def plannedWriteEnabled: Boolean = getConf(SQLConf.PLANNED_WRITE_ENABLED)
+
   def inferDictAsStruct: Boolean = getConf(SQLConf.INFER_NESTED_DICT_AS_STRUCT)
 
   def legacyInferArrayTypeFromFirstElement: Boolean = getConf(
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
index bffa1d1dae7..72bdab409a9 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
@@ -23,8 +23,7 @@ import org.apache.spark.sql.catalyst.optimizer._
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.connector.catalog.CatalogManager
-import org.apache.spark.sql.execution.datasources.PruneFileSourcePartitions
-import org.apache.spark.sql.execution.datasources.SchemaPruning
+import org.apache.spark.sql.execution.datasources.{PruneFileSourcePartitions, 
SchemaPruning, V1Writes}
 import 
org.apache.spark.sql.execution.datasources.v2.{GroupBasedRowLevelOperationScanPlanning,
 OptimizeMetadataOnlyDeleteFromTable, V2ScanPartitioningAndOrdering, 
V2ScanRelationPushDown, V2Writes}
 import 
org.apache.spark.sql.execution.dynamicpruning.{CleanupDynamicPruningFilters, 
PartitionPruning}
 import 
org.apache.spark.sql.execution.python.{ExtractGroupingPythonUDFFromAggregate, 
ExtractPythonUDFFromAggregate, ExtractPythonUDFs}
@@ -39,6 +38,7 @@ class SparkOptimizer(
     // TODO: move SchemaPruning into catalyst
     Seq(SchemaPruning) :+
       GroupBasedRowLevelOperationScanPlanning :+
+      V1Writes :+
       V2ScanRelationPushDown :+
       V2ScanPartitioningAndOrdering :+
       V2Writes :+
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
index 4410f7fea81..94cef481ecf 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.trees.TreePattern._
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.command.{DataWritingCommandExec, 
ExecutedCommandExec}
+import org.apache.spark.sql.execution.datasources.V1WriteCommand
 import org.apache.spark.sql.execution.datasources.v2.V2CommandExec
 import org.apache.spark.sql.execution.exchange.Exchange
 import org.apache.spark.sql.internal.SQLConf
@@ -46,8 +47,9 @@ case class InsertAdaptiveSparkPlan(
     case _ if !conf.adaptiveExecutionEnabled => plan
     case _: ExecutedCommandExec => plan
     case _: CommandResultExec => plan
-    case c: DataWritingCommandExec => c.copy(child = apply(c.child))
     case c: V2CommandExec => c.withNewChildren(c.children.map(apply))
+    case c: DataWritingCommandExec if !c.cmd.isInstanceOf[V1WriteCommand] =>
+      c.copy(child = apply(c.child))
     case _ if shouldApplyAQE(plan, isSubquery) =>
       if (supportAdaptive(plan)) {
         try {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index e64426f8de8..50ad55d5633 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -20,7 +20,9 @@ package org.apache.spark.sql.execution.command
 import java.net.URI
 
 import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
 import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.catalyst.expressions.SortOrder
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.util.CharVarcharUtils
 import org.apache.spark.sql.errors.QueryCompilationErrors
@@ -141,7 +143,18 @@ case class CreateDataSourceTableAsSelectCommand(
     mode: SaveMode,
     query: LogicalPlan,
     outputColumnNames: Seq[String])
-  extends DataWritingCommand {
+  extends V1WriteCommand {
+
+  override def requiredOrdering: Seq[SortOrder] = {
+    val unresolvedPartitionColumns = 
table.partitionColumnNames.map(UnresolvedAttribute.quoted)
+    val partitionColumns = DataSource.resolvePartitionColumns(
+      unresolvedPartitionColumns,
+      outputColumns,
+      query,
+      SparkSession.active.sessionState.conf.resolver)
+    val options = table.storage.properties
+    V1WritesUtils.getSortOrder(outputColumns, partitionColumns, 
table.bucketSpec, options)
+  }
 
   override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = {
     assert(table.tableType != CatalogTableType.VIEW)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index a220fd334a5..8f8846b89f3 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -29,8 +29,9 @@ import org.apache.spark.SparkException
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
+import org.apache.spark.sql.catalyst.analysis.{Resolver, UnresolvedAttribute}
 import org.apache.spark.sql.catalyst.catalog.{BucketSpec, 
CatalogStorageFormat, CatalogTable, CatalogUtils}
+import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, TypeUtils}
 import org.apache.spark.sql.connector.catalog.TableProvider
@@ -519,18 +520,8 @@ case class DataSource(
       case format: FileFormat =>
         disallowWritingIntervals(outputColumns.map(_.dataType), 
forbidAnsiIntervals = false)
         val cmd = planForWritingFileFormat(format, mode, data)
-        val resolvedPartCols = cmd.partitionColumns.map { col =>
-          // The partition columns created in `planForWritingFileFormat` 
should always be
-          // `UnresolvedAttribute` with a single name part.
-          assert(col.isInstanceOf[UnresolvedAttribute])
-          val unresolved = col.asInstanceOf[UnresolvedAttribute]
-          assert(unresolved.nameParts.length == 1)
-          val name = unresolved.nameParts.head
-          outputColumns.find(a => equality(a.name, name)).getOrElse {
-            throw QueryCompilationErrors.cannotResolveAttributeError(
-              name, data.output.map(_.name).mkString(", "))
-          }
-        }
+        val resolvedPartCols =
+          DataSource.resolvePartitionColumns(cmd.partitionColumns, 
outputColumns, data, equality)
         val resolved = cmd.copy(
           partitionColumns = resolvedPartCols,
           outputColumnNames = outputColumnNames)
@@ -836,4 +827,26 @@ object DataSource extends Logging {
       throw 
QueryCompilationErrors.writeEmptySchemasUnsupportedByDataSourceError()
     }
   }
+
+  /**
+   * Resolve partition columns using output columns of the query plan.
+   */
+  def resolvePartitionColumns(
+      partitionColumns: Seq[Attribute],
+      outputColumns: Seq[Attribute],
+      plan: LogicalPlan,
+      resolver: Resolver): Seq[Attribute] = {
+    partitionColumns.map { col =>
+      // The partition columns created in `planForWritingFileFormat` should 
always be
+      // `UnresolvedAttribute` with a single name part.
+      assert(col.isInstanceOf[UnresolvedAttribute])
+      val unresolved = col.asInstanceOf[UnresolvedAttribute]
+      assert(unresolved.nameParts.length == 1)
+      val name = unresolved.nameParts.head
+      outputColumns.find(a => resolver(a.name, name)).getOrElse {
+        throw QueryCompilationErrors.cannotResolveAttributeError(
+          name, plan.output.map(_.name).mkString(", "))
+      }
+    }
+  }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
index e75041b7fb0..6b48420920d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
@@ -36,7 +36,6 @@ import 
org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReferences
 import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, 
ExprCode}
-import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
 import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.execution.{ProjectExec, SortExec, SparkPlan, 
SQLExecution, UnsafeExternalRowSorter}
@@ -78,6 +77,12 @@ object FileFormatWriter extends Logging {
       maxWriters: Int,
       createSorter: () => UnsafeExternalRowSorter)
 
+  /**
+   * A variable used in tests to check whether the output ordering of the 
query matches the
+   * required ordering of the write command.
+   */
+  private[sql] var outputOrderingMatched: Boolean = false
+
   /**
    * Basic work flow of this command is:
    * 1. Driver side setup, including output committer initialization and data 
source specific
@@ -126,38 +131,8 @@ object FileFormatWriter extends Logging {
     }
     val empty2NullPlan = if (needConvert) ProjectExec(projectList, plan) else 
plan
 
-    val writerBucketSpec = bucketSpec.map { spec =>
-      val bucketColumns = spec.bucketColumnNames.map(c => 
dataColumns.find(_.name == c).get)
-
-      if (options.getOrElse(BucketingUtils.optionForHiveCompatibleBucketWrite, 
"false") ==
-        "true") {
-        // Hive bucketed table: use `HiveHash` and bitwise-and as bucket id 
expression.
-        // Without the extra bitwise-and operation, we can get wrong bucket id 
when hash value of
-        // columns is negative. See Hive implementation in
-        // 
`org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils#getBucketNumber()`.
-        val hashId = BitwiseAnd(HiveHash(bucketColumns), Literal(Int.MaxValue))
-        val bucketIdExpression = Pmod(hashId, Literal(spec.numBuckets))
-
-        // The bucket file name prefix is following Hive, Presto and Trino 
conversion, so this
-        // makes sure Hive bucketed table written by Spark, can be read by 
other SQL engines.
-        //
-        // Hive: 
`org.apache.hadoop.hive.ql.exec.Utilities#getBucketIdFromFile()`.
-        // Trino: 
`io.trino.plugin.hive.BackgroundHiveSplitLoader#BUCKET_PATTERNS`.
-        val fileNamePrefix = (bucketId: Int) => f"$bucketId%05d_0_"
-        WriterBucketSpec(bucketIdExpression, fileNamePrefix)
-      } else {
-        // Spark bucketed table: use `HashPartitioning.partitionIdExpression` 
as bucket id
-        // expression, so that we can guarantee the data distribution is same 
between shuffle and
-        // bucketed data source, which enables us to only shuffle one side 
when join a bucketed
-        // table and a normal one.
-        val bucketIdExpression = HashPartitioning(bucketColumns, 
spec.numBuckets)
-          .partitionIdExpression
-        WriterBucketSpec(bucketIdExpression, (_: Int) => "")
-      }
-    }
-    val sortColumns = bucketSpec.toSeq.flatMap {
-      spec => spec.sortColumnNames.map(c => dataColumns.find(_.name == c).get)
-    }
+    val writerBucketSpec = V1WritesUtils.getWriterBucketSpec(bucketSpec, 
dataColumns, options)
+    val sortColumns = V1WritesUtils.getBucketSortColumns(bucketSpec, 
dataColumns)
 
     val caseInsensitiveOptions = CaseInsensitiveMap(options)
 
@@ -209,6 +184,16 @@ object FileFormatWriter extends Logging {
     // prepares the job, any exception thrown from here shouldn't cause 
abortJob() to be called.
     committer.setupJob(job)
 
+    // When `PLANNED_WRITE_ENABLED` is true, the optimizer rule V1Writes will 
add logical sort
+    // operator based on the required ordering of the V1 write command. So the 
output
+    // ordering of the physical plan should always match the required 
ordering. Here
+    // we set the variable to verify this behavior in tests.
+    // There are two cases where FileFormatWriter still needs to add physical 
sort:
+    // 1) When the planned write config is disabled.
+    // 2) When the concurrent writers are enabled (in this case the required 
ordering of a
+    //    V1 write command will be empty).
+    if (Utils.isTesting) outputOrderingMatched = orderingMatched
+
     try {
       val (rdd, concurrentOutputWriterSpec) = if (orderingMatched) {
         (empty2NullPlan.execute(), None)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
index d773d4bd271..e20d9ed8b53 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, 
CatalogTablePartition}
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils._
-import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.sql.errors.{QueryCompilationErrors, 
QueryExecutionErrors}
@@ -57,7 +57,7 @@ case class InsertIntoHadoopFsRelationCommand(
     catalogTable: Option[CatalogTable],
     fileIndex: Option[FileIndex],
     outputColumnNames: Seq[String])
-  extends DataWritingCommand {
+  extends V1WriteCommand {
 
   private lazy val parameters = CaseInsensitiveMap(options)
 
@@ -74,6 +74,9 @@ case class InsertIntoHadoopFsRelationCommand(
       staticPartitions.size < partitionColumns.length
   }
 
+  override def requiredOrdering: Seq[SortOrder] =
+    V1WritesUtils.getSortOrder(outputColumns, partitionColumns, bucketSpec, 
options)
+
   override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = {
     // Most formats don't do well with duplicate columns, so lets not allow 
that
     SchemaUtils.checkColumnNameDuplication(
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala
new file mode 100644
index 00000000000..94e5f3ccaf4
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.spark.sql.catalyst.SQLConfHelper
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, 
AttributeSet, BitwiseAnd, HiveHash, Literal, Pmod, SortOrder}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Sort}
+import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.command.DataWritingCommand
+import org.apache.spark.sql.internal.SQLConf
+
+trait V1WriteCommand extends DataWritingCommand {
+  // Specify the required ordering for the V1 write command. 
`FileFormatWriter` will
+  // add SortExec if necessary when the requiredOrdering is empty.
+  def requiredOrdering: Seq[SortOrder]
+}
+
+/**
+ * A rule that adds logical sorts to V1 data writing commands.
+ */
+object V1Writes extends Rule[LogicalPlan] with SQLConfHelper {
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    if (conf.plannedWriteEnabled) {
+      plan.transformDown {
+        case write: V1WriteCommand =>
+          val newQuery = prepareQuery(write, write.query)
+          write.withNewChildren(newQuery :: Nil)
+      }
+    } else {
+      plan
+    }
+  }
+
+  private def prepareQuery(write: V1WriteCommand, query: LogicalPlan): 
LogicalPlan = {
+    val requiredOrdering = write.requiredOrdering
+    val outputOrdering = query.outputOrdering
+    // Check if the ordering is already matched. It is needed to ensure the
+    // idempotency of the rule.
+    val orderingMatched = if (requiredOrdering.length > outputOrdering.length) 
{
+      false
+    } else {
+      requiredOrdering.zip(outputOrdering).forall {
+        case (requiredOrder, outputOrder) => 
requiredOrder.semanticEquals(outputOrder)
+      }
+    }
+    if (orderingMatched) {
+      query
+    } else {
+      Sort(requiredOrdering, global = false, query)
+    }
+  }
+}
+
+object V1WritesUtils {
+
+  def getWriterBucketSpec(
+      bucketSpec: Option[BucketSpec],
+      dataColumns: Seq[Attribute],
+      options: Map[String, String]): Option[WriterBucketSpec] = {
+    bucketSpec.map { spec =>
+      val bucketColumns = spec.bucketColumnNames.map(c => 
dataColumns.find(_.name == c).get)
+
+      if (options.getOrElse(BucketingUtils.optionForHiveCompatibleBucketWrite, 
"false") ==
+        "true") {
+        // Hive bucketed table: use `HiveHash` and bitwise-and as bucket id 
expression.
+        // Without the extra bitwise-and operation, we can get wrong bucket id 
when hash value of
+        // columns is negative. See Hive implementation in
+        // 
`org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils#getBucketNumber()`.
+        val hashId = BitwiseAnd(HiveHash(bucketColumns), Literal(Int.MaxValue))
+        val bucketIdExpression = Pmod(hashId, Literal(spec.numBuckets))
+
+        // The bucket file name prefix is following Hive, Presto and Trino 
conversion, so this
+        // makes sure Hive bucketed table written by Spark, can be read by 
other SQL engines.
+        //
+        // Hive: 
`org.apache.hadoop.hive.ql.exec.Utilities#getBucketIdFromFile()`.
+        // Trino: 
`io.trino.plugin.hive.BackgroundHiveSplitLoader#BUCKET_PATTERNS`.
+        val fileNamePrefix = (bucketId: Int) => f"$bucketId%05d_0_"
+        WriterBucketSpec(bucketIdExpression, fileNamePrefix)
+      } else {
+        // Spark bucketed table: use `HashPartitioning.partitionIdExpression` 
as bucket id
+        // expression, so that we can guarantee the data distribution is same 
between shuffle and
+        // bucketed data source, which enables us to only shuffle one side 
when join a bucketed
+        // table and a normal one.
+        val bucketIdExpression = HashPartitioning(bucketColumns, 
spec.numBuckets)
+          .partitionIdExpression
+        WriterBucketSpec(bucketIdExpression, (_: Int) => "")
+      }
+    }
+  }
+
+  def getBucketSortColumns(
+      bucketSpec: Option[BucketSpec],
+      dataColumns: Seq[Attribute]): Seq[Attribute] = {
+    bucketSpec.toSeq.flatMap {
+      spec => spec.sortColumnNames.map(c => dataColumns.find(_.name == c).get)
+    }
+  }
+
+  def getSortOrder(
+      outputColumns: Seq[Attribute],
+      partitionColumns: Seq[Attribute],
+      bucketSpec: Option[BucketSpec],
+      options: Map[String, String]): Seq[SortOrder] = {
+    val partitionSet = AttributeSet(partitionColumns)
+    val dataColumns = outputColumns.filterNot(partitionSet.contains)
+    val writerBucketSpec = V1WritesUtils.getWriterBucketSpec(bucketSpec, 
dataColumns, options)
+    val sortColumns = V1WritesUtils.getBucketSortColumns(bucketSpec, 
dataColumns)
+
+    if (SQLConf.get.maxConcurrentOutputFileWriters > 0 && sortColumns.isEmpty) 
{
+      // Do not insert logical sort when concurrent output writers are enabled.
+      Seq.empty
+    } else {
+      // We should first sort by partition columns, then bucket id, and 
finally sorting columns.
+      // Note we do not need to convert empty string partition columns to null 
when sorting the
+      // columns since null and empty string values will be next to each other.
+      (partitionColumns ++writerBucketSpec.map(_.bucketIdExpression) ++ 
sortColumns)
+        .map(SortOrder(_, Ascending))
+    }
+  }
+}
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index 77d7261fb65..5d8e4bbecfe 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -29,7 +29,7 @@ import org.apache.spark.scheduler.{SparkListener, 
SparkListenerEvent, SparkListe
 import org.apache.spark.sql.{Dataset, QueryTest, Row, SparkSession, Strategy}
 import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
 import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan}
-import org.apache.spark.sql.execution.{CollectLimitExec, CommandResultExec, 
LocalTableScanExec, PartialReducerPartitionSpec, QueryExecution, 
ReusedSubqueryExec, ShuffledRowRDD, SortExec, SparkPlan, UnaryExecNode, 
UnionExec}
+import org.apache.spark.sql.execution.{CollectLimitExec, LocalTableScanExec, 
PartialReducerPartitionSpec, QueryExecution, ReusedSubqueryExec, 
ShuffledRowRDD, SortExec, SparkPlan, UnionExec}
 import org.apache.spark.sql.execution.aggregate.BaseAggregateExec
 import org.apache.spark.sql.execution.command.DataWritingCommandExec
 import org.apache.spark.sql.execution.datasources.noop.NoopDataSource
@@ -1119,16 +1119,27 @@ class AdaptiveQueryExecSuite
     }
   }
 
-  test("SPARK-30953: InsertAdaptiveSparkPlan should apply AQE on child plan of 
write commands") {
+  test("SPARK-30953: InsertAdaptiveSparkPlan should apply AQE on child plan of 
v2 write commands") {
     withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
       SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key -> "true") {
+      var plan: SparkPlan = null
+      val listener = new QueryExecutionListener {
+        override def onSuccess(funcName: String, qe: QueryExecution, 
durationNs: Long): Unit = {
+          plan = qe.executedPlan
+        }
+        override def onFailure(
+          funcName: String, qe: QueryExecution, exception: Exception): Unit = 
{}
+      }
+      spark.listenerManager.register(listener)
       withTable("t1") {
-        val plan = sql("CREATE TABLE t1 USING parquet AS SELECT 1 
col").queryExecution.executedPlan
-        assert(plan.isInstanceOf[CommandResultExec])
-        val commandResultExec = plan.asInstanceOf[CommandResultExec]
-        
assert(commandResultExec.commandPhysicalPlan.isInstanceOf[DataWritingCommandExec])
-        
assert(commandResultExec.commandPhysicalPlan.asInstanceOf[DataWritingCommandExec]
-          .child.isInstanceOf[AdaptiveSparkPlanExec])
+        val format = classOf[NoopDataSource].getName
+        Seq((0, 1)).toDF("x", 
"y").write.format(format).mode("overwrite").save()
+
+        sparkContext.listenerBus.waitUntilEmpty()
+        assert(plan.isInstanceOf[V2TableWriteExec])
+        
assert(plan.asInstanceOf[V2TableWriteExec].child.isInstanceOf[AdaptiveSparkPlanExec])
+
+        spark.listenerManager.unregister(listener)
       }
     }
   }
@@ -1179,7 +1190,10 @@ class AdaptiveQueryExecSuite
           override def onOtherEvent(event: SparkListenerEvent): Unit = {
             event match {
               case SparkListenerSQLAdaptiveExecutionUpdate(_, _, planInfo) =>
-                assert(planInfo.nodeName == "Execute 
CreateDataSourceTableAsSelectCommand")
+                assert(planInfo.nodeName == "AdaptiveSparkPlan")
+                assert(planInfo.children.size == 1)
+                assert(planInfo.children.head.nodeName ==
+                  "Execute CreateDataSourceTableAsSelectCommand")
                 checkDone = true
               case _ => // ignore other events
             }
@@ -1584,9 +1598,8 @@ class AdaptiveQueryExecSuite
       var noLocalread: Boolean = false
       val listener = new QueryExecutionListener {
         override def onSuccess(funcName: String, qe: QueryExecution, 
durationNs: Long): Unit = {
-          qe.executedPlan match {
-            case plan@(_: DataWritingCommandExec | _: V2TableWriteExec) =>
-              
assert(plan.asInstanceOf[UnaryExecNode].child.isInstanceOf[AdaptiveSparkPlanExec])
+          stripAQEPlan(qe.executedPlan) match {
+            case plan @ (_: DataWritingCommandExec | _: V2TableWriteExec) =>
               noLocalread = collect(plan) {
                 case exec: AQEShuffleReadExec if exec.isLocalRead => exec
               }.isEmpty
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala
new file mode 100644
index 00000000000..350cac49139
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.spark.sql.{QueryTest, Row}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Sort}
+import org.apache.spark.sql.execution.QueryExecution
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
+import org.apache.spark.sql.util.QueryExecutionListener
+
+abstract class V1WriteCommandSuiteBase extends QueryTest with SQLTestUtils {
+
+  import testImplicits._
+
+  setupTestData()
+
+  protected override def beforeAll(): Unit = {
+    super.beforeAll()
+    (0 to 20).map(i => (i, i % 5, (i % 10).toString))
+      .toDF("i", "j", "k")
+      .write
+      .saveAsTable("t0")
+  }
+
+  protected override def afterAll(): Unit = {
+    sql("drop table if exists t0")
+    super.afterAll()
+  }
+
+  protected def withPlannedWrite(testFunc: Boolean => Any): Unit = {
+    Seq(true, false).foreach { enabled =>
+      withSQLConf(SQLConf.PLANNED_WRITE_ENABLED.key -> enabled.toString) {
+        testFunc(enabled)
+      }
+    }
+  }
+
+  // Execute a write query and check ordering of the plan.
+  protected def executeAndCheckOrdering(
+      hasLogicalSort: Boolean, orderingMatched: Boolean)(query: => Unit): Unit 
= {
+    var optimizedPlan: LogicalPlan = null
+
+    val listener = new QueryExecutionListener {
+      override def onSuccess(funcName: String, qe: QueryExecution, durationNs: 
Long): Unit = {
+        qe.optimizedPlan match {
+          case w: V1WriteCommand =>
+            optimizedPlan = w.query
+          case _ =>
+        }
+      }
+      override def onFailure(funcName: String, qe: QueryExecution, exception: 
Exception): Unit = {}
+    }
+    spark.listenerManager.register(listener)
+
+    query
+
+    // Check whether the output ordering is matched before FileFormatWriter 
executes rdd.
+    assert(FileFormatWriter.outputOrderingMatched == orderingMatched,
+      s"Expect: $orderingMatched, Actual: 
${FileFormatWriter.outputOrderingMatched}")
+
+    sparkContext.listenerBus.waitUntilEmpty()
+
+    // Check whether a logical sort node is at the top of the logical plan of 
the write query.
+    if (optimizedPlan != null) {
+      assert(optimizedPlan.isInstanceOf[Sort] == hasLogicalSort,
+        s"Expect hasLogicalSort: $hasLogicalSort, Actual: 
${optimizedPlan.isInstanceOf[Sort]}")
+    }
+
+    spark.listenerManager.unregister(listener)
+  }
+}
+
+class V1WriteCommandSuite extends V1WriteCommandSuiteBase with 
SharedSparkSession {
+
+  import testImplicits._
+
+  test("v1 write without partition columns") {
+    withPlannedWrite { enabled =>
+      withTable("t") {
+        executeAndCheckOrdering(hasLogicalSort = false, orderingMatched = 
true) {
+          sql("CREATE TABLE t USING PARQUET AS SELECT * FROM t0")
+        }
+      }
+    }
+  }
+
+  test("v1 write with non-string partition columns") {
+    withPlannedWrite { enabled =>
+      withTable("t") {
+        executeAndCheckOrdering(hasLogicalSort = enabled, orderingMatched = 
enabled) {
+          sql("CREATE TABLE t USING PARQUET PARTITIONED BY (j) AS SELECT i, k, 
j FROM t0")
+        }
+      }
+    }
+  }
+
+  test("v1 write with string partition columns") {
+    withPlannedWrite { enabled =>
+      withTable("t") {
+        executeAndCheckOrdering(hasLogicalSort = enabled, orderingMatched = 
enabled) {
+          sql("CREATE TABLE t USING PARQUET PARTITIONED BY (k) AS SELECT * 
FROM t0")
+        }
+      }
+    }
+  }
+
+  test("v1 write with partition, bucketed and sort columns") {
+    withPlannedWrite { enabled =>
+      withTable("t") {
+        sql(
+          """
+            |CREATE TABLE t(i INT, j INT) USING PARQUET
+            |PARTITIONED BY (k STRING)
+            |CLUSTERED BY (i, j) SORTED BY (j) INTO 2 BUCKETS
+            |""".stripMargin)
+        executeAndCheckOrdering(hasLogicalSort = enabled, orderingMatched = 
enabled) {
+          sql("INSERT INTO t SELECT * FROM t0")
+        }
+      }
+    }
+  }
+
+  test("v1 write with already sorted plan - non-string partition column") {
+    withPlannedWrite { enabled =>
+      withTable("t") {
+        sql(
+          """
+            |CREATE TABLE t(i INT, k STRING) USING PARQUET
+            |PARTITIONED BY (j INT)
+            |""".stripMargin)
+        executeAndCheckOrdering(hasLogicalSort = true, orderingMatched = true) 
{
+          sql("INSERT INTO t SELECT i, k, j FROM t0 ORDER BY j")
+        }
+      }
+    }
+  }
+
+  test("v1 write with already sorted plan - string partition column") {
+    withPlannedWrite { enabled =>
+      withTable("t") {
+        sql(
+          """
+            |CREATE TABLE t(i INT, j INT) USING PARQUET
+            |PARTITIONED BY (k STRING)
+            |""".stripMargin)
+        executeAndCheckOrdering(hasLogicalSort = true, orderingMatched = true) 
{
+          sql("INSERT INTO t SELECT * FROM t0 ORDER BY k")
+        }
+      }
+    }
+  }
+
+  test("v1 write with null and empty string column values") {
+    withPlannedWrite { enabled =>
+      withTempPath { path =>
+        executeAndCheckOrdering(hasLogicalSort = enabled, orderingMatched = 
enabled) {
+          Seq((0, None), (1, Some("")), (2, None), (3, Some("x")))
+            .toDF("id", "p")
+            .write
+            .partitionBy("p")
+            .parquet(path.toString)
+          checkAnswer(
+            spark.read.parquet(path.toString).where("p IS NULL").sort($"id"),
+            Seq(Row(0, null), Row(1, null), Row(2, null)))
+          // Check the empty string and null values should be written to the 
same file.
+          val files = path.listFiles().filterNot(
+            f => f.getName.startsWith(".") || f.getName.startsWith("_"))
+          assert(files.length == 2)
+        }
+      }
+    }
+  }
+
+  test("v1 write with AQE changing SMJ to BHJ") {
+    withPlannedWrite { enabled =>
+      withTable("t") {
+        sql(
+          """
+            |CREATE TABLE t(key INT, value STRING) USING PARQUET
+            |PARTITIONED BY (a INT)
+            |""".stripMargin)
+        withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80") {
+          // The top level sort added by V1 write will be removed by the 
physical rule
+          // RemoveRedundantSorts initially, and during the execution AQE will 
change
+          // SMJ to BHJ which will remove the original output ordering from 
the SMJ.
+          // In this case AQE should still add back the sort node from the 
logical plan
+          // during re-planning, and ordering should be matched in 
FileFormatWriter.
+          executeAndCheckOrdering(hasLogicalSort = enabled, orderingMatched = 
enabled) {
+            sql(
+              """
+                |INSERT INTO t
+                |SELECT key, value, a
+                |FROM testData JOIN testData2 ON key = a
+                |WHERE value = '1'
+                |""".stripMargin)
+          }
+        }
+      }
+    }
+  }
+}
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
index 96b41dd8e35..55644e6a341 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
@@ -21,16 +21,17 @@ import scala.util.control.NonFatal
 
 import org.apache.spark.sql.{Row, SaveMode, SparkSession}
 import org.apache.spark.sql.catalyst.catalog.{CatalogTable, SessionCatalog}
+import org.apache.spark.sql.catalyst.expressions.SortOrder
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.util.CharVarcharUtils
 import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.command.{DataWritingCommand, DDLUtils}
-import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
InsertIntoHadoopFsRelationCommand, LogicalRelation}
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
InsertIntoHadoopFsRelationCommand, LogicalRelation, V1WriteCommand, 
V1WritesUtils}
 import org.apache.spark.sql.hive.HiveSessionCatalog
 import org.apache.spark.util.Utils
 
-trait CreateHiveTableAsSelectBase extends DataWritingCommand {
+trait CreateHiveTableAsSelectBase extends V1WriteCommand with 
V1WritesHiveUtils {
   val tableDesc: CatalogTable
   val query: LogicalPlan
   val outputColumnNames: Seq[String]
@@ -38,6 +39,21 @@ trait CreateHiveTableAsSelectBase extends DataWritingCommand 
{
 
   protected val tableIdentifier = tableDesc.identifier
 
+  override def requiredOrdering: Seq[SortOrder] = {
+    // If the table does not exist the schema should always be empty.
+    val table = if (tableDesc.schema.isEmpty) {
+      val tableSchema = 
CharVarcharUtils.getRawSchema(outputColumns.toStructType, conf)
+      tableDesc.copy(schema = tableSchema)
+    } else {
+      tableDesc
+    }
+    // For CTAS, there is no static partition values to insert.
+    val partition = tableDesc.partitionColumnNames.map(_ -> None).toMap
+    val partitionColumns = getDynamicPartitionColumns(table, partition, query)
+    val options = getOptionsWithHiveBucketWrite(tableDesc.bucketSpec)
+    V1WritesUtils.getSortOrder(outputColumns, partitionColumns, 
tableDesc.bucketSpec, options)
+  }
+
   override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = {
     val catalog = sparkSession.sessionState.catalog
     val tableExists = catalog.tableExists(tableIdentifier)
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 8fca95130dd..dcaeac63fb2 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -17,23 +17,20 @@
 
 package org.apache.spark.sql.hive.execution
 
-import java.util.Locale
-
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.hive.conf.HiveConf
-import org.apache.hadoop.hive.ql.ErrorMsg
 import org.apache.hadoop.hive.ql.plan.TableDesc
 
-import org.apache.spark.SparkException
-import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
+import org.apache.spark.sql.{Row, SparkSession}
 import org.apache.spark.sql.catalyst.catalog._
-import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.expressions.SortOrder
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
-import org.apache.spark.sql.errors.{QueryCompilationErrors, 
QueryExecutionErrors}
+import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.command.CommandUtils
+import org.apache.spark.sql.execution.datasources.{V1WriteCommand, 
V1WritesUtils}
 import org.apache.spark.sql.hive.HiveExternalCatalog
 import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
 import org.apache.spark.sql.hive.client.HiveClientImpl
@@ -76,7 +73,14 @@ case class InsertIntoHiveTable(
     query: LogicalPlan,
     overwrite: Boolean,
     ifPartitionNotExists: Boolean,
-    outputColumnNames: Seq[String]) extends SaveAsHiveFile {
+    outputColumnNames: Seq[String]
+  ) extends SaveAsHiveFile with V1WriteCommand with V1WritesHiveUtils {
+
+  override def requiredOrdering: Seq[SortOrder] = {
+    val partitionColumns = getDynamicPartitionColumns(table, partition, query)
+    val options = getOptionsWithHiveBucketWrite(table.bucketSpec)
+    V1WritesUtils.getSortOrder(outputColumns, partitionColumns, 
table.bucketSpec, options)
+  }
 
   /**
    * Inserts all the rows in the table into Hive.  Row objects are properly 
serialized with the
@@ -133,53 +137,8 @@ case class InsertIntoHiveTable(
     val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false)
 
     val numDynamicPartitions = partition.values.count(_.isEmpty)
-    val numStaticPartitions = partition.values.count(_.nonEmpty)
-    val partitionSpec = partition.map {
-      case (key, Some(null)) => key -> 
ExternalCatalogUtils.DEFAULT_PARTITION_NAME
-      case (key, Some(value)) => key -> value
-      case (key, None) => key -> ""
-    }
-
-    // All partition column names in the format of "<column name 1>/<column 
name 2>/..."
-    val partitionColumns = 
fileSinkConf.getTableInfo.getProperties.getProperty("partition_columns")
-    val partitionColumnNames = 
Option(partitionColumns).map(_.split("/")).getOrElse(Array.empty)
-
-    // By this time, the partition map must match the table's partition columns
-    if (partitionColumnNames.toSet != partition.keySet) {
-      throw 
QueryExecutionErrors.requestedPartitionsMismatchTablePartitionsError(table, 
partition)
-    }
-
-    // Validate partition spec if there exist any dynamic partitions
-    if (numDynamicPartitions > 0) {
-      // Report error if dynamic partitioning is not enabled
-      if (!hadoopConf.get("hive.exec.dynamic.partition", "true").toBoolean) {
-        throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg)
-      }
-
-      // Report error if dynamic partition strict mode is on but no static 
partition is found
-      if (numStaticPartitions == 0 &&
-        hadoopConf.get("hive.exec.dynamic.partition.mode", 
"strict").equalsIgnoreCase("strict")) {
-        throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_STRICT_MODE.getMsg)
-      }
-
-      // Report error if any static partition appears after a dynamic partition
-      val isDynamic = partitionColumnNames.map(partitionSpec(_).isEmpty)
-      if (isDynamic.init.zip(isDynamic.tail).contains((true, false))) {
-        throw new AnalysisException(ErrorMsg.PARTITION_DYN_STA_ORDER.getMsg)
-      }
-    }
-
-    val partitionAttributes = 
partitionColumnNames.takeRight(numDynamicPartitions).map { name =>
-      val attr = query.resolve(name :: Nil, 
sparkSession.sessionState.analyzer.resolver).getOrElse {
-        throw QueryCompilationErrors.cannotResolveAttributeError(
-          name, query.output.map(_.name).mkString(", "))
-      }.asInstanceOf[Attribute]
-      // SPARK-28054: Hive metastore is not case preserving and keeps 
partition columns
-      // with lower cased names. Hive will validate the column names in the 
partition directories
-      // during `loadDynamicPartitions`. Spark needs to write partition 
directories with lower-cased
-      // column names in order to make `loadDynamicPartitions` work.
-      attr.withName(name.toLowerCase(Locale.ROOT))
-    }
+    val partitionSpec = getPartitionSpec(partition)
+    val partitionAttributes = getDynamicPartitionColumns(table, partition, 
query)
 
     val writtenParts = saveAsHiveFile(
       sparkSession = sparkSession,
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
index 7f885729bd2..799cea42e1e 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
@@ -37,13 +37,13 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.command.DataWritingCommand
-import org.apache.spark.sql.execution.datasources.{BucketingUtils, 
FileFormatWriter}
+import org.apache.spark.sql.execution.datasources.FileFormatWriter
 import org.apache.spark.sql.hive.HiveExternalCatalog
 import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
 import org.apache.spark.sql.hive.client.HiveVersion
 
 // Base trait from which all hive insert statement physical execution extends.
-private[hive] trait SaveAsHiveFile extends DataWritingCommand {
+private[hive] trait SaveAsHiveFile extends DataWritingCommand with 
V1WritesHiveUtils {
 
   var createdTempDir: Option[Path] = None
 
@@ -86,9 +86,7 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand 
{
       jobId = java.util.UUID.randomUUID().toString,
       outputPath = outputLocation)
 
-    val options = bucketSpec
-      .map(_ => Map(BucketingUtils.optionForHiveCompatibleBucketWrite -> 
"true"))
-      .getOrElse(Map.empty)
+    val options = getOptionsWithHiveBucketWrite(bucketSpec)
 
     FileFormatWriter.write(
       sparkSession = sparkSession,
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/V1WritesHiveUtils.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/V1WritesHiveUtils.scala
new file mode 100644
index 00000000000..752753f334a
--- /dev/null
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/V1WritesHiveUtils.scala
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import java.util.Locale
+
+import org.apache.hadoop.hive.ql.ErrorMsg
+import org.apache.hadoop.hive.ql.plan.TableDesc
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, 
ExternalCatalogUtils}
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.errors.{QueryCompilationErrors, 
QueryExecutionErrors}
+import org.apache.spark.sql.execution.datasources.BucketingUtils
+import org.apache.spark.sql.hive.client.HiveClientImpl
+
+trait V1WritesHiveUtils {
+  def getPartitionSpec(partition: Map[String, Option[String]]): Map[String, 
String] = {
+    partition.map {
+      case (key, Some(null)) => key -> 
ExternalCatalogUtils.DEFAULT_PARTITION_NAME
+      case (key, Some(value)) => key -> value
+      case (key, None) => key -> ""
+    }
+  }
+
+  def getDynamicPartitionColumns(
+      table: CatalogTable,
+      partition: Map[String, Option[String]],
+      query: LogicalPlan): Seq[Attribute] = {
+    val numDynamicPartitions = partition.values.count(_.isEmpty)
+    val numStaticPartitions = partition.values.count(_.nonEmpty)
+    val partitionSpec = getPartitionSpec(partition)
+
+    val hiveQlTable = HiveClientImpl.toHiveTable(table)
+    val tableDesc = new TableDesc(
+      hiveQlTable.getInputFormatClass,
+      hiveQlTable.getOutputFormatClass,
+      hiveQlTable.getMetadata
+    )
+
+    // All partition column names in the format of "<column name 1>/<column 
name 2>/..."
+    val partitionColumns = 
tableDesc.getProperties.getProperty("partition_columns")
+    val partitionColumnNames = 
Option(partitionColumns).map(_.split("/")).getOrElse(Array.empty)
+
+    // By this time, the partition map must match the table's partition columns
+    if (partitionColumnNames.toSet != partition.keySet) {
+      throw 
QueryExecutionErrors.requestedPartitionsMismatchTablePartitionsError(table, 
partition)
+    }
+
+    val sessionState = SparkSession.active.sessionState
+    val hadoopConf = sessionState.newHadoopConf()
+
+    // Validate partition spec if there exist any dynamic partitions
+    if (numDynamicPartitions > 0) {
+      // Report error if dynamic partitioning is not enabled
+      if (!hadoopConf.get("hive.exec.dynamic.partition", "true").toBoolean) {
+        throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg)
+      }
+
+      // Report error if dynamic partition strict mode is on but no static 
partition is found
+      if (numStaticPartitions == 0 &&
+        hadoopConf.get("hive.exec.dynamic.partition.mode", 
"strict").equalsIgnoreCase("strict")) {
+        throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_STRICT_MODE.getMsg)
+      }
+
+      // Report error if any static partition appears after a dynamic partition
+      val isDynamic = partitionColumnNames.map(partitionSpec(_).isEmpty)
+      if (isDynamic.init.zip(isDynamic.tail).contains((true, false))) {
+        throw new AnalysisException(ErrorMsg.PARTITION_DYN_STA_ORDER.getMsg)
+      }
+    }
+
+    partitionColumnNames.takeRight(numDynamicPartitions).map { name =>
+      val attr = query.resolve(name :: Nil, 
sessionState.analyzer.resolver).getOrElse {
+        throw QueryCompilationErrors.cannotResolveAttributeError(
+          name, query.output.map(_.name).mkString(", "))
+      }.asInstanceOf[Attribute]
+      // SPARK-28054: Hive metastore is not case preserving and keeps 
partition columns
+      // with lower cased names. Hive will validate the column names in the 
partition directories
+      // during `loadDynamicPartitions`. Spark needs to write partition 
directories with lower-cased
+      // column names in order to make `loadDynamicPartitions` work.
+      attr.withName(name.toLowerCase(Locale.ROOT))
+    }
+  }
+
+  def getOptionsWithHiveBucketWrite(bucketSpec: Option[BucketSpec]): 
Map[String, String] = {
+    bucketSpec
+      .map(_ => Map(BucketingUtils.optionForHiveCompatibleBucketWrite -> 
"true"))
+      .getOrElse(Map.empty)
+  }
+}
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/V1WriteHiveCommandSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/V1WriteHiveCommandSuite.scala
new file mode 100644
index 00000000000..2c8b2001501
--- /dev/null
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/V1WriteHiveCommandSuite.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution.command
+
+import org.apache.spark.sql.execution.datasources.V1WriteCommandSuiteBase
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+
+class V1WriteHiveCommandSuite extends V1WriteCommandSuiteBase with 
TestHiveSingleton {
+
+  test("create hive table as select - no partition column") {
+    withPlannedWrite { enabled =>
+      withTable("t") {
+        executeAndCheckOrdering(hasLogicalSort = false, orderingMatched = 
true) {
+          sql("CREATE TABLE t AS SELECT * FROM t0")
+        }
+      }
+    }
+  }
+
+  test("create hive table as select") {
+    withPlannedWrite { enabled =>
+      withTable("t") {
+        withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
+          executeAndCheckOrdering(hasLogicalSort = enabled, orderingMatched = 
enabled) {
+            sql(
+              """
+                |CREATE TABLE t
+                |PARTITIONED BY (k)
+                |AS SELECT * FROM t0
+                |""".stripMargin)
+          }
+        }
+      }
+    }
+  }
+
+  test("insert into hive table") {
+    withPlannedWrite { enabled =>
+      withTable("t") {
+        sql(
+          """
+            |CREATE TABLE t (i INT, j INT)
+            |PARTITIONED BY (k STRING)
+            |CLUSTERED BY (i, j) SORTED BY (j) INTO 2 BUCKETS
+            |""".stripMargin)
+        withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
+          executeAndCheckOrdering(hasLogicalSort = enabled, orderingMatched = 
enabled) {
+            sql("INSERT INTO t SELECT * FROM t0")
+          }
+        }
+      }
+    }
+  }
+
+  test("insert overwrite hive table") {
+    withPlannedWrite { enabled =>
+      withTable("t") {
+        withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
+        sql(
+          """
+            |CREATE TABLE t
+            |PARTITIONED BY (k)
+            |AS SELECT * FROM t0
+            |""".stripMargin)
+          executeAndCheckOrdering(hasLogicalSort = enabled, orderingMatched = 
enabled) {
+            sql("INSERT OVERWRITE t SELECT j AS i, i AS j, k FROM t0")
+          }
+        }
+      }
+    }
+  }
+
+  test("insert into hive table with static partitions only") {
+    withPlannedWrite { enabled =>
+      withTable("t") {
+        sql(
+          """
+            |CREATE TABLE t (i INT, j INT)
+            |PARTITIONED BY (k STRING)
+            |""".stripMargin)
+        // No dynamic partition so no sort is needed.
+        executeAndCheckOrdering(hasLogicalSort = false, orderingMatched = 
true) {
+          sql("INSERT INTO t PARTITION (k='0') SELECT i, j FROM t0 WHERE k = 
'0'")
+        }
+      }
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to