jonvex commented on code in PR #13527:
URL: https://github.com/apache/hudi/pull/13527#discussion_r2207939789


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala:
##########
@@ -319,121 +301,164 @@ class 
HoodieMergeOnReadSnapshotHadoopFsRelationFactory(override val sqlContext:
   }
 }
 
+class HoodieMergeOnReadSnapshotHadoopFsRelationFactory(override val 
sqlContext: SQLContext,
+                                                       override val 
metaClient: HoodieTableMetaClient,
+                                                       override val options: 
Map[String, String],
+                                                       override val 
schemaSpec: Option[StructType],
+                                                       isBootstrap: Boolean)
+  extends HoodieBaseHadoopFsRelationFactory(sqlContext, metaClient, options, 
schemaSpec, isBootstrap) {
+  private val fileIndex: HoodieFileIndex = new HoodieFileIndex(
+    sparkSession,
+    metaClient,
+    Some(tableStructSchema),
+    optParams,
+    fileStatusCache,
+    includeLogFiles = true,
+    shouldEmbedFileSlices = true)
+
+  override def buildFileIndex(): FileIndex = fileIndex
+
+  override protected def isMOR: Boolean = true
+
+  override protected def isIncremental: Boolean = false
+
+  override protected def getRequiredFilters: Seq[Filter] = Seq.empty
+
+  override def buildPartitionSchema(): StructType = fileIndex.partitionSchema
+
+  override def buildDataSchema(): StructType = fileIndex.dataSchema
+}
+
+abstract class 
BaseHoodieMergeOnReadIncrementalHadoopFsRelationFactory(override val 
sqlContext: SQLContext,
+                                                                       
override val metaClient: HoodieTableMetaClient,
+                                                                       
override val options: Map[String, String],
+                                                                       
override val schemaSpec: Option[StructType],
+                                                                       
isBootstrap: Boolean)
+  extends HoodieBaseHadoopFsRelationFactory(sqlContext, metaClient, options, 
schemaSpec, isBootstrap) {
+
+  override protected def getMandatoryFields: Seq[String] = 
Seq(HoodieRecord.COMMIT_TIME_METADATA_FIELD) ++ partitionColumnsToRead
+
+  override protected def isMOR: Boolean = true
+
+  override protected def isIncremental: Boolean = true
+}
+
 class HoodieMergeOnReadIncrementalHadoopFsRelationFactory(override val 
sqlContext: SQLContext,
                                                           override val 
metaClient: HoodieTableMetaClient,
                                                           override val 
options: Map[String, String],
                                                           override val 
schemaSpec: Option[StructType],
                                                           isBootstrap: Boolean)
-  extends HoodieMergeOnReadSnapshotHadoopFsRelationFactory(sqlContext, 
metaClient, options, schemaSpec, isBootstrap) {
+  extends BaseHoodieMergeOnReadIncrementalHadoopFsRelationFactory(sqlContext, 
metaClient, options, schemaSpec, isBootstrap) {
 
-  override val mandatoryFields: Seq[String] = 
Seq(HoodieRecord.COMMIT_TIME_METADATA_FIELD) ++ partitionColumnsToRead
+  private val incrementalFileIndex = new HoodieIncrementalFileIndex(
+    sparkSession, metaClient, schemaSpec, options, fileStatusCache, true, true)
 
-  override val fileIndex = new HoodieIncrementalFileIndex(
-    sparkSession, metaClient, schemaSpec, options, 
FileStatusCache.getOrCreate(sparkSession), true, true)
+  override def buildFileIndex(): HoodieFileIndex = incrementalFileIndex
 
-  override def buildFileFormat(): FileFormat = {
-    if (metaClient.getTableConfig.isMultipleBaseFileFormatsEnabled && 
!isBootstrap) {
-      new 
HoodieMultipleBaseFileFormat(sparkSession.sparkContext.broadcast(tableState),
-        
sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema, 
tableAvroSchema.toString, internalSchemaOpt)),
-        metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
-        true, true, fileIndex.getRequiredFilters)
-    } else {
-      new HoodieFileGroupReaderBasedParquetFileFormat(
-        basePath.toString, HoodieTableSchema(tableStructSchema, 
tableAvroSchema.toString, internalSchemaOpt),
-        metaClient.getTableConfig.getTableName, queryTimestamp.get, 
mandatoryFields,
-        true, isBootstrap, true, fileIndex.isInstanceOf[HoodieCDCFileIndex],
-        validCommits, shouldUseRecordPosition, fileIndex.getRequiredFilters)
-    }
-  }
+  override protected def getRequiredFilters: Seq[Filter] = 
incrementalFileIndex.getRequiredFilters
+
+  override def buildPartitionSchema(): StructType = 
incrementalFileIndex.partitionSchema
+
+  override def buildDataSchema(): StructType = incrementalFileIndex.dataSchema
+}
+
+class HoodieMergeOnReadCDCHadoopFsRelationFactory(override val sqlContext: 
SQLContext,
+                                                  override val metaClient: 
HoodieTableMetaClient,
+                                                  override val options: 
Map[String, String],
+                                                  override val schemaSpec: 
Option[StructType],
+                                                  isBootstrap: Boolean)
+  extends BaseHoodieMergeOnReadIncrementalHadoopFsRelationFactory(sqlContext, 
metaClient, options, schemaSpec, isBootstrap) {
+  private val hoodieCDCFileIndex = new HoodieCDCFileIndex(
+    sparkSession, metaClient, schemaSpec, options, fileStatusCache, true, true)
+
+  override def buildFileIndex(): HoodieFileIndex = hoodieCDCFileIndex
+
+  override def buildDataSchema(): StructType = 
hoodieCDCFileIndex.cdcRelation.schema
+
+  override def buildPartitionSchema(): StructType = StructType(Nil)
+
+  override protected def getRequiredFilters: Seq[Filter] = Seq.empty
 }
 
 class HoodieCopyOnWriteSnapshotHadoopFsRelationFactory(override val 
sqlContext: SQLContext,
                                                         override val 
metaClient: HoodieTableMetaClient,
                                                         override val options: 
Map[String, String],
                                                         override val 
schemaSpec: Option[StructType],
                                                         isBootstrap: Boolean)
