This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch branch-1.8
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/branch-1.8 by this push:
new d09ba5072 [KYUUBI #5317] [Bug] Hive Connector throws
NotSerializableException on reading Hive Avro partitioned table
d09ba5072 is described below
commit d09ba50720abf3685133ea45f3f7d07f5ca54216
Author: zhaomin <[email protected]>
AuthorDate: Thu Sep 21 17:05:24 2023 +0800
[KYUUBI #5317] [Bug] Hive Connector throws NotSerializableException on
reading Hive Avro partitioned table
### _Why are the changes needed?_
close https://github.com/apache/kyuubi/issues/5317#issue-1904751001
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
locally before make a pull request
### _Was this patch authored or co-authored using generative AI tooling?_
No
Closes #5319 from zhaomin1423/fixhive-connector.
Closes #5317
02e5321dc [Cheng Pan] nit
cadabf4ab [Cheng Pan] nit
d38832f40 [zhaomin] improve
ee5b62d84 [zhaomin] improve
794473468 [zhaomin] improve
e3eca91fb [zhaomin] add tests
d9302e2ba [zhaomin] [KYUUBI #5317] [Bug] Hive Connector throws
NotSerializableException on reading Hive Avro partitioned table
0bc8ec16f [zhaomin] [KYUUBI #5317] [Bug] Hive Connector throws
NotSerializableException on reading Hive Avro partitioned table
Lead-authored-by: zhaomin <[email protected]>
Co-authored-by: Cheng Pan <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
(cherry picked from commit 167e6c1ca318e4415c915bd7d11d07db0ccc7447)
Signed-off-by: Cheng Pan <[email protected]>
---
.../spark/connector/hive/read/HiveFileIndex.scala | 24 +++---
.../hive/read/HivePartitionReaderFactory.scala | 35 ++++-----
.../hive/read/HivePartitionedReader.scala | 17 ++---
.../spark/connector/hive/read/HiveScan.scala | 6 +-
.../spark/connector/hive/HiveQuerySuite.scala | 87 +++++++++++++++++++++-
5 files changed, 118 insertions(+), 51 deletions(-)
diff --git
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileIndex.scala
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileIndex.scala
index a399cc302..0d79621f8 100644
---
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileIndex.scala
+++
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileIndex.scala
@@ -23,14 +23,12 @@ import scala.collection.mutable
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
-import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Table}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.{expressions, InternalRow}
import org.apache.spark.sql.catalyst.catalog.{CatalogTable,
CatalogTablePartition, ExternalCatalogUtils}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference,
BoundReference, Expression, Predicate}
import org.apache.spark.sql.connector.catalog.CatalogPlugin
import org.apache.spark.sql.execution.datasources._
-import
org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.HiveClientImpl
import org.apache.spark.sql.types.StructType
import org.apache.kyuubi.spark.connector.hive.{HiveTableCatalog,
KyuubiHiveConnectorException}
@@ -47,18 +45,17 @@ class HiveCatalogFileIndex(
private val table = catalogTable
- private val partPathToBindHivePart: mutable.Map[PartitionPath,
HivePartition] = mutable.Map()
+ private val partPathToBindHivePart: mutable.Map[PartitionPath,
CatalogTablePartition] =
+ mutable.Map()
private val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
- private lazy val hiveTable: Table = HiveClientImpl.toHiveTable(table)
-
private val baseLocation: Option[URI] = table.storage.locationUri
override def partitionSchema: StructType = table.partitionSchema
private[hive] def listHiveFiles(partitionFilters: Seq[Expression],
dataFilters: Seq[Expression])
- : (Seq[PartitionDirectory], Map[PartitionDirectory, HivePartition]) = {
+ : (Seq[PartitionDirectory], Map[PartitionDirectory,
CatalogTablePartition]) = {
val fileIndex = filterPartitions(partitionFilters)
val partDirs = fileIndex.listFiles(partitionFilters, dataFilters)
val partDirToHivePart = fileIndex.partDirToBindHivePartMap()
@@ -79,7 +76,7 @@ class HiveCatalogFileIndex(
}
val partitions = selectedPartitions.map {
- case BindPartition(catalogTablePartition, hivePartition) =>
+ case BindPartition(catalogTablePartition) =>
val path = new Path(catalogTablePartition.location)
val fs = path.getFileSystem(hiveCatalog.hadoopConfiguration())
val partPath = PartitionPath(
@@ -87,7 +84,7 @@ class HiveCatalogFileIndex(
partitionSchema,
sparkSession.sessionState.conf.sessionLocalTimeZone),
path.makeQualified(fs.getUri, fs.getWorkingDirectory))
- partPathToBindHivePart += (partPath -> hivePartition)
+ partPathToBindHivePart += (partPath -> catalogTablePartition)
partPath
}
val partitionSpec = PartitionSpec(partitionSchema, partitions)
@@ -114,7 +111,7 @@ class HiveCatalogFileIndex(
}
private def buildBindPartition(partition: CatalogTablePartition):
BindPartition =
- BindPartition(partition, HiveClientImpl.toHivePartition(partition,
hiveTable))
+ BindPartition(partition)
override def partitionSpec(): PartitionSpec = {
throw notSupportOperator("partitionSpec")
@@ -142,7 +139,7 @@ class HiveInMemoryFileIndex(
rootPathsSpecified: Seq[Path],
parameters: Map[String, String],
userSpecifiedSchema: Option[StructType],
- partPathToBindHivePart: Map[PartitionPath, HivePartition] = Map.empty,
+ partPathToBindHivePart: Map[PartitionPath, CatalogTablePartition] =
Map.empty,
fileStatusCache: FileStatusCache = NoopCache,
userSpecifiedPartitionSpec: Option[PartitionSpec] = None,
override val metadataOpsTimeNs: Option[Long] = None,
@@ -156,7 +153,8 @@ class HiveInMemoryFileIndex(
userSpecifiedPartitionSpec,
metadataOpsTimeNs) {
- private val partDirToBindHivePart: mutable.Map[PartitionDirectory,
HivePartition] = mutable.Map()
+ private val partDirToBindHivePart: mutable.Map[PartitionDirectory,
CatalogTablePartition] =
+ mutable.Map()
override def listFiles(
partitionFilters: Seq[Expression],
@@ -234,7 +232,7 @@ class HiveInMemoryFileIndex(
!((name.startsWith("_") && !name.contains("=")) || name.startsWith("."))
}
- def partDirToBindHivePartMap(): Map[PartitionDirectory, HivePartition] = {
+ def partDirToBindHivePartMap(): Map[PartitionDirectory,
CatalogTablePartition] = {
partDirToBindHivePart.toMap
}
@@ -247,7 +245,7 @@ class HiveInMemoryFileIndex(
}
}
-case class BindPartition(catalogTablePartition: CatalogTablePartition,
hivePartition: HivePartition)
+case class BindPartition(catalogTablePartition: CatalogTablePartition)
object HiveTableCatalogFileIndex {
implicit class CatalogHelper(plugin: CatalogPlugin) {
diff --git
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HivePartitionReaderFactory.scala
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HivePartitionReaderFactory.scala
index 0cbd4f6ef..6a2a7f1d6 100644
---
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HivePartitionReaderFactory.scala
+++
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HivePartitionReaderFactory.scala
@@ -31,10 +31,11 @@ import org.apache.spark.TaskContext
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition
import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader,
PartitionReaderFactory}
import org.apache.spark.sql.execution.datasources.{FilePartition,
PartitionedFile}
import org.apache.spark.sql.execution.datasources.v2._
-import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.NextIterator
+import
org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.{HiveClientImpl,
NextIterator}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types._
@@ -49,7 +50,7 @@ case class HivePartitionReaderFactory(
dataSchema: StructType,
readDataSchema: StructType,
partitionSchema: StructType,
- partFileToHivePart: Map[PartitionedFile, HivePartition],
+ partFileToHivePart: Map[PartitionedFile, CatalogTablePartition],
pushedFilters: Array[Filter] = Array.empty)
extends PartitionReaderFactory with Logging {
@@ -64,17 +65,18 @@ case class HivePartitionReaderFactory(
val filePartition = partition.asInstanceOf[FilePartition]
val iter: Iterator[HivePartitionedFileReader[InternalRow]] =
filePartition.files.toIterator.map { file =>
- val bindHivePart = partFileToHivePart.getOrElse(file, null)
+ val bindHivePart = partFileToHivePart.get(file)
+ val hivePartition = bindHivePart.map(HiveClientImpl.toHivePartition(_,
hiveTable))
HivePartitionedFileReader(
file,
new PartitionReaderWithPartitionValues(
HivePartitionedReader(
file,
- buildReaderInternal(file, bindHivePart),
+ buildReaderInternal(file, hivePartition),
tableDesc,
broadcastHiveConf,
nonPartitionReadDataKeys,
- bindHivePart,
+ hivePartition,
charset),
readDataSchema,
partitionSchema,
@@ -83,9 +85,9 @@ case class HivePartitionReaderFactory(
new SparkFilePartitionReader[InternalRow](iter)
}
- def buildReaderInternal(
+ private def buildReaderInternal(
file: PartitionedFile,
- bindPartition: HivePartition): PartitionReader[Writable] = {
+ bindPartition: Option[HivePartition]): PartitionReader[Writable] = {
val reader = createPartitionWritableReader(file, bindPartition)
val fileReader = new PartitionReader[Writable] {
override def next(): Boolean = reader.hasNext
@@ -97,21 +99,16 @@ case class HivePartitionReaderFactory(
private def createPartitionWritableReader[T](
file: PartitionedFile,
- bindPartition: HivePartition): Iterator[Writable] = {
+ bindPartition: Option[HivePartition]): Iterator[Writable] = {
// Obtain binding HivePartition from input partitioned file
- val partDesc =
- if (bindPartition != null) {
- Utilities.getPartitionDesc(bindPartition)
- } else null
-
- val ifc =
- if (partDesc == null) {
- hiveTable.getInputFormatClass
- .asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]]
- } else {
+ val ifc = bindPartition.map(Utilities.getPartitionDesc) match {
+ case Some(partDesc) =>
partDesc.getInputFileFormatClass
.asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]]
- }
+ case None =>
+ hiveTable.getInputFormatClass
+ .asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]]
+ }
val jobConf = new JobConf(broadcastHiveConf.value.value)
diff --git
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HivePartitionedReader.scala
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HivePartitionedReader.scala
index 4c1690524..732643eb1 100644
---
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HivePartitionedReader.scala
+++
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HivePartitionedReader.scala
@@ -19,7 +19,6 @@ package org.apache.kyuubi.spark.connector.hive.read
import java.util.Properties
-import org.apache.hadoop.hive.ql.exec.Utilities
import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition}
import org.apache.hadoop.hive.ql.plan.TableDesc
import org.apache.hadoop.hive.serde2.Deserializer
@@ -43,30 +42,24 @@ case class HivePartitionedReader(
tableDesc: TableDesc,
broadcastHiveConf: Broadcast[SerializableConfiguration],
nonPartitionReadDataKeys: Seq[Attribute],
- bindPartition: HivePartition,
+ bindPartitionOpt: Option[HivePartition],
charset: String = "utf-8") extends PartitionReader[InternalRow] with
Logging {
- private val partDesc =
- if (bindPartition != null) {
- Utilities.getPartitionDesc(bindPartition)
- } else null
private val hiveConf = broadcastHiveConf.value.value
private val tableDeser = tableDesc.getDeserializerClass.newInstance()
tableDeser.initialize(hiveConf, tableDesc.getProperties)
- private val localDeser: Deserializer =
- if (bindPartition != null &&
- bindPartition.getDeserializer != null) {
+ private val localDeser: Deserializer = bindPartitionOpt match {
+ case Some(bindPartition) if bindPartition.getDeserializer != null =>
val tableProperties = tableDesc.getProperties
val props = new Properties(tableProperties)
val deserializer =
bindPartition.getDeserializer.getClass.asInstanceOf[Class[Deserializer]].newInstance()
deserializer.initialize(hiveConf, props)
deserializer
- } else {
- tableDeser
- }
+ case _ => tableDeser
+ }
private val internalRow = new
SpecificInternalRow(nonPartitionReadDataKeys.map(_.dataType))
diff --git
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveScan.scala
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveScan.scala
index 518a9cb68..0b79d7307 100644
---
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveScan.scala
+++
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveScan.scala
@@ -23,9 +23,8 @@ import scala.collection.mutable
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
-import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition}
import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat,
CatalogTable}
+import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat,
CatalogTable, CatalogTablePartition}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference,
Expression}
import
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.connector.read.PartitionReaderFactory
@@ -52,7 +51,8 @@ case class HiveScan(
private val isCaseSensitive =
sparkSession.sessionState.conf.caseSensitiveAnalysis
- private val partFileToHivePartMap: mutable.Map[PartitionedFile,
HivePartition] = mutable.Map()
+ private val partFileToHivePartMap: mutable.Map[PartitionedFile,
CatalogTablePartition] =
+ mutable.Map()
override def isSplitable(path: Path): Boolean = {
catalogTable.provider.map(_.toUpperCase(Locale.ROOT)).exists {
diff --git
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveQuerySuite.scala
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveQuerySuite.scala
index d0b97676b..1d3d5ae10 100644
---
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveQuerySuite.scala
+++
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveQuerySuite.scala
@@ -22,23 +22,31 @@ import
org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
class HiveQuerySuite extends KyuubiHiveTest {
- def withTempNonPartitionedTable(spark: SparkSession, table: String)(f: =>
Unit): Unit = {
+ def withTempNonPartitionedTable(
+ spark: SparkSession,
+ table: String,
+ format: String = "PARQUET",
+ hiveTable: Boolean = false)(f: => Unit): Unit = {
spark.sql(
s"""
| CREATE TABLE IF NOT EXISTS
| $table (id String, date String)
- | USING PARQUET
+ | ${if (hiveTable) "STORED AS" else "USING"} $format
|""".stripMargin).collect()
try f
finally spark.sql(s"DROP TABLE $table")
}
- def withTempPartitionedTable(spark: SparkSession, table: String)(f: =>
Unit): Unit = {
+ def withTempPartitionedTable(
+ spark: SparkSession,
+ table: String,
+ format: String = "PARQUET",
+ hiveTable: Boolean = false)(f: => Unit): Unit = {
spark.sql(
s"""
| CREATE TABLE IF NOT EXISTS
| $table (id String, year String, month string)
- | USING PARQUET
+ | ${if (hiveTable) "STORED AS" else "USING"} $format
| PARTITIONED BY (year, month)
|""".stripMargin).collect()
try f
@@ -186,4 +194,75 @@ class HiveQuerySuite extends KyuubiHiveTest {
}
}
}
+
+ test("read partitioned avro table") {
+ readPartitionedTable("AVRO", true)
+ readPartitionedTable("AVRO", false)
+ }
+
+ test("read un-partitioned avro table") {
+ readUnPartitionedTable("AVRO", true)
+ readUnPartitionedTable("AVRO", false)
+ }
+
+ test("read partitioned textfile table") {
+ readPartitionedTable("TEXTFILE", true)
+ readPartitionedTable("TEXTFILE", false)
+ }
+
+ test("read un-partitioned textfile table") {
+ readUnPartitionedTable("TEXTFILE", true)
+ readUnPartitionedTable("TEXTFILE", false)
+ }
+
+ test("read partitioned SequenceFile table") {
+ readPartitionedTable("SequenceFile", true)
+ readPartitionedTable("SequenceFile", false)
+ }
+
+ test("read un-partitioned SequenceFile table") {
+ readUnPartitionedTable("SequenceFile", true)
+ readUnPartitionedTable("SequenceFile", false)
+ }
+
+ test("read partitioned ORC table") {
+ readPartitionedTable("ORC", true)
+ readPartitionedTable("ORC", false)
+ }
+
+ test("read un-partitioned ORC table") {
+ readUnPartitionedTable("ORC", true)
+ readUnPartitionedTable("ORC", false)
+ }
+
+ private def readPartitionedTable(format: String, hiveTable: Boolean): Unit =
{
+ withSparkSession() { spark =>
+ val table = "hive.default.employee"
+ withTempPartitionedTable(spark, table, format, hiveTable) {
+ spark.sql(
+ s"""
+ | INSERT OVERWRITE
+ | $table PARTITION(year = '2023')
+ | VALUES("zhao", "09")
+ |""".stripMargin)
+ checkQueryResult(s"select * from $table", spark,
Array(Row.apply("zhao", "2023", "09")))
+ }
+ }
+ }
+
+ private def readUnPartitionedTable(format: String, hiveTable: Boolean): Unit
= {
+ withSparkSession() { spark =>
+ val table = "hive.default.employee"
+ withTempNonPartitionedTable(spark, table, format, hiveTable) {
+ spark.sql(
+ s"""
+ | INSERT OVERWRITE
+ | $table
+ | VALUES("zhao", "2023-09-21")
+ |""".stripMargin).collect()
+
+ checkQueryResult(s"select * from $table", spark,
Array(Row.apply("zhao", "2023-09-21")))
+ }
+ }
+ }
}