This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 8cc9b98e2 [KYUUBI #5384][KSCH] Hive connector supports Spark 3.5
8cc9b98e2 is described below

commit 8cc9b98e25f443e6337176914e6472a29e336f62
Author: Cheng Pan <[email protected]>
AuthorDate: Thu Mar 7 17:56:30 2024 +0800

    [KYUUBI #5384][KSCH] Hive connector supports Spark 3.5
    
    # :mag: Description
    ## Issue References ๐Ÿ”—
    
    This pull request fixes #5384
    
    ## Describe Your Solution ๐Ÿ”ง
    
    ## Types of changes :bookmark:
    
    - [ ] Bugfix (non-breaking change which fixes an issue)
    - [x] New feature (non-breaking change which adds functionality)
    - [ ] Breaking change (fix or feature that would cause existing 
functionality to change)
    
    ## Test Plan ๐Ÿงช
    
    #### Behavior Without This Pull Request :coffin:
    
    #### Behavior With This Pull Request :tada:
    
    #### Related Unit Tests
    
    ---
    
    # Checklist ๐Ÿ“
    
    - [ ] This patch was not authored or co-authored using [Generative 
Tooling](https://www.apache.org/legal/generative-tooling.html)
    
    **Be nice. Be informative.**
    
    Closes #6133 from Kwafoor/kyuubi_6073.
    
    Closes #5384
    
    9234e35ad [Cheng Pan] fix
    7766dfda5 [Cheng Pan] nit
    e9da162f8 [Cheng Pan] nit
    676bfb26e [Cheng Pan] pretty
    c241859af [Cheng Pan] pretty
    0eedcf82c [wangjunbo] compat with spark 3.3
    3d866546c [wangjunbo] format code
    a0898f50f [wangjunbo] delete Unused import
    9577f7fe8 [wangjunbo] [KYUUBI #5384] kyuubi-spark-connector-hive supports 
Spark 3.5
    
    Lead-authored-by: Cheng Pan <[email protected]>
    Co-authored-by: wangjunbo <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../spark/connector/hive/HiveConnectorUtils.scala  | 96 ++++++++++++++++++++++
 .../spark/connector/hive/read/HiveFileIndex.scala  |  3 +
 .../spark/connector/hive/read/HiveScan.scala       |  3 +-
 .../spark/connector/hive/write/HiveWrite.scala     | 11 +--
 .../hive/kyuubi/connector/HiveBridgeHelper.scala   |  5 +-
 .../spark/connector/hive/HiveCatalogSuite.scala    |  8 +-
 .../spark/connector/hive/HiveQuerySuite.scala      |  8 +-
 pom.xml                                            |  1 +
 8 files changed, 117 insertions(+), 18 deletions(-)

diff --git 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala
 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala
index 615093186..fd8b08f56 100644
--- 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala
+++ 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala
@@ -17,23 +17,63 @@
 
 package org.apache.kyuubi.spark.connector.hive
 
+import java.lang.{Boolean => JBoolean, Long => JLong}
+
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.hive.ql.plan.{FileSinkDesc, TableDesc}
 import org.apache.spark.SPARK_VERSION
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.catalog.{CatalogTable, 
CatalogTablePartition}
 import org.apache.spark.sql.connector.catalog.TableChange
 import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, After, 
ColumnPosition, DeleteColumn, First, RenameColumn, UpdateColumnComment, 
UpdateColumnNullability, UpdateColumnPosition, UpdateColumnType}
 import org.apache.spark.sql.execution.command.CommandUtils
 import 
org.apache.spark.sql.execution.command.CommandUtils.{calculateMultipleLocationSizes,
 calculateSingleLocationSize}
 import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.hive.execution.HiveFileFormat
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType}
 
 import 
org.apache.kyuubi.spark.connector.common.SparkUtils.SPARK_RUNTIME_VERSION
+import org.apache.kyuubi.util.reflect.{DynClasses, DynConstructors, DynMethods}
 import org.apache.kyuubi.util.reflect.ReflectUtils.invokeAs
 
 object HiveConnectorUtils extends Logging {
 
+  // SPARK-43186
+  def getHiveFileFormat(fileSinkConf: FileSinkDesc): HiveFileFormat = {
+    if (SPARK_RUNTIME_VERSION >= "3.5") {
+      DynConstructors.builder()
+        .impl(classOf[HiveFileFormat], classOf[FileSinkDesc])
+        .build[HiveFileFormat]()
+        .newInstance(fileSinkConf)
+    } else if (SPARK_RUNTIME_VERSION >= "3.3") {
+      val shimFileSinkDescClz = DynClasses.builder()
+        .impl("org.apache.spark.sql.hive.HiveShim$ShimFileSinkDesc")
+        .build()
+      val shimFileSinkDesc = DynConstructors.builder()
+        .impl(
+          "org.apache.spark.sql.hive.HiveShim$ShimFileSinkDesc",
+          classOf[String],
+          classOf[TableDesc],
+          classOf[Boolean])
+        .build[AnyRef]()
+        .newInstance(
+          fileSinkConf.getDirName.toString,
+          fileSinkConf.getTableInfo,
+          fileSinkConf.getCompressed.asInstanceOf[JBoolean])
+      DynConstructors.builder()
+        .impl(classOf[HiveFileFormat], shimFileSinkDescClz)
+        .build[HiveFileFormat]()
+        .newInstance(shimFileSinkDesc)
+    } else {
+      throw KyuubiHiveConnectorException(s"Spark version $SPARK_VERSION " +
+        s"is not supported by Kyuubi spark hive connector.")
+    }
+  }
+
+  // SPARK-41970
   def partitionedFilePath(file: PartitionedFile): String = {
     if (SPARK_RUNTIME_VERSION >= "3.4") {
       invokeAs[String](file, "urlEncodedPath")
@@ -45,6 +85,62 @@ object HiveConnectorUtils extends Logging {
     }
   }
 
+  // SPARK-43039
+  def splitFiles(
+      sparkSession: SparkSession,
+      file: AnyRef,
+      filePath: Path,
+      isSplitable: Boolean,
+      maxSplitBytes: Long,
+      partitionValues: InternalRow): Seq[PartitionedFile] = {
+
+    if (SPARK_RUNTIME_VERSION >= "3.5") {
+      val fileStatusWithMetadataClz = DynClasses.builder()
+        
.impl("org.apache.spark.sql.execution.datasources.FileStatusWithMetadata")
+        .build()
+      DynMethods
+        .builder("splitFiles")
+        .impl(
+          "org.apache.spark.sql.execution.PartitionedFileUtil",
+          classOf[SparkSession],
+          fileStatusWithMetadataClz,
+          classOf[Boolean],
+          classOf[Long],
+          classOf[InternalRow])
+        .build()
+        .invoke[Seq[PartitionedFile]](
+          null,
+          sparkSession,
+          file,
+          isSplitable.asInstanceOf[JBoolean],
+          maxSplitBytes.asInstanceOf[JLong],
+          partitionValues)
+    } else if (SPARK_RUNTIME_VERSION >= "3.3") {
+      DynMethods
+        .builder("splitFiles")
+        .impl(
+          "org.apache.spark.sql.execution.PartitionedFileUtil",
+          classOf[SparkSession],
+          classOf[FileStatus],
+          classOf[Path],
+          classOf[Boolean],
+          classOf[Long],
+          classOf[InternalRow])
+        .build()
+        .invoke[Seq[PartitionedFile]](
+          null,
+          sparkSession,
+          file,
+          filePath,
+          isSplitable.asInstanceOf[JBoolean],
+          maxSplitBytes.asInstanceOf[JLong],
+          partitionValues)
+    } else {
+      throw KyuubiHiveConnectorException(s"Spark version $SPARK_VERSION " +
+        s"is not supported by Kyuubi spark hive connector.")
+    }
+  }
+
   def calculateTotalSize(
       spark: SparkSession,
       catalogTable: CatalogTable,
diff --git 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileIndex.scala
 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileIndex.scala
index 0d79621f8..5e24199f8 100644
--- 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileIndex.scala
+++ 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileIndex.scala
@@ -20,6 +20,7 @@ package org.apache.kyuubi.spark.connector.hive.read
 import java.net.URI
 
 import scala.collection.mutable
+import scala.language.implicitConversions
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, Path}
@@ -156,6 +157,8 @@ class HiveInMemoryFileIndex(
   private val partDirToBindHivePart: mutable.Map[PartitionDirectory, 
CatalogTablePartition] =
     mutable.Map()
 
+  implicit private def seqToArr(seq: Seq[FileStatus]): Array[FileStatus] = 
seq.toArray
+
   override def listFiles(
       partitionFilters: Seq[Expression],
       dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {
diff --git 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveScan.scala
 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveScan.scala
index ecdfc76c5..816c2661d 100644
--- 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveScan.scala
+++ 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveScan.scala
@@ -28,7 +28,6 @@ import 
org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
 import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
 import org.apache.spark.sql.connector.read.PartitionReaderFactory
-import org.apache.spark.sql.execution.PartitionedFileUtil
 import org.apache.spark.sql.execution.datasources.{FilePartition, 
PartitionedFile}
 import org.apache.spark.sql.execution.datasources.v2.FileScan
 import 
org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.HiveClientImpl
@@ -106,7 +105,7 @@ case class HiveScan(
         }
       partition.files.flatMap { file =>
         val filePath = file.getPath
-        val partFiles = PartitionedFileUtil.splitFiles(
+        val partFiles = HiveConnectorUtils.splitFiles(
           sparkSession = sparkSession,
           file = file,
           filePath = filePath,
diff --git 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWrite.scala
 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWrite.scala
index 957c19582..df18f1f8b 100644
--- 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWrite.scala
+++ 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWrite.scala
@@ -23,7 +23,7 @@ import scala.collection.JavaConverters._
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
-import org.apache.hadoop.hive.ql.plan.TableDesc
+import org.apache.hadoop.hive.ql.plan.{FileSinkDesc, TableDesc}
 import org.apache.hadoop.mapreduce.Job
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
 import org.apache.spark.internal.Logging
@@ -36,11 +36,12 @@ import org.apache.spark.sql.connector.write.{BatchWrite, 
LogicalWriteInfo, Write
 import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, 
WriteJobDescription}
 import org.apache.spark.sql.execution.datasources.v2.FileBatchWrite
 import org.apache.spark.sql.execution.metric.SQLMetric
-import org.apache.spark.sql.hive.execution.{HiveFileFormat, HiveOptions}
-import 
org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.{FileSinkDesc, 
HiveClientImpl, StructTypeHelper}
+import org.apache.spark.sql.hive.execution.HiveOptions
+import 
org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.{HiveClientImpl, 
StructTypeHelper}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.SerializableConfiguration
 
+import 
org.apache.kyuubi.spark.connector.hive.HiveConnectorUtils.getHiveFileFormat
 import org.apache.kyuubi.spark.connector.hive.HiveTableCatalog
 
 case class HiveWrite(
@@ -75,7 +76,7 @@ case class HiveWrite(
   override def toBatch: BatchWrite = {
     val tmpLocation = HiveWriteHelper.getExternalTmpPath(externalCatalog, 
hadoopConf, tableLocation)
 
-    val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false)
+    val fileSinkConf = new FileSinkDesc(tmpLocation, tableDesc, false)
     handleCompression(fileSinkConf, hadoopConf)
 
     val committer = FileCommitProtocol.instantiate(
@@ -118,7 +119,7 @@ case class HiveWrite(
       pathName: String,
       customPartitionLocations: Map[TablePartitionSpec, String],
       options: Map[String, String]): WriteJobDescription = {
-    val hiveFileFormat = new HiveFileFormat(fileSinkConf)
+    val hiveFileFormat = getHiveFileFormat(fileSinkConf)
     val dataSchema = StructType(info.schema().fields.take(dataColumns.length))
     val outputWriterFactory = hiveFileFormat.prepareWrite(sparkSession, job, 
options, dataSchema)
     val metrics: Map[String, SQLMetric] = BasicWriteJobStatsTracker.metrics
diff --git 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/spark/sql/hive/kyuubi/connector/HiveBridgeHelper.scala
 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/spark/sql/hive/kyuubi/connector/HiveBridgeHelper.scala
index 8e3a9cd3d..2993a0f12 100644
--- 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/spark/sql/hive/kyuubi/connector/HiveBridgeHelper.scala
+++ 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/spark/sql/hive/kyuubi/connector/HiveBridgeHelper.scala
@@ -32,7 +32,6 @@ object HiveBridgeHelper {
   type HiveMetastoreCatalog = org.apache.spark.sql.hive.HiveMetastoreCatalog
   type HiveExternalCatalog = org.apache.spark.sql.hive.HiveExternalCatalog
   type NextIterator[U] = org.apache.spark.util.NextIterator[U]
-  type FileSinkDesc = org.apache.spark.sql.hive.HiveShim.ShimFileSinkDesc
   type HiveVersion = org.apache.spark.sql.hive.client.HiveVersion
   type InsertIntoHiveTable = 
org.apache.spark.sql.hive.execution.InsertIntoHiveTable
 
@@ -97,7 +96,9 @@ object HiveBridgeHelper {
   }
 
   implicit class StructTypeHelper(structType: StructType) {
-    def toAttributes: Seq[AttributeReference] = structType.toAttributes
+    def toAttributes: Seq[AttributeReference] = structType.map { field =>
+      AttributeReference(field.name, field.dataType, field.nullable, 
field.metadata)()
+    }
   }
 
   def toSQLValue(v: Any, t: DataType): String = Literal.create(v, t) match {
diff --git 
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala
 
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala
index f43dafd11..0485087bf 100644
--- 
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala
+++ 
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala
@@ -27,7 +27,7 @@ import scala.util.Try
 import com.google.common.collect.Maps
 import org.apache.hadoop.fs.Path
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, 
NoSuchTableException, TableAlreadyExistsException, UnresolvedRelation}
+import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, 
NoSuchTableException, TableAlreadyExistsException}
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
 import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
 import org.apache.spark.sql.connector.expressions.Transform
@@ -121,9 +121,9 @@ class HiveCatalogSuite extends KyuubiHiveTest {
       val exception = intercept[AnalysisException] {
         spark.table("hive.ns1.nonexistent_table")
       }
-      assert(exception.plan.exists { p =>
-        p.exists(child => child.isInstanceOf[UnresolvedRelation])
-      })
+      assert(exception.message.contains("[TABLE_OR_VIEW_NOT_FOUND] " +
+        "The table or view `hive`.`ns1`.`nonexistent_table` cannot be found.")
+        || exception.message.contains("Table or view not found: 
hive.ns1.nonexistent_table"))
     }
   }
 
diff --git 
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveQuerySuite.scala
 
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveQuerySuite.scala
index b217d00b4..65ad81d95 100644
--- 
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveQuerySuite.scala
+++ 
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveQuerySuite.scala
@@ -18,7 +18,6 @@
 package org.apache.kyuubi.spark.connector.hive
 
 import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
-import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
 
 class HiveQuerySuite extends KyuubiHiveTest {
 
@@ -79,10 +78,9 @@ class HiveQuerySuite extends KyuubiHiveTest {
                | SELECT * FROM hive.ns1.tb1
                |""".stripMargin)
         }
-
-        assert(e.plan.exists { p =>
-          p.exists(child => child.isInstanceOf[UnresolvedRelation])
-        })
+        assert(e.message.contains(
+          "[TABLE_OR_VIEW_NOT_FOUND] The table or view `hive`.`ns1`.`tb1` 
cannot be found.") ||
+          e.message.contains("Table or view not found: hive.ns1.tb1"))
       }
     }
   }
diff --git a/pom.xml b/pom.xml
index f7d63d209..b70d5310a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2274,6 +2274,7 @@
             <id>spark-3.5</id>
             <modules>
                 <module>extensions/spark/kyuubi-extension-spark-3-5</module>
+                <module>extensions/spark/kyuubi-spark-connector-hive</module>
             </modules>
             <properties>
                 <delta.artifact>delta-spark</delta.artifact>

Reply via email to