This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 1d5ac07df [KYUUBI #4999] [KSHC] Kyuubi-Spark-Hive-Connector support 
Apache Spark 3.4
1d5ac07df is described below

commit 1d5ac07dfceaead1c1eeb0b2b477e1ab09ccce82
Author: Cheng Pan <[email protected]>
AuthorDate: Tue Jul 4 17:25:57 2023 +0800

    [KYUUBI #4999] [KSHC] Kyuubi-Spark-Hive-Connector support Apache Spark 3.4
    
    ### _Why are the changes needed?_
    
    This pr amins to make KSHC support Apache Spark 3.4.
    
    - KSHC support Apache Spark 3.4
    - Make Apache kyuubi `codecov` module contain the spark-3.4 profile. so 
that Apache kyubbi CI can cover some modules.
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run 
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
 locally before make a pull request
    
    Closes #4999 from Yikf/kudu-spark3.4.
    
    Closes #4999
    
    6a35e54b8 [Cheng Pan] Update 
extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala
    66bb742eb [Cheng Pan] Update 
extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala
    7be517c7f [Cheng Pan] Update 
extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala
    ae23133d1 [Cheng Pan] Update 
extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala
    dda5e6521 [Cheng Pan] Update 
extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala
    e43a25dff [Cheng Pan] Update 
extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala
    54f52f16d [Cheng Pan] Update 
extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala
    0955b544b [Cheng Pan] Update pom.xml
    38a1383d9 [yikaifei] codecov module should contain the spark 3.4 profile
    
    Lead-authored-by: Cheng Pan <[email protected]>
    Co-authored-by: yikaifei <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 dev/kyuubi-codecov/pom.xml                         |  10 +
 .../spark/connector/hive/HiveConnectorUtils.scala  | 256 +++++++++++++++++++++
 .../spark/connector/hive/HiveTableCatalog.scala    |   5 +-
 .../connector/hive/read/FilePartitionReader.scala  |   7 +-
 .../hive/read/HivePartitionReaderFactory.scala     |  12 +-
 .../spark/connector/hive/read/HiveScan.scala       |   6 +-
 .../connector/hive/write/HiveBatchWrite.scala      |   6 +-
 .../hive/kyuubi/connector/HiveBridgeHelper.scala   |   1 +
 .../spark/connector/hive/HiveCatalogSuite.scala    |  62 +++--
 .../spark/connector/hive/HiveQuerySuite.scala      |   6 +-
 .../hive/command/CreateNamespaceSuite.scala        |   6 +-
 .../hive/command/DropNamespaceSuite.scala          |   3 +-
 12 files changed, 337 insertions(+), 43 deletions(-)

diff --git a/dev/kyuubi-codecov/pom.xml b/dev/kyuubi-codecov/pom.xml
index ba15ec0f8..3eb922129 100644
--- a/dev/kyuubi-codecov/pom.xml
+++ b/dev/kyuubi-codecov/pom.xml
@@ -209,5 +209,15 @@
                 </dependency>
             </dependencies>
         </profile>
+        <profile>
+            <id>spark-3.4</id>
+            <dependencies>
+                <dependency>
+                    <groupId>org.apache.kyuubi</groupId>
+                    
<artifactId>kyuubi-spark-connector-hive_${scala.binary.version}</artifactId>
+                    <version>${project.version}</version>
+                </dependency>
+            </dependencies>
+        </profile>
     </profiles>
 </project>
diff --git 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala
 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala
new file mode 100644
index 000000000..1d2d2b319
--- /dev/null
+++ 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.spark.connector.hive
+
+import org.apache.spark.SPARK_VERSION
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, 
CatalogTablePartition}
+import org.apache.spark.sql.connector.catalog.TableChange
+import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, After, 
ColumnPosition, DeleteColumn, First, RenameColumn, UpdateColumnComment, 
UpdateColumnNullability, UpdateColumnPosition, UpdateColumnType}
+import org.apache.spark.sql.execution.command.CommandUtils
+import 
org.apache.spark.sql.execution.command.CommandUtils.{calculateMultipleLocationSizes,
 calculateSingleLocationSize}
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType}
+
+import org.apache.kyuubi.spark.connector.common.SparkUtils
+import org.apache.kyuubi.util.reflect.ReflectUtils.invokeAs
+
+object HiveConnectorUtils extends Logging {
+
+  def partitionedFilePath(file: PartitionedFile): String = {
+    if (SparkUtils.isSparkVersionAtLeast("3.4")) {
+      invokeAs[String](file, "urlEncodedPath")
+    } else if (SparkUtils.isSparkVersionAtLeast("3.3")) {
+      invokeAs[String](file, "filePath")
+    } else {
+      throw KyuubiHiveConnectorException(s"Spark version $SPARK_VERSION " +
+        s"is not supported by Kyuubi spark hive connector.")
+    }
+  }
+
+  def calculateTotalSize(
+      spark: SparkSession,
+      catalogTable: CatalogTable,
+      hiveTableCatalog: HiveTableCatalog): (BigInt, 
Seq[CatalogTablePartition]) = {
+    val sessionState = spark.sessionState
+    val startTime = System.nanoTime()
+    val (totalSize, newPartitions) = if 
(catalogTable.partitionColumnNames.isEmpty) {
+      (
+        calculateSingleLocationSize(
+          sessionState,
+          catalogTable.identifier,
+          catalogTable.storage.locationUri),
+        Seq())
+    } else {
+      // Calculate table size as a sum of the visible partitions. See 
SPARK-21079
+      val partitions = hiveTableCatalog.listPartitions(catalogTable.identifier)
+      logInfo(s"Starting to calculate sizes for ${partitions.length} 
partitions.")
+      val paths = partitions.map(_.storage.locationUri)
+      val sizes = calculateMultipleLocationSizes(spark, 
catalogTable.identifier, paths)
+      val newPartitions = partitions.zipWithIndex.flatMap { case (p, idx) =>
+        val newStats = CommandUtils.compareAndGetNewStats(p.stats, sizes(idx), 
None)
+        newStats.map(_ => p.copy(stats = newStats))
+      }
+      (sizes.sum, newPartitions)
+    }
+    logInfo(s"It took ${(System.nanoTime() - startTime) / (1000 * 1000)} ms to 
calculate" +
+      s" the total size for table ${catalogTable.identifier}.")
+    (totalSize, newPartitions)
+  }
+
+  def applySchemaChanges(schema: StructType, changes: Seq[TableChange]): 
StructType = {
+    changes.foldLeft(schema) { (schema, change) =>
+      change match {
+        case add: AddColumn =>
+          add.fieldNames match {
+            case Array(name) =>
+              val field = StructField(name, add.dataType, nullable = 
add.isNullable)
+              val newField = 
Option(add.comment).map(field.withComment).getOrElse(field)
+              addField(schema, newField, add.position())
+
+            case names =>
+              replace(
+                schema,
+                names.init,
+                parent =>
+                  parent.dataType match {
+                    case parentType: StructType =>
+                      val field = StructField(names.last, add.dataType, 
nullable = add.isNullable)
+                      val newField = 
Option(add.comment).map(field.withComment).getOrElse(field)
+                      Some(parent.copy(dataType = addField(parentType, 
newField, add.position())))
+
+                    case _ =>
+                      throw new IllegalArgumentException(s"Not a struct: 
${names.init.last}")
+                  })
+          }
+
+        case rename: RenameColumn =>
+          replace(
+            schema,
+            rename.fieldNames,
+            field =>
+              Some(StructField(rename.newName, field.dataType, field.nullable, 
field.metadata)))
+
+        case update: UpdateColumnType =>
+          replace(
+            schema,
+            update.fieldNames,
+            field => Some(field.copy(dataType = update.newDataType)))
+
+        case update: UpdateColumnNullability =>
+          replace(
+            schema,
+            update.fieldNames,
+            field => Some(field.copy(nullable = update.nullable)))
+
+        case update: UpdateColumnComment =>
+          replace(
+            schema,
+            update.fieldNames,
+            field => Some(field.withComment(update.newComment)))
+
+        case update: UpdateColumnPosition =>
+          def updateFieldPos(struct: StructType, name: String): StructType = {
+            val oldField = struct.fields.find(_.name == name).getOrElse {
+              throw new IllegalArgumentException("Field not found: " + name)
+            }
+            val withFieldRemoved = StructType(struct.fields.filter(_ != 
oldField))
+            addField(withFieldRemoved, oldField, update.position())
+          }
+
+          update.fieldNames() match {
+            case Array(name) =>
+              updateFieldPos(schema, name)
+            case names =>
+              replace(
+                schema,
+                names.init,
+                parent =>
+                  parent.dataType match {
+                    case parentType: StructType =>
+                      Some(parent.copy(dataType = updateFieldPos(parentType, 
names.last)))
+                    case _ =>
+                      throw new IllegalArgumentException(s"Not a struct: 
${names.init.last}")
+                  })
+          }
+
+        case delete: DeleteColumn =>
+          replace(schema, delete.fieldNames, _ => None, delete.ifExists)
+
+        case _ =>
+          // ignore non-schema changes
+          schema
+      }
+    }
+  }
+
+  private def addField(
+      schema: StructType,
+      field: StructField,
+      position: ColumnPosition): StructType = {
+    if (position == null) {
+      schema.add(field)
+    } else if (position.isInstanceOf[First]) {
+      StructType(field +: schema.fields)
+    } else {
+      val afterCol = position.asInstanceOf[After].column()
+      val fieldIndex = schema.fields.indexWhere(_.name == afterCol)
+      if (fieldIndex == -1) {
+        throw new IllegalArgumentException("AFTER column not found: " + 
afterCol)
+      }
+      val (before, after) = schema.fields.splitAt(fieldIndex + 1)
+      StructType(before ++ (field +: after))
+    }
+  }
+
+  private def replace(
+      struct: StructType,
+      fieldNames: Seq[String],
+      update: StructField => Option[StructField],
+      ifExists: Boolean = false): StructType = {
+
+    val posOpt = fieldNames.zipWithIndex.toMap.get(fieldNames.head)
+    if (posOpt.isEmpty) {
+      if (ifExists) {
+        // We couldn't find the column to replace, but with IF EXISTS, we will 
silence the error
+        // Currently only DROP COLUMN may pass down the IF EXISTS parameter
+        return struct
+      } else {
+        throw new IllegalArgumentException(s"Cannot find field: 
${fieldNames.head}")
+      }
+    }
+
+    val pos = posOpt.get
+    val field = struct.fields(pos)
+    val replacement: Option[StructField] = (fieldNames.tail, field.dataType) 
match {
+      case (Seq(), _) =>
+        update(field)
+
+      case (names, struct: StructType) =>
+        val updatedType: StructType = replace(struct, names, update, ifExists)
+        Some(StructField(field.name, updatedType, field.nullable, 
field.metadata))
+
+      case (Seq("key"), map @ MapType(keyType, _, _)) =>
+        val updated = update(StructField("key", keyType, nullable = false))
+          .getOrElse(throw new IllegalArgumentException(s"Cannot delete map 
key"))
+        Some(field.copy(dataType = map.copy(keyType = updated.dataType)))
+
+      case (Seq("key", names @ _*), map @ MapType(keyStruct: StructType, _, 
_)) =>
+        Some(field.copy(dataType = map.copy(keyType = replace(keyStruct, 
names, update, ifExists))))
+
+      case (Seq("value"), map @ MapType(_, mapValueType, isNullable)) =>
+        val updated = update(StructField("value", mapValueType, nullable = 
isNullable))
+          .getOrElse(throw new IllegalArgumentException(s"Cannot delete map 
value"))
+        Some(field.copy(dataType = map.copy(
+          valueType = updated.dataType,
+          valueContainsNull = updated.nullable)))
+
+      case (Seq("value", names @ _*), map @ MapType(_, valueStruct: 
StructType, _)) =>
+        Some(field.copy(dataType = map.copy(valueType =
+          replace(valueStruct, names, update, ifExists))))
+
+      case (Seq("element"), array @ ArrayType(elementType, isNullable)) =>
+        val updated = update(StructField("element", elementType, nullable = 
isNullable))
+          .getOrElse(throw new IllegalArgumentException(s"Cannot delete array 
element"))
+        Some(field.copy(dataType = array.copy(
+          elementType = updated.dataType,
+          containsNull = updated.nullable)))
+
+      case (Seq("element", names @ _*), array @ ArrayType(elementStruct: 
StructType, _)) =>
+        Some(field.copy(dataType = array.copy(elementType =
+          replace(elementStruct, names, update, ifExists))))
+
+      case (names, dataType) =>
+        if (!ifExists) {
+          throw new IllegalArgumentException(
+            s"Cannot find field: ${names.head} in ${dataType.simpleString}")
+        }
+        None
+    }
+
+    val newFields = struct.fields.zipWithIndex.flatMap {
+      case (_, index) if pos == index =>
+        replacement
+      case (other, _) =>
+        Some(other)
+    }
+
+    new StructType(newFields)
+  }
+}
diff --git 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala
 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala
index 44057be0a..cfc78940b 100644
--- 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala
+++ 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala
@@ -39,8 +39,7 @@ import org.apache.spark.sql.connector.expressions.Transform
 import org.apache.spark.sql.execution.datasources.DataSource
 import org.apache.spark.sql.hive.HiveUDFExpressionBuilder
 import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper._
-import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
-import org.apache.spark.sql.internal.StaticSQLConf.GLOBAL_TEMP_DATABASE
+import org.apache.spark.sql.internal.StaticSQLConf.{CATALOG_IMPLEMENTATION, 
GLOBAL_TEMP_DATABASE}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
@@ -212,7 +211,7 @@ class HiveTableCatalog(sparkSession: SparkSession)
       }
 
     val properties = 
CatalogV2Util.applyPropertiesChanges(catalogTable.properties, changes)
-    val schema = CatalogV2Util.applySchemaChanges(
+    val schema = HiveConnectorUtils.applySchemaChanges(
       catalogTable.schema,
       changes)
     val comment = properties.get(TableCatalog.PROP_COMMENT)
diff --git 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/FilePartitionReader.scala
 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/FilePartitionReader.scala
index 8ac90b3fe..13b6d4c20 100644
--- 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/FilePartitionReader.scala
+++ 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/FilePartitionReader.scala
@@ -26,6 +26,8 @@ import 
org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupporte
 import 
org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.InputFileBlockHolder
 import org.apache.spark.sql.internal.SQLConf
 
+import org.apache.kyuubi.spark.connector.hive.HiveConnectorUtils
+
 // scalastyle:off line.size.limit
 // copy from 
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala
 // scalastyle:on line.size.limit
@@ -98,7 +100,10 @@ class FilePartitionReader[T](readers: 
Iterator[HivePartitionedFileReader[T]])
     logInfo(s"Reading file $reader")
     // Sets InputFileBlockHolder for the file block's information
     val file = reader.file
-    InputFileBlockHolder.set(file.filePath, file.start, file.length)
+    InputFileBlockHolder.set(
+      HiveConnectorUtils.partitionedFilePath(file),
+      file.start,
+      file.length)
     reader
   }
 
diff --git 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HivePartitionReaderFactory.scala
 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HivePartitionReaderFactory.scala
index 6770f4144..2b8e2ffd8 100644
--- 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HivePartitionReaderFactory.scala
+++ 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HivePartitionReaderFactory.scala
@@ -31,7 +31,7 @@ import org.apache.spark.TaskContext
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader}
+import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, 
PartitionReaderFactory}
 import org.apache.spark.sql.execution.datasources.{FilePartition, 
PartitionedFile}
 import org.apache.spark.sql.execution.datasources.v2._
 import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.NextIterator
@@ -40,6 +40,8 @@ import org.apache.spark.sql.sources.Filter
 import org.apache.spark.sql.types._
 import org.apache.spark.util.SerializableConfiguration
 
+import org.apache.kyuubi.spark.connector.hive.HiveConnectorUtils
+
 case class HivePartitionReaderFactory(
     sqlConf: SQLConf,
     broadcastHiveConf: Broadcast[SerializableConfiguration],
@@ -49,7 +51,7 @@ case class HivePartitionReaderFactory(
     partitionSchema: StructType,
     partFileToHivePart: Map[PartitionedFile, HivePartition],
     pushedFilters: Array[Filter] = Array.empty)
-  extends FilePartitionReaderFactory with Logging {
+  extends PartitionReaderFactory with Logging {
 
   private val charset: String =
     sqlConf.getConfString("hive.exec.default.charset", "utf-8")
@@ -57,10 +59,6 @@ case class HivePartitionReaderFactory(
   val tableDesc = HiveReader.getTableDec(hiveTable)
   val nonPartitionReadDataKeys = HiveReader.toAttributes(readDataSchema)
 
-  override def buildReader(partitionedFile: PartitionedFile): 
PartitionReader[InternalRow] = {
-    throw new UnsupportedOperationException("Cannot use buildReader directly.")
-  }
-
   override def createReader(partition: InputPartition): 
PartitionReader[InternalRow] = {
     assert(partition.isInstanceOf[FilePartition])
     val filePartition = partition.asInstanceOf[FilePartition]
@@ -117,7 +115,7 @@ case class HivePartitionReaderFactory(
 
     val jobConf = new JobConf(broadcastHiveConf.value.value)
 
-    val filePath = new Path(new URI(file.filePath))
+    val filePath = new Path(new 
URI(HiveConnectorUtils.partitionedFilePath(file)))
 
     if (tableDesc != null) {
       configureJobPropertiesForStorageHandler(tableDesc, jobConf, true)
diff --git 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveScan.scala
 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveScan.scala
index 64fcf23f8..e71e428b7 100644
--- 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveScan.scala
+++ 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveScan.scala
@@ -37,7 +37,7 @@ import org.apache.spark.sql.sources.Filter
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.SerializableConfiguration
 
-import org.apache.kyuubi.spark.connector.hive.KyuubiHiveConnectorException
+import org.apache.kyuubi.spark.connector.hive.{HiveConnectorUtils, 
KyuubiHiveConnectorException}
 
 case class HiveScan(
     sparkSession: SparkSession,
@@ -88,7 +88,7 @@ case class HiveScan(
     }
     lazy val partitionValueProject =
       GenerateUnsafeProjection.generate(readPartitionAttributes, 
partitionAttributes)
-    val splitFiles = selectedPartitions.flatMap { partition =>
+    val splitFiles: Seq[PartitionedFile] = selectedPartitions.flatMap { 
partition =>
       val partitionValues =
         if (readPartitionAttributes != partitionAttributes) {
           partitionValueProject(partition.values).copy()
@@ -115,7 +115,7 @@ case class HiveScan(
     }
 
     if (splitFiles.length == 1) {
-      val path = new Path(splitFiles(0).filePath)
+      val path = new 
Path(HiveConnectorUtils.partitionedFilePath(splitFiles(0)))
       if (!isSplitable(path) && splitFiles(0).length >
           
sparkSession.sparkContext.getConf.getOption("spark.io.warning.largeFileThreshold")
             .getOrElse("1024000000").toLong) {
diff --git 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveBatchWrite.scala
 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveBatchWrite.scala
index 625d79d0c..d12fc0efc 100644
--- 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveBatchWrite.scala
+++ 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveBatchWrite.scala
@@ -28,13 +28,12 @@ import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.sql.connector.write.{BatchWrite, DataWriterFactory, 
PhysicalWriteInfo, WriterCommitMessage}
-import org.apache.spark.sql.execution.command.CommandUtils
 import org.apache.spark.sql.execution.datasources.{WriteJobDescription, 
WriteTaskResult}
 import org.apache.spark.sql.execution.datasources.v2.FileBatchWrite
 import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.{hive, 
toSQLValue, HiveExternalCatalog}
 import org.apache.spark.sql.types.StringType
 
-import org.apache.kyuubi.spark.connector.hive.{HiveTableCatalog, 
KyuubiHiveConnectorException}
+import org.apache.kyuubi.spark.connector.hive.{HiveConnectorUtils, 
HiveTableCatalog, KyuubiHiveConnectorException}
 import 
org.apache.kyuubi.spark.connector.hive.write.HiveWriteHelper.getPartitionSpec
 
 class HiveBatchWrite(
@@ -77,7 +76,8 @@ class HiveBatchWrite(
     val catalog = hiveTableCatalog.catalog
     if (sparkSession.sessionState.conf.autoSizeUpdateEnabled) {
       val newTable = catalog.getTableMetadata(table.identifier)
-      val newSize = CommandUtils.calculateTotalSize(sparkSession, newTable)
+      val (newSize, _) =
+        HiveConnectorUtils.calculateTotalSize(sparkSession, newTable, 
hiveTableCatalog)
       val newStats = CatalogStatistics(sizeInBytes = newSize)
       catalog.alterTableStats(table.identifier, Some(newStats))
     } else if (table.stats.nonEmpty) {
diff --git 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/spark/sql/hive/kyuubi/connector/HiveBridgeHelper.scala
 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/spark/sql/hive/kyuubi/connector/HiveBridgeHelper.scala
index 305c1450e..95def8656 100644
--- 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/spark/sql/hive/kyuubi/connector/HiveBridgeHelper.scala
+++ 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/spark/sql/hive/kyuubi/connector/HiveBridgeHelper.scala
@@ -47,6 +47,7 @@ object HiveBridgeHelper {
   val HadoopTableReader = org.apache.spark.sql.hive.HadoopTableReader
   val SparkHadoopUtil = org.apache.spark.deploy.SparkHadoopUtil
   val Utils = org.apache.spark.util.Utils
+  val CatalogV2Implicits = 
org.apache.spark.sql.connector.catalog.CatalogV2Implicits
 
   def postExternalCatalogEvent(sc: SparkContext, event: ExternalCatalogEvent): 
Unit = {
     sc.listenerBus.post(event)
diff --git 
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala
 
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala
index 9088a6cfe..c1575018e 100644
--- 
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala
+++ 
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala
@@ -27,9 +27,10 @@ import scala.util.Try
 import com.google.common.collect.Maps
 import org.apache.hadoop.fs.Path
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, 
NoSuchTableException, TableAlreadyExistsException}
+import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, 
NoSuchTableException, TableAlreadyExistsException, UnresolvedRelation}
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
 import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
+import org.apache.spark.sql.connector.expressions.Transform
 import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper._
 import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -119,7 +120,9 @@ class HiveCatalogSuite extends KyuubiHiveTest {
       val exception = intercept[AnalysisException] {
         spark.table("hive.ns1.nonexistent_table")
       }
-      assert(exception.message === "Table or view not found: 
hive.ns1.nonexistent_table")
+      assert(exception.plan.exists { p =>
+        p.exists(child => child.isInstanceOf[UnresolvedRelation])
+      })
     }
   }
 
@@ -131,13 +134,13 @@ class HiveCatalogSuite extends KyuubiHiveTest {
 
     assert(catalog.listTables(Array("ns")).isEmpty)
 
-    catalog.createTable(ident1, schema, Array.empty, emptyProps)
+    catalog.createTable(ident1, schema, Array.empty[Transform], emptyProps)
 
     assert(catalog.listTables(Array("ns")).toSet == Set(ident1))
     assert(catalog.listTables(Array("ns2")).isEmpty)
 
-    catalog.createTable(ident3, schema, Array.empty, emptyProps)
-    catalog.createTable(ident2, schema, Array.empty, emptyProps)
+    catalog.createTable(ident3, schema, Array.empty[Transform], emptyProps)
+    catalog.createTable(ident2, schema, Array.empty[Transform], emptyProps)
 
     assert(catalog.listTables(Array("ns")).toSet == Set(ident1, ident2))
     assert(catalog.listTables(Array("ns2")).toSet == Set(ident3))
@@ -157,10 +160,11 @@ class HiveCatalogSuite extends KyuubiHiveTest {
   test("createTable") {
     assert(!catalog.tableExists(testIdent))
 
-    val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+    val table =
+      catalog.createTable(testIdent, schema, Array.empty[Transform], 
emptyProps)
 
     val parsed = CatalystSqlParser.parseMultipartIdentifier(table.name)
-    assert(parsed == Seq("db", "test_table"))
+    assert(parsed == Seq("db", "test_table") || parsed == Seq("spark_catalog", 
"db", "test_table"))
     assert(table.schema == schema)
     assert(filterV2TableProperties(table.properties) == Map())
 
@@ -174,10 +178,10 @@ class HiveCatalogSuite extends KyuubiHiveTest {
 
     assert(!catalog.tableExists(testIdent))
 
-    val table = catalog.createTable(testIdent, schema, Array.empty, properties)
+    val table = catalog.createTable(testIdent, schema, Array.empty[Transform], 
properties)
 
     val parsed = CatalystSqlParser.parseMultipartIdentifier(table.name)
-    assert(parsed == Seq("db", "test_table"))
+    assert(parsed == Seq("db", "test_table") || parsed == Seq("spark_catalog", 
"db", "test_table"))
     assert(table.schema == schema)
     assert(filterV2TableProperties(table.properties).asJava == properties)
 
@@ -188,13 +192,13 @@ class HiveCatalogSuite extends KyuubiHiveTest {
   test("createTable: table already exists") {
     assert(!catalog.tableExists(testIdent))
 
-    val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+    val table = catalog.createTable(testIdent, schema, Array.empty[Transform], 
emptyProps)
 
     val exc = intercept[TableAlreadyExistsException] {
-      catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+      catalog.createTable(testIdent, schema, Array.empty[Transform], 
emptyProps)
     }
 
-    assert(exc.message.contains(table.name()))
+    assert(exc.message.contains(testIdent.name()))
     assert(exc.message.contains("already exists"))
 
     assert(catalog.tableExists(testIdent))
@@ -204,7 +208,7 @@ class HiveCatalogSuite extends KyuubiHiveTest {
   test("tableExists") {
     assert(!catalog.tableExists(testIdent))
 
-    catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+    catalog.createTable(testIdent, schema, Array.empty[Transform], emptyProps)
 
     assert(catalog.tableExists(testIdent))
 
@@ -218,32 +222,48 @@ class HiveCatalogSuite extends KyuubiHiveTest {
     assert(!catalog.tableExists(testIdent))
 
     // default location
-    val t1 = catalog.createTable(testIdent, schema, Array.empty, 
properties).asInstanceOf[HiveTable]
+    val t1 = catalog.createTable(
+      testIdent,
+      schema,
+      Array.empty[Transform],
+      properties).asInstanceOf[HiveTable]
     assert(t1.catalogTable.location ===
       catalog.catalog.defaultTablePath(testIdent.asTableIdentifier))
     catalog.dropTable(testIdent)
 
     // relative path
     properties.put(TableCatalog.PROP_LOCATION, "relative/path")
-    val t2 = catalog.createTable(testIdent, schema, Array.empty, 
properties).asInstanceOf[HiveTable]
+    val t2 = catalog.createTable(
+      testIdent,
+      schema,
+      Array.empty[Transform],
+      properties).asInstanceOf[HiveTable]
     assert(t2.catalogTable.location === 
makeQualifiedPathWithWarehouse("db.db/relative/path"))
     catalog.dropTable(testIdent)
 
     // absolute path without scheme
     properties.put(TableCatalog.PROP_LOCATION, "/absolute/path")
-    val t3 = catalog.createTable(testIdent, schema, Array.empty, 
properties).asInstanceOf[HiveTable]
+    val t3 = catalog.createTable(
+      testIdent,
+      schema,
+      Array.empty[Transform],
+      properties).asInstanceOf[HiveTable]
     assert(t3.catalogTable.location.toString === "file:/absolute/path")
     catalog.dropTable(testIdent)
 
     // absolute path with scheme
     properties.put(TableCatalog.PROP_LOCATION, "file:/absolute/path")
-    val t4 = catalog.createTable(testIdent, schema, Array.empty, 
properties).asInstanceOf[HiveTable]
+    val t4 = catalog.createTable(
+      testIdent,
+      schema,
+      Array.empty[Transform],
+      properties).asInstanceOf[HiveTable]
     assert(t4.catalogTable.location.toString === "file:/absolute/path")
     catalog.dropTable(testIdent)
   }
 
   test("loadTable") {
-    val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+    val table = catalog.createTable(testIdent, schema, Array.empty[Transform], 
emptyProps)
     val loaded = catalog.loadTable(testIdent)
 
     assert(table.name == loaded.name)
@@ -253,15 +273,13 @@ class HiveCatalogSuite extends KyuubiHiveTest {
   }
 
   test("loadTable: table does not exist") {
-    val exc = intercept[NoSuchTableException] {
+    intercept[NoSuchTableException] {
       catalog.loadTable(testIdent)
     }
-
-    assert(exc.message.contains("Table or view 'test_table' not found in 
database 'db'"))
   }
 
   test("invalidateTable") {
-    val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+    val table = catalog.createTable(testIdent, schema, Array.empty[Transform], 
emptyProps)
     // Hive v2 don't cache table
     catalog.invalidateTable(testIdent)
 
diff --git 
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveQuerySuite.scala
 
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveQuerySuite.scala
index 16ea03234..d0b97676b 100644
--- 
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveQuerySuite.scala
+++ 
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveQuerySuite.scala
@@ -18,6 +18,7 @@
 package org.apache.kyuubi.spark.connector.hive
 
 import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
+import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
 
 class HiveQuerySuite extends KyuubiHiveTest {
 
@@ -70,7 +71,10 @@ class HiveQuerySuite extends KyuubiHiveTest {
                | SELECT * FROM hive.ns1.tb1
                |""".stripMargin)
         }
-        assert(e.getMessage().contains("Table or view not found: 
hive.ns1.tb1"))
+
+        assert(e.plan.exists { p =>
+          p.exists(child => child.isInstanceOf[UnresolvedRelation])
+        })
       }
     }
   }
diff --git 
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/command/CreateNamespaceSuite.scala
 
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/command/CreateNamespaceSuite.scala
index 855eb0c67..e2e5b574b 100644
--- 
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/command/CreateNamespaceSuite.scala
+++ 
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/command/CreateNamespaceSuite.scala
@@ -62,7 +62,8 @@ trait CreateNamespaceSuiteBase extends DDLCommandTestUtils {
         val e = intercept[IllegalArgumentException] {
           sql(s"CREATE NAMESPACE $ns LOCATION ''")
         }
-        assert(e.getMessage.contains("Can not create a Path from an empty 
string"))
+        assert(e.getMessage.contains("Can not create a Path from an empty 
string") ||
+          e.getMessage.contains("The location name cannot be empty string"))
 
         val uri = new Path(path).toUri
         sql(s"CREATE NAMESPACE $ns LOCATION '$uri'")
@@ -83,7 +84,8 @@ trait CreateNamespaceSuiteBase extends DDLCommandTestUtils {
       val e = intercept[NamespaceAlreadyExistsException] {
         sql(s"CREATE NAMESPACE $ns")
       }
-      assert(e.getMessage.contains(s"Namespace '$namespace' already exists"))
+      assert(e.getMessage.contains(s"Namespace '$namespace' already exists") ||
+        e.getMessage.contains(s"Cannot create schema `fakens` because it 
already exists"))
 
       // The following will be no-op since the namespace already exists.
       Try { sql(s"CREATE NAMESPACE IF NOT EXISTS $ns") }.isSuccess
diff --git 
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/command/DropNamespaceSuite.scala
 
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/command/DropNamespaceSuite.scala
index 66eb42c86..81107c24f 100644
--- 
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/command/DropNamespaceSuite.scala
+++ 
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/command/DropNamespaceSuite.scala
@@ -60,7 +60,8 @@ trait DropNamespaceSuiteBase extends DDLCommandTestUtils {
     val message = intercept[AnalysisException] {
       sql(s"DROP NAMESPACE $catalogName.unknown")
     }.getMessage
-    assert(message.contains(s"'unknown' not found"))
+    assert(message.contains(s"'unknown' not found") ||
+      message.contains(s"The schema `unknown` cannot be found"))
   }
 
   test("drop non-empty namespace with a non-cascading mode") {


Reply via email to