This is an automated email from the ASF dual-hosted git repository.

jonvex pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new d4a4d9c43e4 use parquet reader for cdc reading (#11775)
d4a4d9c43e4 is described below

commit d4a4d9c43e40a4d16b7a4859bd19b3704e5794c1
Author: Jon Vexler <[email protected]>
AuthorDate: Thu Aug 15 15:57:51 2024 -0400

    use parquet reader for cdc reading (#11775)
    
    Co-authored-by: Jonathan Vexler <=>
---
 ...odieFileGroupReaderBasedParquetFileFormat.scala | 30 +++++++---------------
 1 file changed, 9 insertions(+), 21 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
index 8dfe07055df..ec6c99d1fee 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
@@ -113,17 +113,6 @@ class 
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
     val isCount = requiredSchema.isEmpty && !isMOR && !isIncremental
     val augmentedStorageConf = new 
HadoopStorageConfiguration(hadoopConf).getInline
     setSchemaEvolutionConfigs(augmentedStorageConf)
-    val cdcFileReader = if (isCDC) {
-      super.buildReaderWithPartitionValues(
-        spark,
-        tableSchema.structTypeSchema,
-        StructType(Nil),
-        tableSchema.structTypeSchema,
-        Nil,
-        options,
-        new Configuration(hadoopConf))
-    }
-
     val requestedAvroSchema = 
AvroConversionUtils.convertStructTypeToAvroSchema(requiredSchema, 
sanitizedTableName)
     val dataAvroSchema = 
AvroConversionUtils.convertStructTypeToAvroSchema(dataSchema, 
sanitizedTableName)
     val parquetFileReader = 
spark.sparkContext.broadcast(sparkAdapter.createParquetFileReader(supportBatchResult,
@@ -174,29 +163,27 @@ class 
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
           }
         // CDC queries.
         case hoodiePartitionCDCFileGroupSliceMapping: 
HoodiePartitionCDCFileGroupMapping =>
-          val fileSplits = 
hoodiePartitionCDCFileGroupSliceMapping.getFileSplits().toArray
-          val fileGroupSplit: HoodieCDCFileGroupSplit = 
HoodieCDCFileGroupSplit(fileSplits)
-          buildCDCRecordIterator(
-            fileGroupSplit, cdcFileReader.asInstanceOf[PartitionedFile => 
Iterator[InternalRow]],
-            storageConf, fileIndexProps, requiredSchema)
+          buildCDCRecordIterator(hoodiePartitionCDCFileGroupSliceMapping, 
parquetFileReader.value, storageConf, fileIndexProps, requiredSchema)
 
         case _ => parquetFileReader.value.read(file, requiredSchema, 
partitionSchema, internalSchemaOpt, filters, storageConf)
       }
     }
   }
 
-  private def setSchemaEvolutionConfigs(conf: StorageConfiguration[_]): Unit = 
{
+  private def setSchemaEvolutionConfigs(conf: 
StorageConfiguration[Configuration]): Unit = {
     if (internalSchemaOpt.isPresent) {
       conf.set(SparkInternalSchemaConverter.HOODIE_TABLE_PATH, 
tableState.tablePath)
       conf.set(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST, 
validCommits)
     }
   }
 
-  protected def buildCDCRecordIterator(cdcFileGroupSplit: 
HoodieCDCFileGroupSplit,
-                                       cdcFileReader: PartitionedFile => 
Iterator[InternalRow],
-                                       storageConf: StorageConfiguration[_],
+  protected def 
buildCDCRecordIterator(hoodiePartitionCDCFileGroupSliceMapping: 
HoodiePartitionCDCFileGroupMapping,
+                                       parquetFileReader: SparkParquetReader,
+                                       storageConf: 
StorageConfiguration[Configuration],
                                        props: TypedProperties,
                                        requiredSchema: StructType): 
Iterator[InternalRow] = {
+    val fileSplits = 
hoodiePartitionCDCFileGroupSliceMapping.getFileSplits().toArray
+    val cdcFileGroupSplit: HoodieCDCFileGroupSplit = 
HoodieCDCFileGroupSplit(fileSplits)
     props.setProperty(HoodieTableConfig.HOODIE_TABLE_NAME_KEY, tableName)
     val cdcSchema = CDCRelation.FULL_CDC_SPARK_SCHEMA
     val metaClient = HoodieTableMetaClient.builder
@@ -205,7 +192,8 @@ class 
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
       cdcFileGroupSplit,
       metaClient,
       storageConf,
-      cdcFileReader,
+      (file: PartitionedFile) =>
+        parquetFileReader.read(file, tableSchema.structTypeSchema, new 
StructType(), internalSchemaOpt, Seq.empty, storageConf),
       tableSchema,
       cdcSchema,
       requiredSchema,

Reply via email to