This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 31eea3b9a85 [HUDI-8079] Get rid of other count case for fg reader 
(#11774)
31eea3b9a85 is described below

commit 31eea3b9a85103f1c19e9b4e3fefed7e40e651f0
Author: Jon Vexler <[email protected]>
AuthorDate: Thu Aug 15 13:36:23 2024 -0400

    [HUDI-8079] Get rid of other count case for fg reader (#11774)
    
    Co-authored-by: Jonathan Vexler <=>
---
 ...odieFileGroupReaderBasedParquetFileFormat.scala | 64 ++++++++++------------
 1 file changed, 28 insertions(+), 36 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
index 9c9a4cca7be..8dfe07055df 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
@@ -20,7 +20,6 @@ package org.apache.spark.sql.execution.datasources.parquet
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hudi.{AvroConversionUtils, HoodieFileIndex, 
HoodiePartitionCDCFileGroupMapping, HoodiePartitionFileSliceMapping, 
HoodieTableSchema, HoodieTableState, SparkAdapterSupport, 
SparkFileFormatInternalRowReaderContext}
-import org.apache.hudi.MergeOnReadSnapshotRelation.createPartitionedFile
 import org.apache.hudi.avro.AvroSchemaUtils
 import org.apache.hudi.cdc.{CDCFileGroupIterator, CDCRelation, 
HoodieCDCFileGroupSplit}
 import org.apache.hudi.client.utils.SparkInternalSchemaConverter
@@ -114,8 +113,6 @@ class 
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
     val isCount = requiredSchema.isEmpty && !isMOR && !isIncremental
     val augmentedStorageConf = new 
HadoopStorageConfiguration(hadoopConf).getInline
     setSchemaEvolutionConfigs(augmentedStorageConf)
-    val baseFileReader = super.buildReaderWithPartitionValues(spark, 
dataSchema, partitionSchema, requiredSchema,
-      filters ++ requiredFilters, options, augmentedStorageConf.unwrapCopy())
     val cdcFileReader = if (isCDC) {
       super.buildReaderWithPartitionValues(
         spark,
@@ -144,39 +141,34 @@ class 
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
           val filegroupName = FSUtils.getFileIdFromFilePath(sparkAdapter
             .getSparkPartitionedFileUtils.getPathFromPartitionedFile(file))
           fileSliceMapping.getSlice(filegroupName) match {
-            case Some(fileSlice) if !isCount =>
-              if (requiredSchema.isEmpty && 
!fileSlice.getLogFiles.findAny().isPresent) {
-                val hoodieBaseFile = fileSlice.getBaseFile.get()
-                
baseFileReader(createPartitionedFile(fileSliceMapping.getPartitionValues, 
hoodieBaseFile.getStoragePath, 0, hoodieBaseFile.getFileLen))
-              } else {
-                val readerContext = new 
SparkFileFormatInternalRowReaderContext(parquetFileReader.value, 
tableState.recordKeyField, filters, requiredFilters)
-                val metaClient: HoodieTableMetaClient = HoodieTableMetaClient
-                  
.builder().setConf(storageConf).setBasePath(tableState.tablePath).build
-                val props = metaClient.getTableConfig.getProps
-                options.foreach(kv => props.setProperty(kv._1, kv._2))
-                val reader = new HoodieFileGroupReader[InternalRow](
-                  readerContext,
-                  new HoodieHadoopStorage(metaClient.getBasePath, storageConf),
-                  tableState.tablePath,
-                  tableState.latestCommitTimestamp.get,
-                  fileSlice,
-                  broadcastedDataSchema.value,
-                  broadcastedRequestedSchema.value,
-                  internalSchemaOpt,
-                  metaClient,
-                  props,
-                  file.start,
-                  file.length,
-                  shouldUseRecordPosition)
-                reader.initRecordIterators()
-                // Append partition values to rows and project to output schema
-                appendPartitionAndProject(
-                  reader.getClosableIterator,
-                  requiredSchema,
-                  partitionSchema,
-                  outputSchema,
-                  fileSliceMapping.getPartitionValues)
-              }
+            case Some(fileSlice) if !isCount && (requiredSchema.nonEmpty || 
fileSlice.getLogFiles.findAny().isPresent) =>
+              val readerContext = new 
SparkFileFormatInternalRowReaderContext(parquetFileReader.value, 
tableState.recordKeyField, filters, requiredFilters)
+              val metaClient: HoodieTableMetaClient = HoodieTableMetaClient
+                
.builder().setConf(storageConf).setBasePath(tableState.tablePath).build
+              val props = metaClient.getTableConfig.getProps
+              options.foreach(kv => props.setProperty(kv._1, kv._2))
+              val reader = new HoodieFileGroupReader[InternalRow](
+                readerContext,
+                new HoodieHadoopStorage(metaClient.getBasePath, storageConf),
+                tableState.tablePath,
+                tableState.latestCommitTimestamp.get,
+                fileSlice,
+                broadcastedDataSchema.value,
+                broadcastedRequestedSchema.value,
+                internalSchemaOpt,
+                metaClient,
+                props,
+                file.start,
+                file.length,
+                shouldUseRecordPosition)
+              reader.initRecordIterators()
+              // Append partition values to rows and project to output schema
+              appendPartitionAndProject(
+                reader.getClosableIterator,
+                requiredSchema,
+                partitionSchema,
+                outputSchema,
+                fileSliceMapping.getPartitionValues)
 
             case _ => parquetFileReader.value.read(file, requiredSchema, 
partitionSchema, internalSchemaOpt, filters, storageConf)
           }

Reply via email to