yihua commented on code in PR #10144:
URL: https://github.com/apache/hudi/pull/10144#discussion_r1414533367


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala:
##########
@@ -310,6 +317,15 @@ class 
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
       _: PartitionedFile => Iterator.empty
     }
 
+    // Note that for CDC reader, the underlying data schema is stored in the 
'options' to separate from the CDC schema.
+    val rawDataSchemaStr = options.getOrElse(rawDataSchema, "")

Review Comment:
   `rawDataSchemaStr` is the table schema.  Can the table schema be directly 
read here instead of being passed in? Does the `tableSchema` represent the 
actual data schema?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCDCFileIndex.scala:
##########
@@ -42,29 +42,34 @@ class HoodieCDCFileIndex (override val spark: SparkSession,
   extends HoodieIncrementalFileIndex(
     spark, metaClient, schemaSpec, options, fileStatusCache, includeLogFiles, 
shouldEmbedFileSlices
   ) with FileIndex {
+  private val emptyPartitionPath: String = "empty_partition_path";
   val cdcRelation: CDCRelation = CDCRelation.getCDCRelation(spark.sqlContext, 
metaClient, options)
   val cdcExtractor: HoodieCDCExtractor = cdcRelation.cdcExtractor
 
   override def listFiles(partitionFilters: Seq[Expression], dataFilters: 
Seq[Expression]): Seq[PartitionDirectory] = {
-    val partitionToFileGroups = 
cdcExtractor.extractCDCFileSplits().asScala.groupBy(_._1.getPartitionPath).toSeq
-    partitionToFileGroups.map {
-      case (partitionPath, fileGroups) =>
-        val fileGroupIds: List[FileStatus] = fileGroups.map { fileGroup => {
-          // We create a fake FileStatus to wrap the information of 
HoodieFileGroupId, which are used
-          // later to retrieve the corresponding CDC file group splits.
-          val fileGroupId: HoodieFileGroupId = fileGroup._1
-          new FileStatus(0, true, 0, 0, 0,
-            0, null, "", "", null,
-            new Path(fileGroupId.getPartitionPath, fileGroupId.getFileId))
-        }}.toList
-        val partitionValues: InternalRow = new 
GenericInternalRow(doParsePartitionColumnValues(
-          metaClient.getTableConfig.getPartitionFields.get(), 
partitionPath).asInstanceOf[Array[Any]])
+    cdcExtractor.extractCDCFileSplits().asScala.map {
+      case (fileGroupId, fileSplits) =>
+        val partitionPath = if (fileGroupId.getPartitionPath.isEmpty) 
emptyPartitionPath else fileGroupId.getPartitionPath

Review Comment:
   using empty String instead for non-partitioned table?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to