-  extends HoodieMergeOnReadSnapshotHadoopFsRelationFactory(sqlContext, 
metaClient, options, schemaSpec, isBootstrap) {
-
-  override val mandatoryFields: Seq[String] = partitionColumnsToRead
+  extends HoodieBaseHadoopFsRelationFactory(sqlContext, metaClient, options, 
schemaSpec, isBootstrap) {
 
-  override val fileIndex = new HoodieFileIndex(
+  val fileIndex: HoodieFileIndex = new HoodieFileIndex(
     sparkSession,
     metaClient,
     Some(tableStructSchema),
     optParams,
-    FileStatusCache.getOrCreate(sparkSession),
+    fileStatusCache,
     shouldEmbedFileSlices = true)
 
-  override def buildFileFormat(): FileFormat = {
-    if (metaClient.getTableConfig.isMultipleBaseFileFormatsEnabled && 
!isBootstrap) {
-      new 
HoodieMultipleBaseFileFormat(sparkSession.sparkContext.broadcast(tableState),
-        
sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema, 
tableAvroSchema.toString, internalSchemaOpt)),
-        metaClient.getTableConfig.getTableName, mergeType, mandatoryFields, 
false, false, Seq.empty)
-    } else {
-      new HoodieFileGroupReaderBasedParquetFileFormat(
-        basePath.toString, HoodieTableSchema(tableStructSchema, 
tableAvroSchema.toString, internalSchemaOpt),
-        metaClient.getTableConfig.getTableName, queryTimestamp.get, 
mandatoryFields,
-        false, isBootstrap, false, fileIndex.isInstanceOf[HoodieCDCFileIndex], 
validCommits,
-        shouldUseRecordPosition, Seq.empty)
-    }
-  }
+  override def buildFileIndex(): HoodieFileIndex = fileIndex
+
+  override protected def isMOR: Boolean = false
+
+  override protected def isIncremental: Boolean = false
+
+  override protected def getRequiredFilters: Seq[Filter] = Seq.empty
+
+  override def buildPartitionSchema(): StructType = fileIndex.partitionSchema
+
+  override def buildDataSchema(): StructType = fileIndex.dataSchema
+}
+
+abstract class 
BaseHoodieCopyOnWriteIncrementalHadoopFsRelationFactory(override val 
sqlContext: SQLContext,
+                                                                       
override val metaClient: HoodieTableMetaClient,
+                                                                       
override val options: Map[String, String],
+                                                                       
override val schemaSpec: Option[StructType],
+                                                                       
isBootstrap: Boolean)
+  extends HoodieBaseHadoopFsRelationFactory(sqlContext, metaClient, options, 
schemaSpec, isBootstrap) {
+
+  override protected def getMandatoryFields(): Seq[String] = 
Seq(HoodieRecord.RECORD_KEY_METADATA_FIELD, 
HoodieRecord.COMMIT_TIME_METADATA_FIELD) ++
+    preCombineFieldOpt.map(Seq(_)).getOrElse(Seq()) ++ partitionColumnsToRead
+
+  override protected def isMOR: Boolean = false
+
+  override protected def isIncremental: Boolean = true
+
 }
 
 class HoodieCopyOnWriteIncrementalHadoopFsRelationFactory(override val 
sqlContext: SQLContext,
                                                           override val 
metaClient: HoodieTableMetaClient,
                                                           override val 
options: Map[String, String],
                                                           override val 
schemaSpec: Option[StructType],
                                                           isBootstrap: Boolean)
-  extends HoodieCopyOnWriteSnapshotHadoopFsRelationFactory(sqlContext, 
metaClient, options, schemaSpec, isBootstrap) {
+  extends BaseHoodieCopyOnWriteIncrementalHadoopFsRelationFactory(sqlContext, 
metaClient, options, schemaSpec, isBootstrap) {
 
-  override val mandatoryFields: Seq[String] = 
Seq(HoodieRecord.RECORD_KEY_METADATA_FIELD, 
HoodieRecord.COMMIT_TIME_METADATA_FIELD) ++
-    preCombineFieldOpt.map(Seq(_)).getOrElse(Seq()) ++ partitionColumnsToRead
-
-  override val fileIndex = new HoodieIncrementalFileIndex(
-    sparkSession, metaClient, schemaSpec, options, 
FileStatusCache.getOrCreate(sparkSession), false, true)
+  private val incrementalFileIndex = new HoodieIncrementalFileIndex(

Review Comment:
   It doesn't matter because we always use it right away



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to