This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 8cc9b98e2 [KYUUBI #5384][KSCH] Hive connector supports Spark 3.5
8cc9b98e2 is described below
commit 8cc9b98e25f443e6337176914e6472a29e336f62
Author: Cheng Pan <[email protected]>
AuthorDate: Thu Mar 7 17:56:30 2024 +0800
[KYUUBI #5384][KSCH] Hive connector supports Spark 3.5
# :mag: Description
## Issue References ๐
This pull request fixes #5384
## Describe Your Solution ๐ง
## Types of changes :bookmark:
- [ ] Bugfix (non-breaking change which fixes an issue)
- [x] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing
functionality to change)
## Test Plan ๐งช
#### Behavior Without This Pull Request :coffin:
#### Behavior With This Pull Request :tada:
#### Related Unit Tests
---
# Checklist ๐
- [ ] This patch was not authored or co-authored using [Generative
Tooling](https://www.apache.org/legal/generative-tooling.html)
**Be nice. Be informative.**
Closes #6133 from Kwafoor/kyuubi_6073.
Closes #5384
9234e35ad [Cheng Pan] fix
7766dfda5 [Cheng Pan] nit
e9da162f8 [Cheng Pan] nit
676bfb26e [Cheng Pan] pretty
c241859af [Cheng Pan] pretty
0eedcf82c [wangjunbo] compat with spark 3.3
3d866546c [wangjunbo] format code
a0898f50f [wangjunbo] delete Unused import
9577f7fe8 [wangjunbo] [KYUUBI #5384] kyuubi-spark-connector-hive supports
Spark 3.5
Lead-authored-by: Cheng Pan <[email protected]>
Co-authored-by: wangjunbo <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
.../spark/connector/hive/HiveConnectorUtils.scala | 96 ++++++++++++++++++++++
.../spark/connector/hive/read/HiveFileIndex.scala | 3 +
.../spark/connector/hive/read/HiveScan.scala | 3 +-
.../spark/connector/hive/write/HiveWrite.scala | 11 +--
.../hive/kyuubi/connector/HiveBridgeHelper.scala | 5 +-
.../spark/connector/hive/HiveCatalogSuite.scala | 8 +-
.../spark/connector/hive/HiveQuerySuite.scala | 8 +-
pom.xml | 1 +
8 files changed, 117 insertions(+), 18 deletions(-)
diff --git
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala
index 615093186..fd8b08f56 100644
---
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala
+++
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala
@@ -17,23 +17,63 @@
package org.apache.kyuubi.spark.connector.hive
+import java.lang.{Boolean => JBoolean, Long => JLong}
+
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.hive.ql.plan.{FileSinkDesc, TableDesc}
import org.apache.spark.SPARK_VERSION
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.catalog.{CatalogTable,
CatalogTablePartition}
import org.apache.spark.sql.connector.catalog.TableChange
import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, After,
ColumnPosition, DeleteColumn, First, RenameColumn, UpdateColumnComment,
UpdateColumnNullability, UpdateColumnPosition, UpdateColumnType}
import org.apache.spark.sql.execution.command.CommandUtils
import
org.apache.spark.sql.execution.command.CommandUtils.{calculateMultipleLocationSizes,
calculateSingleLocationSize}
import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.hive.execution.HiveFileFormat
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType}
import
org.apache.kyuubi.spark.connector.common.SparkUtils.SPARK_RUNTIME_VERSION
+import org.apache.kyuubi.util.reflect.{DynClasses, DynConstructors, DynMethods}
import org.apache.kyuubi.util.reflect.ReflectUtils.invokeAs
object HiveConnectorUtils extends Logging {
+ // SPARK-43186
+ def getHiveFileFormat(fileSinkConf: FileSinkDesc): HiveFileFormat = {
+ if (SPARK_RUNTIME_VERSION >= "3.5") {
+ DynConstructors.builder()
+ .impl(classOf[HiveFileFormat], classOf[FileSinkDesc])
+ .build[HiveFileFormat]()
+ .newInstance(fileSinkConf)
+ } else if (SPARK_RUNTIME_VERSION >= "3.3") {
+ val shimFileSinkDescClz = DynClasses.builder()
+ .impl("org.apache.spark.sql.hive.HiveShim$ShimFileSinkDesc")
+ .build()
+ val shimFileSinkDesc = DynConstructors.builder()
+ .impl(
+ "org.apache.spark.sql.hive.HiveShim$ShimFileSinkDesc",
+ classOf[String],
+ classOf[TableDesc],
+ classOf[Boolean])
+ .build[AnyRef]()
+ .newInstance(
+ fileSinkConf.getDirName.toString,
+ fileSinkConf.getTableInfo,
+ fileSinkConf.getCompressed.asInstanceOf[JBoolean])
+ DynConstructors.builder()
+ .impl(classOf[HiveFileFormat], shimFileSinkDescClz)
+ .build[HiveFileFormat]()
+ .newInstance(shimFileSinkDesc)
+ } else {
+ throw KyuubiHiveConnectorException(s"Spark version $SPARK_VERSION " +
+ s"is not supported by Kyuubi spark hive connector.")
+ }
+ }
+
+ // SPARK-41970
def partitionedFilePath(file: PartitionedFile): String = {
if (SPARK_RUNTIME_VERSION >= "3.4") {
invokeAs[String](file, "urlEncodedPath")
@@ -45,6 +85,62 @@ object HiveConnectorUtils extends Logging {
}
}
+ // SPARK-43039
+ def splitFiles(
+ sparkSession: SparkSession,
+ file: AnyRef,
+ filePath: Path,
+ isSplitable: Boolean,
+ maxSplitBytes: Long,
+ partitionValues: InternalRow): Seq[PartitionedFile] = {
+
+ if (SPARK_RUNTIME_VERSION >= "3.5") {
+ val fileStatusWithMetadataClz = DynClasses.builder()
+
.impl("org.apache.spark.sql.execution.datasources.FileStatusWithMetadata")
+ .build()
+ DynMethods
+ .builder("splitFiles")
+ .impl(
+ "org.apache.spark.sql.execution.PartitionedFileUtil",
+ classOf[SparkSession],
+ fileStatusWithMetadataClz,
+ classOf[Boolean],
+ classOf[Long],
+ classOf[InternalRow])
+ .build()
+ .invoke[Seq[PartitionedFile]](
+ null,
+ sparkSession,
+ file,
+ isSplitable.asInstanceOf[JBoolean],
+ maxSplitBytes.asInstanceOf[JLong],
+ partitionValues)
+ } else if (SPARK_RUNTIME_VERSION >= "3.3") {
+ DynMethods
+ .builder("splitFiles")
+ .impl(
+ "org.apache.spark.sql.execution.PartitionedFileUtil",
+ classOf[SparkSession],
+ classOf[FileStatus],
+ classOf[Path],
+ classOf[Boolean],
+ classOf[Long],
+ classOf[InternalRow])
+ .build()
+ .invoke[Seq[PartitionedFile]](
+ null,
+ sparkSession,
+ file,
+ filePath,
+ isSplitable.asInstanceOf[JBoolean],
+ maxSplitBytes.asInstanceOf[JLong],
+ partitionValues)
+ } else {
+ throw KyuubiHiveConnectorException(s"Spark version $SPARK_VERSION " +
+ s"is not supported by Kyuubi spark hive connector.")
+ }
+ }
+
def calculateTotalSize(
spark: SparkSession,
catalogTable: CatalogTable,
diff --git
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileIndex.scala
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileIndex.scala
index 0d79621f8..5e24199f8 100644
---
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileIndex.scala
+++
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileIndex.scala
@@ -20,6 +20,7 @@ package org.apache.kyuubi.spark.connector.hive.read
import java.net.URI
import scala.collection.mutable
+import scala.language.implicitConversions
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
@@ -156,6 +157,8 @@ class HiveInMemoryFileIndex(
private val partDirToBindHivePart: mutable.Map[PartitionDirectory,
CatalogTablePartition] =
mutable.Map()
+ implicit private def seqToArr(seq: Seq[FileStatus]): Array[FileStatus] =
seq.toArray
+
override def listFiles(
partitionFilters: Seq[Expression],
dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {
diff --git
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveScan.scala
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveScan.scala
index ecdfc76c5..816c2661d 100644
---
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveScan.scala
+++
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveScan.scala
@@ -28,7 +28,6 @@ import
org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable
import org.apache.spark.sql.catalyst.expressions.{AttributeReference,
Expression}
import
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.connector.read.PartitionReaderFactory
-import org.apache.spark.sql.execution.PartitionedFileUtil
import org.apache.spark.sql.execution.datasources.{FilePartition,
PartitionedFile}
import org.apache.spark.sql.execution.datasources.v2.FileScan
import
org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.HiveClientImpl
@@ -106,7 +105,7 @@ case class HiveScan(
}
partition.files.flatMap { file =>
val filePath = file.getPath
- val partFiles = PartitionedFileUtil.splitFiles(
+ val partFiles = HiveConnectorUtils.splitFiles(
sparkSession = sparkSession,
file = file,
filePath = filePath,
diff --git
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWrite.scala
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWrite.scala
index 957c19582..df18f1f8b 100644
---
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWrite.scala
+++
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWrite.scala
@@ -23,7 +23,7 @@ import scala.collection.JavaConverters._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
-import org.apache.hadoop.hive.ql.plan.TableDesc
+import org.apache.hadoop.hive.ql.plan.{FileSinkDesc, TableDesc}
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.spark.internal.Logging
@@ -36,11 +36,12 @@ import org.apache.spark.sql.connector.write.{BatchWrite,
LogicalWriteInfo, Write
import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker,
WriteJobDescription}
import org.apache.spark.sql.execution.datasources.v2.FileBatchWrite
import org.apache.spark.sql.execution.metric.SQLMetric
-import org.apache.spark.sql.hive.execution.{HiveFileFormat, HiveOptions}
-import
org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.{FileSinkDesc,
HiveClientImpl, StructTypeHelper}
+import org.apache.spark.sql.hive.execution.HiveOptions
+import
org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.{HiveClientImpl,
StructTypeHelper}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SerializableConfiguration
+import
org.apache.kyuubi.spark.connector.hive.HiveConnectorUtils.getHiveFileFormat
import org.apache.kyuubi.spark.connector.hive.HiveTableCatalog
case class HiveWrite(
@@ -75,7 +76,7 @@ case class HiveWrite(
override def toBatch: BatchWrite = {
val tmpLocation = HiveWriteHelper.getExternalTmpPath(externalCatalog,
hadoopConf, tableLocation)
- val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false)
+ val fileSinkConf = new FileSinkDesc(tmpLocation, tableDesc, false)
handleCompression(fileSinkConf, hadoopConf)
val committer = FileCommitProtocol.instantiate(
@@ -118,7 +119,7 @@ case class HiveWrite(
pathName: String,
customPartitionLocations: Map[TablePartitionSpec, String],
options: Map[String, String]): WriteJobDescription = {
- val hiveFileFormat = new HiveFileFormat(fileSinkConf)
+ val hiveFileFormat = getHiveFileFormat(fileSinkConf)
val dataSchema = StructType(info.schema().fields.take(dataColumns.length))
val outputWriterFactory = hiveFileFormat.prepareWrite(sparkSession, job,
options, dataSchema)
val metrics: Map[String, SQLMetric] = BasicWriteJobStatsTracker.metrics
diff --git
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/spark/sql/hive/kyuubi/connector/HiveBridgeHelper.scala
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/spark/sql/hive/kyuubi/connector/HiveBridgeHelper.scala
index 8e3a9cd3d..2993a0f12 100644
---
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/spark/sql/hive/kyuubi/connector/HiveBridgeHelper.scala
+++
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/spark/sql/hive/kyuubi/connector/HiveBridgeHelper.scala
@@ -32,7 +32,6 @@ object HiveBridgeHelper {
type HiveMetastoreCatalog = org.apache.spark.sql.hive.HiveMetastoreCatalog
type HiveExternalCatalog = org.apache.spark.sql.hive.HiveExternalCatalog
type NextIterator[U] = org.apache.spark.util.NextIterator[U]
- type FileSinkDesc = org.apache.spark.sql.hive.HiveShim.ShimFileSinkDesc
type HiveVersion = org.apache.spark.sql.hive.client.HiveVersion
type InsertIntoHiveTable =
org.apache.spark.sql.hive.execution.InsertIntoHiveTable
@@ -97,7 +96,9 @@ object HiveBridgeHelper {
}
implicit class StructTypeHelper(structType: StructType) {
- def toAttributes: Seq[AttributeReference] = structType.toAttributes
+ def toAttributes: Seq[AttributeReference] = structType.map { field =>
+ AttributeReference(field.name, field.dataType, field.nullable,
field.metadata)()
+ }
}
def toSQLValue(v: Any, t: DataType): String = Literal.create(v, t) match {
diff --git
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala
index f43dafd11..0485087bf 100644
---
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala
+++
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala
@@ -27,7 +27,7 @@ import scala.util.Try
import com.google.common.collect.Maps
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException,
NoSuchTableException, TableAlreadyExistsException, UnresolvedRelation}
+import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException,
NoSuchTableException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
import org.apache.spark.sql.connector.expressions.Transform
@@ -121,9 +121,9 @@ class HiveCatalogSuite extends KyuubiHiveTest {
val exception = intercept[AnalysisException] {
spark.table("hive.ns1.nonexistent_table")
}
- assert(exception.plan.exists { p =>
- p.exists(child => child.isInstanceOf[UnresolvedRelation])
- })
+ assert(exception.message.contains("[TABLE_OR_VIEW_NOT_FOUND] " +
+ "The table or view `hive`.`ns1`.`nonexistent_table` cannot be found.")
+ || exception.message.contains("Table or view not found:
hive.ns1.nonexistent_table"))
}
}
diff --git
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveQuerySuite.scala
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveQuerySuite.scala
index b217d00b4..65ad81d95 100644
---
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveQuerySuite.scala
+++
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveQuerySuite.scala
@@ -18,7 +18,6 @@
package org.apache.kyuubi.spark.connector.hive
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
-import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
class HiveQuerySuite extends KyuubiHiveTest {
@@ -79,10 +78,9 @@ class HiveQuerySuite extends KyuubiHiveTest {
| SELECT * FROM hive.ns1.tb1
|""".stripMargin)
}
-
- assert(e.plan.exists { p =>
- p.exists(child => child.isInstanceOf[UnresolvedRelation])
- })
+ assert(e.message.contains(
+ "[TABLE_OR_VIEW_NOT_FOUND] The table or view `hive`.`ns1`.`tb1`
cannot be found.") ||
+ e.message.contains("Table or view not found: hive.ns1.tb1"))
}
}
}
diff --git a/pom.xml b/pom.xml
index f7d63d209..b70d5310a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2274,6 +2274,7 @@
<id>spark-3.5</id>
<modules>
<module>extensions/spark/kyuubi-extension-spark-3-5</module>
+ <module>extensions/spark/kyuubi-spark-connector-hive</module>
</modules>
<properties>
<delta.artifact>delta-spark</delta.artifact>