This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 03dcedd89 [KYUUBI #6453] Make KSHC support Spark 4.0 and enable CI for 
Spark 4.0
03dcedd89 is described below

commit 03dcedd89ed26fb21d0475b974a637e805e06d3b
Author: Cheng Pan <[email protected]>
AuthorDate: Fri Jun 7 11:01:24 2024 +0800

    [KYUUBI #6453] Make KSHC support Spark 4.0 and enable CI for Spark 4.0
    
    # :mag: Description
    
    This PR makes KSHC support Spark 4.0, and also makes sure that the KSHC jar 
compiled against Spark 3.5 is binary compatible with Spark 4.0.
    
    We are ready to enable CI for Spark 4.0, except for authZ module.
    
    ## Types of changes :bookmark:
    
    - [ ] Bugfix (non-breaking change which fixes an issue)
    - [x] New feature (non-breaking change which adds functionality)
    - [ ] Breaking change (fix or feature that would cause existing 
functionality to change)
    
    ## Test Plan ๐Ÿงช
    
    Pass GHA.
    
    ---
    
    # Checklist ๐Ÿ“
    
    - [x] This patch was not authored or co-authored using [Generative 
Tooling](https://www.apache.org/legal/generative-tooling.html)
    
    **Be nice. Be informative.**
    
    Closes #6453 from pan3793/spark4-ci.
    
    Closes #6453
    
    695e3d7f7 [Cheng Pan] Update pom.xml
    2eaa0f88a [Cheng Pan] Update .github/workflows/master.yml
    b1f540a34 [Cheng Pan] cross test
    562839982 [Cheng Pan] fix
    9f0c2e1be [Cheng Pan] fix
    45f182462 [Cheng Pan] kshc
    227ef5bae [Cheng Pan] fix
    690a3b8b2 [Cheng Pan] Revert "fix"
    87fe7678b [Cheng Pan] fix
    60f55dbed [Cheng Pan] CI for Spark 4.
    
    Authored-by: Cheng Pan <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 .github/workflows/master.yml                       | 11 +++++++
 dev/kyuubi-codecov/pom.xml                         | 36 ++++++++++++++++++----
 .../spark/connector/hive/HiveConnectorUtils.scala  | 30 ++++++++++++++++--
 .../spark/connector/hive/HiveTableCatalog.scala    | 26 ++++++++--------
 .../connector/hive/write/HiveWriteHelper.scala     | 34 ++++++--------------
 pom.xml                                            | 23 ++++++++++++++
 6 files changed, 113 insertions(+), 47 deletions(-)

diff --git a/.github/workflows/master.yml b/.github/workflows/master.yml
index 503e03147..23edb0afd 100644
--- a/.github/workflows/master.yml
+++ b/.github/workflows/master.yml
@@ -56,6 +56,11 @@ jobs:
         exclude-tags: [""]
         comment: ["normal"]
         include:
+          - java: 17
+            spark: '4.0'
+            spark-archive: '-Pscala-2.13'
+            exclude-tags: ''
+            comment: 'normal'
           - java: 8
             spark: '3.5'
             spark-archive: 
'-Dspark.archive.mirror=https://archive.apache.org/dist/spark/spark-3.2.4 
-Dspark.archive.name=spark-3.2.4-bin-hadoop3.2.tgz -Pzookeeper-3.6'
@@ -193,6 +198,12 @@ jobs:
             - '3.4'
             - '3.3'
         comment: [ "normal" ]
+        include:
+          - java: 17
+            scala: "2.13"
+            spark-compile: "3.5"
+            spark-runtime: "4.0"
+            comment: "normal"
     env:
       SPARK_LOCAL_IP: localhost
       TEST_MODULES: "extensions/spark/kyuubi-spark-connector-hive,\
diff --git a/dev/kyuubi-codecov/pom.xml b/dev/kyuubi-codecov/pom.xml
index f22cbe186..f224b8cff 100644
--- a/dev/kyuubi-codecov/pom.xml
+++ b/dev/kyuubi-codecov/pom.xml
@@ -121,12 +121,6 @@
             <version>${project.version}</version>
         </dependency>
 
-        <dependency>
-            <groupId>org.apache.kyuubi</groupId>
-            <artifactId>kyuubi-spark-authz_${scala.binary.version}</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-
         <dependency>
             <groupId>org.apache.kyuubi</groupId>
             
<artifactId>kyuubi-spark-connector-tpcds_${scala.binary.version}</artifactId>
@@ -169,6 +163,11 @@
                     
<artifactId>kyuubi-extension-spark-3-2_${scala.binary.version}</artifactId>
                     <version>${project.version}</version>
                 </dependency>
+                <dependency>
+                    <groupId>org.apache.kyuubi</groupId>
+                    
<artifactId>kyuubi-spark-authz_${scala.binary.version}</artifactId>
+                    <version>${project.version}</version>
+                </dependency>
             </dependencies>
         </profile>
         <profile>
@@ -184,6 +183,11 @@
                     
<artifactId>kyuubi-spark-connector-hive_${scala.binary.version}</artifactId>
                     <version>${project.version}</version>
                 </dependency>
+                <dependency>
+                    <groupId>org.apache.kyuubi</groupId>
+                    
<artifactId>kyuubi-spark-authz_${scala.binary.version}</artifactId>
+                    <version>${project.version}</version>
+                </dependency>
             </dependencies>
         </profile>
         <profile>
@@ -209,6 +213,26 @@
                     
<artifactId>kyuubi-extension-spark-3-5_${scala.binary.version}</artifactId>
                     <version>${project.version}</version>
                 </dependency>
+                <dependency>
+                    <groupId>org.apache.kyuubi</groupId>
+                    
<artifactId>kyuubi-spark-connector-hive_${scala.binary.version}</artifactId>
+                    <version>${project.version}</version>
+                </dependency>
+                <dependency>
+                    <groupId>org.apache.kyuubi</groupId>
+                    
<artifactId>kyuubi-spark-authz_${scala.binary.version}</artifactId>
+                    <version>${project.version}</version>
+                </dependency>
+            </dependencies>
+        </profile>
+        <profile>
+            <id>spark-4.0</id>
+            <dependencies>
+                <dependency>
+                    <groupId>org.apache.kyuubi</groupId>
+                    
<artifactId>kyuubi-spark-connector-hive_${scala.binary.version}</artifactId>
+                    <version>${project.version}</version>
+                </dependency>
             </dependencies>
         </profile>
         <profile>
diff --git 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala
 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala
index 8eaaa0102..0ccfd4912 100644
--- 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala
+++ 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala
@@ -83,7 +83,6 @@ object HiveConnectorUtils extends Logging {
     }
   }
 
-  // SPARK-43039
   def splitFiles(
       sparkSession: SparkSession,
       file: AnyRef,
@@ -92,7 +91,26 @@ object HiveConnectorUtils extends Logging {
       maxSplitBytes: Long,
       partitionValues: InternalRow): Seq[PartitionedFile] = {
 
-    if (SPARK_RUNTIME_VERSION >= "3.5") {
+    if (SPARK_RUNTIME_VERSION >= "4.0") { // SPARK-42821
+      val fileStatusWithMetadataClz = DynClasses.builder()
+        
.impl("org.apache.spark.sql.execution.datasources.FileStatusWithMetadata")
+        .build()
+      DynMethods
+        .builder("splitFiles")
+        .impl(
+          "org.apache.spark.sql.execution.PartitionedFileUtil",
+          fileStatusWithMetadataClz,
+          classOf[Boolean],
+          classOf[Long],
+          classOf[InternalRow])
+        .build()
+        .invoke[Seq[PartitionedFile]](
+          null,
+          file,
+          isSplitable.asInstanceOf[JBoolean],
+          maxSplitBytes.asInstanceOf[JLong],
+          partitionValues)
+    } else if (SPARK_RUNTIME_VERSION >= "3.5") { // SPARK-43039
       val fileStatusWithMetadataClz = DynClasses.builder()
         
.impl("org.apache.spark.sql.execution.datasources.FileStatusWithMetadata")
         .build()
@@ -384,7 +402,13 @@ object HiveConnectorUtils extends Logging {
     new StructType(newFields)
   }
 
-  def withSQLConf[T](pairs: (String, String)*)(f: => T): T = {
+  // This is a fork of Spark's withSQLConf, and we use a different name to 
avoid linkage
+  // issue on cross-version cases.
+  // For example, SPARK-46227(4.0.0) moves `withSQLConf` from SQLHelper to 
SQLConfHelper,
+  // classes that extend SQLConfHelper will prefer to linkage super class's 
method when
+  // compiling with Spark 4.0, then linkage error will happen when run the jar 
with lower
+  // Spark versions.
+  def withSparkSQLConf[T](pairs: (String, String)*)(f: => T): T = {
     val conf = SQLConf.get
     val (keys, values) = pairs.unzip
     val currentValues = keys.map { key =>
diff --git 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala
 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala
index c128d67f1..91088d787 100644
--- 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala
+++ 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala
@@ -44,7 +44,7 @@ import 
org.apache.spark.sql.internal.StaticSQLConf.{CATALOG_IMPLEMENTATION, GLOB
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
-import org.apache.kyuubi.spark.connector.hive.HiveConnectorUtils.withSQLConf
+import 
org.apache.kyuubi.spark.connector.hive.HiveConnectorUtils.withSparkSQLConf
 import 
org.apache.kyuubi.spark.connector.hive.HiveTableCatalog.{getStorageFormatAndProvider,
 toCatalogDatabase, CatalogDatabaseHelper, IdentifierHelper, NamespaceHelper}
 import 
org.apache.kyuubi.spark.connector.hive.KyuubiHiveConnectorDelegationTokenProvider.metastoreTokenSignature
 
@@ -148,7 +148,7 @@ class HiveTableCatalog(sparkSession: SparkSession)
   override val defaultNamespace: Array[String] = Array("default")
 
   override def listTables(namespace: Array[String]): Array[Identifier] =
-    withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
+    withSparkSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
       namespace match {
         case Array(db) =>
           catalog
@@ -162,7 +162,7 @@ class HiveTableCatalog(sparkSession: SparkSession)
     }
 
   override def loadTable(ident: Identifier): Table =
-    withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
+    withSparkSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
       HiveTable(sparkSession, 
catalog.getTableMetadata(ident.asTableIdentifier), this)
     }
 
@@ -171,7 +171,7 @@ class HiveTableCatalog(sparkSession: SparkSession)
       schema: StructType,
       partitions: Array[Transform],
       properties: util.Map[String, String]): Table =
-    withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
+    withSparkSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
       import 
org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.TransformHelper
       val (partitionColumns, maybeBucketSpec) = 
partitions.toSeq.convertTransforms
       val location = Option(properties.get(TableCatalog.PROP_LOCATION))
@@ -213,7 +213,7 @@ class HiveTableCatalog(sparkSession: SparkSession)
     }
 
   override def alterTable(ident: Identifier, changes: TableChange*): Table =
-    withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
+    withSparkSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
       val catalogTable =
         try {
           catalog.getTableMetadata(ident.asTableIdentifier)
@@ -253,7 +253,7 @@ class HiveTableCatalog(sparkSession: SparkSession)
     }
 
   override def dropTable(ident: Identifier): Boolean =
-    withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
+    withSparkSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
       try {
         if (loadTable(ident) != null) {
           catalog.dropTable(
@@ -271,7 +271,7 @@ class HiveTableCatalog(sparkSession: SparkSession)
     }
 
   override def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit =
-    withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
+    withSparkSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
       if (tableExists(newIdent)) {
         throw new TableAlreadyExistsException(newIdent)
       }
@@ -288,12 +288,12 @@ class HiveTableCatalog(sparkSession: SparkSession)
   }
 
   override def listNamespaces(): Array[Array[String]] =
-    withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
+    withSparkSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
       catalog.listDatabases().map(Array(_)).toArray
     }
 
   override def listNamespaces(namespace: Array[String]): Array[Array[String]] =
-    withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
+    withSparkSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
       namespace match {
         case Array() =>
           listNamespaces()
@@ -305,7 +305,7 @@ class HiveTableCatalog(sparkSession: SparkSession)
     }
 
   override def loadNamespaceMetadata(namespace: Array[String]): 
util.Map[String, String] =
-    withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
+    withSparkSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
       namespace match {
         case Array(db) =>
           try {
@@ -323,7 +323,7 @@ class HiveTableCatalog(sparkSession: SparkSession)
   override def createNamespace(
       namespace: Array[String],
       metadata: util.Map[String, String]): Unit =
-    withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
+    withSparkSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
       namespace match {
         case Array(db) if !catalog.databaseExists(db) =>
           catalog.createDatabase(
@@ -339,7 +339,7 @@ class HiveTableCatalog(sparkSession: SparkSession)
     }
 
   override def alterNamespace(namespace: Array[String], changes: 
NamespaceChange*): Unit =
-    withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
+    withSparkSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
       namespace match {
         case Array(db) =>
           // validate that this catalog's reserved properties are not removed
@@ -379,7 +379,7 @@ class HiveTableCatalog(sparkSession: SparkSession)
   override def dropNamespace(
       namespace: Array[String],
       cascade: Boolean): Boolean =
-    withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
+    withSparkSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
       namespace match {
         case Array(db) if catalog.databaseExists(db) =>
           catalog.dropDatabase(db, ignoreIfNotExists = false, cascade)
diff --git 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWriteHelper.scala
 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWriteHelper.scala
index c3e73c011..558ac8ee9 100644
--- 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWriteHelper.scala
+++ 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWriteHelper.scala
@@ -28,7 +28,9 @@ import org.apache.hadoop.hive.common.FileUtils
 import org.apache.hadoop.hive.ql.exec.TaskRunner
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener
-import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.{hive, 
HiveExternalCatalog, HiveVersion}
+import 
org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.HiveExternalCatalog
+
+import org.apache.kyuubi.util.SemanticVersion
 
 // scalastyle:off line.size.limit
 /**
@@ -48,8 +50,6 @@ object HiveWriteHelper extends Logging {
       hadoopConf: Configuration,
       path: Path): Path = {
 
-    import hive._
-
     // Before Hive 1.1, when inserting into a table, Hive will create the 
staging directory under
     // a common scratch directory. After the writing is finished, Hive will 
simply empty the table
     // directory and move the staging directory to it.
@@ -59,24 +59,15 @@ object HiveWriteHelper extends Logging {
     // We have to follow the Hive behavior here, to avoid troubles. For 
example, if we create
     // staging directory under the table director for Hive prior to 1.1, the 
staging directory will
     // be removed by Hive when Hive is trying to empty the table directory.
-    val hiveVersionsUsingOldExternalTempPath: Set[HiveVersion] = Set(v12, v13, 
v14, v1_0)
-    val hiveVersionsUsingNewExternalTempPath: Set[HiveVersion] =
-      Set(v1_1, v1_2, v2_0, v2_1, v2_2, v2_3, v3_0, v3_1)
-
-    // Ensure all the supported versions are considered here.
-    assert(hiveVersionsUsingNewExternalTempPath ++ 
hiveVersionsUsingOldExternalTempPath ==
-      allSupportedHiveVersions)
-
-    val hiveVersion = 
externalCatalog.unwrapped.asInstanceOf[HiveExternalCatalog].client.version
+    val hiveVersion = SemanticVersion(
+      
externalCatalog.unwrapped.asInstanceOf[HiveExternalCatalog].client.version.fullVersion)
     val stagingDir = hadoopConf.get(hiveStagingDir, ".hive-staging")
     val scratchDir = hadoopConf.get(hiveScratchDir, "/tmp/hive")
 
-    if (hiveVersionsUsingOldExternalTempPath.contains(hiveVersion)) {
+    if (hiveVersion < "1.1") {
       oldVersionExternalTempPath(path, hadoopConf, scratchDir)
-    } else if (hiveVersionsUsingNewExternalTempPath.contains(hiveVersion)) {
-      newVersionExternalTempPath(path, hadoopConf, stagingDir)
     } else {
-      throw new IllegalStateException("Unsupported hive version: " + 
hiveVersion.fullVersion)
+      newVersionExternalTempPath(path, hadoopConf, stagingDir)
     }
   }
 
@@ -96,7 +87,7 @@ object HiveWriteHelper extends Logging {
     var dirPath = new Path(
       extURI.getScheme,
       extURI.getAuthority,
-      scratchPath.toUri.getPath + "-" + TaskRunner.getTaskRunnerID())
+      scratchPath.toUri.getPath + "-" + TaskRunner.getTaskRunnerID)
 
     try {
       val fs: FileSystem = dirPath.getFileSystem(hadoopConf)
@@ -120,19 +111,12 @@ object HiveWriteHelper extends Logging {
       stagingDir: String): Path = {
     val extURI: URI = path.toUri
     if (extURI.getScheme == "viewfs") {
-      getExtTmpPathRelTo(path, hadoopConf, stagingDir)
+      new Path(getStagingDir(path, hadoopConf, stagingDir), "-ext-10000") // 
Hive uses 10000
     } else {
       new Path(getExternalScratchDir(extURI, hadoopConf, stagingDir), 
"-ext-10000")
     }
   }
 
-  private def getExtTmpPathRelTo(
-      path: Path,
-      hadoopConf: Configuration,
-      stagingDir: String): Path = {
-    new Path(getStagingDir(path, hadoopConf, stagingDir), "-ext-10000") // 
Hive uses 10000
-  }
-
   private def getExternalScratchDir(
       extURI: URI,
       hadoopConf: Configuration,
diff --git a/pom.xml b/pom.xml
index cc0460304..3268a1b00 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2036,6 +2036,29 @@
             </properties>
         </profile>
 
+        <profile>
+            <id>spark-4.0</id>
+            <modules>
+                <module>extensions/spark/kyuubi-spark-connector-hive</module>
+            </modules>
+            <properties>
+                <spark.version>4.0.0-preview1</spark.version>
+                <spark.binary.version>4.0</spark.binary.version>
+                <antlr4.version>4.13.1</antlr4.version>
+                <!-- TODO: update once Delta support Spark 4.0 -->
+                <delta.version>3.2.0</delta.version>
+                
<delta.artifact>delta-spark_${scala.binary.version}</delta.artifact>
+                <!-- TODO: update once Hudi support Spark 4.0 -->
+                
<hudi.artifact>hudi-spark3.5-bundle_${scala.binary.version}</hudi.artifact>
+                <!-- TODO: update once Iceberg support Spark 4.0 -->
+                
<iceberg.artifact>iceberg-spark-runtime-3.5_${scala.binary.version}</iceberg.artifact>
+                <!-- TODO: update once Paimon support Spark 4.0 -->
+                <paimon.artifact>paimon-spark-3.5</paimon.artifact>
+                
<maven.plugin.scalatest.exclude.tags>org.scalatest.tags.Slow,org.apache.kyuubi.tags.DeltaTest,org.apache.kyuubi.tags.IcebergTest,org.apache.kyuubi.tags.PaimonTest,org.apache.kyuubi.tags.HudiTest</maven.plugin.scalatest.exclude.tags>
+                
<spark.archive.name>spark-${spark.version}-bin-hadoop3.tgz</spark.archive.name>
+            </properties>
+        </profile>
+
         <profile>
             <id>spark-master</id>
             <properties>

Reply via email to