This is an automated email from the ASF dual-hosted git repository.
jonvex pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new d4a4d9c43e4 use parquet reader for cdc reading (#11775)
d4a4d9c43e4 is described below
commit d4a4d9c43e40a4d16b7a4859bd19b3704e5794c1
Author: Jon Vexler <[email protected]>
AuthorDate: Thu Aug 15 15:57:51 2024 -0400
use parquet reader for cdc reading (#11775)
Co-authored-by: Jonathan Vexler <=>
---
...odieFileGroupReaderBasedParquetFileFormat.scala | 30 +++++++---------------
1 file changed, 9 insertions(+), 21 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
index 8dfe07055df..ec6c99d1fee 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
@@ -113,17 +113,6 @@ class
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
val isCount = requiredSchema.isEmpty && !isMOR && !isIncremental
val augmentedStorageConf = new
HadoopStorageConfiguration(hadoopConf).getInline
setSchemaEvolutionConfigs(augmentedStorageConf)
- val cdcFileReader = if (isCDC) {
- super.buildReaderWithPartitionValues(
- spark,
- tableSchema.structTypeSchema,
- StructType(Nil),
- tableSchema.structTypeSchema,
- Nil,
- options,
- new Configuration(hadoopConf))
- }
-
val requestedAvroSchema =
AvroConversionUtils.convertStructTypeToAvroSchema(requiredSchema,
sanitizedTableName)
val dataAvroSchema =
AvroConversionUtils.convertStructTypeToAvroSchema(dataSchema,
sanitizedTableName)
val parquetFileReader =
spark.sparkContext.broadcast(sparkAdapter.createParquetFileReader(supportBatchResult,
@@ -174,29 +163,27 @@ class
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
}
// CDC queries.
case hoodiePartitionCDCFileGroupSliceMapping:
HoodiePartitionCDCFileGroupMapping =>
- val fileSplits =
hoodiePartitionCDCFileGroupSliceMapping.getFileSplits().toArray
- val fileGroupSplit: HoodieCDCFileGroupSplit =
HoodieCDCFileGroupSplit(fileSplits)
- buildCDCRecordIterator(
- fileGroupSplit, cdcFileReader.asInstanceOf[PartitionedFile =>
Iterator[InternalRow]],
- storageConf, fileIndexProps, requiredSchema)
+ buildCDCRecordIterator(hoodiePartitionCDCFileGroupSliceMapping,
parquetFileReader.value, storageConf, fileIndexProps, requiredSchema)
case _ => parquetFileReader.value.read(file, requiredSchema,
partitionSchema, internalSchemaOpt, filters, storageConf)
}
}
}
- private def setSchemaEvolutionConfigs(conf: StorageConfiguration[_]): Unit =
{
+ private def setSchemaEvolutionConfigs(conf:
StorageConfiguration[Configuration]): Unit = {
if (internalSchemaOpt.isPresent) {
conf.set(SparkInternalSchemaConverter.HOODIE_TABLE_PATH,
tableState.tablePath)
conf.set(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST,
validCommits)
}
}
- protected def buildCDCRecordIterator(cdcFileGroupSplit:
HoodieCDCFileGroupSplit,
- cdcFileReader: PartitionedFile =>
Iterator[InternalRow],
- storageConf: StorageConfiguration[_],
+ protected def
buildCDCRecordIterator(hoodiePartitionCDCFileGroupSliceMapping:
HoodiePartitionCDCFileGroupMapping,
+ parquetFileReader: SparkParquetReader,
+ storageConf:
StorageConfiguration[Configuration],
props: TypedProperties,
requiredSchema: StructType):
Iterator[InternalRow] = {
+ val fileSplits =
hoodiePartitionCDCFileGroupSliceMapping.getFileSplits().toArray
+ val cdcFileGroupSplit: HoodieCDCFileGroupSplit =
HoodieCDCFileGroupSplit(fileSplits)
props.setProperty(HoodieTableConfig.HOODIE_TABLE_NAME_KEY, tableName)
val cdcSchema = CDCRelation.FULL_CDC_SPARK_SCHEMA
val metaClient = HoodieTableMetaClient.builder
@@ -205,7 +192,8 @@ class
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
cdcFileGroupSplit,
metaClient,
storageConf,
- cdcFileReader,
+ (file: PartitionedFile) =>
+ parquetFileReader.read(file, tableSchema.structTypeSchema, new
StructType(), internalSchemaOpt, Seq.empty, storageConf),
tableSchema,
cdcSchema,
requiredSchema,