This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch branch-1.8
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/branch-1.8 by this push:
     new ee68ca147 [KYUUBI #6078] KSHC should handle the commit of the 
partitioned table as dynamic partition at write path
ee68ca147 is described below

commit ee68ca1472b3cb744b5eeff6bc721612301d9187
Author: yikaifei <[email protected]>
AuthorDate: Thu Mar 7 17:47:14 2024 +0800

    [KYUUBI #6078] KSHC should handle the commit of the partitioned table as 
dynamic partition at write path
    
    # :mag: Description
    ## Issue References ๐Ÿ”—
    
    This pull request fixes https://github.com/apache/kyuubi/issues/6078, KSHC 
should handle the commit of the partitioned table as dynamic partition at write 
path, that's beacuse the process of writing with Apache Spark DataSourceV2 
using dynamic partitioning to handle static partitions.
    
    ## Describe Your Solution ๐Ÿ”ง
    
    Please include a summary of the change and which issue is fixed. Please 
also include relevant motivation and context. List any dependencies that are 
required for this change.
    
    ## Types of changes :bookmark:
    
    - [x] Bugfix (non-breaking change which fixes an issue)
    - [ ] New feature (non-breaking change which adds functionality)
    - [ ] Breaking change (fix or feature that would cause existing 
functionality to change)
    
    ## Test Plan ๐Ÿงช
    
    #### Behavior Without This Pull Request :coffin:
    
    #### Behavior With This Pull Request :tada:
    
    #### Related Unit Tests
    
    ---
    
    # Checklist ๐Ÿ“
    
    - [x] This patch was not authored or co-authored using [Generative 
Tooling](https://www.apache.org/legal/generative-tooling.html)
    
    **Be nice. Be informative.**
    
    Closes #6082 from Yikf/KYUUBI-6078.
    
    Closes #6078
    
    2ae183672 [yikaifei] KSHC should handle the commit of the partitioned table 
as dynamic partition at write path
    
    Authored-by: yikaifei <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
    (cherry picked from commit 5bee05e45fdc9edb01f61babd7537ef7333f7468)
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../connector/hive/write/HiveBatchWrite.scala      | 218 ++++++---------------
 .../spark/connector/hive/write/HiveWrite.scala     |  46 +----
 .../connector/hive/write/HiveWriteBuilder.scala    |  43 +---
 .../connector/hive/write/HiveWriteHelper.scala     |  14 --
 .../spark/connector/hive/HiveQuerySuite.scala      |  36 ++--
 5 files changed, 95 insertions(+), 262 deletions(-)

diff --git 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveBatchWrite.scala
 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveBatchWrite.scala
index d12fc0efc..2a30ac434 100644
--- 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveBatchWrite.scala
+++ 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveBatchWrite.scala
@@ -30,21 +30,18 @@ import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.sql.connector.write.{BatchWrite, DataWriterFactory, 
PhysicalWriteInfo, WriterCommitMessage}
 import org.apache.spark.sql.execution.datasources.{WriteJobDescription, 
WriteTaskResult}
 import org.apache.spark.sql.execution.datasources.v2.FileBatchWrite
-import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.{hive, 
toSQLValue, HiveExternalCatalog}
+import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.toSQLValue
 import org.apache.spark.sql.types.StringType
 
 import org.apache.kyuubi.spark.connector.hive.{HiveConnectorUtils, 
HiveTableCatalog, KyuubiHiveConnectorException}
-import 
org.apache.kyuubi.spark.connector.hive.write.HiveWriteHelper.getPartitionSpec
 
 class HiveBatchWrite(
     sparkSession: SparkSession,
     table: CatalogTable,
     hiveTableCatalog: HiveTableCatalog,
     tmpLocation: Option[Path],
-    partition: Map[String, Option[String]],
-    partitionColumnNames: Seq[String],
+    dynamicPartition: Map[String, Option[String]],
     overwrite: Boolean,
-    ifPartitionNotExists: Boolean,
     hadoopConf: Configuration,
     fileBatchWrite: FileBatchWrite,
     externalCatalog: ExternalCatalog,
@@ -114,13 +111,10 @@ class HiveBatchWrite(
   }
 
   private def commitToMetastore(writtenParts: Set[String]): Unit = {
-    val numDynamicPartitions = partition.values.count(_.isEmpty)
-    val partitionSpec = getPartitionSpec(partition)
-    val staticPartitionSpec = partitionSpec.filter {
-      case (_, v) => !v.equals("")
-    }
+    val numDynamicPartitions = table.partitionColumnNames.size
+    val partitionSpec = table.partitionColumnNames.map(colName => colName -> 
"").toMap
 
-    if (partition.isEmpty) {
+    if (dynamicPartition.isEmpty) {
       externalCatalog.loadTable(
         table.database,
         table.identifier.table,
@@ -130,155 +124,67 @@ class HiveBatchWrite(
       return
     }
 
-    if (numDynamicPartitions > 0) {
-      if (overwrite && table.tableType == CatalogTableType.EXTERNAL) {
-        val numWrittenParts = writtenParts.size
-        val maxDynamicPartitionsKey = 
HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS.varname
-        val maxDynamicPartitions = hadoopConf.getInt(
-          maxDynamicPartitionsKey,
-          HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS.defaultIntVal)
-        if (numWrittenParts > maxDynamicPartitions) {
-          throw KyuubiHiveConnectorException(
-            s"Number of dynamic partitions created is $numWrittenParts, " +
-              s"which is more than $maxDynamicPartitions. " +
-              s"To solve this try to set $maxDynamicPartitionsKey " +
-              s"to at least $numWrittenParts.")
-        }
-        // SPARK-29295: When insert overwrite to a Hive external table 
partition, if the
-        // partition does not exist, Hive will not check if the external 
partition directory
-        // exists or not before copying files. So if users drop the partition, 
and then do
-        // insert overwrite to the same partition, the partition will have 
both old and new
-        // data. We construct partition path. If the path exists, we delete it 
manually.
-        writtenParts.foreach { partPath =>
-          val dpMap = partPath.split("/").map { part =>
-            val splitPart = part.split("=")
-            assert(splitPart.size == 2, s"Invalid written partition path: 
$part")
-            ExternalCatalogUtils.unescapePathName(splitPart(0)) ->
-              ExternalCatalogUtils.unescapePathName(splitPart(1))
-          }.toMap
-
-          val caseInsensitiveDpMap = CaseInsensitiveMap(dpMap)
-
-          val updatedPartitionSpec = partition.map {
-            case (key, Some(null)) => key -> 
ExternalCatalogUtils.DEFAULT_PARTITION_NAME
-            case (key, Some(value)) => key -> value
-            case (key, None) if caseInsensitiveDpMap.contains(key) =>
-              key -> caseInsensitiveDpMap(key)
-            case (key, _) =>
-              throw KyuubiHiveConnectorException(
-                s"Dynamic partition key ${toSQLValue(key, StringType)} " +
-                  "is not among written partition paths.")
-          }
-          val partitionColumnNames = table.partitionColumnNames
-          val tablePath = new Path(table.location)
-          val partitionPath = ExternalCatalogUtils.generatePartitionPath(
-            updatedPartitionSpec,
-            partitionColumnNames,
-            tablePath)
-
-          val fs = partitionPath.getFileSystem(hadoopConf)
-          if (fs.exists(partitionPath)) {
-            if (!fs.delete(partitionPath, true)) {
-              throw KyuubiHiveConnectorException(s"Cannot remove partition 
directory " +
-                s"'$partitionPath'")
-            }
-          }
-        }
+    if (overwrite && table.tableType == CatalogTableType.EXTERNAL) {
+      val numWrittenParts = writtenParts.size
+      val maxDynamicPartitionsKey = 
HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS.varname
+      val maxDynamicPartitions = hadoopConf.getInt(
+        maxDynamicPartitionsKey,
+        HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS.defaultIntVal)
+      if (numWrittenParts > maxDynamicPartitions) {
+        throw KyuubiHiveConnectorException(
+          s"Number of dynamic partitions created is $numWrittenParts, " +
+            s"which is more than $maxDynamicPartitions. " +
+            s"To solve this try to set $maxDynamicPartitionsKey " +
+            s"to at least $numWrittenParts.")
       }
-
-      val loadPath = ExternalCatalogUtils.generatePartitionPath(
-        staticPartitionSpec,
-        partitionColumnNames.take(partitionColumnNames.length - 
numDynamicPartitions),
-        tmpLocation.get)
-
-      externalCatalog.loadDynamicPartitions(
-        db = table.database,
-        table = table.identifier.table,
-        loadPath.toString,
-        partitionSpec,
-        overwrite,
-        numDynamicPartitions)
-    } else {
-      // scalastyle:off
-      // ifNotExists is only valid with static partition, refer to
-      // 
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML#LanguageManualDML-InsertingdataintoHiveTablesfromqueries
-      // scalastyle:on
-      val oldPart =
-        externalCatalog.getPartitionOption(
-          table.database,
-          table.identifier.table,
-          partitionSpec)
-
-      var doHiveOverwrite = overwrite
-
-      if (oldPart.isEmpty || !ifPartitionNotExists) {
-        // SPARK-29295: When insert overwrite to a Hive external table 
partition, if the
-        // partition does not exist, Hive will not check if the external 
partition directory
-        // exists or not before copying files. So if users drop the partition, 
and then do
-        // insert overwrite to the same partition, the partition will have 
both old and new
-        // data. We construct partition path. If the path exists, we delete it 
manually.
-        val partitionPath =
-          if (oldPart.isEmpty && overwrite
-            && table.tableType == CatalogTableType.EXTERNAL) {
-            val partitionColumnNames = table.partitionColumnNames
-            val tablePath = new Path(table.location)
-            Some(ExternalCatalogUtils.generatePartitionPath(
-              partitionSpec,
-              partitionColumnNames,
-              tablePath))
-          } else {
-            oldPart.flatMap(_.storage.locationUri.map(uri => new Path(uri)))
-          }
-
-        import hive._
-        // SPARK-18107: Insert overwrite runs much slower than hive-client.
-        // Newer Hive largely improves insert overwrite performance. As Spark 
uses older Hive
-        // version and we may not want to catch up new Hive version every 
time. We delete the
-        // Hive partition first and then load data file into the Hive 
partition.
-        val hiveVersion = 
externalCatalog.asInstanceOf[ExternalCatalogWithListener]
-          .unwrapped.asInstanceOf[HiveExternalCatalog]
-          .client
-          .version
-        // SPARK-31684:
-        // For Hive 2.0.0 and onwards, as 
https://issues.apache.org/jira/browse/HIVE-11940
-        // has been fixed, and there is no performance issue anymore. We 
should leave the
-        // overwrite logic to hive to avoid failure in `FileSystem#checkPath` 
when the table
-        // and partition locations do not belong to the same `FileSystem`
-        // TODO(SPARK-31675): For Hive 2.2.0 and earlier, if the table and 
partition locations
-        // do not belong together, we will still get the same error thrown by 
hive encryption
-        // check. see https://issues.apache.org/jira/browse/HIVE-14380.
-        // So we still disable for Hive overwrite for Hive 1.x for better 
performance because
-        // the partition and table are on the same cluster in most cases.
-        if (partitionPath.nonEmpty && overwrite && hiveVersion < v2_0) {
-          partitionPath.foreach { path =>
-            val fs = path.getFileSystem(hadoopConf)
-            if (fs.exists(path)) {
-              if (!fs.delete(path, true)) {
-                throw new RuntimeException(s"Cannot remove partition directory 
'$partitionPath'")
-              }
-              // Don't let Hive do overwrite operation since it is slower.
-              doHiveOverwrite = false
-            }
+      // SPARK-29295: When insert overwrite to a Hive external table 
partition, if the
+      // partition does not exist, Hive will not check if the external 
partition directory
+      // exists or not before copying files. So if users drop the partition, 
and then do
+      // insert overwrite to the same partition, the partition will have both 
old and new
+      // data. We construct partition path. If the path exists, we delete it 
manually.
+      writtenParts.foreach { partPath =>
+        val dpMap = partPath.split("/").map { part =>
+          val splitPart = part.split("=")
+          assert(splitPart.size == 2, s"Invalid written partition path: $part")
+          ExternalCatalogUtils.unescapePathName(splitPart(0)) ->
+            ExternalCatalogUtils.unescapePathName(splitPart(1))
+        }.toMap
+
+        val caseInsensitiveDpMap = CaseInsensitiveMap(dpMap)
+
+        val updatedPartitionSpec = dynamicPartition.map {
+          case (key, Some(null)) => key -> 
ExternalCatalogUtils.DEFAULT_PARTITION_NAME
+          case (key, Some(value)) => key -> value
+          case (key, None) if caseInsensitiveDpMap.contains(key) =>
+            key -> caseInsensitiveDpMap(key)
+          case (key, _) =>
+            throw KyuubiHiveConnectorException(
+              s"Dynamic partition key ${toSQLValue(key, StringType)} " +
+                "is not among written partition paths.")
+        }
+        val partitionColumnNames = table.partitionColumnNames
+        val tablePath = new Path(table.location)
+        val partitionPath = ExternalCatalogUtils.generatePartitionPath(
+          updatedPartitionSpec,
+          partitionColumnNames,
+          tablePath)
+
+        val fs = partitionPath.getFileSystem(hadoopConf)
+        if (fs.exists(partitionPath)) {
+          if (!fs.delete(partitionPath, true)) {
+            throw KyuubiHiveConnectorException(s"Cannot remove partition 
directory " +
+              s"'$partitionPath'")
           }
         }
-
-        // inheritTableSpecs is set to true. It should be set to false for an 
IMPORT query
-        // which is currently considered as a Hive native command.
-        val inheritTableSpecs = true
-        val loadPath = ExternalCatalogUtils.generatePartitionPath(
-          staticPartitionSpec,
-          partitionColumnNames.take(partitionColumnNames.length - 
numDynamicPartitions),
-          tmpLocation.get)
-
-        externalCatalog.loadPartition(
-          table.database,
-          table.identifier.table,
-          loadPath.toString,
-          partitionSpec,
-          isOverwrite = doHiveOverwrite,
-          inheritTableSpecs = inheritTableSpecs,
-          isSrcLocal = false)
       }
     }
+
+    externalCatalog.loadDynamicPartitions(
+      db = table.database,
+      table = table.identifier.table,
+      tmpLocation.get.toString,
+      partitionSpec,
+      overwrite,
+      numDynamicPartitions)
   }
 }
diff --git 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWrite.scala
 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWrite.scala
index 2ee338673..957c19582 100644
--- 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWrite.scala
+++ 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWrite.scala
@@ -23,7 +23,6 @@ import scala.collection.JavaConverters._
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
-import org.apache.hadoop.hive.ql.ErrorMsg
 import org.apache.hadoop.hive.ql.plan.TableDesc
 import org.apache.hadoop.mapreduce.Job
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
@@ -42,8 +41,7 @@ import 
org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.{FileSinkDesc
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.SerializableConfiguration
 
-import org.apache.kyuubi.spark.connector.hive.{HiveTableCatalog, 
KyuubiHiveConnectorException}
-import 
org.apache.kyuubi.spark.connector.hive.write.HiveWriteHelper.getPartitionSpec
+import org.apache.kyuubi.spark.connector.hive.HiveTableCatalog
 
 case class HiveWrite(
     sparkSession: SparkSession,
@@ -51,8 +49,7 @@ case class HiveWrite(
     info: LogicalWriteInfo,
     hiveTableCatalog: HiveTableCatalog,
     forceOverwrite: Boolean,
-    partition: Map[String, Option[String]],
-    ifPartitionNotExists: Boolean) extends Write with Logging {
+    dynamicPartition: Map[String, Option[String]]) extends Write with Logging {
 
   private val options = info.options()
 
@@ -81,8 +78,6 @@ case class HiveWrite(
     val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false)
     handleCompression(fileSinkConf, hadoopConf)
 
-    val partitionColumnNames = extractAndValidatePartitionCols(fileSinkConf, 
hadoopConf)
-
     val committer = FileCommitProtocol.instantiate(
       className = sparkSession.sessionState.conf.fileCommitProtocolClass,
       jobId = java.util.UUID.randomUUID().toString,
@@ -106,10 +101,8 @@ case class HiveWrite(
       table,
       hiveTableCatalog,
       Some(tmpLocation),
-      partition,
-      partitionColumnNames,
+      dynamicPartition,
       forceOverwrite,
-      ifPartitionNotExists,
       hadoopConf,
       new FileBatchWrite(job, description, committer),
       externalCatalog,
@@ -180,37 +173,4 @@ case class HiveWrite(
         .foreach { case (compression, codec) => hadoopConf.set(compression, 
codec) }
     }
   }
-
-  private def extractAndValidatePartitionCols(
-      fileSinkConf: FileSinkDesc,
-      hadoopConf: Configuration): Seq[String] = {
-    val partitionSpec = getPartitionSpec(partition)
-    val numDynamicPartitions = partitionSpec.values.count(_.equals(""))
-    val numStaticPartitions = partitionSpec.size - numDynamicPartitions
-
-    // All partition column names in the format of "<column name 1>/<column 
name 2>/..."
-    val partitionColumns = 
fileSinkConf.getTableInfo.getProperties.getProperty("partition_columns")
-    val partitionColumnNames = 
Option(partitionColumns).map(_.split("/")).getOrElse(Array.empty)
-
-    // Validate partition spec if there exist any dynamic partitions
-    if (numDynamicPartitions > 0) {
-      // Report error if dynamic partitioning is not enabled
-      if (!hadoopConf.get("hive.exec.dynamic.partition", "true").toBoolean) {
-        throw 
KyuubiHiveConnectorException(ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg)
-      }
-
-      // Report error if dynamic partition strict mode is on but no static 
partition is found
-      if (numStaticPartitions == 0 &&
-        hadoopConf.get("hive.exec.dynamic.partition.mode", 
"strict").equalsIgnoreCase("strict")) {
-        throw 
KyuubiHiveConnectorException(ErrorMsg.DYNAMIC_PARTITION_STRICT_MODE.getMsg)
-      }
-
-      // Report error if any static partition appears after a dynamic partition
-      val isDynamic = partitionColumnNames.map(partitionSpec(_).equals(""))
-      if (isDynamic.init.zip(isDynamic.tail).contains((true, false))) {
-        throw 
KyuubiHiveConnectorException(ErrorMsg.PARTITION_DYN_STA_ORDER.getMsg)
-      }
-    }
-    partitionColumnNames
-  }
 }
diff --git 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWriteBuilder.scala
 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWriteBuilder.scala
index 6f0668993..750afc947 100644
--- 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWriteBuilder.scala
+++ 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWriteBuilder.scala
@@ -19,11 +19,10 @@ package org.apache.kyuubi.spark.connector.hive.write
 
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.connector.write._
-import org.apache.spark.sql.sources.{AlwaysTrue, And, EqualNullSafe, EqualTo, 
Filter}
+import org.apache.spark.sql.sources.Filter
 
-import org.apache.kyuubi.spark.connector.hive.{HiveTableCatalog, 
KyuubiHiveConnectorException}
+import org.apache.kyuubi.spark.connector.hive.HiveTableCatalog
 
 case class HiveWriteBuilder(
     sparkSession: SparkSession,
@@ -33,11 +32,6 @@ case class HiveWriteBuilder(
   with SupportsDynamicOverwrite {
 
   private var forceOverwrite = false
-
-  private var staticPartition: TablePartitionSpec = Map.empty
-
-  private val ifPartitionNotExists = false
-
   private val parts = catalogTable.partitionColumnNames
 
   override def build(): Write = {
@@ -47,16 +41,12 @@ case class HiveWriteBuilder(
       info,
       hiveTableCatalog,
       forceOverwrite,
-      mergePartitionSpec(),
-      ifPartitionNotExists)
+      dynamicPartitionSpec())
   }
 
   override def overwrite(filters: Array[Filter]): WriteBuilder = {
-    filters match {
-      case Array(AlwaysTrue) => // no partition, do nothing
-      case _ => staticPartition = deduplicateFilters(filters)
-    }
-    overwriteDynamicPartitions()
+    forceOverwrite = true
+    this
   }
 
   override def overwriteDynamicPartitions(): WriteBuilder = {
@@ -64,28 +54,9 @@ case class HiveWriteBuilder(
     this
   }
 
-  private def mergePartitionSpec(): Map[String, Option[String]] = {
+  private def dynamicPartitionSpec(): Map[String, Option[String]] = {
     var partSpec = Map.empty[String, Option[String]]
-
-    staticPartition.foreach {
-      case (p, v) => partSpec = partSpec.updated(p, Some(v))
-    }
-
-    val dynamicCols = parts diff staticPartition.keySet.toSeq
-    dynamicCols.foreach(p => partSpec = partSpec.updated(p, None))
+    parts.foreach(p => partSpec = partSpec.updated(p, None))
     partSpec
   }
-
-  private def deduplicateFilters(filters: Array[Filter]): TablePartitionSpec = 
{
-    filters.map(extractConjunctions).reduce((partDesc1, partDesc2) => 
partDesc1 ++ partDesc2)
-  }
-
-  private def extractConjunctions(filter: Filter): TablePartitionSpec = {
-    filter match {
-      case And(l, r) => extractConjunctions(l) ++ extractConjunctions(r)
-      case EqualNullSafe(att, value) => Map(att -> value.toString)
-      case EqualTo(att, value) => Map(att -> value.toString)
-      case _ => throw KyuubiHiveConnectorException(s"Unsupported static insert 
condition $filter")
-    }
-  }
 }
diff --git 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWriteHelper.scala
 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWriteHelper.scala
index 25bca911f..c3e73c011 100644
--- 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWriteHelper.scala
+++ 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWriteHelper.scala
@@ -27,12 +27,9 @@ import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.hive.common.FileUtils
 import org.apache.hadoop.hive.ql.exec.TaskRunner
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
 import org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener
 import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.{hive, 
HiveExternalCatalog, HiveVersion}
 
-import org.apache.kyuubi.spark.connector.hive.KyuubiHiveConnectorException
-
 // scalastyle:off line.size.limit
 /**
  * A helper for v2 hive writer.
@@ -198,15 +195,4 @@ object HiveWriteHelper extends Logging {
   def cannotCreateStagingDirError(message: String, e: IOException = null): 
Throwable = {
     new RuntimeException(s"Cannot create staging directory: $message", e)
   }
-
-  def getPartitionSpec(partition: Map[String, Option[String]]): Map[String, 
String] = {
-    partition.map {
-      case (key, Some(null)) => key -> 
ExternalCatalogUtils.DEFAULT_PARTITION_NAME
-      case (key, Some(value)) if value.equals("") =>
-        throw KyuubiHiveConnectorException(s"Partition spec is invalid. " +
-          s"The spec ($key='$value') contains an empty partition column value")
-      case (key, Some(value)) => key -> value
-      case (key, None) => key -> ""
-    }
-  }
 }
diff --git 
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveQuerySuite.scala
 
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveQuerySuite.scala
index 0dd1efdec..b217d00b4 100644
--- 
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveQuerySuite.scala
+++ 
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveQuerySuite.scala
@@ -159,7 +159,7 @@ class HiveQuerySuite extends KyuubiHiveTest {
     }
   }
 
-  test("Partitioned table insert and static and dynamic insert") {
+  test("Partitioned table insert overwrite static and dynamic insert") {
     withSparkSession() { spark =>
       val table = "hive.default.employee"
       withTempPartitionedTable(spark, table) {
@@ -196,18 +196,14 @@ class HiveQuerySuite extends KyuubiHiveTest {
     withSparkSession() { spark =>
       val table = "hive.default.employee"
       withTempPartitionedTable(spark, table) {
-        val exception = intercept[KyuubiHiveConnectorException] {
-          spark.sql(
-            s"""
-               | INSERT OVERWRITE
-               | $table PARTITION(year = '', month = '08')
-               | VALUES("yi")
-               |""".stripMargin).collect()
-        }
-        // 1. not thrown `Dynamic partition cannot be the parent of a static 
partition`
-        // 2. thrown `Partition spec is invalid`, should be consist with spark 
v1.
-        assert(exception.message.contains("Partition spec is invalid. The spec 
(year='') " +
-          "contains an empty partition column value"))
+        spark.sql(
+          s"""
+             | INSERT OVERWRITE
+             | $table PARTITION(year = '', month = '08')
+             | VALUES("yi")
+             |""".stripMargin).collect()
+
+        checkQueryResult(s"select * from $table", spark, Array(Row.apply("yi", 
null, "08")))
       }
     }
   }
@@ -252,6 +248,20 @@ class HiveQuerySuite extends KyuubiHiveTest {
     readUnPartitionedTable("ORC", false)
   }
 
+  test("Partitioned table insert into static and dynamic insert") {
+    val table = "hive.default.employee"
+    withTempPartitionedTable(spark, table) {
+      spark.sql(
+        s"""
+           | INSERT INTO
+           | $table PARTITION(year = '2022')
+           | SELECT * FROM VALUES("yi", "08")
+           |""".stripMargin).collect()
+
+      checkQueryResult(s"select * from $table", spark, Array(Row.apply("yi", 
"2022", "08")))
+    }
+  }
+
   private def readPartitionedTable(format: String, hiveTable: Boolean): Unit = 
{
     withSparkSession() { spark =>
       val table = "hive.default.employee"

Reply via email to