This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch branch-1.8
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/branch-1.8 by this push:
     new d09ba5072 [KYUUBI #5317] [Bug] Hive Connector throws 
NotSerializableException on reading Hive Avro partitioned table
d09ba5072 is described below

commit d09ba50720abf3685133ea45f3f7d07f5ca54216
Author: zhaomin <[email protected]>
AuthorDate: Thu Sep 21 17:05:24 2023 +0800

    [KYUUBI #5317] [Bug] Hive Connector throws NotSerializableException on 
reading Hive Avro partitioned table
    
    ### _Why are the changes needed?_
    
    close https://github.com/apache/kyuubi/issues/5317#issue-1904751001
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run 
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
 locally before make a pull request
    
    ### _Was this patch authored or co-authored using generative AI tooling?_
    
    No
    
    Closes #5319 from zhaomin1423/fixhive-connector.
    
    Closes #5317
    
    02e5321dc [Cheng Pan] nit
    cadabf4ab [Cheng Pan] nit
    d38832f40 [zhaomin] improve
    ee5b62d84 [zhaomin] improve
    794473468 [zhaomin] improve
    e3eca91fb [zhaomin] add tests
    d9302e2ba [zhaomin] [KYUUBI #5317] [Bug] Hive Connector throws 
NotSerializableException on reading Hive Avro partitioned table
    0bc8ec16f [zhaomin] [KYUUBI #5317] [Bug] Hive Connector throws 
NotSerializableException on reading Hive Avro partitioned table
    
    Lead-authored-by: zhaomin <[email protected]>
    Co-authored-by: Cheng Pan <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
    (cherry picked from commit 167e6c1ca318e4415c915bd7d11d07db0ccc7447)
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../spark/connector/hive/read/HiveFileIndex.scala  | 24 +++---
 .../hive/read/HivePartitionReaderFactory.scala     | 35 ++++-----
 .../hive/read/HivePartitionedReader.scala          | 17 ++---
 .../spark/connector/hive/read/HiveScan.scala       |  6 +-
 .../spark/connector/hive/HiveQuerySuite.scala      | 87 +++++++++++++++++++++-
 5 files changed, 118 insertions(+), 51 deletions(-)

diff --git 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileIndex.scala
 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileIndex.scala
index a399cc302..0d79621f8 100644
--- 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileIndex.scala
+++ 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileIndex.scala
@@ -23,14 +23,12 @@ import scala.collection.mutable
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, Path}
-import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Table}
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.{expressions, InternalRow}
 import org.apache.spark.sql.catalyst.catalog.{CatalogTable, 
CatalogTablePartition, ExternalCatalogUtils}
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
BoundReference, Expression, Predicate}
 import org.apache.spark.sql.connector.catalog.CatalogPlugin
 import org.apache.spark.sql.execution.datasources._
-import 
org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.HiveClientImpl
 import org.apache.spark.sql.types.StructType
 
 import org.apache.kyuubi.spark.connector.hive.{HiveTableCatalog, 
KyuubiHiveConnectorException}
@@ -47,18 +45,17 @@ class HiveCatalogFileIndex(
 
   private val table = catalogTable
 
-  private val partPathToBindHivePart: mutable.Map[PartitionPath, 
HivePartition] = mutable.Map()
+  private val partPathToBindHivePart: mutable.Map[PartitionPath, 
CatalogTablePartition] =
+    mutable.Map()
 
   private val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
 
-  private lazy val hiveTable: Table = HiveClientImpl.toHiveTable(table)
-
   private val baseLocation: Option[URI] = table.storage.locationUri
 
   override def partitionSchema: StructType = table.partitionSchema
 
   private[hive] def listHiveFiles(partitionFilters: Seq[Expression], 
dataFilters: Seq[Expression])
-      : (Seq[PartitionDirectory], Map[PartitionDirectory, HivePartition]) = {
+      : (Seq[PartitionDirectory], Map[PartitionDirectory, 
CatalogTablePartition]) = {
     val fileIndex = filterPartitions(partitionFilters)
     val partDirs = fileIndex.listFiles(partitionFilters, dataFilters)
     val partDirToHivePart = fileIndex.partDirToBindHivePartMap()
@@ -79,7 +76,7 @@ class HiveCatalogFileIndex(
       }
 
       val partitions = selectedPartitions.map {
-        case BindPartition(catalogTablePartition, hivePartition) =>
+        case BindPartition(catalogTablePartition) =>
           val path = new Path(catalogTablePartition.location)
           val fs = path.getFileSystem(hiveCatalog.hadoopConfiguration())
           val partPath = PartitionPath(
@@ -87,7 +84,7 @@ class HiveCatalogFileIndex(
               partitionSchema,
               sparkSession.sessionState.conf.sessionLocalTimeZone),
             path.makeQualified(fs.getUri, fs.getWorkingDirectory))
-          partPathToBindHivePart += (partPath -> hivePartition)
+          partPathToBindHivePart += (partPath -> catalogTablePartition)
           partPath
       }
       val partitionSpec = PartitionSpec(partitionSchema, partitions)
@@ -114,7 +111,7 @@ class HiveCatalogFileIndex(
   }
 
   private def buildBindPartition(partition: CatalogTablePartition): 
BindPartition =
-    BindPartition(partition, HiveClientImpl.toHivePartition(partition, 
hiveTable))
+    BindPartition(partition)
 
   override def partitionSpec(): PartitionSpec = {
     throw notSupportOperator("partitionSpec")
@@ -142,7 +139,7 @@ class HiveInMemoryFileIndex(
     rootPathsSpecified: Seq[Path],
     parameters: Map[String, String],
     userSpecifiedSchema: Option[StructType],
-    partPathToBindHivePart: Map[PartitionPath, HivePartition] = Map.empty,
+    partPathToBindHivePart: Map[PartitionPath, CatalogTablePartition] = 
Map.empty,
     fileStatusCache: FileStatusCache = NoopCache,
     userSpecifiedPartitionSpec: Option[PartitionSpec] = None,
     override val metadataOpsTimeNs: Option[Long] = None,
@@ -156,7 +153,8 @@ class HiveInMemoryFileIndex(
     userSpecifiedPartitionSpec,
     metadataOpsTimeNs) {
 
-  private val partDirToBindHivePart: mutable.Map[PartitionDirectory, 
HivePartition] = mutable.Map()
+  private val partDirToBindHivePart: mutable.Map[PartitionDirectory, 
CatalogTablePartition] =
+    mutable.Map()
 
   override def listFiles(
       partitionFilters: Seq[Expression],
@@ -234,7 +232,7 @@ class HiveInMemoryFileIndex(
     !((name.startsWith("_") && !name.contains("=")) || name.startsWith("."))
   }
 
-  def partDirToBindHivePartMap(): Map[PartitionDirectory, HivePartition] = {
+  def partDirToBindHivePartMap(): Map[PartitionDirectory, 
CatalogTablePartition] = {
     partDirToBindHivePart.toMap
   }
 
@@ -247,7 +245,7 @@ class HiveInMemoryFileIndex(
   }
 }
 
-case class BindPartition(catalogTablePartition: CatalogTablePartition, 
hivePartition: HivePartition)
+case class BindPartition(catalogTablePartition: CatalogTablePartition)
 
 object HiveTableCatalogFileIndex {
   implicit class CatalogHelper(plugin: CatalogPlugin) {
diff --git 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HivePartitionReaderFactory.scala
 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HivePartitionReaderFactory.scala
index 0cbd4f6ef..6a2a7f1d6 100644
--- 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HivePartitionReaderFactory.scala
+++ 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HivePartitionReaderFactory.scala
@@ -31,10 +31,11 @@ import org.apache.spark.TaskContext
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition
 import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, 
PartitionReaderFactory}
 import org.apache.spark.sql.execution.datasources.{FilePartition, 
PartitionedFile}
 import org.apache.spark.sql.execution.datasources.v2._
-import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.NextIterator
+import 
org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.{HiveClientImpl, 
NextIterator}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources.Filter
 import org.apache.spark.sql.types._
@@ -49,7 +50,7 @@ case class HivePartitionReaderFactory(
     dataSchema: StructType,
     readDataSchema: StructType,
     partitionSchema: StructType,
-    partFileToHivePart: Map[PartitionedFile, HivePartition],
+    partFileToHivePart: Map[PartitionedFile, CatalogTablePartition],
     pushedFilters: Array[Filter] = Array.empty)
   extends PartitionReaderFactory with Logging {
 
@@ -64,17 +65,18 @@ case class HivePartitionReaderFactory(
     val filePartition = partition.asInstanceOf[FilePartition]
     val iter: Iterator[HivePartitionedFileReader[InternalRow]] =
       filePartition.files.toIterator.map { file =>
-        val bindHivePart = partFileToHivePart.getOrElse(file, null)
+        val bindHivePart = partFileToHivePart.get(file)
+        val hivePartition = bindHivePart.map(HiveClientImpl.toHivePartition(_, 
hiveTable))
         HivePartitionedFileReader(
           file,
           new PartitionReaderWithPartitionValues(
             HivePartitionedReader(
               file,
-              buildReaderInternal(file, bindHivePart),
+              buildReaderInternal(file, hivePartition),
               tableDesc,
               broadcastHiveConf,
               nonPartitionReadDataKeys,
-              bindHivePart,
+              hivePartition,
               charset),
             readDataSchema,
             partitionSchema,
@@ -83,9 +85,9 @@ case class HivePartitionReaderFactory(
     new SparkFilePartitionReader[InternalRow](iter)
   }
 
-  def buildReaderInternal(
+  private def buildReaderInternal(
       file: PartitionedFile,
-      bindPartition: HivePartition): PartitionReader[Writable] = {
+      bindPartition: Option[HivePartition]): PartitionReader[Writable] = {
     val reader = createPartitionWritableReader(file, bindPartition)
     val fileReader = new PartitionReader[Writable] {
       override def next(): Boolean = reader.hasNext
@@ -97,21 +99,16 @@ case class HivePartitionReaderFactory(
 
   private def createPartitionWritableReader[T](
       file: PartitionedFile,
-      bindPartition: HivePartition): Iterator[Writable] = {
+      bindPartition: Option[HivePartition]): Iterator[Writable] = {
     // Obtain binding HivePartition from input partitioned file
-    val partDesc =
-      if (bindPartition != null) {
-        Utilities.getPartitionDesc(bindPartition)
-      } else null
-
-    val ifc =
-      if (partDesc == null) {
-        hiveTable.getInputFormatClass
-          .asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]]
-      } else {
+    val ifc = bindPartition.map(Utilities.getPartitionDesc) match {
+      case Some(partDesc) =>
         partDesc.getInputFileFormatClass
           .asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]]
-      }
+      case None =>
+        hiveTable.getInputFormatClass
+          .asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]]
+    }
 
     val jobConf = new JobConf(broadcastHiveConf.value.value)
 
diff --git 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HivePartitionedReader.scala
 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HivePartitionedReader.scala
index 4c1690524..732643eb1 100644
--- 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HivePartitionedReader.scala
+++ 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HivePartitionedReader.scala
@@ -19,7 +19,6 @@ package org.apache.kyuubi.spark.connector.hive.read
 
 import java.util.Properties
 
-import org.apache.hadoop.hive.ql.exec.Utilities
 import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition}
 import org.apache.hadoop.hive.ql.plan.TableDesc
 import org.apache.hadoop.hive.serde2.Deserializer
@@ -43,30 +42,24 @@ case class HivePartitionedReader(
     tableDesc: TableDesc,
     broadcastHiveConf: Broadcast[SerializableConfiguration],
     nonPartitionReadDataKeys: Seq[Attribute],
-    bindPartition: HivePartition,
+    bindPartitionOpt: Option[HivePartition],
     charset: String = "utf-8") extends PartitionReader[InternalRow] with 
Logging {
 
-  private val partDesc =
-    if (bindPartition != null) {
-      Utilities.getPartitionDesc(bindPartition)
-    } else null
   private val hiveConf = broadcastHiveConf.value.value
 
   private val tableDeser = tableDesc.getDeserializerClass.newInstance()
   tableDeser.initialize(hiveConf, tableDesc.getProperties)
 
-  private val localDeser: Deserializer =
-    if (bindPartition != null &&
-      bindPartition.getDeserializer != null) {
+  private val localDeser: Deserializer = bindPartitionOpt match {
+    case Some(bindPartition) if bindPartition.getDeserializer != null =>
       val tableProperties = tableDesc.getProperties
       val props = new Properties(tableProperties)
       val deserializer =
         
bindPartition.getDeserializer.getClass.asInstanceOf[Class[Deserializer]].newInstance()
       deserializer.initialize(hiveConf, props)
       deserializer
-    } else {
-      tableDeser
-    }
+    case _ => tableDeser
+  }
 
   private val internalRow = new 
SpecificInternalRow(nonPartitionReadDataKeys.map(_.dataType))
 
diff --git 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveScan.scala
 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveScan.scala
index 518a9cb68..0b79d7307 100644
--- 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveScan.scala
+++ 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveScan.scala
@@ -23,9 +23,8 @@ import scala.collection.mutable
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
-import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition}
 import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, 
CatalogTable}
+import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, 
CatalogTable, CatalogTablePartition}
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
 import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
 import org.apache.spark.sql.connector.read.PartitionReaderFactory
@@ -52,7 +51,8 @@ case class HiveScan(
 
   private val isCaseSensitive = 
sparkSession.sessionState.conf.caseSensitiveAnalysis
 
-  private val partFileToHivePartMap: mutable.Map[PartitionedFile, 
HivePartition] = mutable.Map()
+  private val partFileToHivePartMap: mutable.Map[PartitionedFile, 
CatalogTablePartition] =
+    mutable.Map()
 
   override def isSplitable(path: Path): Boolean = {
     catalogTable.provider.map(_.toUpperCase(Locale.ROOT)).exists {
diff --git 
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveQuerySuite.scala
 
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveQuerySuite.scala
index d0b97676b..1d3d5ae10 100644
--- 
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveQuerySuite.scala
+++ 
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveQuerySuite.scala
@@ -22,23 +22,31 @@ import 
org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
 
 class HiveQuerySuite extends KyuubiHiveTest {
 
-  def withTempNonPartitionedTable(spark: SparkSession, table: String)(f: => 
Unit): Unit = {
+  def withTempNonPartitionedTable(
+      spark: SparkSession,
+      table: String,
+      format: String = "PARQUET",
+      hiveTable: Boolean = false)(f: => Unit): Unit = {
     spark.sql(
       s"""
          | CREATE TABLE IF NOT EXISTS
          | $table (id String, date String)
-         | USING PARQUET
+         | ${if (hiveTable) "STORED AS" else "USING"} $format
          |""".stripMargin).collect()
     try f
     finally spark.sql(s"DROP TABLE $table")
   }
 
-  def withTempPartitionedTable(spark: SparkSession, table: String)(f: => 
Unit): Unit = {
+  def withTempPartitionedTable(
+      spark: SparkSession,
+      table: String,
+      format: String = "PARQUET",
+      hiveTable: Boolean = false)(f: => Unit): Unit = {
     spark.sql(
       s"""
          | CREATE TABLE IF NOT EXISTS
          | $table (id String, year String, month string)
-         | USING PARQUET
+         | ${if (hiveTable) "STORED AS" else "USING"} $format
          | PARTITIONED BY (year, month)
          |""".stripMargin).collect()
     try f
@@ -186,4 +194,75 @@ class HiveQuerySuite extends KyuubiHiveTest {
       }
     }
   }
+
+  test("read partitioned avro table") {
+    readPartitionedTable("AVRO", true)
+    readPartitionedTable("AVRO", false)
+  }
+
+  test("read un-partitioned avro table") {
+    readUnPartitionedTable("AVRO", true)
+    readUnPartitionedTable("AVRO", false)
+  }
+
+  test("read partitioned textfile table") {
+    readPartitionedTable("TEXTFILE", true)
+    readPartitionedTable("TEXTFILE", false)
+  }
+
+  test("read un-partitioned textfile table") {
+    readUnPartitionedTable("TEXTFILE", true)
+    readUnPartitionedTable("TEXTFILE", false)
+  }
+
+  test("read partitioned SequenceFile table") {
+    readPartitionedTable("SequenceFile", true)
+    readPartitionedTable("SequenceFile", false)
+  }
+
+  test("read un-partitioned SequenceFile table") {
+    readUnPartitionedTable("SequenceFile", true)
+    readUnPartitionedTable("SequenceFile", false)
+  }
+
+  test("read partitioned ORC table") {
+    readPartitionedTable("ORC", true)
+    readPartitionedTable("ORC", false)
+  }
+
+  test("read un-partitioned ORC table") {
+    readUnPartitionedTable("ORC", true)
+    readUnPartitionedTable("ORC", false)
+  }
+
+  private def readPartitionedTable(format: String, hiveTable: Boolean): Unit = 
{
+    withSparkSession() { spark =>
+      val table = "hive.default.employee"
+      withTempPartitionedTable(spark, table, format, hiveTable) {
+        spark.sql(
+          s"""
+             | INSERT OVERWRITE
+             | $table PARTITION(year = '2023')
+             | VALUES("zhao", "09")
+             |""".stripMargin)
+        checkQueryResult(s"select * from $table", spark, 
Array(Row.apply("zhao", "2023", "09")))
+      }
+    }
+  }
+
+  private def readUnPartitionedTable(format: String, hiveTable: Boolean): Unit 
= {
+    withSparkSession() { spark =>
+      val table = "hive.default.employee"
+      withTempNonPartitionedTable(spark, table, format, hiveTable) {
+        spark.sql(
+          s"""
+             | INSERT OVERWRITE
+             | $table
+             | VALUES("zhao", "2023-09-21")
+             |""".stripMargin).collect()
+
+        checkQueryResult(s"select * from $table", spark, 
Array(Row.apply("zhao", "2023-09-21")))
+      }
+    }
+  }
 }

Reply via email to