This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 31eea3b9a85 [HUDI-8079] Get rid of other count case for fg reader
(#11774)
31eea3b9a85 is described below
commit 31eea3b9a85103f1c19e9b4e3fefed7e40e651f0
Author: Jon Vexler <[email protected]>
AuthorDate: Thu Aug 15 13:36:23 2024 -0400
[HUDI-8079] Get rid of other count case for fg reader (#11774)
Co-authored-by: Jonathan Vexler <=>
---
...odieFileGroupReaderBasedParquetFileFormat.scala | 64 ++++++++++------------
1 file changed, 28 insertions(+), 36 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
index 9c9a4cca7be..8dfe07055df 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
@@ -20,7 +20,6 @@ package org.apache.spark.sql.execution.datasources.parquet
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hudi.{AvroConversionUtils, HoodieFileIndex,
HoodiePartitionCDCFileGroupMapping, HoodiePartitionFileSliceMapping,
HoodieTableSchema, HoodieTableState, SparkAdapterSupport,
SparkFileFormatInternalRowReaderContext}
-import org.apache.hudi.MergeOnReadSnapshotRelation.createPartitionedFile
import org.apache.hudi.avro.AvroSchemaUtils
import org.apache.hudi.cdc.{CDCFileGroupIterator, CDCRelation,
HoodieCDCFileGroupSplit}
import org.apache.hudi.client.utils.SparkInternalSchemaConverter
@@ -114,8 +113,6 @@ class
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
val isCount = requiredSchema.isEmpty && !isMOR && !isIncremental
val augmentedStorageConf = new
HadoopStorageConfiguration(hadoopConf).getInline
setSchemaEvolutionConfigs(augmentedStorageConf)
- val baseFileReader = super.buildReaderWithPartitionValues(spark,
dataSchema, partitionSchema, requiredSchema,
- filters ++ requiredFilters, options, augmentedStorageConf.unwrapCopy())
val cdcFileReader = if (isCDC) {
super.buildReaderWithPartitionValues(
spark,
@@ -144,39 +141,34 @@ class
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
val filegroupName = FSUtils.getFileIdFromFilePath(sparkAdapter
.getSparkPartitionedFileUtils.getPathFromPartitionedFile(file))
fileSliceMapping.getSlice(filegroupName) match {
- case Some(fileSlice) if !isCount =>
- if (requiredSchema.isEmpty &&
!fileSlice.getLogFiles.findAny().isPresent) {
- val hoodieBaseFile = fileSlice.getBaseFile.get()
-
baseFileReader(createPartitionedFile(fileSliceMapping.getPartitionValues,
hoodieBaseFile.getStoragePath, 0, hoodieBaseFile.getFileLen))
- } else {
- val readerContext = new
SparkFileFormatInternalRowReaderContext(parquetFileReader.value,
tableState.recordKeyField, filters, requiredFilters)
- val metaClient: HoodieTableMetaClient = HoodieTableMetaClient
-
.builder().setConf(storageConf).setBasePath(tableState.tablePath).build
- val props = metaClient.getTableConfig.getProps
- options.foreach(kv => props.setProperty(kv._1, kv._2))
- val reader = new HoodieFileGroupReader[InternalRow](
- readerContext,
- new HoodieHadoopStorage(metaClient.getBasePath, storageConf),
- tableState.tablePath,
- tableState.latestCommitTimestamp.get,
- fileSlice,
- broadcastedDataSchema.value,
- broadcastedRequestedSchema.value,
- internalSchemaOpt,
- metaClient,
- props,
- file.start,
- file.length,
- shouldUseRecordPosition)
- reader.initRecordIterators()
- // Append partition values to rows and project to output schema
- appendPartitionAndProject(
- reader.getClosableIterator,
- requiredSchema,
- partitionSchema,
- outputSchema,
- fileSliceMapping.getPartitionValues)
- }
+ case Some(fileSlice) if !isCount && (requiredSchema.nonEmpty ||
fileSlice.getLogFiles.findAny().isPresent) =>
+ val readerContext = new
SparkFileFormatInternalRowReaderContext(parquetFileReader.value,
tableState.recordKeyField, filters, requiredFilters)
+ val metaClient: HoodieTableMetaClient = HoodieTableMetaClient
+
.builder().setConf(storageConf).setBasePath(tableState.tablePath).build
+ val props = metaClient.getTableConfig.getProps
+ options.foreach(kv => props.setProperty(kv._1, kv._2))
+ val reader = new HoodieFileGroupReader[InternalRow](
+ readerContext,
+ new HoodieHadoopStorage(metaClient.getBasePath, storageConf),
+ tableState.tablePath,
+ tableState.latestCommitTimestamp.get,
+ fileSlice,
+ broadcastedDataSchema.value,
+ broadcastedRequestedSchema.value,
+ internalSchemaOpt,
+ metaClient,
+ props,
+ file.start,
+ file.length,
+ shouldUseRecordPosition)
+ reader.initRecordIterators()
+ // Append partition values to rows and project to output schema
+ appendPartitionAndProject(
+ reader.getClosableIterator,
+ requiredSchema,
+ partitionSchema,
+ outputSchema,
+ fileSliceMapping.getPartitionValues)
case _ => parquetFileReader.value.read(file, requiredSchema,
partitionSchema, internalSchemaOpt, filters, storageConf)
}