yihua commented on code in PR #10144:
URL: https://github.com/apache/hudi/pull/10144#discussion_r1414533367
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala:
##########
@@ -310,6 +317,15 @@ class
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
_: PartitionedFile => Iterator.empty
}
+ // Note that for CDC reader, the underlying data schema is stored in the
'options' to separate from the CDC schema.
+ val rawDataSchemaStr = options.getOrElse(rawDataSchema, "")
Review Comment:
`rawDataSchemaStr` is the table schema. Can the table schema be directly
read here instead of being passed in? Does the `tableSchema` represent the
actual data schema?
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCDCFileIndex.scala:
##########
@@ -42,29 +42,34 @@ class HoodieCDCFileIndex (override val spark: SparkSession,
extends HoodieIncrementalFileIndex(
spark, metaClient, schemaSpec, options, fileStatusCache, includeLogFiles,
shouldEmbedFileSlices
) with FileIndex {
+ private val emptyPartitionPath: String = "empty_partition_path";
val cdcRelation: CDCRelation = CDCRelation.getCDCRelation(spark.sqlContext,
metaClient, options)
val cdcExtractor: HoodieCDCExtractor = cdcRelation.cdcExtractor
override def listFiles(partitionFilters: Seq[Expression], dataFilters:
Seq[Expression]): Seq[PartitionDirectory] = {
- val partitionToFileGroups =
cdcExtractor.extractCDCFileSplits().asScala.groupBy(_._1.getPartitionPath).toSeq
- partitionToFileGroups.map {
- case (partitionPath, fileGroups) =>
- val fileGroupIds: List[FileStatus] = fileGroups.map { fileGroup => {
- // We create a fake FileStatus to wrap the information of
HoodieFileGroupId, which are used
- // later to retrieve the corresponding CDC file group splits.
- val fileGroupId: HoodieFileGroupId = fileGroup._1
- new FileStatus(0, true, 0, 0, 0,
- 0, null, "", "", null,
- new Path(fileGroupId.getPartitionPath, fileGroupId.getFileId))
- }}.toList
- val partitionValues: InternalRow = new
GenericInternalRow(doParsePartitionColumnValues(
- metaClient.getTableConfig.getPartitionFields.get(),
partitionPath).asInstanceOf[Array[Any]])
+ cdcExtractor.extractCDCFileSplits().asScala.map {
+ case (fileGroupId, fileSplits) =>
+ val partitionPath = if (fileGroupId.getPartitionPath.isEmpty)
emptyPartitionPath else fileGroupId.getPartitionPath
Review Comment:
using empty String instead for non-partitioned table?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]