This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch branch-1.10
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/branch-1.10 by this push:
     new 9482de6255 [KYUUBI #6939] KSHC supports Spark 3.5.5
9482de6255 is described below

commit 9482de62550136327443b2edc4051c3b71f20b52
Author: Cheng Pan <[email protected]>
AuthorDate: Mon Mar 3 13:42:09 2025 +0800

    [KYUUBI #6939] KSHC supports Spark 3.5.5
    
    Test Spark 3.5.5 Release Notes
    
    https://spark.apache.org/releases/spark-release-3-5-5.html
    
    Pass GHA.
    
    No.
    
    Closes #6939 from pan3793/spark-3.5.5.
    
    Closes #6939
    
    8c0288ae5 [Cheng Pan] ga
    78b0e72db [Cheng Pan] nit
    686a7b0a9 [Cheng Pan] fix
    d40cc5bba [Cheng Pan] Bump Spark 3.5.5
    
    Authored-by: Cheng Pan <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
    (cherry picked from commit d5b01fa3e2428f4a512632ab64238d7313666f83)
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../spark/connector/hive/HiveConnectorUtils.scala  | 126 ++++++++++-----------
 1 file changed, 63 insertions(+), 63 deletions(-)

diff --git 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala
 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala
index 0ccfd4912e..f56aa977b6 100644
--- 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala
+++ 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala
@@ -19,15 +19,16 @@ package org.apache.kyuubi.spark.connector.hive
 
 import java.lang.{Boolean => JBoolean, Long => JLong}
 
+import scala.util.Try
+
 import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.hadoop.hive.ql.plan.{FileSinkDesc, TableDesc}
-import org.apache.spark.SPARK_VERSION
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.catalog.{CatalogTable, 
CatalogTablePartition}
 import org.apache.spark.sql.connector.catalog.TableChange
-import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, After, 
ColumnPosition, DeleteColumn, First, RenameColumn, UpdateColumnComment, 
UpdateColumnNullability, UpdateColumnPosition, UpdateColumnType}
+import org.apache.spark.sql.connector.catalog.TableChange._
 import org.apache.spark.sql.execution.command.CommandUtils
 import 
org.apache.spark.sql.execution.command.CommandUtils.{calculateMultipleLocationSizes,
 calculateSingleLocationSize}
 import org.apache.spark.sql.execution.datasources.{PartitionDirectory, 
PartitionedFile}
@@ -35,20 +36,18 @@ import org.apache.spark.sql.hive.execution.HiveFileFormat
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType}
 
