This is an automated email from the ASF dual-hosted git repository.
csy pushed a commit to branch branch-1.7
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/branch-1.7 by this push:
new ba769ad99 [KYUUBI #5317] [Bug][1.7] Hive Connector throws
NotSerializableException on reading Hive Avro partitioned table
ba769ad99 is described below
commit ba769ad9980447d6970e87cce1633051edd3e800
Author: zhaomin <[email protected]>
AuthorDate: Tue Nov 21 21:51:48 2023 +0800
[KYUUBI #5317] [Bug][1.7] Hive Connector throws NotSerializableException on
reading Hive Avro partitioned table
# :mag: Description
## Issue References ๐
This pull request fixes #5317
## Describe Your Solution ๐ง
cherry-pick
## Types of changes :bookmark:
- [x] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing
functionality to change)
## Test Plan ๐งช
#### Behavior Without This Pull Request :coffin:
```
Error: ]
/home/runner/work/kyuubi/kyuubi/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveQuerySuite.scala:169:
too many arguments (4) for method withTempPartitionedTable: (spark:
org.apache.spark.sql.SparkSession, table: String)(f: => Unit)Unit
Note that 'hiveTable' is not a parameter name of the invoked method.
```
https://github.com/apache/kyuubi/actions/runs/6903431853/job/18782106774
#### Behavior With This Pull Request :tada:
#### Related Unit Tests
org.apache.kyuubi.spark.connector.hive.HiveQuerySuite
---
# Checklists
## ๐ Author Self Checklist
- [x] My code follows the [style
guidelines](https://kyuubi.readthedocs.io/en/master/contributing/code/style.html)
of this project
- [x] I have performed a self-review
- [ ] I have commented my code, particularly in hard-to-understand areas
- [ ] I have made corresponding changes to the documentation
- [ ] My changes generate no new warnings
- [ ] I have added tests that prove my fix is effective or that my feature
works
- [ ] New and existing unit tests pass locally with my changes
- [ ] This patch was not authored or co-authored using [Generative
Tooling](https://www.apache.org/legal/generative-tooling.html)
## ๐ Committer Pre-Merge Checklist
- [ ] Pull request title is okay.
- [ ] No license issues.
- [ ] Milestone correctly set?
- [ ] Test coverage is ok
- [ ] Assignees are selected.
- [ ] Minimum number of approvals
- [ ] No changes are requested
**Be nice. Be informative.**
Closes #5741 from cxzl25/KYUUBI-5317-1.7.
Closes #5317
482b829b6 [zhaomin] [KYUUBI #5317] [Bug] Hive Connector throws
NotSerializableException on reading Hive Avro partitioned table
Authored-by: zhaomin <[email protected]>
Signed-off-by: Shaoyun Chen <[email protected]>
---
.../spark/connector/hive/read/HiveFileIndex.scala | 24 +++---
.../hive/read/HivePartitionReaderFactory.scala | 35 ++++-----
.../hive/read/HivePartitionedReader.scala | 17 ++---
.../spark/connector/hive/read/HiveScan.scala | 6 +-
.../spark/connector/hive/HiveQuerySuite.scala | 87 +++++++++++++++++++++-
5 files changed, 118 insertions(+), 51 deletions(-)
diff --git
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileIndex.scala
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileIndex.scala
index a399cc302..0d79621f8 100644
---
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileIndex.scala
+++
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileIndex.scala
@@ -23,14 +23,12 @@ import scala.collection.mutable
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
-import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Table}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.{expressions, InternalRow}
import org.apache.spark.sql.catalyst.catalog.{CatalogTable,
CatalogTablePartition, ExternalCatalogUtils}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference,
BoundReference, Expression, Predicate}
import org.apache.spark.sql.connector.catalog.CatalogPlugin
import org.apache.spark.sql.execution.datasources._
-import
org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.HiveClientImpl
import org.apache.spark.sql.types.StructType
import org.apache.kyuubi.spark.connector.hive.{HiveTableCatalog,
KyuubiHiveConnectorException}
@@ -47,18 +45,17 @@ class HiveCatalogFileIndex(
private val table = catalogTable
- private val partPathToBindHivePart: mutable.Map[PartitionPath,
HivePartition] = mutable.Map()
+ private val partPathToBindHivePart: mutable.Map[PartitionPath,
CatalogTablePartition] =
+ mutable.Map()
private val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
- private lazy val hiveTable: Table = HiveClientImpl.toHiveTable(table)
-
private val baseLocation: Option[URI] = table.storage.locationUri
override def partitionSchema: StructType = table.partitionSchema
private[hive] def listHiveFiles(partitionFilters: Seq[Expression],
dataFilters: Seq[Expression])
- : (Seq[PartitionDirectory], Map[PartitionDirectory, HivePartition]) = {
+ : (Seq[PartitionDirectory], Map[PartitionDirectory,
CatalogTablePartition]) = {
val fileIndex = filterPartitions(partitionFilters)
val partDirs = fileIndex.listFiles(partitionFilters, dataFilters)
val partDirToHivePart = fileIndex.partDirToBindHivePartMap()
@@ -79,7 +76,7 @@ class HiveCatalogFileIndex(
}
val partitions = selectedPartitions.map {
- case BindPartition(catalogTablePartition, hivePartition) =>
+ case BindPartition(catalogTablePartition) =>
val path = new Path(catalogTablePartition.location)
val fs = path.getFileSystem(hiveCatalog.hadoopConfiguration())
val partPath = PartitionPath(
@@ -87,7 +84,7 @@ class HiveCatalogFileIndex(
partitionSchema,
sparkSession.sessionState.conf.sessionLocalTimeZone),
path.makeQualified(fs.getUri, fs.getWorkingDirectory))
- partPathToBindHivePart += (partPath -> hivePartition)
+ partPathToBindHivePart += (partPath -> catalogTablePartition)
partPath
}
val partitionSpec = PartitionSpec(partitionSchema, partitions)
@@ -114,7 +111,7 @@ class HiveCatalogFileIndex(
}
private def buildBindPartition(partition: CatalogTablePartition):
BindPartition =
- BindPartition(partition, HiveClientImpl.toHivePartition(partition,
hiveTable))
+ BindPartition(partition)
override def partitionSpec(): PartitionSpec = {
throw notSupportOperator("partitionSpec")
@@ -142,7 +139,7 @@ class HiveInMemoryFileIndex(
rootPathsSpecified: Seq[Path],
parameters: Map[String, String],
userSpecifiedSchema: Option[StructType],
- partPathToBindHivePart: Map[PartitionPath, HivePartition] = Map.empty,
+ partPathToBindHivePart: Map[PartitionPath, CatalogTablePartition] =
Map.empty,
fileStatusCache: FileStatusCache = NoopCache,
userSpecifiedPartitionSpec: Option[PartitionSpec] = None,
override val metadataOpsTimeNs: Option[Long] = None,
@@ -156,7 +153,8 @@ class HiveInMemoryFileIndex(
userSpecifiedPartitionSpec,
metadataOpsTimeNs) {
- private val partDirToBindHivePart: mutable.Map[PartitionDirectory,
HivePartition] = mutable.Map()
+ private val partDirToBindHivePart: mutable.Map[PartitionDirectory,
CatalogTablePartition] =
+ mutable.Map()
override def listFiles(
partitionFilters: Seq[Expression],
@@ -234,7 +232,7 @@ class HiveInMemoryFileIndex(
!((name.startsWith("_") && !name.contains("=")) || name.startsWith("."))
}
- def partDirToBindHivePartMap(): Map[PartitionDirectory, HivePartition] = {
+ def partDirToBindHivePartMap(): Map[PartitionDirectory,
CatalogTablePartition] = {
partDirToBindHivePart.toMap
}
@@ -247,7 +245,7 @@ class HiveInMemoryFileIndex(
}
}
-case class BindPartition(catalogTablePartition: CatalogTablePartition,
hivePartition: HivePartition)
+case class BindPartition(catalogTablePartition: CatalogTablePartition)
object HiveTableCatalogFileIndex {
implicit class CatalogHelper(plugin: CatalogPlugin) {
diff --git
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HivePartitionReaderFactory.scala
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HivePartitionReaderFactory.scala
index 6770f4144..144aa2e78 100644
---
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HivePartitionReaderFactory.scala
+++
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HivePartitionReaderFactory.scala
@@ -31,10 +31,11 @@ import org.apache.spark.TaskContext
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition
import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader}
import org.apache.spark.sql.execution.datasources.{FilePartition,
PartitionedFile}
import org.apache.spark.sql.execution.datasources.v2._
-import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.NextIterator
+import
org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.{HiveClientImpl,
NextIterator}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types._
@@ -47,7 +48,7 @@ case class HivePartitionReaderFactory(
dataSchema: StructType,
readDataSchema: StructType,
partitionSchema: StructType,
- partFileToHivePart: Map[PartitionedFile, HivePartition],
+ partFileToHivePart: Map[PartitionedFile, CatalogTablePartition],
pushedFilters: Array[Filter] = Array.empty)
extends FilePartitionReaderFactory with Logging {
@@ -66,17 +67,18 @@ case class HivePartitionReaderFactory(
val filePartition = partition.asInstanceOf[FilePartition]
val iter: Iterator[HivePartitionedFileReader[InternalRow]] =
filePartition.files.toIterator.map { file =>
- val bindHivePart = partFileToHivePart.getOrElse(file, null)
+ val bindHivePart = partFileToHivePart.get(file)
+ val hivePartition = bindHivePart.map(HiveClientImpl.toHivePartition(_,
hiveTable))
HivePartitionedFileReader(
file,
new PartitionReaderWithPartitionValues(
HivePartitionedReader(
file,
- buildReaderInternal(file, bindHivePart),
+ buildReaderInternal(file, hivePartition),
tableDesc,
broadcastHiveConf,
nonPartitionReadDataKeys,
- bindHivePart,
+ hivePartition,
charset),
readDataSchema,
partitionSchema,
@@ -85,9 +87,9 @@ case class HivePartitionReaderFactory(
new FilePartitionReader[InternalRow](iter)
}
- def buildReaderInternal(
+ private def buildReaderInternal(
file: PartitionedFile,
- bindPartition: HivePartition): PartitionReader[Writable] = {
+ bindPartition: Option[HivePartition]): PartitionReader[Writable] = {
val reader = createPartitionWritableReader(file, bindPartition)
val fileReader = new PartitionReader[Writable] {
override def next(): Boolean = reader.hasNext
@@ -99,21 +101,16 @@ case class HivePartitionReaderFactory(
private def createPartitionWritableReader[T](
file: PartitionedFile,
- bindPartition: HivePartition): Iterator[Writable] = {
+ bindPartition: Option[HivePartition]): Iterator[Writable] = {
// Obtain binding HivePartition from input partitioned file
- val partDesc =
- if (bindPartition != null) {
- Utilities.getPartitionDesc(bindPartition)
- } else null
-
- val ifc =
- if (partDesc == null) {
- hiveTable.getInputFormatClass
- .asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]]
- } else {
+ val ifc = bindPartition.map(Utilities.getPartitionDesc) match {
+ case Some(partDesc) =>
partDesc.getInputFileFormatClass
.asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]]
- }
+ case None =>
+ hiveTable.getInputFormatClass
+ .asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]]
+ }
val jobConf = new JobConf(broadcastHiveConf.value.value)
diff --git
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HivePartitionedReader.scala
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HivePartitionedReader.scala
index 4c1690524..732643eb1 100644
---
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HivePartitionedReader.scala
+++
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HivePartitionedReader.scala
@@ -19,7 +19,6 @@ package org.apache.kyuubi.spark.connector.hive.read
import java.util.Properties
-import org.apache.hadoop.hive.ql.exec.Utilities
import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition}
import org.apache.hadoop.hive.ql.plan.TableDesc
import org.apache.hadoop.hive.serde2.Deserializer
@@ -43,30 +42,24 @@ case class HivePartitionedReader(
tableDesc: TableDesc,
broadcastHiveConf: Broadcast[SerializableConfiguration],
nonPartitionReadDataKeys: Seq[Attribute],
- bindPartition: HivePartition,
+ bindPartitionOpt: Option[HivePartition],
charset: String = "utf-8") extends PartitionReader[InternalRow] with
Logging {
- private val partDesc =
- if (bindPartition != null) {
- Utilities.getPartitionDesc(bindPartition)
- } else null
private val hiveConf = broadcastHiveConf.value.value
private val tableDeser = tableDesc.getDeserializerClass.newInstance()
tableDeser.initialize(hiveConf, tableDesc.getProperties)
- private val localDeser: Deserializer =
- if (bindPartition != null &&
- bindPartition.getDeserializer != null) {
+ private val localDeser: Deserializer = bindPartitionOpt match {
+ case Some(bindPartition) if bindPartition.getDeserializer != null =>
val tableProperties = tableDesc.getProperties
val props = new Properties(tableProperties)
val deserializer =
bindPartition.getDeserializer.getClass.asInstanceOf[Class[Deserializer]].newInstance()
deserializer.initialize(hiveConf, props)
deserializer
- } else {
- tableDeser
- }
+ case _ => tableDeser
+ }
private val internalRow = new
SpecificInternalRow(nonPartitionReadDataKeys.map(_.dataType))
diff --git
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveScan.scala
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveScan.scala
index c37186f46..58d6bc599 100644
---
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveScan.scala
+++
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveScan.scala
@@ -23,9 +23,8 @@ import scala.collection.mutable
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
-import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition}
import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.catalog.CatalogTable
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable,
CatalogTablePartition}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference,
Expression}
import
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.connector.read.PartitionReaderFactory
@@ -52,7 +51,8 @@ case class HiveScan(
private val isCaseSensitive =
sparkSession.sessionState.conf.caseSensitiveAnalysis
- private val partFileToHivePartMap: mutable.Map[PartitionedFile,
HivePartition] = mutable.Map()
+ private val partFileToHivePartMap: mutable.Map[PartitionedFile,
CatalogTablePartition] =
+ mutable.Map()
override def createReaderFactory(): PartitionReaderFactory = {
val hiveConf = new
Configuration(fileIndex.hiveCatalog.hadoopConfiguration())
diff --git
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveQuerySuite.scala
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveQuerySuite.scala
index f564caabf..11dd49463 100644
---
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveQuerySuite.scala
+++
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveQuerySuite.scala
@@ -21,23 +21,31 @@ import org.apache.spark.sql.{AnalysisException, Row,
SparkSession}
class HiveQuerySuite extends KyuubiHiveTest {
- def withTempNonPartitionedTable(spark: SparkSession, table: String)(f: =>
Unit): Unit = {
+ def withTempNonPartitionedTable(
+ spark: SparkSession,
+ table: String,
+ format: String = "PARQUET",
+ hiveTable: Boolean = false)(f: => Unit): Unit = {
spark.sql(
s"""
| CREATE TABLE IF NOT EXISTS
| $table (id String, date String)
- | USING PARQUET
+ | ${if (hiveTable) "STORED AS" else "USING"} $format
|""".stripMargin).collect()
try f
finally spark.sql(s"DROP TABLE $table")
}
- def withTempPartitionedTable(spark: SparkSession, table: String)(f: =>
Unit): Unit = {
+ def withTempPartitionedTable(
+ spark: SparkSession,
+ table: String,
+ format: String = "PARQUET",
+ hiveTable: Boolean = false)(f: => Unit): Unit = {
spark.sql(
s"""
| CREATE TABLE IF NOT EXISTS
| $table (id String, year String, month string)
- | USING PARQUET
+ | ${if (hiveTable) "STORED AS" else "USING"} $format
| PARTITIONED BY (year, month)
|""".stripMargin).collect()
try f
@@ -199,4 +207,75 @@ class HiveQuerySuite extends KyuubiHiveTest {
}
}
}
+
+ test("read partitioned avro table") {
+ readPartitionedTable("AVRO", true)
+ readPartitionedTable("AVRO", false)
+ }
+
+ test("read un-partitioned avro table") {
+ readUnPartitionedTable("AVRO", true)
+ readUnPartitionedTable("AVRO", false)
+ }
+
+ test("read partitioned textfile table") {
+ readPartitionedTable("TEXTFILE", true)
+ readPartitionedTable("TEXTFILE", false)
+ }
+
+ test("read un-partitioned textfile table") {
+ readUnPartitionedTable("TEXTFILE", true)
+ readUnPartitionedTable("TEXTFILE", false)
+ }
+
+ test("read partitioned SequenceFile table") {
+ readPartitionedTable("SequenceFile", true)
+ readPartitionedTable("SequenceFile", false)
+ }
+
+ test("read un-partitioned SequenceFile table") {
+ readUnPartitionedTable("SequenceFile", true)
+ readUnPartitionedTable("SequenceFile", false)
+ }
+
+ test("read partitioned ORC table") {
+ readPartitionedTable("ORC", true)
+ readPartitionedTable("ORC", false)
+ }
+
+ test("read un-partitioned ORC table") {
+ readUnPartitionedTable("ORC", true)
+ readUnPartitionedTable("ORC", false)
+ }
+
+ private def readPartitionedTable(format: String, hiveTable: Boolean): Unit =
{
+ withSparkSession() { spark =>
+ val table = "hive.default.employee"
+ withTempPartitionedTable(spark, table, format, hiveTable) {
+ spark.sql(
+ s"""
+ | INSERT OVERWRITE
+ | $table PARTITION(year = '2023')
+ | VALUES("zhao", "09")
+ |""".stripMargin)
+ checkQueryResult(s"select * from $table", spark,
Array(Row.apply("zhao", "2023", "09")))
+ }
+ }
+ }
+
+ private def readUnPartitionedTable(format: String, hiveTable: Boolean): Unit
= {
+ withSparkSession() { spark =>
+ val table = "hive.default.employee"
+ withTempNonPartitionedTable(spark, table, format, hiveTable) {
+ spark.sql(
+ s"""
+ | INSERT OVERWRITE
+ | $table
+ | VALUES("zhao", "2023-09-21")
+ |""".stripMargin).collect()
+
+ checkQueryResult(s"select * from $table", spark,
Array(Row.apply("zhao", "2023-09-21")))
+ }
+ }
+ }
}