This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 3ed912f5d [KYUUBI #6247] Make KSHC binary compatible with multiple
Spark versions
3ed912f5d is described below
commit 3ed912f5de7857b442d541f339e36d87d4e54fcb
Author: zhouyifan279 <[email protected]>
AuthorDate: Sat Jun 1 20:13:41 2024 +0800
[KYUUBI #6247] Make KSHC binary compatible with multiple Spark versions
# :mag: Description
## Issue References ๐
This pull request closes #6247
This also closes #6431
## Describe Your Solution ๐ง
Add a job `spark-connector-cross-version-test` in GitHub Actions to:
1. Build KSHC package with maven opt `-Pspark-3.5`
2. Run KSHC tests with maven opt `-Pspark-3.3` and `-Pspark-3.4` and KSHC
package built in step 1
3. Fix the binary-compatible issue via reflection.
## Types of changes :bookmark:
- [ ] Bugfix (non-breaking change which fixes an issue)
- [x] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing
functionality to change)
## Test Plan ๐งช
Pass GHA.
---
# Checklist ๐
- [x] This patch was not authored or co-authored using [Generative
Tooling](https://www.apache.org/legal/generative-tooling.html)
**Be nice. Be informative.**
Closes #6436 from zhouyifan279/kshc-cross-version-test.
Closes #6247
d3ac2ef47 [zhouyifan279] Tune the KSHC code to fix binary-compatible issues
4e14edcb5 [zhouyifan279] Fix invalid unit-tests-log name
56ca45d18 [zhouyifan279] Fix invalid unit-tests-log name
4c5ab7b9e [zhouyifan279] Update test log name
8a84e8812 [zhouyifan279] Add matrix scala
17cb67155 [zhouyifan279] [KYUUBI #6247] KSHC cross-version test
Authored-by: zhouyifan279 <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
.github/workflows/master.yml | 55 ++++++++++++++++++++++
.../spark/kyuubi-spark-connector-hive/pom.xml | 41 ++++++++++++++++
.../spark/connector/hive/HiveConnectorUtils.scala | 47 +++++++++++++++---
.../spark/connector/hive/read/HiveFileIndex.scala | 11 ++---
.../spark/connector/hive/read/HiveScan.scala | 4 +-
5 files changed, 143 insertions(+), 15 deletions(-)
diff --git a/.github/workflows/master.yml b/.github/workflows/master.yml
index ad7e85606..6b4cb552e 100644
--- a/.github/workflows/master.yml
+++ b/.github/workflows/master.yml
@@ -170,6 +170,61 @@ jobs:
**/kyuubi-jdbc-engine.log*
**/kyuubi-hive-sql-engine.log*
+ spark-connector-cross-version-test:
+ name: Spark Connector Cross Version Test
+ runs-on: ubuntu-22.04
+ strategy:
+ fail-fast: false
+ max-parallel: 1
+ matrix:
+ java:
+ - 17
+ scala:
+ - '2.12'
+ - '2.13'
+ spark-compile:
+ - '3.5'
+ spark-runtime:
+ - '3.4'
+ - '3.3'
+ comment: [ "normal" ]
+ env:
+ SPARK_LOCAL_IP: localhost
+ steps:
+ - uses: actions/checkout@v4
+ - name: Free up disk space
+ run: ./.github/scripts/free_disk_space.sh
+ - name: Tune Runner VM
+ uses: ./.github/actions/tune-runner-vm
+ - name: Setup JDK ${{ matrix.java }}
+ uses: actions/setup-java@v4
+ with:
+ distribution: temurin
+ java-version: ${{ matrix.java }}
+ cache: 'maven'
+ check-latest: false
+ - name: Setup Maven
+ uses: ./.github/actions/setup-maven
+ - name: Build Kyuubi Spark Hive Connector with Spark-${{
matrix.spark-compile }}
+ run: |
+ ./build/mvn clean install ${MVN_OPT} -pl
extensions/spark/kyuubi-spark-connector-hive -am \
+ -Pjava-${{ matrix.java }} -Pscala-${{ matrix.scala }} -Pspark-${{
matrix.spark-compile }} \
+ -DskipTests
+ - name: Test Kyuubi Spark Hive Connector with Spark-${{
matrix.spark-runtime }}
+ run: |
+ ./build/mvn test ${MVN_OPT} -pl
extensions/spark/kyuubi-spark-connector-hive \
+ -Pjava-${{ matrix.java }} -Pscala-${{ matrix.scala }} -Pspark-${{
matrix.spark-runtime }} \
+ -Pcross-version-test
+ - name: Upload test logs
+ if: failure()
+ uses: actions/upload-artifact@v3
+ with:
+ name: "unit-tests-log-java-${{ matrix.java }}-scala-${{ matrix.scala
}}\
+ -spark-compile-${{ matrix.spark-compile }}-spark-runtime-${{
matrix.spark-runtime }}\
+ -${{ matrix.comment }}"
+ path: |
+ **/target/unit-tests.log
+
flink-it:
name: Flink Test
runs-on: ubuntu-22.04
diff --git a/extensions/spark/kyuubi-spark-connector-hive/pom.xml
b/extensions/spark/kyuubi-spark-connector-hive/pom.xml
index 37cab8cfc..a2a783e21 100644
--- a/extensions/spark/kyuubi-spark-connector-hive/pom.xml
+++ b/extensions/spark/kyuubi-spark-connector-hive/pom.xml
@@ -182,4 +182,45 @@
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
</build>
+
+ <profiles>
+ <profile>
+ <id>cross-version-test</id>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.kyuubi</groupId>
+
<artifactId>kyuubi-spark-connector-hive-local_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>system</scope>
+
<systemPath>${project.basedir}/target/kyuubi-spark-connector-hive_${scala.binary.version}-${project.version}.jar</systemPath>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-clean-plugin</artifactId>
+ <configuration>
+
<excludeDefaultDirectories>true</excludeDefaultDirectories>
+ <filesets>
+ <fileset>
+
<directory>target/scala-${scala.binary.version}/classes</directory>
+ <includes>**/*.*</includes>
+ </fileset>
+ </filesets>
+ </configuration>
+ <executions>
+ <execution>
+ <id>clean
target/scala-${scala.binary.version}/classes</id>
+ <goals>
+ <goal>clean</goal>
+ </goals>
+ <phase>process-test-classes</phase>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
</project>
diff --git
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala
index fd8b08f56..8eaaa0102 100644
---
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala
+++
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.connector.catalog.TableChange
import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, After,
ColumnPosition, DeleteColumn, First, RenameColumn, UpdateColumnComment,
UpdateColumnNullability, UpdateColumnPosition, UpdateColumnType}
import org.apache.spark.sql.execution.command.CommandUtils
import
org.apache.spark.sql.execution.command.CommandUtils.{calculateMultipleLocationSizes,
calculateSingleLocationSize}
-import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.execution.datasources.{PartitionDirectory,
PartitionedFile}
import org.apache.spark.sql.hive.execution.HiveFileFormat
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType}
@@ -68,8 +68,7 @@ object HiveConnectorUtils extends Logging {
.build[HiveFileFormat]()
.newInstance(shimFileSinkDesc)
} else {
- throw KyuubiHiveConnectorException(s"Spark version $SPARK_VERSION " +
- s"is not supported by Kyuubi spark hive connector.")
+ throw unsupportedSparkVersion()
}
}
@@ -80,8 +79,7 @@ object HiveConnectorUtils extends Logging {
} else if (SPARK_RUNTIME_VERSION >= "3.3") {
invokeAs[String](file, "filePath")
} else {
- throw KyuubiHiveConnectorException(s"Spark version $SPARK_VERSION " +
- s"is not supported by Kyuubi spark hive connector.")
+ throw unsupportedSparkVersion()
}
}
@@ -136,11 +134,46 @@ object HiveConnectorUtils extends Logging {
maxSplitBytes.asInstanceOf[JLong],
partitionValues)
} else {
- throw KyuubiHiveConnectorException(s"Spark version $SPARK_VERSION " +
- s"is not supported by Kyuubi spark hive connector.")
+ throw unsupportedSparkVersion()
}
}
+ def createPartitionDirectory(values: InternalRow, files: Seq[FileStatus]):
PartitionDirectory = {
+ if (SPARK_RUNTIME_VERSION >= "3.5") {
+ new DynMethods.Builder("apply")
+ .impl(classOf[PartitionDirectory], classOf[InternalRow],
classOf[Array[FileStatus]])
+ .buildChecked()
+ .asStatic()
+ .invoke[PartitionDirectory](values, files.toArray)
+ } else if (SPARK_RUNTIME_VERSION >= "3.3") {
+ new DynMethods.Builder("apply")
+ .impl(classOf[PartitionDirectory], classOf[InternalRow],
classOf[Seq[FileStatus]])
+ .buildChecked()
+ .asStatic()
+ .invoke[PartitionDirectory](values, files)
+ } else {
+ throw unsupportedSparkVersion()
+ }
+ }
+
+ def getPartitionFilePath(file: AnyRef): Path = {
+ if (SPARK_RUNTIME_VERSION >= "3.5") {
+ new DynMethods.Builder("getPath")
+
.impl("org.apache.spark.sql.execution.datasources.FileStatusWithMetadata")
+ .build()
+ .invoke[Path](file)
+ } else if (SPARK_RUNTIME_VERSION >= "3.3") {
+ file.asInstanceOf[FileStatus].getPath
+ } else {
+ throw unsupportedSparkVersion()
+ }
+ }
+
+ private def unsupportedSparkVersion(): KyuubiHiveConnectorException = {
+ KyuubiHiveConnectorException(s"Spark version $SPARK_VERSION " +
+ "is not supported by Kyuubi spark hive connector.")
+ }
+
def calculateTotalSize(
spark: SparkSession,
catalogTable: CatalogTable,
diff --git
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileIndex.scala
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileIndex.scala
index 5e24199f8..55c9168ed 100644
---
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileIndex.scala
+++
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileIndex.scala
@@ -20,7 +20,6 @@ package org.apache.kyuubi.spark.connector.hive.read
import java.net.URI
import scala.collection.mutable
-import scala.language.implicitConversions
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
@@ -32,7 +31,7 @@ import org.apache.spark.sql.connector.catalog.CatalogPlugin
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.types.StructType
-import org.apache.kyuubi.spark.connector.hive.{HiveTableCatalog,
KyuubiHiveConnectorException}
+import org.apache.kyuubi.spark.connector.hive.{HiveConnectorUtils,
HiveTableCatalog, KyuubiHiveConnectorException}
class HiveCatalogFileIndex(
sparkSession: SparkSession,
@@ -157,8 +156,6 @@ class HiveInMemoryFileIndex(
private val partDirToBindHivePart: mutable.Map[PartitionDirectory,
CatalogTablePartition] =
mutable.Map()
- implicit private def seqToArr(seq: Seq[FileStatus]): Array[FileStatus] =
seq.toArray
-
override def listFiles(
partitionFilters: Seq[Expression],
dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {
@@ -167,7 +164,9 @@ class HiveInMemoryFileIndex(
}
val selectedPartitions =
if (partitionSpec().partitionColumns.isEmpty) {
- PartitionDirectory(InternalRow.empty,
allFiles().filter(isNonEmptyFile)) :: Nil
+ HiveConnectorUtils.createPartitionDirectory(
+ InternalRow.empty,
+ allFiles().filter(isNonEmptyFile)) :: Nil
} else {
if (recursiveFileLookup) {
throw new IllegalArgumentException(
@@ -184,7 +183,7 @@ class HiveInMemoryFileIndex(
// Directory does not exist, or has no children files
Nil
}
- val partDir = PartitionDirectory(values, files)
+ val partDir = HiveConnectorUtils.createPartitionDirectory(values,
files)
// Update Partition Directory -> binding Hive part map
updatePartDirHivePartitionMapping(partDir, partPath)
diff --git
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveScan.scala
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveScan.scala
index 816c2661d..ab6cfd912 100644
---
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveScan.scala
+++
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveScan.scala
@@ -103,8 +103,8 @@ case class HiveScan(
} else {
partition.values
}
- partition.files.flatMap { file =>
- val filePath = file.getPath
+ partition.files.asInstanceOf[Seq[AnyRef]].flatMap { file =>
+ val filePath = HiveConnectorUtils.getPartitionFilePath(file)
val partFiles = HiveConnectorUtils.splitFiles(
sparkSession = sparkSession,
file = file,