This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch branch-1.10
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/branch-1.10 by this push:
new 9482de6255 [KYUUBI #6939] KSHC supports Spark 3.5.5
9482de6255 is described below
commit 9482de62550136327443b2edc4051c3b71f20b52
Author: Cheng Pan <[email protected]>
AuthorDate: Mon Mar 3 13:42:09 2025 +0800
[KYUUBI #6939] KSHC supports Spark 3.5.5
Test Spark 3.5.5 Release Notes
https://spark.apache.org/releases/spark-release-3-5-5.html
Pass GHA.
No.
Closes #6939 from pan3793/spark-3.5.5.
Closes #6939
8c0288ae5 [Cheng Pan] ga
78b0e72db [Cheng Pan] nit
686a7b0a9 [Cheng Pan] fix
d40cc5bba [Cheng Pan] Bump Spark 3.5.5
Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
(cherry picked from commit d5b01fa3e2428f4a512632ab64238d7313666f83)
Signed-off-by: Cheng Pan <[email protected]>
---
.../spark/connector/hive/HiveConnectorUtils.scala | 126 ++++++++++-----------
1 file changed, 63 insertions(+), 63 deletions(-)
diff --git
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala
index 0ccfd4912e..f56aa977b6 100644
---
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala
+++
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala
@@ -19,15 +19,16 @@ package org.apache.kyuubi.spark.connector.hive
import java.lang.{Boolean => JBoolean, Long => JLong}
+import scala.util.Try
+
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.hive.ql.plan.{FileSinkDesc, TableDesc}
-import org.apache.spark.SPARK_VERSION
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.catalog.{CatalogTable,
CatalogTablePartition}
import org.apache.spark.sql.connector.catalog.TableChange
-import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, After,
ColumnPosition, DeleteColumn, First, RenameColumn, UpdateColumnComment,
UpdateColumnNullability, UpdateColumnPosition, UpdateColumnType}
+import org.apache.spark.sql.connector.catalog.TableChange._
import org.apache.spark.sql.execution.command.CommandUtils
import
org.apache.spark.sql.execution.command.CommandUtils.{calculateMultipleLocationSizes,
calculateSingleLocationSize}
import org.apache.spark.sql.execution.datasources.{PartitionDirectory,
PartitionedFile}
@@ -35,20 +36,18 @@ import org.apache.spark.sql.hive.execution.HiveFileFormat
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType}
-import
org.apache.kyuubi.spark.connector.common.SparkUtils.SPARK_RUNTIME_VERSION
import org.apache.kyuubi.util.reflect.{DynClasses, DynConstructors, DynMethods}
import org.apache.kyuubi.util.reflect.ReflectUtils.invokeAs
object HiveConnectorUtils extends Logging {
- // SPARK-43186
- def getHiveFileFormat(fileSinkConf: FileSinkDesc): HiveFileFormat = {
- if (SPARK_RUNTIME_VERSION >= "3.5") {
+ def getHiveFileFormat(fileSinkConf: FileSinkDesc): HiveFileFormat =
+ Try { // SPARK-43186: 3.5.0
DynConstructors.builder()
.impl(classOf[HiveFileFormat], classOf[FileSinkDesc])
.build[HiveFileFormat]()
.newInstance(fileSinkConf)
- } else if (SPARK_RUNTIME_VERSION >= "3.3") {
+ }.recover { case _: Exception =>
val shimFileSinkDescClz = DynClasses.builder()
.impl("org.apache.spark.sql.hive.HiveShim$ShimFileSinkDesc")
.build()
@@ -67,34 +66,26 @@ object HiveConnectorUtils extends Logging {
.impl(classOf[HiveFileFormat], shimFileSinkDescClz)
.build[HiveFileFormat]()
.newInstance(shimFileSinkDesc)
- } else {
- throw unsupportedSparkVersion()
- }
- }
+ }.get
- // SPARK-41970
- def partitionedFilePath(file: PartitionedFile): String = {
- if (SPARK_RUNTIME_VERSION >= "3.4") {
+ def partitionedFilePath(file: PartitionedFile): String =
+ Try { // SPARK-41970: 3.4.0
invokeAs[String](file, "urlEncodedPath")
- } else if (SPARK_RUNTIME_VERSION >= "3.3") {
+ }.recover { case _: Exception =>
invokeAs[String](file, "filePath")
- } else {
- throw unsupportedSparkVersion()
- }
- }
+ }.get
def splitFiles(
sparkSession: SparkSession,
file: AnyRef,
filePath: Path,
- isSplitable: Boolean,
- maxSplitBytes: Long,
- partitionValues: InternalRow): Seq[PartitionedFile] = {
-
- if (SPARK_RUNTIME_VERSION >= "4.0") { // SPARK-42821
+ isSplitable: JBoolean,
+ maxSplitBytes: JLong,
+ partitionValues: InternalRow): Seq[PartitionedFile] =
+ Try { // SPARK-42821: 4.0.0-preview2
val fileStatusWithMetadataClz = DynClasses.builder()
.impl("org.apache.spark.sql.execution.datasources.FileStatusWithMetadata")
- .build()
+ .buildChecked()
DynMethods
.builder("splitFiles")
.impl(
@@ -103,35 +94,58 @@ object HiveConnectorUtils extends Logging {
classOf[Boolean],
classOf[Long],
classOf[InternalRow])
- .build()
- .invoke[Seq[PartitionedFile]](
+ .buildChecked()
+ .invokeChecked[Seq[PartitionedFile]](
null,
file,
- isSplitable.asInstanceOf[JBoolean],
- maxSplitBytes.asInstanceOf[JLong],
+ isSplitable,
+ maxSplitBytes,
partitionValues)
- } else if (SPARK_RUNTIME_VERSION >= "3.5") { // SPARK-43039
+ }.recover { case _: Exception => // SPARK-51185: Spark 3.5.5
val fileStatusWithMetadataClz = DynClasses.builder()
.impl("org.apache.spark.sql.execution.datasources.FileStatusWithMetadata")
- .build()
+ .buildChecked()
DynMethods
.builder("splitFiles")
.impl(
"org.apache.spark.sql.execution.PartitionedFileUtil",
classOf[SparkSession],
fileStatusWithMetadataClz,
+ classOf[Path],
classOf[Boolean],
classOf[Long],
classOf[InternalRow])
- .build()
- .invoke[Seq[PartitionedFile]](
+ .buildChecked()
+ .invokeChecked[Seq[PartitionedFile]](
+ null,
+ sparkSession,
+ file,
+ filePath,
+ isSplitable,
+ maxSplitBytes,
+ partitionValues)
+ }.recover { case _: Exception => // SPARK-43039: 3.5.0
+ val fileStatusWithMetadataClz = DynClasses.builder()
+
.impl("org.apache.spark.sql.execution.datasources.FileStatusWithMetadata")
+ .buildChecked()
+ DynMethods
+ .builder("splitFiles")
+ .impl(
+ "org.apache.spark.sql.execution.PartitionedFileUtil",
+ classOf[SparkSession],
+ fileStatusWithMetadataClz,
+ classOf[Boolean],
+ classOf[Long],
+ classOf[InternalRow])
+ .buildChecked()
+ .invokeChecked[Seq[PartitionedFile]](
null,
sparkSession,
file,
- isSplitable.asInstanceOf[JBoolean],
- maxSplitBytes.asInstanceOf[JLong],
+ isSplitable,
+ maxSplitBytes,
partitionValues)
- } else if (SPARK_RUNTIME_VERSION >= "3.3") {
+ }.recover { case _: Exception =>
DynMethods
.builder("splitFiles")
.impl(
@@ -142,55 +156,41 @@ object HiveConnectorUtils extends Logging {
classOf[Boolean],
classOf[Long],
classOf[InternalRow])
- .build()
- .invoke[Seq[PartitionedFile]](
+ .buildChecked()
+ .invokeChecked[Seq[PartitionedFile]](
null,
sparkSession,
file,
filePath,
- isSplitable.asInstanceOf[JBoolean],
- maxSplitBytes.asInstanceOf[JLong],
+ isSplitable,
+ maxSplitBytes,
partitionValues)
- } else {
- throw unsupportedSparkVersion()
- }
- }
+ }.get
- def createPartitionDirectory(values: InternalRow, files: Seq[FileStatus]):
PartitionDirectory = {
- if (SPARK_RUNTIME_VERSION >= "3.5") {
+ def createPartitionDirectory(values: InternalRow, files: Seq[FileStatus]):
PartitionDirectory =
+ Try { // SPARK-43039: 3.5.0
new DynMethods.Builder("apply")
.impl(classOf[PartitionDirectory], classOf[InternalRow],
classOf[Array[FileStatus]])
.buildChecked()
.asStatic()
.invoke[PartitionDirectory](values, files.toArray)
- } else if (SPARK_RUNTIME_VERSION >= "3.3") {
+ }.recover { case _: Exception =>
new DynMethods.Builder("apply")
.impl(classOf[PartitionDirectory], classOf[InternalRow],
classOf[Seq[FileStatus]])
.buildChecked()
.asStatic()
.invoke[PartitionDirectory](values, files)
- } else {
- throw unsupportedSparkVersion()
- }
- }
+ }.get
- def getPartitionFilePath(file: AnyRef): Path = {
- if (SPARK_RUNTIME_VERSION >= "3.5") {
+ def getPartitionFilePath(file: AnyRef): Path =
+ Try { // SPARK-43039: 3.5.0
new DynMethods.Builder("getPath")
.impl("org.apache.spark.sql.execution.datasources.FileStatusWithMetadata")
.build()
.invoke[Path](file)
- } else if (SPARK_RUNTIME_VERSION >= "3.3") {
+ }.recover { case _: Exception =>
file.asInstanceOf[FileStatus].getPath
- } else {
- throw unsupportedSparkVersion()
- }
- }
-
- private def unsupportedSparkVersion(): KyuubiHiveConnectorException = {
- KyuubiHiveConnectorException(s"Spark version $SPARK_VERSION " +
- "is not supported by Kyuubi spark hive connector.")
- }
+ }.get
def calculateTotalSize(
spark: SparkSession,