This is an automated email from the ASF dual-hosted git repository.

beliefer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new dfc8c742d9 [GLUTEN-11062][CORE] Supports mixed input format for 
partitioned Hive table (#11113)
dfc8c742d9 is described below

commit dfc8c742d9250a3b0c2f9f9e65b3f92f1fbcda53
Author: Jiaan Geng <[email protected]>
AuthorDate: Mon Nov 24 10:27:07 2025 +0800

    [GLUTEN-11062][CORE] Supports mixed input format for partitioned Hive table 
(#11113)
    
    * [Gluten-11062] Supports mixed input format for partitioned Hive table
---
 .../gluten/execution/CHRangeExecTransformer.scala  |  4 ++
 .../gluten/execution/IcebergScanTransformer.scala  | 13 +++--
 .../execution/MicroBatchScanExecTransformer.scala  |  6 +-
 .../gluten/execution/PaimonScanTransformer.scala   | 11 +++-
 .../execution/BasicScanExecTransformer.scala       | 42 ++++++++------
 .../execution/BatchScanExecTransformer.scala       |  5 +-
 .../execution/FileSourceScanExecTransformer.scala  |  3 +
 .../gluten/execution/WholeStageTransformer.scala   |  4 ++
 .../spark/sql/hive/HivePartitionConverter.scala    | 65 ++++++++++++++--------
 .../sql/hive/HiveTableScanExecTransformer.scala    | 38 ++++++++++---
 .../TestFileSourceScanExecTransformer.scala        |  4 ++
 .../hive/execution/GlutenHiveSQLQuerySuite.scala   | 34 ++++++++++-
 .../TestFileSourceScanExecTransformer.scala        |  5 ++
 .../TestFileSourceScanExecTransformer.scala        |  5 ++
 .../TestFileSourceScanExecTransformer.scala        |  5 ++
 .../TestFileSourceScanExecTransformer.scala        |  5 ++
 16 files changed, 190 insertions(+), 59 deletions(-)

diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHRangeExecTransformer.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHRangeExecTransformer.scala
index fb9fc4b4d8..317d9d12c7 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHRangeExecTransformer.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHRangeExecTransformer.scala
@@ -23,6 +23,7 @@ import org.apache.gluten.substrait.`type`._
 import org.apache.gluten.substrait.SubstraitContext
 import org.apache.gluten.substrait.extensions.ExtensionBuilder
 import org.apache.gluten.substrait.rel.{RelBuilder, SplitInfo}
+import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
 
 import org.apache.spark.Partition
 import org.apache.spark.sql.catalyst.expressions.Attribute
@@ -59,6 +60,9 @@ case class CHRangeExecTransformer(
     }
   }
 
+  override def getPartitionWithReadFileFormats: Seq[(Partition, 
ReadFileFormat)] =
+    getPartitions.map((_, ReadFileFormat.UnknownFormat))
+
   @transient
   override lazy val metrics: Map[String, SQLMetric] =
     Map(
diff --git 
a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
 
b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
index f33143feb1..e5b098eaa8 100644
--- 
a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
+++ 
b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
@@ -168,14 +168,19 @@ case class IcebergScanTransformer(
 
   override lazy val fileFormat: ReadFileFormat = 
GlutenIcebergSourceUtil.getFileFormat(scan)
 
-  override def getSplitInfosFromPartitions(partitions: Seq[Partition]): 
Seq[SplitInfo] = {
-    val splitInfos = partitions.map {
+  override def getSplitInfosFromPartitions(
+      partitions: Seq[(Partition, ReadFileFormat)]): Seq[SplitInfo] = {
+    partitions.map { case (partition, _) => partitionToSplitInfo(partition) }
+  }
+
+  private def partitionToSplitInfo(partition: Partition): SplitInfo = {
+    val splitInfo = partition match {
       case p: SparkDataSourceRDDPartition =>
         GlutenIcebergSourceUtil.genSplitInfo(p, getPartitionSchema)
       case _ => throw new GlutenNotSupportException()
     }
-    numSplits.add(splitInfos.map(s => 
s.asInstanceOf[LocalFilesNode].getPaths.size()).sum)
-    splitInfos
+    numSplits.add(splitInfo.asInstanceOf[LocalFilesNode].getPaths.size())
+    splitInfo
   }
 
   override def doCanonicalize(): IcebergScanTransformer = {
diff --git 
a/gluten-kafka/src/main/scala/org/apache/gluten/execution/MicroBatchScanExecTransformer.scala
 
b/gluten-kafka/src/main/scala/org/apache/gluten/execution/MicroBatchScanExecTransformer.scala
index 2ab458f6f7..2f5e277d03 100644
--- 
a/gluten-kafka/src/main/scala/org/apache/gluten/execution/MicroBatchScanExecTransformer.scala
+++ 
b/gluten-kafka/src/main/scala/org/apache/gluten/execution/MicroBatchScanExecTransformer.scala
@@ -73,6 +73,9 @@ case class MicroBatchScanExecTransformer(
     case (inputPartition, index) => new SparkDataSourceRDDPartition(index, 
Seq(inputPartition))
   }
 
+  override def getPartitionWithReadFileFormats: Seq[(Partition, 
ReadFileFormat)] =
+    getPartitions.map((_, fileFormat))
+
   /** Returns the actual schema of this data source scan. */
   override def getDataSchema: StructType = scan.readSchema()
 
@@ -84,7 +87,8 @@ case class MicroBatchScanExecTransformer(
     MicroBatchScanExecTransformer.supportsBatchScan(scan)
   }
 
-  override def getSplitInfosFromPartitions(partitions: Seq[Partition]): 
Seq[SplitInfo] = {
+  override def getSplitInfosFromPartitions(
+      partitions: Seq[(Partition, ReadFileFormat)]): Seq[SplitInfo] = {
     val groupedPartitions = filteredPartitions.flatten
     groupedPartitions.zipWithIndex.map {
       case (p, _) => GlutenStreamKafkaSourceUtil.genSplitInfo(p)
diff --git 
a/gluten-paimon/src-paimon/main/scala/org/apache/gluten/execution/PaimonScanTransformer.scala
 
b/gluten-paimon/src-paimon/main/scala/org/apache/gluten/execution/PaimonScanTransformer.scala
index 0141bf12ce..5dbce795a9 100644
--- 
a/gluten-paimon/src-paimon/main/scala/org/apache/gluten/execution/PaimonScanTransformer.scala
+++ 
b/gluten-paimon/src-paimon/main/scala/org/apache/gluten/execution/PaimonScanTransformer.scala
@@ -120,9 +120,16 @@ case class PaimonScanTransformer(
 
   override def doExecuteColumnar(): RDD[ColumnarBatch] = throw new 
UnsupportedOperationException()
 
-  override def getSplitInfosFromPartitions(partitions: Seq[Partition]): 
Seq[SplitInfo] = {
+  override def getSplitInfosFromPartitions(
+      partitions: Seq[(Partition, ReadFileFormat)]): Seq[SplitInfo] = {
     val partitionComputer = 
PaimonScanTransformer.getRowDataPartitionComputer(scan)
-    partitions.map {
+    partitions.map { case (partition, _) => partitionToSplitInfo(partition, 
partitionComputer) }
+  }
+
+  private def partitionToSplitInfo(
+      partition: Partition,
+      partitionComputer: InternalRowPartitionComputer): SplitInfo = {
+    partition match {
       case p: SparkDataSourceRDDPartition =>
         val paths = mutable.ListBuffer.empty[String]
         val starts = mutable.ListBuffer.empty[JLong]
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
index 86b76e7a25..cb6f54dea7 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
@@ -57,26 +57,32 @@ trait BasicScanExecTransformer extends LeafTransformSupport 
with BaseDataSource
   def getProperties: Map[String, String] = Map.empty
 
   override def getSplitInfos: Seq[SplitInfo] = {
-    getSplitInfosFromPartitions(getPartitions)
+    getSplitInfosFromPartitions(getPartitionWithReadFileFormats)
   }
 
-  def getSplitInfosFromPartitions(partitions: Seq[Partition]): Seq[SplitInfo] 
= {
-    partitions.map(
-      p => {
-        val ps = p match {
-          case sp: SparkDataSourceRDDPartition => 
sp.inputPartitions.map(_.asInstanceOf[Partition])
-          case o => Seq(o)
-        }
-        BackendsApiManager.getIteratorApiInstance
-          .genSplitInfo(
-            p.index,
-            ps,
-            getPartitionSchema,
-            getDataSchema,
-            fileFormat,
-            getMetadataColumns().map(_.name),
-            getProperties)
-      })
+  def getSplitInfosFromPartitions(partitions: Seq[(Partition, 
ReadFileFormat)]): Seq[SplitInfo] = {
+    partitions.map {
+      case (partition, readFileFormat) => partitionToSplitInfo(partition, 
readFileFormat)
+    }
+  }
+
+  private def partitionToSplitInfo(
+      partition: Partition,
+      readFileFormat: ReadFileFormat): SplitInfo = {
+    val part = partition match {
+      case sp: SparkDataSourceRDDPartition => 
sp.inputPartitions.map(_.asInstanceOf[Partition])
+      case _ => Seq(partition)
+    }
+
+    BackendsApiManager.getIteratorApiInstance
+      .genSplitInfo(
+        partition.index,
+        part,
+        getPartitionSchema,
+        getDataSchema,
+        readFileFormat,
+        getMetadataColumns().map(_.name),
+        getProperties)
   }
 
   override protected def doValidateInternal(): ValidationResult = {
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
index 0b0dc7c52f..1d0a38e887 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
@@ -128,7 +128,10 @@ abstract class BatchScanExecTransformerBase(
 
   override def getMetadataColumns(): Seq[AttributeReference] = Seq.empty
 
-  def getPartitions: Seq[Partition] = finalPartitions
+  override def getPartitions: Seq[Partition] = finalPartitions
+
+  override def getPartitionWithReadFileFormats: Seq[(Partition, 
ReadFileFormat)] =
+    finalPartitions.map((_, fileFormat))
 
   override def getPartitionSchema: StructType = scan match {
     case fileScan: FileScan => fileScan.readPartitionSchema
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
index 0584922bca..af77fa24b3 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
@@ -138,6 +138,9 @@ abstract class FileSourceScanExecTransformerBase(
       )
   }
 
+  override def getPartitionWithReadFileFormats: Seq[(Partition, 
ReadFileFormat)] =
+    getPartitions.map((_, fileFormat))
+
   override def getPartitionSchema: StructType = relation.partitionSchema
 
   override def getDataSchema: StructType = relation.dataSchema
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
index dc2d3897c6..859b07af69 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
@@ -25,6 +25,7 @@ import org.apache.gluten.substrait.`type`.{TypeBuilder, 
TypeNode}
 import org.apache.gluten.substrait.SubstraitContext
 import org.apache.gluten.substrait.plan.{PlanBuilder, PlanNode}
 import org.apache.gluten.substrait.rel.{LocalFilesNode, RelNode, SplitInfo}
+import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
 import org.apache.gluten.utils.SubstraitPlanPrinterUtil
 
 import org.apache.spark._
@@ -140,6 +141,9 @@ trait LeafTransformSupport extends TransformSupport with 
LeafExecNode {
 
   /** Returns the partitions generated by this data source scan. */
   def getPartitions: Seq[Partition]
+
+  /** Returns the partitions generated by this data source scan and tied with 
ReadFileFormat. */
+  def getPartitionWithReadFileFormats: Seq[(Partition, ReadFileFormat)]
 }
 
 trait UnaryTransformSupport extends TransformSupport with UnaryExecNode {
diff --git 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HivePartitionConverter.scala
 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HivePartitionConverter.scala
index b35f8f9d93..0f7514d44f 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HivePartitionConverter.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HivePartitionConverter.scala
@@ -18,12 +18,13 @@ package org.apache.spark.sql.hive
 
 import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.sql.shims.SparkShimLoader
+import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
 
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper}
 import org.apache.spark.sql.catalyst.analysis.CastSupport
 import org.apache.spark.sql.catalyst.expressions.Literal
-import org.apache.spark.sql.execution.datasources.{FilePartition, 
PartitionDirectory}
+import org.apache.spark.sql.execution.datasources.{FilePartition, 
PartitionDirectory, PartitionedFile}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.DataType
 
@@ -130,35 +131,51 @@ class HivePartitionConverter(hadoopConf: Configuration, 
session: SparkSession)
     }
   }
 
-  private def createFilePartition(
-      selectedPartitions: Seq[PartitionDirectory]): Seq[FilePartition] = {
+  private def getSplitFile(
+      partitionDirectory: PartitionDirectory,
+      maxSplitBytes: Long): Seq[PartitionedFile] =
+    SparkShimLoader.getSparkShims
+      .getFileStatus(partitionDirectory)
+      .flatMap {
+        f =>
+          SparkShimLoader.getSparkShims.splitFiles(
+            session,
+            f._1,
+            f._1.getPath,
+            isSplitable = canBeSplit(f._1.getPath),
+            maxSplitBytes,
+            partitionDirectory.values
+          )
+      }
+      .sortBy(_.length)(implicitly[Ordering[Long]].reverse)
+
+  def createFilePartition(tableLocation: URI): Seq[FilePartition] = {
+    val selectedPartitions = listFiles(Seq((tableLocation, InternalRow.empty)))
     val maxSplitBytes = FilePartition.maxSplitBytes(session, 
selectedPartitions)
-    val splitFiles = selectedPartitions.flatMap {
-      partition =>
-        SparkShimLoader.getSparkShims
-          .getFileStatus(partition)
-          .flatMap {
-            f =>
-              SparkShimLoader.getSparkShims.splitFiles(
-                session,
-                f._1,
-                f._1.getPath,
-                isSplitable = canBeSplit(f._1.getPath),
-                maxSplitBytes,
-                partition.values
-              )
-          }
-          .sortBy(_.length)(implicitly[Ordering[Long]].reverse)
-    }
+    val splitFiles = selectedPartitions.flatMap(getSplitFile(_, maxSplitBytes))
     FilePartition.getFilePartitions(session, splitFiles, maxSplitBytes)
   }
 
   def createFilePartition(
       prunedPartitions: Seq[HivePartition],
-      partitionColTypes: Seq[DataType]): Seq[FilePartition] = {
-    createFilePartition(listFiles(prunedPartitions, partitionColTypes))
+      partitionColTypes: Seq[DataType],
+      readFileFormats: Seq[ReadFileFormat]): Seq[(FilePartition, 
ReadFileFormat)] = {
+    val selectedPartitions = listFiles(prunedPartitions, partitionColTypes)
+    val maxSplitBytes = FilePartition.maxSplitBytes(session, 
selectedPartitions)
+    selectedPartitions.zip(readFileFormats).flatMap {
+      case (partitionDirectory, readFileFormat) =>
+        val splitFiles = getSplitFile(partitionDirectory, maxSplitBytes)
+        val filePartitions = FilePartition.getFilePartitions(session, 
splitFiles, maxSplitBytes)
+        filePartitions.map((_, readFileFormat))
+    }
   }
 
-  def createFilePartition(tableLocation: URI): Seq[FilePartition] =
-    createFilePartition(listFiles(Seq((tableLocation, InternalRow.empty))))
+  def createFilePartition(
+      prunedPartitions: Seq[HivePartition],
+      partitionColTypes: Seq[DataType]): Seq[FilePartition] = {
+    val selectedPartitions = listFiles(prunedPartitions, partitionColTypes)
+    val maxSplitBytes = FilePartition.maxSplitBytes(session, 
selectedPartitions)
+    val splitFiles = selectedPartitions.flatMap(getSplitFile(_, maxSplitBytes))
+    FilePartition.getFilePartitions(session, splitFiles, maxSplitBytes)
+  }
 }
diff --git 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
index 585c2a85d4..6d0b16fabb 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
@@ -23,7 +23,7 @@ import 
org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
 
 import org.apache.spark.Partition
 import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
+import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, 
HiveTableRelation}
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, AttributeSeq, Expression}
 import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.execution.SparkPlan
@@ -69,6 +69,9 @@ case class HiveTableScanExecTransformer(
 
   override def getPartitions: Seq[Partition] = partitions
 
+  override def getPartitionWithReadFileFormats: Seq[(Partition, 
ReadFileFormat)] =
+    partitionWithReadFileFormats
+
   override def getPartitionSchema: StructType = 
relation.tableMeta.partitionSchema
 
   override def getDataSchema: StructType = relation.tableMeta.dataSchema
@@ -82,23 +85,42 @@ case class HiveTableScanExecTransformer(
   @transient private lazy val hivePartitionConverter =
     new HivePartitionConverter(session.sessionState.newHadoopConf(), session)
 
-  @transient private lazy val partitions: Seq[Partition] =
+  @transient private lazy val existsMixedInputFormat: Boolean =
+    prunedPartitions.exists(_.getInputFormatClass != 
tableDesc.getInputFileFormatClass)
+
+  @transient private lazy val partitionWithReadFileFormats: Seq[(Partition, 
ReadFileFormat)] =
     if (!relation.isPartitioned) {
       val tableLocation: URI = 
relation.tableMeta.storage.locationUri.getOrElse {
         throw new UnsupportedOperationException("Table path not set.")
       }
-      hivePartitionConverter.createFilePartition(tableLocation)
-    } else {
+
+      hivePartitionConverter.createFilePartition(tableLocation).map((_, 
fileFormat))
+    } else if (existsMixedInputFormat) {
+      val readFileFormats = prunedPartitions.map {
+        partition => 
getReadFileFormat(HiveClientImpl.fromHivePartition(partition).storage)
+      }
+
       hivePartitionConverter.createFilePartition(
         prunedPartitions,
-        relation.partitionCols.map(_.dataType))
+        relation.partitionCols.map(_.dataType),
+        readFileFormats)
+    } else {
+      val filePartitions = hivePartitionConverter
+        .createFilePartition(prunedPartitions, 
relation.partitionCols.map(_.dataType))
+
+      filePartitions.map((_, fileFormat))
     }
 
-  @transient override lazy val fileFormat: ReadFileFormat = {
-    relation.tableMeta.storage.inputFormat match {
+  @transient private lazy val partitions: Seq[Partition] = 
partitionWithReadFileFormats.unzip._1
+
+  @transient override lazy val fileFormat: ReadFileFormat =
+    getReadFileFormat(relation.tableMeta.storage)
+
+  private def getReadFileFormat(storage: CatalogStorageFormat): ReadFileFormat 
= {
+    storage.inputFormat match {
       case Some(inputFormat)
           if 
TEXT_INPUT_FORMAT_CLASS.isAssignableFrom(Utils.classForName(inputFormat)) =>
-        relation.tableMeta.storage.serde match {
+        storage.serde match {
           case Some("org.openx.data.jsonserde.JsonSerDe") | Some(
                 "org.apache.hive.hcatalog.data.JsonSerDe") =>
             ReadFileFormat.JsonReadFormat
diff --git 
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
 
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
index 339bda2a03..884e62d5b9 100644
--- 
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
+++ 
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
@@ -18,6 +18,7 @@ package org.apache.spark.sql.extension
 
 import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.execution.FileSourceScanExecTransformerBase
+import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
 
 import org.apache.spark.Partition
 import org.apache.spark.sql.catalyst.TableIdentifier
@@ -61,6 +62,9 @@ case class TestFileSourceScanExecTransformer(
         optionalNumCoalescedBuckets,
         disableBucketedScan)
 
+  override def getPartitionWithReadFileFormats: Seq[(Partition, 
ReadFileFormat)] =
+    getPartitions.map((_, fileFormat))
+
   override val nodeNamePrefix: String = "TestFile"
 
   override def doCanonicalize(): TestFileSourceScanExecTransformer = {
diff --git 
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala
 
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala
index 6a10b0781e..f3a9d85903 100644
--- 
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala
+++ 
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala
@@ -19,7 +19,8 @@ package org.apache.spark.sql.hive.execution
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.hive.HiveTableScanExecTransformer
+import org.apache.spark.sql.hive.{HiveExternalCatalog, 
HiveTableScanExecTransformer}
+import org.apache.spark.sql.hive.client.HiveClient
 
 class GlutenHiveSQLQuerySuite extends GlutenHiveSQLQuerySuiteBase {
 
@@ -69,4 +70,35 @@ class GlutenHiveSQLQuerySuite extends 
GlutenHiveSQLQuerySuiteBase {
       purge = false)
   }
 
+  test("GLUTEN-11062: Supports mixed input format for partitioned Hive table") 
{
+    val hiveClient: HiveClient =
+      
spark.sharedState.externalCatalog.unwrapped.asInstanceOf[HiveExternalCatalog].client
+
+    withSQLConf("spark.sql.hive.convertMetastoreParquet" -> "false") {
+      withTempDir {
+        dir =>
+          val parquetLoc = s"file:///$dir/test_parquet"
+          val orcLoc = s"file:///$dir/test_orc"
+          withTable("test_parquet", "test_orc") {
+            hiveClient.runSqlHive(s"""create table test_parquet(id int)
+                 partitioned by(pid int)
+                 stored as parquet location '$parquetLoc'
+                 """.stripMargin)
+            hiveClient.runSqlHive("insert into test_parquet partition(pid=1) 
select 2")
+            hiveClient.runSqlHive(s"""create table test_orc(id int)
+                 partitioned by(pid int)
+                 stored as orc location '$orcLoc'
+                 """.stripMargin)
+            hiveClient.runSqlHive("insert into test_orc partition(pid=2) 
select 2")
+            hiveClient.runSqlHive(
+              s"alter table test_parquet add partition (pid=2) location 
'$orcLoc/pid=2'")
+            hiveClient.runSqlHive("alter table test_parquet partition(pid=2) 
SET FILEFORMAT orc")
+            val df = sql("select pid, id from test_parquet order by pid")
+            checkAnswer(df, Seq(Row(1, 2), Row(2, 2)))
+            checkOperatorMatch[HiveTableScanExecTransformer](df)
+          }
+      }
+    }
+  }
+
 }
diff --git 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
index 6846f140ed..9f5d47b60b 100644
--- 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
+++ 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
@@ -18,6 +18,7 @@ package org.apache.spark.sql.extension
 
 import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.execution.FileSourceScanExecTransformerBase
+import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
 
 import org.apache.spark.Partition
 import org.apache.spark.sql.catalyst.TableIdentifier
@@ -47,6 +48,7 @@ case class TestFileSourceScanExecTransformer(
     dataFilters,
     tableIdentifier,
     disableBucketedScan) {
+
   override def getPartitions: Seq[Partition] =
     BackendsApiManager.getTransformerApiInstance
       .genPartitionSeq(
@@ -59,5 +61,8 @@ case class TestFileSourceScanExecTransformer(
         optionalNumCoalescedBuckets,
         disableBucketedScan)
 
+  override def getPartitionWithReadFileFormats: Seq[(Partition, 
ReadFileFormat)] =
+    getPartitions.map((_, fileFormat))
+
   override val nodeNamePrefix: String = "TestFile"
 }
diff --git 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
index 6846f140ed..9f5d47b60b 100644
--- 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
+++ 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
@@ -18,6 +18,7 @@ package org.apache.spark.sql.extension
 
 import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.execution.FileSourceScanExecTransformerBase
+import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
 
 import org.apache.spark.Partition
 import org.apache.spark.sql.catalyst.TableIdentifier
@@ -47,6 +48,7 @@ case class TestFileSourceScanExecTransformer(
     dataFilters,
     tableIdentifier,
     disableBucketedScan) {
+
   override def getPartitions: Seq[Partition] =
     BackendsApiManager.getTransformerApiInstance
       .genPartitionSeq(
@@ -59,5 +61,8 @@ case class TestFileSourceScanExecTransformer(
         optionalNumCoalescedBuckets,
         disableBucketedScan)
 
+  override def getPartitionWithReadFileFormats: Seq[(Partition, 
ReadFileFormat)] =
+    getPartitions.map((_, fileFormat))
+
   override val nodeNamePrefix: String = "TestFile"
 }
diff --git 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
index 6846f140ed..9f5d47b60b 100644
--- 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
+++ 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
@@ -18,6 +18,7 @@ package org.apache.spark.sql.extension
 
 import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.execution.FileSourceScanExecTransformerBase
+import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
 
 import org.apache.spark.Partition
 import org.apache.spark.sql.catalyst.TableIdentifier
@@ -47,6 +48,7 @@ case class TestFileSourceScanExecTransformer(
     dataFilters,
     tableIdentifier,
     disableBucketedScan) {
+
   override def getPartitions: Seq[Partition] =
     BackendsApiManager.getTransformerApiInstance
       .genPartitionSeq(
@@ -59,5 +61,8 @@ case class TestFileSourceScanExecTransformer(
         optionalNumCoalescedBuckets,
         disableBucketedScan)
 
+  override def getPartitionWithReadFileFormats: Seq[(Partition, 
ReadFileFormat)] =
+    getPartitions.map((_, fileFormat))
+
   override val nodeNamePrefix: String = "TestFile"
 }
diff --git 
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
 
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
index 22e9929559..4ac1348e1d 100644
--- 
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
+++ 
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
@@ -18,6 +18,7 @@ package org.apache.spark.sql.extension
 
 import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.execution.FileSourceScanExecTransformerBase
+import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
 
 import org.apache.spark.Partition
 import org.apache.spark.sql.catalyst.TableIdentifier
@@ -47,6 +48,7 @@ case class TestFileSourceScanExecTransformer(
     dataFilters,
     tableIdentifier,
     disableBucketedScan) {
+
   override def getPartitions: Seq[Partition] =
     BackendsApiManager.getTransformerApiInstance.genPartitionSeq(
       relation,
@@ -58,5 +60,8 @@ case class TestFileSourceScanExecTransformer(
       optionalNumCoalescedBuckets,
       disableBucketedScan)
 
+  override def getPartitionWithReadFileFormats: Seq[(Partition, 
ReadFileFormat)] =
+    getPartitions.map((_, fileFormat))
+
   override val nodeNamePrefix: String = "TestFile"
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to