This is an automated email from the ASF dual-hosted git repository.
beliefer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new dfc8c742d9 [GLUTEN-11062][CORE] Supports mixed input format for
partitioned Hive table (#11113)
dfc8c742d9 is described below
commit dfc8c742d9250a3b0c2f9f9e65b3f92f1fbcda53
Author: Jiaan Geng <[email protected]>
AuthorDate: Mon Nov 24 10:27:07 2025 +0800
[GLUTEN-11062][CORE] Supports mixed input format for partitioned Hive table
(#11113)
* [Gluten-11062] Supports mixed input format for partitioned Hive table
---
.../gluten/execution/CHRangeExecTransformer.scala | 4 ++
.../gluten/execution/IcebergScanTransformer.scala | 13 +++--
.../execution/MicroBatchScanExecTransformer.scala | 6 +-
.../gluten/execution/PaimonScanTransformer.scala | 11 +++-
.../execution/BasicScanExecTransformer.scala | 42 ++++++++------
.../execution/BatchScanExecTransformer.scala | 5 +-
.../execution/FileSourceScanExecTransformer.scala | 3 +
.../gluten/execution/WholeStageTransformer.scala | 4 ++
.../spark/sql/hive/HivePartitionConverter.scala | 65 ++++++++++++++--------
.../sql/hive/HiveTableScanExecTransformer.scala | 38 ++++++++++---
.../TestFileSourceScanExecTransformer.scala | 4 ++
.../hive/execution/GlutenHiveSQLQuerySuite.scala | 34 ++++++++++-
.../TestFileSourceScanExecTransformer.scala | 5 ++
.../TestFileSourceScanExecTransformer.scala | 5 ++
.../TestFileSourceScanExecTransformer.scala | 5 ++
.../TestFileSourceScanExecTransformer.scala | 5 ++
16 files changed, 190 insertions(+), 59 deletions(-)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHRangeExecTransformer.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHRangeExecTransformer.scala
index fb9fc4b4d8..317d9d12c7 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHRangeExecTransformer.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHRangeExecTransformer.scala
@@ -23,6 +23,7 @@ import org.apache.gluten.substrait.`type`._
import org.apache.gluten.substrait.SubstraitContext
import org.apache.gluten.substrait.extensions.ExtensionBuilder
import org.apache.gluten.substrait.rel.{RelBuilder, SplitInfo}
+import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
import org.apache.spark.Partition
import org.apache.spark.sql.catalyst.expressions.Attribute
@@ -59,6 +60,9 @@ case class CHRangeExecTransformer(
}
}
+ override def getPartitionWithReadFileFormats: Seq[(Partition,
ReadFileFormat)] =
+ getPartitions.map((_, ReadFileFormat.UnknownFormat))
+
@transient
override lazy val metrics: Map[String, SQLMetric] =
Map(
diff --git
a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
index f33143feb1..e5b098eaa8 100644
---
a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
+++
b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
@@ -168,14 +168,19 @@ case class IcebergScanTransformer(
override lazy val fileFormat: ReadFileFormat =
GlutenIcebergSourceUtil.getFileFormat(scan)
- override def getSplitInfosFromPartitions(partitions: Seq[Partition]):
Seq[SplitInfo] = {
- val splitInfos = partitions.map {
+ override def getSplitInfosFromPartitions(
+ partitions: Seq[(Partition, ReadFileFormat)]): Seq[SplitInfo] = {
+ partitions.map { case (partition, _) => partitionToSplitInfo(partition) }
+ }
+
+ private def partitionToSplitInfo(partition: Partition): SplitInfo = {
+ val splitInfo = partition match {
case p: SparkDataSourceRDDPartition =>
GlutenIcebergSourceUtil.genSplitInfo(p, getPartitionSchema)
case _ => throw new GlutenNotSupportException()
}
- numSplits.add(splitInfos.map(s =>
s.asInstanceOf[LocalFilesNode].getPaths.size()).sum)
- splitInfos
+ numSplits.add(splitInfo.asInstanceOf[LocalFilesNode].getPaths.size())
+ splitInfo
}
override def doCanonicalize(): IcebergScanTransformer = {
diff --git
a/gluten-kafka/src/main/scala/org/apache/gluten/execution/MicroBatchScanExecTransformer.scala
b/gluten-kafka/src/main/scala/org/apache/gluten/execution/MicroBatchScanExecTransformer.scala
index 2ab458f6f7..2f5e277d03 100644
---
a/gluten-kafka/src/main/scala/org/apache/gluten/execution/MicroBatchScanExecTransformer.scala
+++
b/gluten-kafka/src/main/scala/org/apache/gluten/execution/MicroBatchScanExecTransformer.scala
@@ -73,6 +73,9 @@ case class MicroBatchScanExecTransformer(
case (inputPartition, index) => new SparkDataSourceRDDPartition(index,
Seq(inputPartition))
}
+ override def getPartitionWithReadFileFormats: Seq[(Partition,
ReadFileFormat)] =
+ getPartitions.map((_, fileFormat))
+
/** Returns the actual schema of this data source scan. */
override def getDataSchema: StructType = scan.readSchema()
@@ -84,7 +87,8 @@ case class MicroBatchScanExecTransformer(
MicroBatchScanExecTransformer.supportsBatchScan(scan)
}
- override def getSplitInfosFromPartitions(partitions: Seq[Partition]):
Seq[SplitInfo] = {
+ override def getSplitInfosFromPartitions(
+ partitions: Seq[(Partition, ReadFileFormat)]): Seq[SplitInfo] = {
val groupedPartitions = filteredPartitions.flatten
groupedPartitions.zipWithIndex.map {
case (p, _) => GlutenStreamKafkaSourceUtil.genSplitInfo(p)
diff --git
a/gluten-paimon/src-paimon/main/scala/org/apache/gluten/execution/PaimonScanTransformer.scala
b/gluten-paimon/src-paimon/main/scala/org/apache/gluten/execution/PaimonScanTransformer.scala
index 0141bf12ce..5dbce795a9 100644
---
a/gluten-paimon/src-paimon/main/scala/org/apache/gluten/execution/PaimonScanTransformer.scala
+++
b/gluten-paimon/src-paimon/main/scala/org/apache/gluten/execution/PaimonScanTransformer.scala
@@ -120,9 +120,16 @@ case class PaimonScanTransformer(
override def doExecuteColumnar(): RDD[ColumnarBatch] = throw new
UnsupportedOperationException()
- override def getSplitInfosFromPartitions(partitions: Seq[Partition]):
Seq[SplitInfo] = {
+ override def getSplitInfosFromPartitions(
+ partitions: Seq[(Partition, ReadFileFormat)]): Seq[SplitInfo] = {
val partitionComputer =
PaimonScanTransformer.getRowDataPartitionComputer(scan)
- partitions.map {
+ partitions.map { case (partition, _) => partitionToSplitInfo(partition,
partitionComputer) }
+ }
+
+ private def partitionToSplitInfo(
+ partition: Partition,
+ partitionComputer: InternalRowPartitionComputer): SplitInfo = {
+ partition match {
case p: SparkDataSourceRDDPartition =>
val paths = mutable.ListBuffer.empty[String]
val starts = mutable.ListBuffer.empty[JLong]
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
index 86b76e7a25..cb6f54dea7 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
@@ -57,26 +57,32 @@ trait BasicScanExecTransformer extends LeafTransformSupport
with BaseDataSource
def getProperties: Map[String, String] = Map.empty
override def getSplitInfos: Seq[SplitInfo] = {
- getSplitInfosFromPartitions(getPartitions)
+ getSplitInfosFromPartitions(getPartitionWithReadFileFormats)
}
- def getSplitInfosFromPartitions(partitions: Seq[Partition]): Seq[SplitInfo]
= {
- partitions.map(
- p => {
- val ps = p match {
- case sp: SparkDataSourceRDDPartition =>
sp.inputPartitions.map(_.asInstanceOf[Partition])
- case o => Seq(o)
- }
- BackendsApiManager.getIteratorApiInstance
- .genSplitInfo(
- p.index,
- ps,
- getPartitionSchema,
- getDataSchema,
- fileFormat,
- getMetadataColumns().map(_.name),
- getProperties)
- })
+ def getSplitInfosFromPartitions(partitions: Seq[(Partition,
ReadFileFormat)]): Seq[SplitInfo] = {
+ partitions.map {
+ case (partition, readFileFormat) => partitionToSplitInfo(partition,
readFileFormat)
+ }
+ }
+
+ private def partitionToSplitInfo(
+ partition: Partition,
+ readFileFormat: ReadFileFormat): SplitInfo = {
+ val part = partition match {
+ case sp: SparkDataSourceRDDPartition =>
sp.inputPartitions.map(_.asInstanceOf[Partition])
+ case _ => Seq(partition)
+ }
+
+ BackendsApiManager.getIteratorApiInstance
+ .genSplitInfo(
+ partition.index,
+ part,
+ getPartitionSchema,
+ getDataSchema,
+ readFileFormat,
+ getMetadataColumns().map(_.name),
+ getProperties)
}
override protected def doValidateInternal(): ValidationResult = {
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
index 0b0dc7c52f..1d0a38e887 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
@@ -128,7 +128,10 @@ abstract class BatchScanExecTransformerBase(
override def getMetadataColumns(): Seq[AttributeReference] = Seq.empty
- def getPartitions: Seq[Partition] = finalPartitions
+ override def getPartitions: Seq[Partition] = finalPartitions
+
+ override def getPartitionWithReadFileFormats: Seq[(Partition,
ReadFileFormat)] =
+ finalPartitions.map((_, fileFormat))
override def getPartitionSchema: StructType = scan match {
case fileScan: FileScan => fileScan.readPartitionSchema
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
index 0584922bca..af77fa24b3 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
@@ -138,6 +138,9 @@ abstract class FileSourceScanExecTransformerBase(
)
}
+ override def getPartitionWithReadFileFormats: Seq[(Partition,
ReadFileFormat)] =
+ getPartitions.map((_, fileFormat))
+
override def getPartitionSchema: StructType = relation.partitionSchema
override def getDataSchema: StructType = relation.dataSchema
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
index dc2d3897c6..859b07af69 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
@@ -25,6 +25,7 @@ import org.apache.gluten.substrait.`type`.{TypeBuilder,
TypeNode}
import org.apache.gluten.substrait.SubstraitContext
import org.apache.gluten.substrait.plan.{PlanBuilder, PlanNode}
import org.apache.gluten.substrait.rel.{LocalFilesNode, RelNode, SplitInfo}
+import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
import org.apache.gluten.utils.SubstraitPlanPrinterUtil
import org.apache.spark._
@@ -140,6 +141,9 @@ trait LeafTransformSupport extends TransformSupport with
LeafExecNode {
/** Returns the partitions generated by this data source scan. */
def getPartitions: Seq[Partition]
+
+ /** Returns the partitions generated by this data source scan and tied with
ReadFileFormat. */
+ def getPartitionWithReadFileFormats: Seq[(Partition, ReadFileFormat)]
}
trait UnaryTransformSupport extends TransformSupport with UnaryExecNode {
diff --git
a/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HivePartitionConverter.scala
b/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HivePartitionConverter.scala
index b35f8f9d93..0f7514d44f 100644
---
a/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HivePartitionConverter.scala
+++
b/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HivePartitionConverter.scala
@@ -18,12 +18,13 @@ package org.apache.spark.sql.hive
import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.sql.shims.SparkShimLoader
+import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper}
import org.apache.spark.sql.catalyst.analysis.CastSupport
import org.apache.spark.sql.catalyst.expressions.Literal
-import org.apache.spark.sql.execution.datasources.{FilePartition,
PartitionDirectory}
+import org.apache.spark.sql.execution.datasources.{FilePartition,
PartitionDirectory, PartitionedFile}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.DataType
@@ -130,35 +131,51 @@ class HivePartitionConverter(hadoopConf: Configuration,
session: SparkSession)
}
}
- private def createFilePartition(
- selectedPartitions: Seq[PartitionDirectory]): Seq[FilePartition] = {
+ private def getSplitFile(
+ partitionDirectory: PartitionDirectory,
+ maxSplitBytes: Long): Seq[PartitionedFile] =
+ SparkShimLoader.getSparkShims
+ .getFileStatus(partitionDirectory)
+ .flatMap {
+ f =>
+ SparkShimLoader.getSparkShims.splitFiles(
+ session,
+ f._1,
+ f._1.getPath,
+ isSplitable = canBeSplit(f._1.getPath),
+ maxSplitBytes,
+ partitionDirectory.values
+ )
+ }
+ .sortBy(_.length)(implicitly[Ordering[Long]].reverse)
+
+ def createFilePartition(tableLocation: URI): Seq[FilePartition] = {
+ val selectedPartitions = listFiles(Seq((tableLocation, InternalRow.empty)))
val maxSplitBytes = FilePartition.maxSplitBytes(session,
selectedPartitions)
- val splitFiles = selectedPartitions.flatMap {
- partition =>
- SparkShimLoader.getSparkShims
- .getFileStatus(partition)
- .flatMap {
- f =>
- SparkShimLoader.getSparkShims.splitFiles(
- session,
- f._1,
- f._1.getPath,
- isSplitable = canBeSplit(f._1.getPath),
- maxSplitBytes,
- partition.values
- )
- }
- .sortBy(_.length)(implicitly[Ordering[Long]].reverse)
- }
+ val splitFiles = selectedPartitions.flatMap(getSplitFile(_, maxSplitBytes))
FilePartition.getFilePartitions(session, splitFiles, maxSplitBytes)
}
def createFilePartition(
prunedPartitions: Seq[HivePartition],
- partitionColTypes: Seq[DataType]): Seq[FilePartition] = {
- createFilePartition(listFiles(prunedPartitions, partitionColTypes))
+ partitionColTypes: Seq[DataType],
+ readFileFormats: Seq[ReadFileFormat]): Seq[(FilePartition,
ReadFileFormat)] = {
+ val selectedPartitions = listFiles(prunedPartitions, partitionColTypes)
+ val maxSplitBytes = FilePartition.maxSplitBytes(session,
selectedPartitions)
+ selectedPartitions.zip(readFileFormats).flatMap {
+ case (partitionDirectory, readFileFormat) =>
+ val splitFiles = getSplitFile(partitionDirectory, maxSplitBytes)
+ val filePartitions = FilePartition.getFilePartitions(session,
splitFiles, maxSplitBytes)
+ filePartitions.map((_, readFileFormat))
+ }
}
- def createFilePartition(tableLocation: URI): Seq[FilePartition] =
- createFilePartition(listFiles(Seq((tableLocation, InternalRow.empty))))
+ def createFilePartition(
+ prunedPartitions: Seq[HivePartition],
+ partitionColTypes: Seq[DataType]): Seq[FilePartition] = {
+ val selectedPartitions = listFiles(prunedPartitions, partitionColTypes)
+ val maxSplitBytes = FilePartition.maxSplitBytes(session,
selectedPartitions)
+ val splitFiles = selectedPartitions.flatMap(getSplitFile(_, maxSplitBytes))
+ FilePartition.getFilePartitions(session, splitFiles, maxSplitBytes)
+ }
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
index 585c2a85d4..6d0b16fabb 100644
---
a/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
+++
b/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
@@ -23,7 +23,7 @@ import
org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
import org.apache.spark.Partition
import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
+import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat,
HiveTableRelation}
import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference, AttributeSeq, Expression}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.execution.SparkPlan
@@ -69,6 +69,9 @@ case class HiveTableScanExecTransformer(
override def getPartitions: Seq[Partition] = partitions
+ override def getPartitionWithReadFileFormats: Seq[(Partition,
ReadFileFormat)] =
+ partitionWithReadFileFormats
+
override def getPartitionSchema: StructType =
relation.tableMeta.partitionSchema
override def getDataSchema: StructType = relation.tableMeta.dataSchema
@@ -82,23 +85,42 @@ case class HiveTableScanExecTransformer(
@transient private lazy val hivePartitionConverter =
new HivePartitionConverter(session.sessionState.newHadoopConf(), session)
- @transient private lazy val partitions: Seq[Partition] =
+ @transient private lazy val existsMixedInputFormat: Boolean =
+ prunedPartitions.exists(_.getInputFormatClass !=
tableDesc.getInputFileFormatClass)
+
+ @transient private lazy val partitionWithReadFileFormats: Seq[(Partition,
ReadFileFormat)] =
if (!relation.isPartitioned) {
val tableLocation: URI =
relation.tableMeta.storage.locationUri.getOrElse {
throw new UnsupportedOperationException("Table path not set.")
}
- hivePartitionConverter.createFilePartition(tableLocation)
- } else {
+
+ hivePartitionConverter.createFilePartition(tableLocation).map((_,
fileFormat))
+ } else if (existsMixedInputFormat) {
+ val readFileFormats = prunedPartitions.map {
+ partition =>
getReadFileFormat(HiveClientImpl.fromHivePartition(partition).storage)
+ }
+
hivePartitionConverter.createFilePartition(
prunedPartitions,
- relation.partitionCols.map(_.dataType))
+ relation.partitionCols.map(_.dataType),
+ readFileFormats)
+ } else {
+ val filePartitions = hivePartitionConverter
+ .createFilePartition(prunedPartitions,
relation.partitionCols.map(_.dataType))
+
+ filePartitions.map((_, fileFormat))
}
- @transient override lazy val fileFormat: ReadFileFormat = {
- relation.tableMeta.storage.inputFormat match {
+ @transient private lazy val partitions: Seq[Partition] =
partitionWithReadFileFormats.unzip._1
+
+ @transient override lazy val fileFormat: ReadFileFormat =
+ getReadFileFormat(relation.tableMeta.storage)
+
+ private def getReadFileFormat(storage: CatalogStorageFormat): ReadFileFormat
= {
+ storage.inputFormat match {
case Some(inputFormat)
if
TEXT_INPUT_FORMAT_CLASS.isAssignableFrom(Utils.classForName(inputFormat)) =>
- relation.tableMeta.storage.serde match {
+ storage.serde match {
case Some("org.openx.data.jsonserde.JsonSerDe") | Some(
"org.apache.hive.hcatalog.data.JsonSerDe") =>
ReadFileFormat.JsonReadFormat
diff --git
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
index 339bda2a03..884e62d5b9 100644
---
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
+++
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
@@ -18,6 +18,7 @@ package org.apache.spark.sql.extension
import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.execution.FileSourceScanExecTransformerBase
+import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
import org.apache.spark.Partition
import org.apache.spark.sql.catalyst.TableIdentifier
@@ -61,6 +62,9 @@ case class TestFileSourceScanExecTransformer(
optionalNumCoalescedBuckets,
disableBucketedScan)
+ override def getPartitionWithReadFileFormats: Seq[(Partition,
ReadFileFormat)] =
+ getPartitions.map((_, fileFormat))
+
override val nodeNamePrefix: String = "TestFile"
override def doCanonicalize(): TestFileSourceScanExecTransformer = {
diff --git
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala
index 6a10b0781e..f3a9d85903 100644
---
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala
+++
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala
@@ -19,7 +19,8 @@ package org.apache.spark.sql.hive.execution
import org.apache.spark.SparkConf
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.hive.HiveTableScanExecTransformer
+import org.apache.spark.sql.hive.{HiveExternalCatalog,
HiveTableScanExecTransformer}
+import org.apache.spark.sql.hive.client.HiveClient
class GlutenHiveSQLQuerySuite extends GlutenHiveSQLQuerySuiteBase {
@@ -69,4 +70,35 @@ class GlutenHiveSQLQuerySuite extends
GlutenHiveSQLQuerySuiteBase {
purge = false)
}
+ test("GLUTEN-11062: Supports mixed input format for partitioned Hive table")
{
+ val hiveClient: HiveClient =
+
spark.sharedState.externalCatalog.unwrapped.asInstanceOf[HiveExternalCatalog].client
+
+ withSQLConf("spark.sql.hive.convertMetastoreParquet" -> "false") {
+ withTempDir {
+ dir =>
+ val parquetLoc = s"file:///$dir/test_parquet"
+ val orcLoc = s"file:///$dir/test_orc"
+ withTable("test_parquet", "test_orc") {
+ hiveClient.runSqlHive(s"""create table test_parquet(id int)
+ partitioned by(pid int)
+ stored as parquet location '$parquetLoc'
+ """.stripMargin)
+ hiveClient.runSqlHive("insert into test_parquet partition(pid=1)
select 2")
+ hiveClient.runSqlHive(s"""create table test_orc(id int)
+ partitioned by(pid int)
+ stored as orc location '$orcLoc'
+ """.stripMargin)
+ hiveClient.runSqlHive("insert into test_orc partition(pid=2)
select 2")
+ hiveClient.runSqlHive(
+ s"alter table test_parquet add partition (pid=2) location
'$orcLoc/pid=2'")
+ hiveClient.runSqlHive("alter table test_parquet partition(pid=2)
SET FILEFORMAT orc")
+ val df = sql("select pid, id from test_parquet order by pid")
+ checkAnswer(df, Seq(Row(1, 2), Row(2, 2)))
+ checkOperatorMatch[HiveTableScanExecTransformer](df)
+ }
+ }
+ }
+ }
+
}
diff --git
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
index 6846f140ed..9f5d47b60b 100644
---
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
+++
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
@@ -18,6 +18,7 @@ package org.apache.spark.sql.extension
import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.execution.FileSourceScanExecTransformerBase
+import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
import org.apache.spark.Partition
import org.apache.spark.sql.catalyst.TableIdentifier
@@ -47,6 +48,7 @@ case class TestFileSourceScanExecTransformer(
dataFilters,
tableIdentifier,
disableBucketedScan) {
+
override def getPartitions: Seq[Partition] =
BackendsApiManager.getTransformerApiInstance
.genPartitionSeq(
@@ -59,5 +61,8 @@ case class TestFileSourceScanExecTransformer(
optionalNumCoalescedBuckets,
disableBucketedScan)
+ override def getPartitionWithReadFileFormats: Seq[(Partition,
ReadFileFormat)] =
+ getPartitions.map((_, fileFormat))
+
override val nodeNamePrefix: String = "TestFile"
}
diff --git
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
index 6846f140ed..9f5d47b60b 100644
---
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
+++
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
@@ -18,6 +18,7 @@ package org.apache.spark.sql.extension
import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.execution.FileSourceScanExecTransformerBase
+import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
import org.apache.spark.Partition
import org.apache.spark.sql.catalyst.TableIdentifier
@@ -47,6 +48,7 @@ case class TestFileSourceScanExecTransformer(
dataFilters,
tableIdentifier,
disableBucketedScan) {
+
override def getPartitions: Seq[Partition] =
BackendsApiManager.getTransformerApiInstance
.genPartitionSeq(
@@ -59,5 +61,8 @@ case class TestFileSourceScanExecTransformer(
optionalNumCoalescedBuckets,
disableBucketedScan)
+ override def getPartitionWithReadFileFormats: Seq[(Partition,
ReadFileFormat)] =
+ getPartitions.map((_, fileFormat))
+
override val nodeNamePrefix: String = "TestFile"
}
diff --git
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
index 6846f140ed..9f5d47b60b 100644
---
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
+++
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
@@ -18,6 +18,7 @@ package org.apache.spark.sql.extension
import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.execution.FileSourceScanExecTransformerBase
+import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
import org.apache.spark.Partition
import org.apache.spark.sql.catalyst.TableIdentifier
@@ -47,6 +48,7 @@ case class TestFileSourceScanExecTransformer(
dataFilters,
tableIdentifier,
disableBucketedScan) {
+
override def getPartitions: Seq[Partition] =
BackendsApiManager.getTransformerApiInstance
.genPartitionSeq(
@@ -59,5 +61,8 @@ case class TestFileSourceScanExecTransformer(
optionalNumCoalescedBuckets,
disableBucketedScan)
+ override def getPartitionWithReadFileFormats: Seq[(Partition,
ReadFileFormat)] =
+ getPartitions.map((_, fileFormat))
+
override val nodeNamePrefix: String = "TestFile"
}
diff --git
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
index 22e9929559..4ac1348e1d 100644
---
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
+++
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
@@ -18,6 +18,7 @@ package org.apache.spark.sql.extension
import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.execution.FileSourceScanExecTransformerBase
+import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
import org.apache.spark.Partition
import org.apache.spark.sql.catalyst.TableIdentifier
@@ -47,6 +48,7 @@ case class TestFileSourceScanExecTransformer(
dataFilters,
tableIdentifier,
disableBucketedScan) {
+
override def getPartitions: Seq[Partition] =
BackendsApiManager.getTransformerApiInstance.genPartitionSeq(
relation,
@@ -58,5 +60,8 @@ case class TestFileSourceScanExecTransformer(
optionalNumCoalescedBuckets,
disableBucketedScan)
+ override def getPartitionWithReadFileFormats: Seq[(Partition,
ReadFileFormat)] =
+ getPartitions.map((_, fileFormat))
+
override val nodeNamePrefix: String = "TestFile"
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]