This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 1d5ac07df [KYUUBI #4999] [KSHC] Kyuubi-Spark-Hive-Connector support
Apache Spark 3.4
1d5ac07df is described below
commit 1d5ac07dfceaead1c1eeb0b2b477e1ab09ccce82
Author: Cheng Pan <[email protected]>
AuthorDate: Tue Jul 4 17:25:57 2023 +0800
[KYUUBI #4999] [KSHC] Kyuubi-Spark-Hive-Connector support Apache Spark 3.4
### _Why are the changes needed?_
This pr amins to make KSHC support Apache Spark 3.4.
- KSHC support Apache Spark 3.4
- Make Apache kyuubi `codecov` module contain the spark-3.4 profile. so
that Apache kyubbi CI can cover some modules.
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
locally before make a pull request
Closes #4999 from Yikf/kudu-spark3.4.
Closes #4999
6a35e54b8 [Cheng Pan] Update
extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala
66bb742eb [Cheng Pan] Update
extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala
7be517c7f [Cheng Pan] Update
extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala
ae23133d1 [Cheng Pan] Update
extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala
dda5e6521 [Cheng Pan] Update
extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala
e43a25dff [Cheng Pan] Update
extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala
54f52f16d [Cheng Pan] Update
extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala
0955b544b [Cheng Pan] Update pom.xml
38a1383d9 [yikaifei] codecov module should contain the spark 3.4 profile
Lead-authored-by: Cheng Pan <[email protected]>
Co-authored-by: yikaifei <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
dev/kyuubi-codecov/pom.xml | 10 +
.../spark/connector/hive/HiveConnectorUtils.scala | 256 +++++++++++++++++++++
.../spark/connector/hive/HiveTableCatalog.scala | 5 +-
.../connector/hive/read/FilePartitionReader.scala | 7 +-
.../hive/read/HivePartitionReaderFactory.scala | 12 +-
.../spark/connector/hive/read/HiveScan.scala | 6 +-
.../connector/hive/write/HiveBatchWrite.scala | 6 +-
.../hive/kyuubi/connector/HiveBridgeHelper.scala | 1 +
.../spark/connector/hive/HiveCatalogSuite.scala | 62 +++--
.../spark/connector/hive/HiveQuerySuite.scala | 6 +-
.../hive/command/CreateNamespaceSuite.scala | 6 +-
.../hive/command/DropNamespaceSuite.scala | 3 +-
12 files changed, 337 insertions(+), 43 deletions(-)
diff --git a/dev/kyuubi-codecov/pom.xml b/dev/kyuubi-codecov/pom.xml
index ba15ec0f8..3eb922129 100644
--- a/dev/kyuubi-codecov/pom.xml
+++ b/dev/kyuubi-codecov/pom.xml
@@ -209,5 +209,15 @@
</dependency>
</dependencies>
</profile>
+ <profile>
+ <id>spark-3.4</id>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.kyuubi</groupId>
+
<artifactId>kyuubi-spark-connector-hive_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+ </profile>
</profiles>
</project>
diff --git
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala
new file mode 100644
index 000000000..1d2d2b319
--- /dev/null
+++
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.spark.connector.hive
+
+import org.apache.spark.SPARK_VERSION
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable,
CatalogTablePartition}
+import org.apache.spark.sql.connector.catalog.TableChange
+import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, After,
ColumnPosition, DeleteColumn, First, RenameColumn, UpdateColumnComment,
UpdateColumnNullability, UpdateColumnPosition, UpdateColumnType}
+import org.apache.spark.sql.execution.command.CommandUtils
+import
org.apache.spark.sql.execution.command.CommandUtils.{calculateMultipleLocationSizes,
calculateSingleLocationSize}
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType}
+
+import org.apache.kyuubi.spark.connector.common.SparkUtils
+import org.apache.kyuubi.util.reflect.ReflectUtils.invokeAs
+
+object HiveConnectorUtils extends Logging {
+
+ def partitionedFilePath(file: PartitionedFile): String = {
+ if (SparkUtils.isSparkVersionAtLeast("3.4")) {
+ invokeAs[String](file, "urlEncodedPath")
+ } else if (SparkUtils.isSparkVersionAtLeast("3.3")) {
+ invokeAs[String](file, "filePath")
+ } else {
+ throw KyuubiHiveConnectorException(s"Spark version $SPARK_VERSION " +
+ s"is not supported by Kyuubi spark hive connector.")
+ }
+ }
+
+ def calculateTotalSize(
+ spark: SparkSession,
+ catalogTable: CatalogTable,
+ hiveTableCatalog: HiveTableCatalog): (BigInt,
Seq[CatalogTablePartition]) = {
+ val sessionState = spark.sessionState
+ val startTime = System.nanoTime()
+ val (totalSize, newPartitions) = if
(catalogTable.partitionColumnNames.isEmpty) {
+ (
+ calculateSingleLocationSize(
+ sessionState,
+ catalogTable.identifier,
+ catalogTable.storage.locationUri),
+ Seq())
+ } else {
+ // Calculate table size as a sum of the visible partitions. See
SPARK-21079
+ val partitions = hiveTableCatalog.listPartitions(catalogTable.identifier)
+ logInfo(s"Starting to calculate sizes for ${partitions.length}
partitions.")
+ val paths = partitions.map(_.storage.locationUri)
+ val sizes = calculateMultipleLocationSizes(spark,
catalogTable.identifier, paths)
+ val newPartitions = partitions.zipWithIndex.flatMap { case (p, idx) =>
+ val newStats = CommandUtils.compareAndGetNewStats(p.stats, sizes(idx),
None)
+ newStats.map(_ => p.copy(stats = newStats))
+ }
+ (sizes.sum, newPartitions)
+ }
+ logInfo(s"It took ${(System.nanoTime() - startTime) / (1000 * 1000)} ms to
calculate" +
+ s" the total size for table ${catalogTable.identifier}.")
+ (totalSize, newPartitions)
+ }
+
+ def applySchemaChanges(schema: StructType, changes: Seq[TableChange]):
StructType = {
+ changes.foldLeft(schema) { (schema, change) =>
+ change match {
+ case add: AddColumn =>
+ add.fieldNames match {
+ case Array(name) =>
+ val field = StructField(name, add.dataType, nullable =
add.isNullable)
+ val newField =
Option(add.comment).map(field.withComment).getOrElse(field)
+ addField(schema, newField, add.position())
+
+ case names =>
+ replace(
+ schema,
+ names.init,
+ parent =>
+ parent.dataType match {
+ case parentType: StructType =>
+ val field = StructField(names.last, add.dataType,
nullable = add.isNullable)
+ val newField =
Option(add.comment).map(field.withComment).getOrElse(field)
+ Some(parent.copy(dataType = addField(parentType,
newField, add.position())))
+
+ case _ =>
+ throw new IllegalArgumentException(s"Not a struct:
${names.init.last}")
+ })
+ }
+
+ case rename: RenameColumn =>
+ replace(
+ schema,
+ rename.fieldNames,
+ field =>
+ Some(StructField(rename.newName, field.dataType, field.nullable,
field.metadata)))
+
+ case update: UpdateColumnType =>
+ replace(
+ schema,
+ update.fieldNames,
+ field => Some(field.copy(dataType = update.newDataType)))
+
+ case update: UpdateColumnNullability =>
+ replace(
+ schema,
+ update.fieldNames,
+ field => Some(field.copy(nullable = update.nullable)))
+
+ case update: UpdateColumnComment =>
+ replace(
+ schema,
+ update.fieldNames,
+ field => Some(field.withComment(update.newComment)))
+
+ case update: UpdateColumnPosition =>
+ def updateFieldPos(struct: StructType, name: String): StructType = {
+ val oldField = struct.fields.find(_.name == name).getOrElse {
+ throw new IllegalArgumentException("Field not found: " + name)
+ }
+ val withFieldRemoved = StructType(struct.fields.filter(_ !=
oldField))
+ addField(withFieldRemoved, oldField, update.position())
+ }
+
+ update.fieldNames() match {
+ case Array(name) =>
+ updateFieldPos(schema, name)
+ case names =>
+ replace(
+ schema,
+ names.init,
+ parent =>
+ parent.dataType match {
+ case parentType: StructType =>
+ Some(parent.copy(dataType = updateFieldPos(parentType,
names.last)))
+ case _ =>
+ throw new IllegalArgumentException(s"Not a struct:
${names.init.last}")
+ })
+ }
+
+ case delete: DeleteColumn =>
+ replace(schema, delete.fieldNames, _ => None, delete.ifExists)
+
+ case _ =>
+ // ignore non-schema changes
+ schema
+ }
+ }
+ }
+
+ private def addField(
+ schema: StructType,
+ field: StructField,
+ position: ColumnPosition): StructType = {
+ if (position == null) {
+ schema.add(field)
+ } else if (position.isInstanceOf[First]) {
+ StructType(field +: schema.fields)
+ } else {
+ val afterCol = position.asInstanceOf[After].column()
+ val fieldIndex = schema.fields.indexWhere(_.name == afterCol)
+ if (fieldIndex == -1) {
+ throw new IllegalArgumentException("AFTER column not found: " +
afterCol)
+ }
+ val (before, after) = schema.fields.splitAt(fieldIndex + 1)
+ StructType(before ++ (field +: after))
+ }
+ }
+
+ private def replace(
+ struct: StructType,
+ fieldNames: Seq[String],
+ update: StructField => Option[StructField],
+ ifExists: Boolean = false): StructType = {
+
+ val posOpt = fieldNames.zipWithIndex.toMap.get(fieldNames.head)
+ if (posOpt.isEmpty) {
+ if (ifExists) {
+ // We couldn't find the column to replace, but with IF EXISTS, we will
silence the error
+ // Currently only DROP COLUMN may pass down the IF EXISTS parameter
+ return struct
+ } else {
+ throw new IllegalArgumentException(s"Cannot find field:
${fieldNames.head}")
+ }
+ }
+
+ val pos = posOpt.get
+ val field = struct.fields(pos)
+ val replacement: Option[StructField] = (fieldNames.tail, field.dataType)
match {
+ case (Seq(), _) =>
+ update(field)
+
+ case (names, struct: StructType) =>
+ val updatedType: StructType = replace(struct, names, update, ifExists)
+ Some(StructField(field.name, updatedType, field.nullable,
field.metadata))
+
+ case (Seq("key"), map @ MapType(keyType, _, _)) =>
+ val updated = update(StructField("key", keyType, nullable = false))
+ .getOrElse(throw new IllegalArgumentException(s"Cannot delete map
key"))
+ Some(field.copy(dataType = map.copy(keyType = updated.dataType)))
+
+ case (Seq("key", names @ _*), map @ MapType(keyStruct: StructType, _,
_)) =>
+ Some(field.copy(dataType = map.copy(keyType = replace(keyStruct,
names, update, ifExists))))
+
+ case (Seq("value"), map @ MapType(_, mapValueType, isNullable)) =>
+ val updated = update(StructField("value", mapValueType, nullable =
isNullable))
+ .getOrElse(throw new IllegalArgumentException(s"Cannot delete map
value"))
+ Some(field.copy(dataType = map.copy(
+ valueType = updated.dataType,
+ valueContainsNull = updated.nullable)))
+
+ case (Seq("value", names @ _*), map @ MapType(_, valueStruct:
StructType, _)) =>
+ Some(field.copy(dataType = map.copy(valueType =
+ replace(valueStruct, names, update, ifExists))))
+
+ case (Seq("element"), array @ ArrayType(elementType, isNullable)) =>
+ val updated = update(StructField("element", elementType, nullable =
isNullable))
+ .getOrElse(throw new IllegalArgumentException(s"Cannot delete array
element"))
+ Some(field.copy(dataType = array.copy(
+ elementType = updated.dataType,
+ containsNull = updated.nullable)))
+
+ case (Seq("element", names @ _*), array @ ArrayType(elementStruct:
StructType, _)) =>
+ Some(field.copy(dataType = array.copy(elementType =
+ replace(elementStruct, names, update, ifExists))))
+
+ case (names, dataType) =>
+ if (!ifExists) {
+ throw new IllegalArgumentException(
+ s"Cannot find field: ${names.head} in ${dataType.simpleString}")
+ }
+ None
+ }
+
+ val newFields = struct.fields.zipWithIndex.flatMap {
+ case (_, index) if pos == index =>
+ replacement
+ case (other, _) =>
+ Some(other)
+ }
+
+ new StructType(newFields)
+ }
+}
diff --git
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala
index 44057be0a..cfc78940b 100644
---
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala
+++
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala
@@ -39,8 +39,7 @@ import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.hive.HiveUDFExpressionBuilder
import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper._
-import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
-import org.apache.spark.sql.internal.StaticSQLConf.GLOBAL_TEMP_DATABASE
+import org.apache.spark.sql.internal.StaticSQLConf.{CATALOG_IMPLEMENTATION,
GLOBAL_TEMP_DATABASE}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -212,7 +211,7 @@ class HiveTableCatalog(sparkSession: SparkSession)
}
val properties =
CatalogV2Util.applyPropertiesChanges(catalogTable.properties, changes)
- val schema = CatalogV2Util.applySchemaChanges(
+ val schema = HiveConnectorUtils.applySchemaChanges(
catalogTable.schema,
changes)
val comment = properties.get(TableCatalog.PROP_COMMENT)
diff --git
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/FilePartitionReader.scala
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/FilePartitionReader.scala
index 8ac90b3fe..13b6d4c20 100644
---
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/FilePartitionReader.scala
+++
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/FilePartitionReader.scala
@@ -26,6 +26,8 @@ import
org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupporte
import
org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.InputFileBlockHolder
import org.apache.spark.sql.internal.SQLConf
+import org.apache.kyuubi.spark.connector.hive.HiveConnectorUtils
+
// scalastyle:off line.size.limit
// copy from
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala
// scalastyle:on line.size.limit
@@ -98,7 +100,10 @@ class FilePartitionReader[T](readers:
Iterator[HivePartitionedFileReader[T]])
logInfo(s"Reading file $reader")
// Sets InputFileBlockHolder for the file block's information
val file = reader.file
- InputFileBlockHolder.set(file.filePath, file.start, file.length)
+ InputFileBlockHolder.set(
+ HiveConnectorUtils.partitionedFilePath(file),
+ file.start,
+ file.length)
reader
}
diff --git
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HivePartitionReaderFactory.scala
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HivePartitionReaderFactory.scala
index 6770f4144..2b8e2ffd8 100644
---
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HivePartitionReaderFactory.scala
+++
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HivePartitionReaderFactory.scala
@@ -31,7 +31,7 @@ import org.apache.spark.TaskContext
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader}
+import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader,
PartitionReaderFactory}
import org.apache.spark.sql.execution.datasources.{FilePartition,
PartitionedFile}
import org.apache.spark.sql.execution.datasources.v2._
import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.NextIterator
@@ -40,6 +40,8 @@ import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types._
import org.apache.spark.util.SerializableConfiguration
+import org.apache.kyuubi.spark.connector.hive.HiveConnectorUtils
+
case class HivePartitionReaderFactory(
sqlConf: SQLConf,
broadcastHiveConf: Broadcast[SerializableConfiguration],
@@ -49,7 +51,7 @@ case class HivePartitionReaderFactory(
partitionSchema: StructType,
partFileToHivePart: Map[PartitionedFile, HivePartition],
pushedFilters: Array[Filter] = Array.empty)
- extends FilePartitionReaderFactory with Logging {
+ extends PartitionReaderFactory with Logging {
private val charset: String =
sqlConf.getConfString("hive.exec.default.charset", "utf-8")
@@ -57,10 +59,6 @@ case class HivePartitionReaderFactory(
val tableDesc = HiveReader.getTableDec(hiveTable)
val nonPartitionReadDataKeys = HiveReader.toAttributes(readDataSchema)
- override def buildReader(partitionedFile: PartitionedFile):
PartitionReader[InternalRow] = {
- throw new UnsupportedOperationException("Cannot use buildReader directly.")
- }
-
override def createReader(partition: InputPartition):
PartitionReader[InternalRow] = {
assert(partition.isInstanceOf[FilePartition])
val filePartition = partition.asInstanceOf[FilePartition]
@@ -117,7 +115,7 @@ case class HivePartitionReaderFactory(
val jobConf = new JobConf(broadcastHiveConf.value.value)
- val filePath = new Path(new URI(file.filePath))
+ val filePath = new Path(new
URI(HiveConnectorUtils.partitionedFilePath(file)))
if (tableDesc != null) {
configureJobPropertiesForStorageHandler(tableDesc, jobConf, true)
diff --git
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveScan.scala
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveScan.scala
index 64fcf23f8..e71e428b7 100644
---
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveScan.scala
+++
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveScan.scala
@@ -37,7 +37,7 @@ import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SerializableConfiguration
-import org.apache.kyuubi.spark.connector.hive.KyuubiHiveConnectorException
+import org.apache.kyuubi.spark.connector.hive.{HiveConnectorUtils,
KyuubiHiveConnectorException}
case class HiveScan(
sparkSession: SparkSession,
@@ -88,7 +88,7 @@ case class HiveScan(
}
lazy val partitionValueProject =
GenerateUnsafeProjection.generate(readPartitionAttributes,
partitionAttributes)
- val splitFiles = selectedPartitions.flatMap { partition =>
+ val splitFiles: Seq[PartitionedFile] = selectedPartitions.flatMap {
partition =>
val partitionValues =
if (readPartitionAttributes != partitionAttributes) {
partitionValueProject(partition.values).copy()
@@ -115,7 +115,7 @@ case class HiveScan(
}
if (splitFiles.length == 1) {
- val path = new Path(splitFiles(0).filePath)
+ val path = new
Path(HiveConnectorUtils.partitionedFilePath(splitFiles(0)))
if (!isSplitable(path) && splitFiles(0).length >
sparkSession.sparkContext.getConf.getOption("spark.io.warning.largeFileThreshold")
.getOrElse("1024000000").toLong) {
diff --git
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveBatchWrite.scala
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveBatchWrite.scala
index 625d79d0c..d12fc0efc 100644
---
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveBatchWrite.scala
+++
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveBatchWrite.scala
@@ -28,13 +28,12 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.connector.write.{BatchWrite, DataWriterFactory,
PhysicalWriteInfo, WriterCommitMessage}
-import org.apache.spark.sql.execution.command.CommandUtils
import org.apache.spark.sql.execution.datasources.{WriteJobDescription,
WriteTaskResult}
import org.apache.spark.sql.execution.datasources.v2.FileBatchWrite
import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.{hive,
toSQLValue, HiveExternalCatalog}
import org.apache.spark.sql.types.StringType
-import org.apache.kyuubi.spark.connector.hive.{HiveTableCatalog,
KyuubiHiveConnectorException}
+import org.apache.kyuubi.spark.connector.hive.{HiveConnectorUtils,
HiveTableCatalog, KyuubiHiveConnectorException}
import
org.apache.kyuubi.spark.connector.hive.write.HiveWriteHelper.getPartitionSpec
class HiveBatchWrite(
@@ -77,7 +76,8 @@ class HiveBatchWrite(
val catalog = hiveTableCatalog.catalog
if (sparkSession.sessionState.conf.autoSizeUpdateEnabled) {
val newTable = catalog.getTableMetadata(table.identifier)
- val newSize = CommandUtils.calculateTotalSize(sparkSession, newTable)
+ val (newSize, _) =
+ HiveConnectorUtils.calculateTotalSize(sparkSession, newTable,
hiveTableCatalog)
val newStats = CatalogStatistics(sizeInBytes = newSize)
catalog.alterTableStats(table.identifier, Some(newStats))
} else if (table.stats.nonEmpty) {
diff --git
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/spark/sql/hive/kyuubi/connector/HiveBridgeHelper.scala
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/spark/sql/hive/kyuubi/connector/HiveBridgeHelper.scala
index 305c1450e..95def8656 100644
---
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/spark/sql/hive/kyuubi/connector/HiveBridgeHelper.scala
+++
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/spark/sql/hive/kyuubi/connector/HiveBridgeHelper.scala
@@ -47,6 +47,7 @@ object HiveBridgeHelper {
val HadoopTableReader = org.apache.spark.sql.hive.HadoopTableReader
val SparkHadoopUtil = org.apache.spark.deploy.SparkHadoopUtil
val Utils = org.apache.spark.util.Utils
+ val CatalogV2Implicits =
org.apache.spark.sql.connector.catalog.CatalogV2Implicits
def postExternalCatalogEvent(sc: SparkContext, event: ExternalCatalogEvent):
Unit = {
sc.listenerBus.post(event)
diff --git
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala
index 9088a6cfe..c1575018e 100644
---
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala
+++
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala
@@ -27,9 +27,10 @@ import scala.util.Try
import com.google.common.collect.Maps
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException,
NoSuchTableException, TableAlreadyExistsException}
+import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException,
NoSuchTableException, TableAlreadyExistsException, UnresolvedRelation}
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
+import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper._
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -119,7 +120,9 @@ class HiveCatalogSuite extends KyuubiHiveTest {
val exception = intercept[AnalysisException] {
spark.table("hive.ns1.nonexistent_table")
}
- assert(exception.message === "Table or view not found:
hive.ns1.nonexistent_table")
+ assert(exception.plan.exists { p =>
+ p.exists(child => child.isInstanceOf[UnresolvedRelation])
+ })
}
}
@@ -131,13 +134,13 @@ class HiveCatalogSuite extends KyuubiHiveTest {
assert(catalog.listTables(Array("ns")).isEmpty)
- catalog.createTable(ident1, schema, Array.empty, emptyProps)
+ catalog.createTable(ident1, schema, Array.empty[Transform], emptyProps)
assert(catalog.listTables(Array("ns")).toSet == Set(ident1))
assert(catalog.listTables(Array("ns2")).isEmpty)
- catalog.createTable(ident3, schema, Array.empty, emptyProps)
- catalog.createTable(ident2, schema, Array.empty, emptyProps)
+ catalog.createTable(ident3, schema, Array.empty[Transform], emptyProps)
+ catalog.createTable(ident2, schema, Array.empty[Transform], emptyProps)
assert(catalog.listTables(Array("ns")).toSet == Set(ident1, ident2))
assert(catalog.listTables(Array("ns2")).toSet == Set(ident3))
@@ -157,10 +160,11 @@ class HiveCatalogSuite extends KyuubiHiveTest {
test("createTable") {
assert(!catalog.tableExists(testIdent))
- val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+ val table =
+ catalog.createTable(testIdent, schema, Array.empty[Transform],
emptyProps)
val parsed = CatalystSqlParser.parseMultipartIdentifier(table.name)
- assert(parsed == Seq("db", "test_table"))
+ assert(parsed == Seq("db", "test_table") || parsed == Seq("spark_catalog",
"db", "test_table"))
assert(table.schema == schema)
assert(filterV2TableProperties(table.properties) == Map())
@@ -174,10 +178,10 @@ class HiveCatalogSuite extends KyuubiHiveTest {
assert(!catalog.tableExists(testIdent))
- val table = catalog.createTable(testIdent, schema, Array.empty, properties)
+ val table = catalog.createTable(testIdent, schema, Array.empty[Transform],
properties)
val parsed = CatalystSqlParser.parseMultipartIdentifier(table.name)
- assert(parsed == Seq("db", "test_table"))
+ assert(parsed == Seq("db", "test_table") || parsed == Seq("spark_catalog",
"db", "test_table"))
assert(table.schema == schema)
assert(filterV2TableProperties(table.properties).asJava == properties)
@@ -188,13 +192,13 @@ class HiveCatalogSuite extends KyuubiHiveTest {
test("createTable: table already exists") {
assert(!catalog.tableExists(testIdent))
- val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+ val table = catalog.createTable(testIdent, schema, Array.empty[Transform],
emptyProps)
val exc = intercept[TableAlreadyExistsException] {
- catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+ catalog.createTable(testIdent, schema, Array.empty[Transform],
emptyProps)
}
- assert(exc.message.contains(table.name()))
+ assert(exc.message.contains(testIdent.name()))
assert(exc.message.contains("already exists"))
assert(catalog.tableExists(testIdent))
@@ -204,7 +208,7 @@ class HiveCatalogSuite extends KyuubiHiveTest {
test("tableExists") {
assert(!catalog.tableExists(testIdent))
- catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+ catalog.createTable(testIdent, schema, Array.empty[Transform], emptyProps)
assert(catalog.tableExists(testIdent))
@@ -218,32 +222,48 @@ class HiveCatalogSuite extends KyuubiHiveTest {
assert(!catalog.tableExists(testIdent))
// default location
- val t1 = catalog.createTable(testIdent, schema, Array.empty,
properties).asInstanceOf[HiveTable]
+ val t1 = catalog.createTable(
+ testIdent,
+ schema,
+ Array.empty[Transform],
+ properties).asInstanceOf[HiveTable]
assert(t1.catalogTable.location ===
catalog.catalog.defaultTablePath(testIdent.asTableIdentifier))
catalog.dropTable(testIdent)
// relative path
properties.put(TableCatalog.PROP_LOCATION, "relative/path")
- val t2 = catalog.createTable(testIdent, schema, Array.empty,
properties).asInstanceOf[HiveTable]
+ val t2 = catalog.createTable(
+ testIdent,
+ schema,
+ Array.empty[Transform],
+ properties).asInstanceOf[HiveTable]
assert(t2.catalogTable.location ===
makeQualifiedPathWithWarehouse("db.db/relative/path"))
catalog.dropTable(testIdent)
// absolute path without scheme
properties.put(TableCatalog.PROP_LOCATION, "/absolute/path")
- val t3 = catalog.createTable(testIdent, schema, Array.empty,
properties).asInstanceOf[HiveTable]
+ val t3 = catalog.createTable(
+ testIdent,
+ schema,
+ Array.empty[Transform],
+ properties).asInstanceOf[HiveTable]
assert(t3.catalogTable.location.toString === "file:/absolute/path")
catalog.dropTable(testIdent)
// absolute path with scheme
properties.put(TableCatalog.PROP_LOCATION, "file:/absolute/path")
- val t4 = catalog.createTable(testIdent, schema, Array.empty,
properties).asInstanceOf[HiveTable]
+ val t4 = catalog.createTable(
+ testIdent,
+ schema,
+ Array.empty[Transform],
+ properties).asInstanceOf[HiveTable]
assert(t4.catalogTable.location.toString === "file:/absolute/path")
catalog.dropTable(testIdent)
}
test("loadTable") {
- val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+ val table = catalog.createTable(testIdent, schema, Array.empty[Transform],
emptyProps)
val loaded = catalog.loadTable(testIdent)
assert(table.name == loaded.name)
@@ -253,15 +273,13 @@ class HiveCatalogSuite extends KyuubiHiveTest {
}
test("loadTable: table does not exist") {
- val exc = intercept[NoSuchTableException] {
+ intercept[NoSuchTableException] {
catalog.loadTable(testIdent)
}
-
- assert(exc.message.contains("Table or view 'test_table' not found in
database 'db'"))
}
test("invalidateTable") {
- val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+ val table = catalog.createTable(testIdent, schema, Array.empty[Transform],
emptyProps)
// Hive v2 don't cache table
catalog.invalidateTable(testIdent)
diff --git
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveQuerySuite.scala
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveQuerySuite.scala
index 16ea03234..d0b97676b 100644
---
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveQuerySuite.scala
+++
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveQuerySuite.scala
@@ -18,6 +18,7 @@
package org.apache.kyuubi.spark.connector.hive
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
+import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
class HiveQuerySuite extends KyuubiHiveTest {
@@ -70,7 +71,10 @@ class HiveQuerySuite extends KyuubiHiveTest {
| SELECT * FROM hive.ns1.tb1
|""".stripMargin)
}
- assert(e.getMessage().contains("Table or view not found:
hive.ns1.tb1"))
+
+ assert(e.plan.exists { p =>
+ p.exists(child => child.isInstanceOf[UnresolvedRelation])
+ })
}
}
}
diff --git
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/command/CreateNamespaceSuite.scala
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/command/CreateNamespaceSuite.scala
index 855eb0c67..e2e5b574b 100644
---
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/command/CreateNamespaceSuite.scala
+++
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/command/CreateNamespaceSuite.scala
@@ -62,7 +62,8 @@ trait CreateNamespaceSuiteBase extends DDLCommandTestUtils {
val e = intercept[IllegalArgumentException] {
sql(s"CREATE NAMESPACE $ns LOCATION ''")
}
- assert(e.getMessage.contains("Can not create a Path from an empty
string"))
+ assert(e.getMessage.contains("Can not create a Path from an empty
string") ||
+ e.getMessage.contains("The location name cannot be empty string"))
val uri = new Path(path).toUri
sql(s"CREATE NAMESPACE $ns LOCATION '$uri'")
@@ -83,7 +84,8 @@ trait CreateNamespaceSuiteBase extends DDLCommandTestUtils {
val e = intercept[NamespaceAlreadyExistsException] {
sql(s"CREATE NAMESPACE $ns")
}
- assert(e.getMessage.contains(s"Namespace '$namespace' already exists"))
+ assert(e.getMessage.contains(s"Namespace '$namespace' already exists") ||
+ e.getMessage.contains(s"Cannot create schema `fakens` because it
already exists"))
// The following will be no-op since the namespace already exists.
Try { sql(s"CREATE NAMESPACE IF NOT EXISTS $ns") }.isSuccess
diff --git
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/command/DropNamespaceSuite.scala
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/command/DropNamespaceSuite.scala
index 66eb42c86..81107c24f 100644
---
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/command/DropNamespaceSuite.scala
+++
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/command/DropNamespaceSuite.scala
@@ -60,7 +60,8 @@ trait DropNamespaceSuiteBase extends DDLCommandTestUtils {
val message = intercept[AnalysisException] {
sql(s"DROP NAMESPACE $catalogName.unknown")
}.getMessage
- assert(message.contains(s"'unknown' not found"))
+ assert(message.contains(s"'unknown' not found") ||
+ message.contains(s"The schema `unknown` cannot be found"))
}
test("drop non-empty namespace with a non-cascading mode") {