-import 
org.apache.kyuubi.spark.connector.common.SparkUtils.SPARK_RUNTIME_VERSION
 import org.apache.kyuubi.util.reflect.{DynClasses, DynConstructors, DynMethods}
 import org.apache.kyuubi.util.reflect.ReflectUtils.invokeAs
 
 object HiveConnectorUtils extends Logging {
 
-  // SPARK-43186
-  def getHiveFileFormat(fileSinkConf: FileSinkDesc): HiveFileFormat = {
-    if (SPARK_RUNTIME_VERSION >= "3.5") {
+  def getHiveFileFormat(fileSinkConf: FileSinkDesc): HiveFileFormat =
+    Try { // SPARK-43186: 3.5.0
       DynConstructors.builder()
         .impl(classOf[HiveFileFormat], classOf[FileSinkDesc])
         .build[HiveFileFormat]()
         .newInstance(fileSinkConf)
-    } else if (SPARK_RUNTIME_VERSION >= "3.3") {
+    }.recover { case _: Exception =>
       val shimFileSinkDescClz = DynClasses.builder()
         .impl("org.apache.spark.sql.hive.HiveShim$ShimFileSinkDesc")
         .build()
@@ -67,34 +66,26 @@ object HiveConnectorUtils extends Logging {
         .impl(classOf[HiveFileFormat], shimFileSinkDescClz)
         .build[HiveFileFormat]()
         .newInstance(shimFileSinkDesc)
-    } else {
-      throw unsupportedSparkVersion()
-    }
-  }
+    }.get
 
-  // SPARK-41970
-  def partitionedFilePath(file: PartitionedFile): String = {
-    if (SPARK_RUNTIME_VERSION >= "3.4") {
+  def partitionedFilePath(file: PartitionedFile): String =
+    Try { // SPARK-41970: 3.4.0
       invokeAs[String](file, "urlEncodedPath")
-    } else if (SPARK_RUNTIME_VERSION >= "3.3") {
+    }.recover { case _: Exception =>
       invokeAs[String](file, "filePath")
-    } else {
-      throw unsupportedSparkVersion()
-    }
-  }
+    }.get
 
   def splitFiles(
       sparkSession: SparkSession,
       file: AnyRef,
       filePath: Path,
-      isSplitable: Boolean,
-      maxSplitBytes: Long,
-      partitionValues: InternalRow): Seq[PartitionedFile] = {
-
-    if (SPARK_RUNTIME_VERSION >= "4.0") { // SPARK-42821
+      isSplitable: JBoolean,
+      maxSplitBytes: JLong,
+      partitionValues: InternalRow): Seq[PartitionedFile] =
+    Try { // SPARK-42821: 4.0.0-preview2
       val fileStatusWithMetadataClz = DynClasses.builder()
         
.impl("org.apache.spark.sql.execution.datasources.FileStatusWithMetadata")
-        .build()
+        .buildChecked()
       DynMethods
         .builder("splitFiles")
         .impl(
@@ -103,35 +94,58 @@ object HiveConnectorUtils extends Logging {
           classOf[Boolean],
           classOf[Long],
           classOf[InternalRow])
-        .build()
-        .invoke[Seq[PartitionedFile]](
+        .buildChecked()
+        .invokeChecked[Seq[PartitionedFile]](
           null,
           file,
-          isSplitable.asInstanceOf[JBoolean],
-          maxSplitBytes.asInstanceOf[JLong],
+          isSplitable,
+          maxSplitBytes,
           partitionValues)
-    } else if (SPARK_RUNTIME_VERSION >= "3.5") { // SPARK-43039
+    }.recover { case _: Exception => // SPARK-51185: Spark 3.5.5
       val fileStatusWithMetadataClz = DynClasses.builder()
         
.impl("org.apache.spark.sql.execution.datasources.FileStatusWithMetadata")
-        .build()
+        .buildChecked()
       DynMethods
         .builder("splitFiles")
         .impl(
           "org.apache.spark.sql.execution.PartitionedFileUtil",
           classOf[SparkSession],
           fileStatusWithMetadataClz,
+          classOf[Path],
           classOf[Boolean],
           classOf[Long],
           classOf[InternalRow])
-        .build()
-        .invoke[Seq[PartitionedFile]](
+        .buildChecked()
+        .invokeChecked[Seq[PartitionedFile]](
+          null,
+          sparkSession,
+          file,
+          filePath,
+          isSplitable,
+          maxSplitBytes,
+          partitionValues)
+    }.recover { case _: Exception => // SPARK-43039: 3.5.0
+      val fileStatusWithMetadataClz = DynClasses.builder()
+        
.impl("org.apache.spark.sql.execution.datasources.FileStatusWithMetadata")
+        .buildChecked()
+      DynMethods
+        .builder("splitFiles")
+        .impl(
+          "org.apache.spark.sql.execution.PartitionedFileUtil",
+          classOf[SparkSession],
+          fileStatusWithMetadataClz,
+          classOf[Boolean],
+          classOf[Long],
+          classOf[InternalRow])
+        .buildChecked()
+        .invokeChecked[Seq[PartitionedFile]](
           null,
           sparkSession,
           file,
-          isSplitable.asInstanceOf[JBoolean],
-          maxSplitBytes.asInstanceOf[JLong],
+          isSplitable,
+          maxSplitBytes,
           partitionValues)
-    } else if (SPARK_RUNTIME_VERSION >= "3.3") {
+    }.recover { case _: Exception =>
       DynMethods
         .builder("splitFiles")
         .impl(
@@ -142,55 +156,41 @@ object HiveConnectorUtils extends Logging {
           classOf[Boolean],
           classOf[Long],
           classOf[InternalRow])
-        .build()
-        .invoke[Seq[PartitionedFile]](
+        .buildChecked()
+        .invokeChecked[Seq[PartitionedFile]](
           null,
           sparkSession,
           file,
           filePath,
-          isSplitable.asInstanceOf[JBoolean],
-          maxSplitBytes.asInstanceOf[JLong],
+          isSplitable,
+          maxSplitBytes,
           partitionValues)
-    } else {
-      throw unsupportedSparkVersion()
-    }
-  }
+    }.get
 
-  def createPartitionDirectory(values: InternalRow, files: Seq[FileStatus]): 
PartitionDirectory = {
-    if (SPARK_RUNTIME_VERSION >= "3.5") {
+  def createPartitionDirectory(values: InternalRow, files: Seq[FileStatus]): 
PartitionDirectory =
+    Try { // SPARK-43039: 3.5.0
       new DynMethods.Builder("apply")
         .impl(classOf[PartitionDirectory], classOf[InternalRow], 
classOf[Array[FileStatus]])
         .buildChecked()
         .asStatic()
         .invoke[PartitionDirectory](values, files.toArray)
-    } else if (SPARK_RUNTIME_VERSION >= "3.3") {
+    }.recover { case _: Exception =>
       new DynMethods.Builder("apply")
         .impl(classOf[PartitionDirectory], classOf[InternalRow], 
classOf[Seq[FileStatus]])
         .buildChecked()
         .asStatic()
         .invoke[PartitionDirectory](values, files)
-    } else {
-      throw unsupportedSparkVersion()
-    }
-  }
+    }.get
 
-  def getPartitionFilePath(file: AnyRef): Path = {
-    if (SPARK_RUNTIME_VERSION >= "3.5") {
+  def getPartitionFilePath(file: AnyRef): Path =
+    Try { // SPARK-43039: 3.5.0
       new DynMethods.Builder("getPath")
         
.impl("org.apache.spark.sql.execution.datasources.FileStatusWithMetadata")
         .build()
         .invoke[Path](file)
-    } else if (SPARK_RUNTIME_VERSION >= "3.3") {
+    }.recover { case _: Exception =>
       file.asInstanceOf[FileStatus].getPath
-    } else {
-      throw unsupportedSparkVersion()
-    }
-  }
-
-  private def unsupportedSparkVersion(): KyuubiHiveConnectorException = {
-    KyuubiHiveConnectorException(s"Spark version $SPARK_VERSION " +
-      "is not supported by Kyuubi spark hive connector.")
-  }
+    }.get
 
   def calculateTotalSize(
       spark: SparkSession,

Reply via email to