This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 03dcedd89 [KYUUBI #6453] Make KSHC support Spark 4.0 and enable CI for
Spark 4.0
03dcedd89 is described below
commit 03dcedd89ed26fb21d0475b974a637e805e06d3b
Author: Cheng Pan <[email protected]>
AuthorDate: Fri Jun 7 11:01:24 2024 +0800
[KYUUBI #6453] Make KSHC support Spark 4.0 and enable CI for Spark 4.0
# :mag: Description
This PR makes KSHC support Spark 4.0, and also makes sure that the KSHC jar
compiled against Spark 3.5 is binary compatible with Spark 4.0.
We are ready to enable CI for Spark 4.0, except for authZ module.
## Types of changes :bookmark:
- [ ] Bugfix (non-breaking change which fixes an issue)
- [x] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing
functionality to change)
## Test Plan ๐งช
Pass GHA.
---
# Checklist ๐
- [x] This patch was not authored or co-authored using [Generative
Tooling](https://www.apache.org/legal/generative-tooling.html)
**Be nice. Be informative.**
Closes #6453 from pan3793/spark4-ci.
Closes #6453
695e3d7f7 [Cheng Pan] Update pom.xml
2eaa0f88a [Cheng Pan] Update .github/workflows/master.yml
b1f540a34 [Cheng Pan] cross test
562839982 [Cheng Pan] fix
9f0c2e1be [Cheng Pan] fix
45f182462 [Cheng Pan] kshc
227ef5bae [Cheng Pan] fix
690a3b8b2 [Cheng Pan] Revert "fix"
87fe7678b [Cheng Pan] fix
60f55dbed [Cheng Pan] CI for Spark 4.
Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
.github/workflows/master.yml | 11 +++++++
dev/kyuubi-codecov/pom.xml | 36 ++++++++++++++++++----
.../spark/connector/hive/HiveConnectorUtils.scala | 30 ++++++++++++++++--
.../spark/connector/hive/HiveTableCatalog.scala | 26 ++++++++--------
.../connector/hive/write/HiveWriteHelper.scala | 34 ++++++--------------
pom.xml | 23 ++++++++++++++
6 files changed, 113 insertions(+), 47 deletions(-)
diff --git a/.github/workflows/master.yml b/.github/workflows/master.yml
index 503e03147..23edb0afd 100644
--- a/.github/workflows/master.yml
+++ b/.github/workflows/master.yml
@@ -56,6 +56,11 @@ jobs:
exclude-tags: [""]
comment: ["normal"]
include:
+ - java: 17
+ spark: '4.0'
+ spark-archive: '-Pscala-2.13'
+ exclude-tags: ''
+ comment: 'normal'
- java: 8
spark: '3.5'
spark-archive:
'-Dspark.archive.mirror=https://archive.apache.org/dist/spark/spark-3.2.4
-Dspark.archive.name=spark-3.2.4-bin-hadoop3.2.tgz -Pzookeeper-3.6'
@@ -193,6 +198,12 @@ jobs:
- '3.4'
- '3.3'
comment: [ "normal" ]
+ include:
+ - java: 17
+ scala: "2.13"
+ spark-compile: "3.5"
+ spark-runtime: "4.0"
+ comment: "normal"
env:
SPARK_LOCAL_IP: localhost
TEST_MODULES: "extensions/spark/kyuubi-spark-connector-hive,\
diff --git a/dev/kyuubi-codecov/pom.xml b/dev/kyuubi-codecov/pom.xml
index f22cbe186..f224b8cff 100644
--- a/dev/kyuubi-codecov/pom.xml
+++ b/dev/kyuubi-codecov/pom.xml
@@ -121,12 +121,6 @@
<version>${project.version}</version>
</dependency>
- <dependency>
- <groupId>org.apache.kyuubi</groupId>
- <artifactId>kyuubi-spark-authz_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- </dependency>
-
<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-spark-connector-tpcds_${scala.binary.version}</artifactId>
@@ -169,6 +163,11 @@
<artifactId>kyuubi-extension-spark-3-2_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.kyuubi</groupId>
+
<artifactId>kyuubi-spark-authz_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
</profile>
<profile>
@@ -184,6 +183,11 @@
<artifactId>kyuubi-spark-connector-hive_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.kyuubi</groupId>
+
<artifactId>kyuubi-spark-authz_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
</profile>
<profile>
@@ -209,6 +213,26 @@
<artifactId>kyuubi-extension-spark-3-5_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.kyuubi</groupId>
+
<artifactId>kyuubi-spark-connector-hive_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kyuubi</groupId>
+
<artifactId>kyuubi-spark-authz_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+ </profile>
+ <profile>
+ <id>spark-4.0</id>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.kyuubi</groupId>
+
<artifactId>kyuubi-spark-connector-hive_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
</profile>
<profile>
diff --git
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala
index 8eaaa0102..0ccfd4912 100644
---
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala
+++
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala
@@ -83,7 +83,6 @@ object HiveConnectorUtils extends Logging {
}
}
- // SPARK-43039
def splitFiles(
sparkSession: SparkSession,
file: AnyRef,
@@ -92,7 +91,26 @@ object HiveConnectorUtils extends Logging {
maxSplitBytes: Long,
partitionValues: InternalRow): Seq[PartitionedFile] = {
- if (SPARK_RUNTIME_VERSION >= "3.5") {
+ if (SPARK_RUNTIME_VERSION >= "4.0") { // SPARK-42821
+ val fileStatusWithMetadataClz = DynClasses.builder()
+
.impl("org.apache.spark.sql.execution.datasources.FileStatusWithMetadata")
+ .build()
+ DynMethods
+ .builder("splitFiles")
+ .impl(
+ "org.apache.spark.sql.execution.PartitionedFileUtil",
+ fileStatusWithMetadataClz,
+ classOf[Boolean],
+ classOf[Long],
+ classOf[InternalRow])
+ .build()
+ .invoke[Seq[PartitionedFile]](
+ null,
+ file,
+ isSplitable.asInstanceOf[JBoolean],
+ maxSplitBytes.asInstanceOf[JLong],
+ partitionValues)
+ } else if (SPARK_RUNTIME_VERSION >= "3.5") { // SPARK-43039
val fileStatusWithMetadataClz = DynClasses.builder()
.impl("org.apache.spark.sql.execution.datasources.FileStatusWithMetadata")
.build()
@@ -384,7 +402,13 @@ object HiveConnectorUtils extends Logging {
new StructType(newFields)
}
- def withSQLConf[T](pairs: (String, String)*)(f: => T): T = {
+ // This is a fork of Spark's withSQLConf, and we use a different name to
avoid linkage
+ // issue on cross-version cases.
+ // For example, SPARK-46227(4.0.0) moves `withSQLConf` from SQLHelper to
SQLConfHelper,
+ // classes that extend SQLConfHelper will prefer to linkage super class's
method when
+ // compiling with Spark 4.0, then linkage error will happen when run the jar
with lower
+ // Spark versions.
+ def withSparkSQLConf[T](pairs: (String, String)*)(f: => T): T = {
val conf = SQLConf.get
val (keys, values) = pairs.unzip
val currentValues = keys.map { key =>
diff --git
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala
index c128d67f1..91088d787 100644
---
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala
+++
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala
@@ -44,7 +44,7 @@ import
org.apache.spark.sql.internal.StaticSQLConf.{CATALOG_IMPLEMENTATION, GLOB
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
-import org.apache.kyuubi.spark.connector.hive.HiveConnectorUtils.withSQLConf
+import
org.apache.kyuubi.spark.connector.hive.HiveConnectorUtils.withSparkSQLConf
import
org.apache.kyuubi.spark.connector.hive.HiveTableCatalog.{getStorageFormatAndProvider,
toCatalogDatabase, CatalogDatabaseHelper, IdentifierHelper, NamespaceHelper}
import
org.apache.kyuubi.spark.connector.hive.KyuubiHiveConnectorDelegationTokenProvider.metastoreTokenSignature
@@ -148,7 +148,7 @@ class HiveTableCatalog(sparkSession: SparkSession)
override val defaultNamespace: Array[String] = Array("default")
override def listTables(namespace: Array[String]): Array[Identifier] =
- withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
+ withSparkSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
namespace match {
case Array(db) =>
catalog
@@ -162,7 +162,7 @@ class HiveTableCatalog(sparkSession: SparkSession)
}
override def loadTable(ident: Identifier): Table =
- withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
+ withSparkSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
HiveTable(sparkSession,
catalog.getTableMetadata(ident.asTableIdentifier), this)
}
@@ -171,7 +171,7 @@ class HiveTableCatalog(sparkSession: SparkSession)
schema: StructType,
partitions: Array[Transform],
properties: util.Map[String, String]): Table =
- withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
+ withSparkSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
import
org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.TransformHelper
val (partitionColumns, maybeBucketSpec) =
partitions.toSeq.convertTransforms
val location = Option(properties.get(TableCatalog.PROP_LOCATION))
@@ -213,7 +213,7 @@ class HiveTableCatalog(sparkSession: SparkSession)
}
override def alterTable(ident: Identifier, changes: TableChange*): Table =
- withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
+ withSparkSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
val catalogTable =
try {
catalog.getTableMetadata(ident.asTableIdentifier)
@@ -253,7 +253,7 @@ class HiveTableCatalog(sparkSession: SparkSession)
}
override def dropTable(ident: Identifier): Boolean =
- withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
+ withSparkSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
try {
if (loadTable(ident) != null) {
catalog.dropTable(
@@ -271,7 +271,7 @@ class HiveTableCatalog(sparkSession: SparkSession)
}
override def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit =
- withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
+ withSparkSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
if (tableExists(newIdent)) {
throw new TableAlreadyExistsException(newIdent)
}
@@ -288,12 +288,12 @@ class HiveTableCatalog(sparkSession: SparkSession)
}
override def listNamespaces(): Array[Array[String]] =
- withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
+ withSparkSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
catalog.listDatabases().map(Array(_)).toArray
}
override def listNamespaces(namespace: Array[String]): Array[Array[String]] =
- withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
+ withSparkSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
namespace match {
case Array() =>
listNamespaces()
@@ -305,7 +305,7 @@ class HiveTableCatalog(sparkSession: SparkSession)
}
override def loadNamespaceMetadata(namespace: Array[String]):
util.Map[String, String] =
- withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
+ withSparkSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
namespace match {
case Array(db) =>
try {
@@ -323,7 +323,7 @@ class HiveTableCatalog(sparkSession: SparkSession)
override def createNamespace(
namespace: Array[String],
metadata: util.Map[String, String]): Unit =
- withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
+ withSparkSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
namespace match {
case Array(db) if !catalog.databaseExists(db) =>
catalog.createDatabase(
@@ -339,7 +339,7 @@ class HiveTableCatalog(sparkSession: SparkSession)
}
override def alterNamespace(namespace: Array[String], changes:
NamespaceChange*): Unit =
- withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
+ withSparkSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
namespace match {
case Array(db) =>
// validate that this catalog's reserved properties are not removed
@@ -379,7 +379,7 @@ class HiveTableCatalog(sparkSession: SparkSession)
override def dropNamespace(
namespace: Array[String],
cascade: Boolean): Boolean =
- withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
+ withSparkSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
namespace match {
case Array(db) if catalog.databaseExists(db) =>
catalog.dropDatabase(db, ignoreIfNotExists = false, cascade)
diff --git
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWriteHelper.scala
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWriteHelper.scala
index c3e73c011..558ac8ee9 100644
---
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWriteHelper.scala
+++
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWriteHelper.scala
@@ -28,7 +28,9 @@ import org.apache.hadoop.hive.common.FileUtils
import org.apache.hadoop.hive.ql.exec.TaskRunner
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener
-import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.{hive,
HiveExternalCatalog, HiveVersion}
+import
org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.HiveExternalCatalog
+
+import org.apache.kyuubi.util.SemanticVersion
// scalastyle:off line.size.limit
/**
@@ -48,8 +50,6 @@ object HiveWriteHelper extends Logging {
hadoopConf: Configuration,
path: Path): Path = {
- import hive._
-
// Before Hive 1.1, when inserting into a table, Hive will create the
staging directory under
// a common scratch directory. After the writing is finished, Hive will
simply empty the table
// directory and move the staging directory to it.
@@ -59,24 +59,15 @@ object HiveWriteHelper extends Logging {
// We have to follow the Hive behavior here, to avoid troubles. For
example, if we create
// staging directory under the table director for Hive prior to 1.1, the
staging directory will
// be removed by Hive when Hive is trying to empty the table directory.
- val hiveVersionsUsingOldExternalTempPath: Set[HiveVersion] = Set(v12, v13,
v14, v1_0)
- val hiveVersionsUsingNewExternalTempPath: Set[HiveVersion] =
- Set(v1_1, v1_2, v2_0, v2_1, v2_2, v2_3, v3_0, v3_1)
-
- // Ensure all the supported versions are considered here.
- assert(hiveVersionsUsingNewExternalTempPath ++
hiveVersionsUsingOldExternalTempPath ==
- allSupportedHiveVersions)
-
- val hiveVersion =
externalCatalog.unwrapped.asInstanceOf[HiveExternalCatalog].client.version
+ val hiveVersion = SemanticVersion(
+
externalCatalog.unwrapped.asInstanceOf[HiveExternalCatalog].client.version.fullVersion)
val stagingDir = hadoopConf.get(hiveStagingDir, ".hive-staging")
val scratchDir = hadoopConf.get(hiveScratchDir, "/tmp/hive")
- if (hiveVersionsUsingOldExternalTempPath.contains(hiveVersion)) {
+ if (hiveVersion < "1.1") {
oldVersionExternalTempPath(path, hadoopConf, scratchDir)
- } else if (hiveVersionsUsingNewExternalTempPath.contains(hiveVersion)) {
- newVersionExternalTempPath(path, hadoopConf, stagingDir)
} else {
- throw new IllegalStateException("Unsupported hive version: " +
hiveVersion.fullVersion)
+ newVersionExternalTempPath(path, hadoopConf, stagingDir)
}
}
@@ -96,7 +87,7 @@ object HiveWriteHelper extends Logging {
var dirPath = new Path(
extURI.getScheme,
extURI.getAuthority,
- scratchPath.toUri.getPath + "-" + TaskRunner.getTaskRunnerID())
+ scratchPath.toUri.getPath + "-" + TaskRunner.getTaskRunnerID)
try {
val fs: FileSystem = dirPath.getFileSystem(hadoopConf)
@@ -120,19 +111,12 @@ object HiveWriteHelper extends Logging {
stagingDir: String): Path = {
val extURI: URI = path.toUri
if (extURI.getScheme == "viewfs") {
- getExtTmpPathRelTo(path, hadoopConf, stagingDir)
+ new Path(getStagingDir(path, hadoopConf, stagingDir), "-ext-10000") //
Hive uses 10000
} else {
new Path(getExternalScratchDir(extURI, hadoopConf, stagingDir),
"-ext-10000")
}
}
- private def getExtTmpPathRelTo(
- path: Path,
- hadoopConf: Configuration,
- stagingDir: String): Path = {
- new Path(getStagingDir(path, hadoopConf, stagingDir), "-ext-10000") //
Hive uses 10000
- }
-
private def getExternalScratchDir(
extURI: URI,
hadoopConf: Configuration,
diff --git a/pom.xml b/pom.xml
index cc0460304..3268a1b00 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2036,6 +2036,29 @@
</properties>
</profile>
+ <profile>
+ <id>spark-4.0</id>
+ <modules>
+ <module>extensions/spark/kyuubi-spark-connector-hive</module>
+ </modules>
+ <properties>
+ <spark.version>4.0.0-preview1</spark.version>
+ <spark.binary.version>4.0</spark.binary.version>
+ <antlr4.version>4.13.1</antlr4.version>
+ <!-- TODO: update once Delta support Spark 4.0 -->
+ <delta.version>3.2.0</delta.version>
+
<delta.artifact>delta-spark_${scala.binary.version}</delta.artifact>
+ <!-- TODO: update once Hudi support Spark 4.0 -->
+
<hudi.artifact>hudi-spark3.5-bundle_${scala.binary.version}</hudi.artifact>
+ <!-- TODO: update once Iceberg support Spark 4.0 -->
+
<iceberg.artifact>iceberg-spark-runtime-3.5_${scala.binary.version}</iceberg.artifact>
+ <!-- TODO: update once Paimon support Spark 4.0 -->
+ <paimon.artifact>paimon-spark-3.5</paimon.artifact>
+
<maven.plugin.scalatest.exclude.tags>org.scalatest.tags.Slow,org.apache.kyuubi.tags.DeltaTest,org.apache.kyuubi.tags.IcebergTest,org.apache.kyuubi.tags.PaimonTest,org.apache.kyuubi.tags.HudiTest</maven.plugin.scalatest.exclude.tags>
+
<spark.archive.name>spark-${spark.version}-bin-hadoop3.tgz</spark.archive.name>
+ </properties>
+ </profile>
+
<profile>
<id>spark-master</id>
<properties>