This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch branch-1.9
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/branch-1.9 by this push:
     new 126ff4ef2 [KYUUBI #6247] Make KSHC binary compatible with multiple 
Spark versions
126ff4ef2 is described below

commit 126ff4ef221a83ac106101bb7baee74dc50f0a7e
Author: zhouyifan279 <[email protected]>
AuthorDate: Sat Jun 1 20:13:41 2024 +0800

    [KYUUBI #6247] Make KSHC binary compatible with multiple Spark versions
    
    # :mag: Description
    ## Issue References ๐Ÿ”—
    
    This pull request closes #6247
    
    This also closes #6431
    
    ## Describe Your Solution ๐Ÿ”ง
    Add a job `spark-connector-cross-version-test` in GitHub Actions to:
    1. Build KSHC package with maven opt `-Pspark-3.5`
    2. Run KSHC tests with maven opt `-Pspark-3.3` and `-Pspark-3.4` and KSHC 
package built in step 1
    3. Fix the binary-compatible issue via reflection.
    
    ## Types of changes :bookmark:
    
    - [ ] Bugfix (non-breaking change which fixes an issue)
    - [x] New feature (non-breaking change which adds functionality)
    - [ ] Breaking change (fix or feature that would cause existing 
functionality to change)
    
    ## Test Plan ๐Ÿงช
    
    Pass GHA.
    
    ---
    
    # Checklist ๐Ÿ“
    
    - [x] This patch was not authored or co-authored using [Generative 
Tooling](https://www.apache.org/legal/generative-tooling.html)
    
    **Be nice. Be informative.**
    
    Closes #6436 from zhouyifan279/kshc-cross-version-test.
    
    Closes #6247
    
    d3ac2ef47 [zhouyifan279] Tune the KSHC code to fix binary-compatible issues
    4e14edcb5 [zhouyifan279] Fix invalid unit-tests-log name
    56ca45d18 [zhouyifan279] Fix invalid unit-tests-log name
    4c5ab7b9e [zhouyifan279] Update test log name
    8a84e8812 [zhouyifan279] Add matrix scala
    17cb67155 [zhouyifan279] [KYUUBI #6247] KSHC cross-version test
    
    Authored-by: zhouyifan279 <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
    (cherry picked from commit 3ed912f5de7857b442d541f339e36d87d4e54fcb)
    Signed-off-by: Cheng Pan <[email protected]>
---
 .github/workflows/master.yml                       | 55 ++++++++++++++++++++++
 .../spark/kyuubi-spark-connector-hive/pom.xml      | 41 ++++++++++++++++
 .../spark/connector/hive/HiveConnectorUtils.scala  | 47 +++++++++++++++---
 .../spark/connector/hive/read/HiveFileIndex.scala  | 11 ++---
 .../spark/connector/hive/read/HiveScan.scala       |  4 +-
 5 files changed, 143 insertions(+), 15 deletions(-)

diff --git a/.github/workflows/master.yml b/.github/workflows/master.yml
index 95288b089..8658aaf25 100644
--- a/.github/workflows/master.yml
+++ b/.github/workflows/master.yml
@@ -175,6 +175,61 @@ jobs:
             **/kyuubi-jdbc-engine.log*
             **/kyuubi-hive-sql-engine.log*
 
+  spark-connector-cross-version-test:
+    name: Spark Connector Cross Version Test
+    runs-on: ubuntu-22.04
+    strategy:
+      fail-fast: false
+      max-parallel: 1
+      matrix:
+        java:
+          - 17
+        scala:
+          - '2.12'
+          - '2.13'
+        spark-compile:
+            - '3.5'
+        spark-runtime:
+            - '3.4'
+            - '3.3'
+        comment: [ "normal" ]
+    env:
+      SPARK_LOCAL_IP: localhost
+    steps:
+      - uses: actions/checkout@v4
+      - name: Free up disk space
+        run: ./.github/scripts/free_disk_space.sh
+      - name: Tune Runner VM
+        uses: ./.github/actions/tune-runner-vm
+      - name: Setup JDK ${{ matrix.java }}
+        uses: actions/setup-java@v4
+        with:
+          distribution: temurin
+          java-version: ${{ matrix.java }}
+          cache: 'maven'
+          check-latest: false
+      - name: Setup Maven
+        uses: ./.github/actions/setup-maven
+      - name: Build Kyuubi Spark Hive Connector with Spark-${{ 
matrix.spark-compile }}
+        run: |
+          ./build/mvn clean install ${MVN_OPT} -pl 
extensions/spark/kyuubi-spark-connector-hive -am \
+          -Pjava-${{ matrix.java }} -Pscala-${{ matrix.scala }} -Pspark-${{ 
matrix.spark-compile }} \
+          -DskipTests
+      - name: Test Kyuubi Spark Hive Connector with Spark-${{ 
matrix.spark-runtime }}
+        run: |
+          ./build/mvn test ${MVN_OPT} -pl 
extensions/spark/kyuubi-spark-connector-hive \
+          -Pjava-${{ matrix.java }} -Pscala-${{ matrix.scala }} -Pspark-${{ 
matrix.spark-runtime }} \
+          -Pcross-version-test
+      - name: Upload test logs
+        if: failure()
+        uses: actions/upload-artifact@v3
+        with:
+          name: "unit-tests-log-java-${{ matrix.java }}-scala-${{ matrix.scala 
}}\
+            -spark-compile-${{ matrix.spark-compile }}-spark-runtime-${{ 
matrix.spark-runtime }}\
+            -${{ matrix.comment }}"
+          path: |
+            **/target/unit-tests.log
+
   flink-it:
     name: Flink Test
     runs-on: ubuntu-22.04
diff --git a/extensions/spark/kyuubi-spark-connector-hive/pom.xml 
b/extensions/spark/kyuubi-spark-connector-hive/pom.xml
index 3a7bcee73..a80d4893e 100644
--- a/extensions/spark/kyuubi-spark-connector-hive/pom.xml
+++ b/extensions/spark/kyuubi-spark-connector-hive/pom.xml
@@ -182,4 +182,45 @@
         
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
         
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
     </build>
+
+    <profiles>
+        <profile>
+            <id>cross-version-test</id>
+            <dependencies>
+                <dependency>
+                    <groupId>org.apache.kyuubi</groupId>
+                    
<artifactId>kyuubi-spark-connector-hive-local_${scala.binary.version}</artifactId>
+                    <version>${project.version}</version>
+                    <scope>system</scope>
+                    
<systemPath>${project.basedir}/target/kyuubi-spark-connector-hive_${scala.binary.version}-${project.version}.jar</systemPath>
+                </dependency>
+            </dependencies>
+            <build>
+                <plugins>
+                    <plugin>
+                        <groupId>org.apache.maven.plugins</groupId>
+                        <artifactId>maven-clean-plugin</artifactId>
+                        <configuration>
+                            
<excludeDefaultDirectories>true</excludeDefaultDirectories>
+                            <filesets>
+                                <fileset>
+                                    
<directory>target/scala-${scala.binary.version}/classes</directory>
+                                    <includes>**/*.*</includes>
+                                </fileset>
+                            </filesets>
+                        </configuration>
+                        <executions>
+                            <execution>
+                                <id>clean 
target/scala-${scala.binary.version}/classes</id>
+                                <goals>
+                                    <goal>clean</goal>
+                                </goals>
+                                <phase>process-test-classes</phase>
+                            </execution>
+                        </executions>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
+    </profiles>
 </project>
diff --git 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala
 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala
index fd8b08f56..8eaaa0102 100644
--- 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala
+++ 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.connector.catalog.TableChange
 import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, After, 
ColumnPosition, DeleteColumn, First, RenameColumn, UpdateColumnComment, 
UpdateColumnNullability, UpdateColumnPosition, UpdateColumnType}
 import org.apache.spark.sql.execution.command.CommandUtils
 import 
org.apache.spark.sql.execution.command.CommandUtils.{calculateMultipleLocationSizes,
 calculateSingleLocationSize}
-import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.execution.datasources.{PartitionDirectory, 
PartitionedFile}
 import org.apache.spark.sql.hive.execution.HiveFileFormat
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType}
@@ -68,8 +68,7 @@ object HiveConnectorUtils extends Logging {
         .build[HiveFileFormat]()
         .newInstance(shimFileSinkDesc)
     } else {
-      throw KyuubiHiveConnectorException(s"Spark version $SPARK_VERSION " +
-        s"is not supported by Kyuubi spark hive connector.")
+      throw unsupportedSparkVersion()
     }
   }
 
@@ -80,8 +79,7 @@ object HiveConnectorUtils extends Logging {
     } else if (SPARK_RUNTIME_VERSION >= "3.3") {
       invokeAs[String](file, "filePath")
     } else {
-      throw KyuubiHiveConnectorException(s"Spark version $SPARK_VERSION " +
-        s"is not supported by Kyuubi spark hive connector.")
+      throw unsupportedSparkVersion()
     }
   }
 
@@ -136,11 +134,46 @@ object HiveConnectorUtils extends Logging {
           maxSplitBytes.asInstanceOf[JLong],
           partitionValues)
     } else {
-      throw KyuubiHiveConnectorException(s"Spark version $SPARK_VERSION " +
-        s"is not supported by Kyuubi spark hive connector.")
+      throw unsupportedSparkVersion()
     }
   }
 
+  def createPartitionDirectory(values: InternalRow, files: Seq[FileStatus]): 
PartitionDirectory = {
+    if (SPARK_RUNTIME_VERSION >= "3.5") {
+      new DynMethods.Builder("apply")
+        .impl(classOf[PartitionDirectory], classOf[InternalRow], 
classOf[Array[FileStatus]])
+        .buildChecked()
+        .asStatic()
+        .invoke[PartitionDirectory](values, files.toArray)
+    } else if (SPARK_RUNTIME_VERSION >= "3.3") {
+      new DynMethods.Builder("apply")
+        .impl(classOf[PartitionDirectory], classOf[InternalRow], 
classOf[Seq[FileStatus]])
+        .buildChecked()
+        .asStatic()
+        .invoke[PartitionDirectory](values, files)
+    } else {
+      throw unsupportedSparkVersion()
+    }
+  }
+
+  def getPartitionFilePath(file: AnyRef): Path = {
+    if (SPARK_RUNTIME_VERSION >= "3.5") {
+      new DynMethods.Builder("getPath")
+        
.impl("org.apache.spark.sql.execution.datasources.FileStatusWithMetadata")
+        .build()
+        .invoke[Path](file)
+    } else if (SPARK_RUNTIME_VERSION >= "3.3") {
+      file.asInstanceOf[FileStatus].getPath
+    } else {
+      throw unsupportedSparkVersion()
+    }
+  }
+
+  private def unsupportedSparkVersion(): KyuubiHiveConnectorException = {
+    KyuubiHiveConnectorException(s"Spark version $SPARK_VERSION " +
+      "is not supported by Kyuubi spark hive connector.")
+  }
+
   def calculateTotalSize(
       spark: SparkSession,
       catalogTable: CatalogTable,
diff --git 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileIndex.scala
 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileIndex.scala
index 5e24199f8..55c9168ed 100644
--- 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileIndex.scala
+++ 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileIndex.scala
@@ -20,7 +20,6 @@ package org.apache.kyuubi.spark.connector.hive.read
 import java.net.URI
 
 import scala.collection.mutable
-import scala.language.implicitConversions
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, Path}
@@ -32,7 +31,7 @@ import org.apache.spark.sql.connector.catalog.CatalogPlugin
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.types.StructType
 
-import org.apache.kyuubi.spark.connector.hive.{HiveTableCatalog, 
KyuubiHiveConnectorException}
+import org.apache.kyuubi.spark.connector.hive.{HiveConnectorUtils, 
HiveTableCatalog, KyuubiHiveConnectorException}
 
 class HiveCatalogFileIndex(
     sparkSession: SparkSession,
@@ -157,8 +156,6 @@ class HiveInMemoryFileIndex(
   private val partDirToBindHivePart: mutable.Map[PartitionDirectory, 
CatalogTablePartition] =
     mutable.Map()
 
-  implicit private def seqToArr(seq: Seq[FileStatus]): Array[FileStatus] = 
seq.toArray
-
   override def listFiles(
       partitionFilters: Seq[Expression],
       dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {
@@ -167,7 +164,9 @@ class HiveInMemoryFileIndex(
     }
     val selectedPartitions =
       if (partitionSpec().partitionColumns.isEmpty) {
-        PartitionDirectory(InternalRow.empty, 
allFiles().filter(isNonEmptyFile)) :: Nil
+        HiveConnectorUtils.createPartitionDirectory(
+          InternalRow.empty,
+          allFiles().filter(isNonEmptyFile)) :: Nil
       } else {
         if (recursiveFileLookup) {
           throw new IllegalArgumentException(
@@ -184,7 +183,7 @@ class HiveInMemoryFileIndex(
                 // Directory does not exist, or has no children files
                 Nil
             }
-            val partDir = PartitionDirectory(values, files)
+            val partDir = HiveConnectorUtils.createPartitionDirectory(values, 
files)
             // Update Partition Directory -> binding Hive part map
             updatePartDirHivePartitionMapping(partDir, partPath)
 
diff --git 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveScan.scala
 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveScan.scala
index 816c2661d..ab6cfd912 100644
--- 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveScan.scala
+++ 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveScan.scala
@@ -103,8 +103,8 @@ case class HiveScan(
         } else {
           partition.values
         }
-      partition.files.flatMap { file =>
-        val filePath = file.getPath
+      partition.files.asInstanceOf[Seq[AnyRef]].flatMap { file =>
+        val filePath = HiveConnectorUtils.getPartitionFilePath(file)
         val partFiles = HiveConnectorUtils.splitFiles(
           sparkSession = sparkSession,
           file = file,

Reply via email to