This is an automated email from the ASF dual-hosted git repository.

jonvex pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 668d86d948e6 [HUDI-9609] Add support for V1 incremental queries with 
fg reader (#13529)
668d86d948e6 is described below

commit 668d86d948e632ea17f540a18f123a4904473ae5
Author: Jon Vexler <[email protected]>
AuthorDate: Thu Jul 24 13:21:27 2025 -0400

    [HUDI-9609] Add support for V1 incremental queries with fg reader (#13529)
---
 .../org/apache/hudi/BaseHoodieTableFileIndex.java  | 47 +++++++++----
 .../main/scala/org/apache/hudi/DefaultSource.scala | 66 ++++++-------------
 .../scala/org/apache/hudi/HoodieCDCFileIndex.scala | 16 ++---
 .../hudi/HoodieHadoopFsRelationFactory.scala       | 76 ++++++++++++++++------
 .../apache/hudi/HoodieIncrementalFileIndex.scala   |  6 +-
 .../hudi/MergeOnReadIncrementalRelationV1.scala    |  6 +-
 .../hudi/MergeOnReadIncrementalRelationV2.scala    | 11 +++-
 7 files changed, 130 insertions(+), 98 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java 
b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
index 9919e0bde3cb..096a11ccf5dd 100644
--- a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
+++ b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
@@ -29,6 +29,7 @@ import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieTableQueryType;
 import org.apache.hudi.common.serialization.HoodieFileSliceSerializer;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.TimelineUtils;
@@ -97,8 +98,8 @@ public abstract class BaseHoodieTableFileIndex implements 
AutoCloseable {
   private final TypedProperties configProperties;
   private final HoodieTableQueryType queryType;
   private final Option<String> specifiedQueryInstant;
-  private final Option<String> startCompletionTime;
-  private final Option<String> endCompletionTime;
+  private final Option<String> incrementalQueryStartTime;
+  private final Option<String> incrementalQueryEndTime;
   private final List<StoragePath> queryPaths;
 
   private final boolean shouldIncludePendingCommits;
@@ -117,6 +118,7 @@ public abstract class BaseHoodieTableFileIndex implements 
AutoCloseable {
 
   private final HoodieTableMetaClient metaClient;
   private final HoodieEngineContext engineContext;
+  private final boolean isCompletionTimeBasedQuery;
 
   private final transient FileStatusCache fileStatusCache;
 
@@ -139,8 +141,8 @@ public abstract class BaseHoodieTableFileIndex implements 
AutoCloseable {
    * @param shouldIncludePendingCommits  flags whether file-index should 
exclude any pending operations
    * @param shouldValidateInstant        flags to validate whether query 
instant is present in the timeline
    * @param fileStatusCache              transient cache of fetched 
[[FileStatus]]es
-   * @param startCompletionTime          start completion time for incremental 
query (optional)
-   * @param endCompletionTime            end completion time for incremental 
query (optional)
+   * @param incrementalQueryStartTime          start completion time for 
incremental query (optional)
+   * @param incrementalQueryEndTime            end completion time for 
incremental query (optional)
    */
   public BaseHoodieTableFileIndex(HoodieEngineContext engineContext,
                                   HoodieTableMetaClient metaClient,
@@ -152,8 +154,8 @@ public abstract class BaseHoodieTableFileIndex implements 
AutoCloseable {
                                   boolean shouldValidateInstant,
                                   FileStatusCache fileStatusCache,
                                   boolean shouldListLazily,
-                                  Option<String> startCompletionTime,
-                                  Option<String> endCompletionTime) {
+                                  Option<String> incrementalQueryStartTime,
+                                  Option<String> incrementalQueryEndTime) {
     this.partitionColumns = metaClient.getTableConfig().getPartitionFields()
         .orElseGet(() -> new String[0]);
 
@@ -169,8 +171,8 @@ public abstract class BaseHoodieTableFileIndex implements 
AutoCloseable {
     this.shouldIncludePendingCommits = shouldIncludePendingCommits;
     this.shouldValidateInstant = shouldValidateInstant;
     this.shouldListLazily = shouldListLazily;
-    this.startCompletionTime = startCompletionTime;
-    this.endCompletionTime = endCompletionTime;
+    this.incrementalQueryStartTime = incrementalQueryStartTime;
+    this.incrementalQueryEndTime = incrementalQueryEndTime;
 
     this.basePath = metaClient.getBasePath();
 
@@ -178,6 +180,9 @@ public abstract class BaseHoodieTableFileIndex implements 
AutoCloseable {
     this.engineContext = engineContext;
     this.fileStatusCache = fileStatusCache;
     this.configProperties = configProperties;
+    HoodieTableVersion tableVersion = 
HoodieTableVersion.fromVersionCode(configProperties.getInteger("hoodie.datasource.read.incr.table.version",
+        metaClient.getTableConfig().getTableVersion().versionCode()));
+    this.isCompletionTimeBasedQuery = 
tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT);
     doRefresh();
   }
 
@@ -324,12 +329,8 @@ public abstract class BaseHoodieTableFileIndex implements 
AutoCloseable {
     List<String> matchedPartitionPaths;
     try {
       if (isPartitionedTable()) {
-        if (queryType == HoodieTableQueryType.INCREMENTAL && 
startCompletionTime.isPresent()
-            && 
!metaClient.getActiveTimeline().isBeforeTimelineStartsByCompletionTime(startCompletionTime.get()))
 {
-          HoodieTimeline timelineToQuery = 
metaClient.getActiveTimeline().getWriteTimeline()
-              .findInstantsInRangeByCompletionTime(
-                  startCompletionTime.get(),
-                  endCompletionTime.orElse(String.valueOf(Long.MAX_VALUE)));
+        if (queryType == HoodieTableQueryType.INCREMENTAL && 
incrementalQueryStartTime.isPresent() && !isBeforeTimelineStarts()) {
+          HoodieTimeline timelineToQuery = findInstantsInRange();
           matchedPartitionPaths = 
TimelineUtils.getWrittenPartitions(timelineToQuery);
         } else {
           matchedPartitionPaths = 
tableMetadata.getPartitionPathWithPathPrefixes(relativePartitionPaths);
@@ -347,6 +348,24 @@ public abstract class BaseHoodieTableFileIndex implements 
AutoCloseable {
         .collect(Collectors.toList());
   }
 
+  private boolean isBeforeTimelineStarts() {
+    if (isCompletionTimeBasedQuery) {
+      return 
metaClient.getActiveTimeline().isBeforeTimelineStartsByCompletionTime(incrementalQueryStartTime.get());
+    } else {
+      return 
metaClient.getActiveTimeline().isBeforeTimelineStarts(incrementalQueryStartTime.get());
+    }
+  }
+
+  private HoodieTimeline findInstantsInRange() {
+    if (isCompletionTimeBasedQuery) {
+      return metaClient.getActiveTimeline().getWriteTimeline()
+          
.findInstantsInRangeByCompletionTime(incrementalQueryStartTime.get(), 
incrementalQueryEndTime.orElse(String.valueOf(Long.MAX_VALUE)));
+    } else {
+      return metaClient.getActiveTimeline().getWriteTimeline()
+          .findInstantsInRange(incrementalQueryStartTime.get(), 
incrementalQueryEndTime.orElse(String.valueOf(Long.MAX_VALUE)));
+    }
+  }
+
   protected void refresh() {
     fileStatusCache.invalidate();
     doRefresh();
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
index a0d7ed059ddf..75eaa7e8a00d 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -307,6 +307,12 @@ object DefaultSource {
       val useNewParquetFileFormat = 
parameters.getOrElse(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(),
         
HoodieReaderConfig.FILE_GROUP_READER_ENABLED.defaultValue().toString).toBoolean 
&&
         !metaClient.isMetadataTable && (globPaths == null || globPaths.isEmpty)
+      lazy val tableVersion = if 
(SparkConfigUtils.containsConfigProperty(parameters, 
INCREMENTAL_READ_TABLE_VERSION)) {
+        Integer.parseInt(parameters(INCREMENTAL_READ_TABLE_VERSION.key))
+      } else {
+        metaClient.getTableConfig.getTableVersion.versionCode()
+      }
+      lazy val hoodieTableSupportsCompletionTime = tableVersion >= 
HoodieTableVersion.EIGHT.versionCode()
       if 
(metaClient.getCommitsTimeline.filterCompletedInstants.countInstants() == 0) {
         new EmptyRelation(sqlContext, resolveSchema(metaClient, parameters, 
Some(schema)))
       } else if (isCdcQuery) {
@@ -334,29 +340,13 @@ object DefaultSource {
               resolveBaseFileOnlyRelation(sqlContext, globPaths, userSchema, 
metaClient, parameters)
             }
           case (COPY_ON_WRITE, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
-            if (SparkConfigUtils.containsConfigProperty(parameters, 
INCREMENTAL_READ_TABLE_VERSION)) {
-              val writeTableVersion = 
Integer.parseInt(parameters(INCREMENTAL_READ_TABLE_VERSION.key))
-              if (writeTableVersion >= HoodieTableVersion.EIGHT.versionCode()) 
{
-                if (useNewParquetFileFormat) {
-                  new HoodieCopyOnWriteIncrementalHadoopFsRelationFactory(
-                    sqlContext, metaClient, parameters, userSchema, 
isBootstrappedTable).build()
-                } else {
-                  new IncrementalRelationV2(sqlContext, parameters, 
userSchema, metaClient, RangeType.CLOSED_CLOSED)
-                }
-              } else {
-                new IncrementalRelationV1(sqlContext, parameters, userSchema, 
metaClient)
-              }
-            } else {
-              if (metaClient.getTableConfig.getTableVersion.versionCode() >= 
HoodieTableVersion.EIGHT.versionCode()) {
-                if (useNewParquetFileFormat) {
-                  new HoodieCopyOnWriteIncrementalHadoopFsRelationFactory(
-                    sqlContext, metaClient, parameters, userSchema, 
isBootstrappedTable).build()
-                } else {
-                  new IncrementalRelationV2(sqlContext, parameters, 
userSchema, metaClient, RangeType.CLOSED_CLOSED)
-                }
-              } else {
-                new IncrementalRelationV1(sqlContext, parameters, userSchema, 
metaClient)
-              }
+            (hoodieTableSupportsCompletionTime, useNewParquetFileFormat) match 
{
+              case (true, true) => new 
HoodieCopyOnWriteIncrementalHadoopFsRelationFactoryV2(
+                sqlContext, metaClient, parameters, userSchema, 
isBootstrappedTable, RangeType.CLOSED_CLOSED).build()
+              case (true, false) => new IncrementalRelationV2(sqlContext, 
parameters, userSchema, metaClient, RangeType.CLOSED_CLOSED)
+              case (false, true) => new 
HoodieCopyOnWriteIncrementalHadoopFsRelationFactoryV1(
+                sqlContext, metaClient, parameters, userSchema, 
isBootstrappedTable).build()
+              case (false, false) => new IncrementalRelationV1(sqlContext, 
parameters, userSchema, metaClient)
             }
 
           case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) =>
@@ -376,29 +366,13 @@ object DefaultSource {
             }
 
           case (MERGE_ON_READ, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
-            if (SparkConfigUtils.containsConfigProperty(parameters, 
INCREMENTAL_READ_TABLE_VERSION)) {
-              val writeTableVersion = 
Integer.parseInt(parameters(INCREMENTAL_READ_TABLE_VERSION.key))
-              if (writeTableVersion >= HoodieTableVersion.EIGHT.versionCode()) 
{
-                if (useNewParquetFileFormat) {
-                  new HoodieMergeOnReadIncrementalHadoopFsRelationFactory(
-                    sqlContext, metaClient, parameters, userSchema, 
isBootstrappedTable).build()
-                } else {
-                  MergeOnReadIncrementalRelationV2(sqlContext, parameters, 
metaClient, userSchema)
-                }
-              } else {
-                MergeOnReadIncrementalRelationV1(sqlContext, parameters, 
metaClient, userSchema)
-              }
-            } else {
-              if (metaClient.getTableConfig.getTableVersion.versionCode() >= 
HoodieTableVersion.EIGHT.versionCode()) {
-                if (useNewParquetFileFormat) {
-                  new HoodieMergeOnReadIncrementalHadoopFsRelationFactory(
-                    sqlContext, metaClient, parameters, userSchema, 
isBootstrappedTable).build()
-                } else {
-                  MergeOnReadIncrementalRelationV2(sqlContext, parameters, 
metaClient, userSchema)
-                }
-              } else {
-                MergeOnReadIncrementalRelationV1(sqlContext, parameters, 
metaClient, userSchema)
-              }
+            (hoodieTableSupportsCompletionTime, useNewParquetFileFormat) match 
{
+              case (true, true) => new 
HoodieMergeOnReadIncrementalHadoopFsRelationFactoryV2(
+                sqlContext, metaClient, parameters, userSchema, 
isBootstrappedTable).build()
+              case (true, false) => 
MergeOnReadIncrementalRelationV2(sqlContext, parameters, metaClient, userSchema)
+              case (false, true) => new 
HoodieMergeOnReadIncrementalHadoopFsRelationFactoryV1(
+                sqlContext, metaClient, parameters, userSchema, 
isBootstrappedTable).build()
+              case (false, false) => 
MergeOnReadIncrementalRelationV1(sqlContext, parameters, metaClient, userSchema)
             }
 
           case (_, _, true) =>
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCDCFileIndex.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCDCFileIndex.scala
index 07d8b224d977..c2413f33d427 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCDCFileIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCDCFileIndex.scala
@@ -33,21 +33,21 @@ import org.apache.spark.sql.types.StructType
 
 import scala.collection.JavaConverters._
 
-class HoodieCDCFileIndex (override val spark: SparkSession,
-                          override val metaClient: HoodieTableMetaClient,
-                          override val schemaSpec: Option[StructType],
-                          override val options: Map[String, String],
-                          @transient override val fileStatusCache: 
FileStatusCache = NoopCache,
-                          override val includeLogFiles: Boolean,
-                          override val shouldEmbedFileSlices: Boolean)
+class HoodieCDCFileIndex(override val spark: SparkSession,
+                         override val metaClient: HoodieTableMetaClient,
+                         override val schemaSpec: Option[StructType],
+                         override val options: Map[String, String],
+                         @transient override val fileStatusCache: 
FileStatusCache = NoopCache,
+                         override val includeLogFiles: Boolean)
   extends HoodieFileIndex(
-    spark, metaClient, schemaSpec, options, fileStatusCache, includeLogFiles, 
shouldEmbedFileSlices
+    spark, metaClient, schemaSpec, options, fileStatusCache, includeLogFiles, 
shouldEmbedFileSlices = true
   ) with FileIndex  {
   private val emptyPartitionPath: String = "empty_partition_path";
   val cdcRelation: CDCRelation = CDCRelation.getCDCRelation(spark.sqlContext, 
metaClient, options)
   val cdcExtractor: HoodieCDCExtractor = cdcRelation.cdcExtractor
 
   override def listFiles(partitionFilters: Seq[Expression], dataFilters: 
Seq[Expression]): Seq[PartitionDirectory] = {
+    hasPushedDownPartitionPredicates = true
     cdcExtractor.extractCDCFileSplits().asScala.map {
       case (fileGroupId, fileSplits) =>
         val partitionPath = if (fileGroupId.getPartitionPath.isEmpty) 
emptyPartitionPath else fileGroupId.getPartitionPath
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
index cacc8fc36c6b..ff6322dc6fad 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
@@ -25,6 +25,7 @@ import org.apache.hudi.common.config.{ConfigProperty, 
HoodieMetadataConfig, Hood
 import 
org.apache.hudi.common.config.HoodieMetadataConfig.{DEFAULT_METADATA_ENABLE_FOR_READERS,
 ENABLE}
 import org.apache.hudi.common.model.HoodieRecord
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.common.table.log.InstantRange.RangeType
 import org.apache.hudi.common.table.timeline.HoodieTimeline
 import org.apache.hudi.common.util.{ConfigUtils, StringUtils}
 import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
@@ -329,7 +330,7 @@ class 
HoodieMergeOnReadSnapshotHadoopFsRelationFactory(override val sqlContext:
   override def buildDataSchema(): StructType = fileIndex.dataSchema
 }
 
-abstract class 
BaseHoodieMergeOnReadIncrementalHadoopFsRelationFactory(override val 
sqlContext: SQLContext,
+abstract class 
HoodieBaseMergeOnReadIncrementalHadoopFsRelationFactory(override val 
sqlContext: SQLContext,
                                                                        
override val metaClient: HoodieTableMetaClient,
                                                                        
override val options: Map[String, String],
                                                                        
override val schemaSpec: Option[StructType],
@@ -343,15 +344,16 @@ abstract class 
BaseHoodieMergeOnReadIncrementalHadoopFsRelationFactory(override
   override protected def isIncremental: Boolean = true
 }
 
-class HoodieMergeOnReadIncrementalHadoopFsRelationFactory(override val 
sqlContext: SQLContext,
-                                                          override val 
metaClient: HoodieTableMetaClient,
-                                                          override val 
options: Map[String, String],
-                                                          override val 
schemaSpec: Option[StructType],
-                                                          isBootstrap: Boolean)
-  extends BaseHoodieMergeOnReadIncrementalHadoopFsRelationFactory(sqlContext, 
metaClient, options, schemaSpec, isBootstrap) {
+abstract class HoodieMergeOnReadIncrementalHadoopFsRelationFactory(override 
val sqlContext: SQLContext,
+                                                                   override 
val metaClient: HoodieTableMetaClient,
+                                                                   override 
val options: Map[String, String],
+                                                                   override 
val schemaSpec: Option[StructType],
+                                                                   
isBootstrap: Boolean,
+                                                                   
mergeOnReadIncrementalRelation: MergeOnReadIncrementalRelation)
+  extends HoodieBaseMergeOnReadIncrementalHadoopFsRelationFactory(sqlContext, 
metaClient, options, schemaSpec, isBootstrap) {
 
   private val incrementalFileIndex = new HoodieIncrementalFileIndex(
-    sparkSession, metaClient, schemaSpec, options, fileStatusCache, true, true)
+    sparkSession, metaClient, schemaSpec, options, fileStatusCache, true, 
mergeOnReadIncrementalRelation)
 
   override def buildFileIndex(): HoodieFileIndex = incrementalFileIndex
 
@@ -362,14 +364,30 @@ class 
HoodieMergeOnReadIncrementalHadoopFsRelationFactory(override val sqlContex
   override def buildDataSchema(): StructType = incrementalFileIndex.dataSchema
 }
 
+class HoodieMergeOnReadIncrementalHadoopFsRelationFactoryV1(override val 
sqlContext: SQLContext,
+                                                            override val 
metaClient: HoodieTableMetaClient,
+                                                            override val 
options: Map[String, String],
+                                                            override val 
schemaSpec: Option[StructType],
+                                                            isBootstrap: 
Boolean)
+  extends HoodieMergeOnReadIncrementalHadoopFsRelationFactory(sqlContext, 
metaClient, options, schemaSpec, isBootstrap,
+    MergeOnReadIncrementalRelationV1(sqlContext, options, metaClient, 
schemaSpec))
+
+class HoodieMergeOnReadIncrementalHadoopFsRelationFactoryV2(override val 
sqlContext: SQLContext,
+                                                            override val 
metaClient: HoodieTableMetaClient,
+                                                            override val 
options: Map[String, String],
+                                                            override val 
schemaSpec: Option[StructType],
+                                                            isBootstrap: 
Boolean)
+  extends HoodieMergeOnReadIncrementalHadoopFsRelationFactory(sqlContext, 
metaClient, options, schemaSpec, isBootstrap,
+    MergeOnReadIncrementalRelationV2(sqlContext, options, metaClient, 
schemaSpec))
+
 class HoodieMergeOnReadCDCHadoopFsRelationFactory(override val sqlContext: 
SQLContext,
                                                   override val metaClient: 
HoodieTableMetaClient,
                                                   override val options: 
Map[String, String],
                                                   override val schemaSpec: 
Option[StructType],
                                                   isBootstrap: Boolean)
-  extends BaseHoodieMergeOnReadIncrementalHadoopFsRelationFactory(sqlContext, 
metaClient, options, schemaSpec, isBootstrap) {
+  extends HoodieBaseMergeOnReadIncrementalHadoopFsRelationFactory(sqlContext, 
metaClient, options, schemaSpec, isBootstrap) {
   private val hoodieCDCFileIndex = new HoodieCDCFileIndex(
-    sparkSession, metaClient, schemaSpec, options, fileStatusCache, true, true)
+    sparkSession, metaClient, schemaSpec, options, fileStatusCache, true)
 
   override def buildFileIndex(): HoodieFileIndex = hoodieCDCFileIndex
 
@@ -408,7 +426,7 @@ class 
HoodieCopyOnWriteSnapshotHadoopFsRelationFactory(override val sqlContext:
   override def buildDataSchema(): StructType = fileIndex.dataSchema
 }
 
-abstract class 
BaseHoodieCopyOnWriteIncrementalHadoopFsRelationFactory(override val 
sqlContext: SQLContext,
+abstract class 
HoodieBaseCopyOnWriteIncrementalHadoopFsRelationFactory(override val 
sqlContext: SQLContext,
                                                                        
override val metaClient: HoodieTableMetaClient,
                                                                        
override val options: Map[String, String],
                                                                        
override val schemaSpec: Option[StructType],
@@ -424,15 +442,16 @@ abstract class 
BaseHoodieCopyOnWriteIncrementalHadoopFsRelationFactory(override
 
 }
 
-class HoodieCopyOnWriteIncrementalHadoopFsRelationFactory(override val 
sqlContext: SQLContext,
-                                                          override val 
metaClient: HoodieTableMetaClient,
-                                                          override val 
options: Map[String, String],
-                                                          override val 
schemaSpec: Option[StructType],
-                                                          isBootstrap: Boolean)
-  extends BaseHoodieCopyOnWriteIncrementalHadoopFsRelationFactory(sqlContext, 
metaClient, options, schemaSpec, isBootstrap) {
+abstract class HoodieCopyOnWriteIncrementalHadoopFsRelationFactory(override 
val sqlContext: SQLContext,
+                                                                       
override val metaClient: HoodieTableMetaClient,
+                                                                       
override val options: Map[String, String],
+                                                                       
override val schemaSpec: Option[StructType],
+                                                                       
isBootstrap: Boolean,
+                                                                       
mergeOnReadIncrementalRelation: MergeOnReadIncrementalRelation)
+  extends HoodieBaseCopyOnWriteIncrementalHadoopFsRelationFactory(sqlContext, 
metaClient, options, schemaSpec, isBootstrap) {
 
   private val incrementalFileIndex = new HoodieIncrementalFileIndex(
-    sparkSession, metaClient, schemaSpec, options, fileStatusCache, false, 
true)
+    sparkSession, metaClient, schemaSpec, options, fileStatusCache, false, 
mergeOnReadIncrementalRelation)
 
   override def buildFileIndex(): HoodieFileIndex = incrementalFileIndex
 
@@ -443,15 +462,32 @@ class 
HoodieCopyOnWriteIncrementalHadoopFsRelationFactory(override val sqlContex
   override def buildDataSchema(): StructType = incrementalFileIndex.dataSchema
 }
 
+class HoodieCopyOnWriteIncrementalHadoopFsRelationFactoryV1(override val 
sqlContext: SQLContext,
+                                                            override val 
metaClient: HoodieTableMetaClient,
+                                                            override val 
options: Map[String, String],
+                                                            override val 
schemaSpec: Option[StructType],
+                                                            isBootstrap: 
Boolean)
+extends HoodieCopyOnWriteIncrementalHadoopFsRelationFactory(sqlContext, 
metaClient, options, schemaSpec, isBootstrap,
+  MergeOnReadIncrementalRelationV1(sqlContext, options, metaClient, 
schemaSpec))
+
+class HoodieCopyOnWriteIncrementalHadoopFsRelationFactoryV2(override val 
sqlContext: SQLContext,
+                                                            override val 
metaClient: HoodieTableMetaClient,
+                                                            override val 
options: Map[String, String],
+                                                            override val 
schemaSpec: Option[StructType],
+                                                            isBootstrap: 
Boolean,
+                                                            rangeType: 
RangeType)
+  extends HoodieCopyOnWriteIncrementalHadoopFsRelationFactory(sqlContext, 
metaClient, options, schemaSpec, isBootstrap,
+    MergeOnReadIncrementalRelationV2(sqlContext, options, metaClient, 
schemaSpec, None, rangeType))
+
 class HoodieCopyOnWriteCDCHadoopFsRelationFactory(override val sqlContext: 
SQLContext,
                                                   override val metaClient: 
HoodieTableMetaClient,
                                                   override val options: 
Map[String, String],
                                                   override val schemaSpec: 
Option[StructType],
                                                   isBootstrap: Boolean)
-  extends BaseHoodieCopyOnWriteIncrementalHadoopFsRelationFactory(sqlContext, 
metaClient, options, schemaSpec, isBootstrap) {
+  extends HoodieBaseCopyOnWriteIncrementalHadoopFsRelationFactory(sqlContext, 
metaClient, options, schemaSpec, isBootstrap) {
 
   private val hoodieCDCFileIndex = new HoodieCDCFileIndex(
-    sparkSession, metaClient, schemaSpec, options, fileStatusCache, false, 
true)
+    sparkSession, metaClient, schemaSpec, options, fileStatusCache, false)
   override def buildFileIndex(): HoodieFileIndex = hoodieCDCFileIndex
 
   override def buildDataSchema(): StructType = 
hoodieCDCFileIndex.cdcRelation.schema
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieIncrementalFileIndex.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieIncrementalFileIndex.scala
index d5717aaafff8..4c2091e8feda 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieIncrementalFileIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieIncrementalFileIndex.scala
@@ -38,12 +38,10 @@ class HoodieIncrementalFileIndex(override val spark: 
SparkSession,
                                  override val options: Map[String, String],
                                  @transient override val fileStatusCache: 
FileStatusCache = NoopCache,
                                  override val includeLogFiles: Boolean,
-                                 override val shouldEmbedFileSlices: Boolean)
+                                 mergeOnReadIncrementalRelation: 
MergeOnReadIncrementalRelation)
   extends HoodieFileIndex(
-    spark, metaClient, schemaSpec, options, fileStatusCache, includeLogFiles, 
shouldEmbedFileSlices
+    spark, metaClient, schemaSpec, options, fileStatusCache, includeLogFiles, 
shouldEmbedFileSlices = true
   ) with FileIndex {
-  val mergeOnReadIncrementalRelation: MergeOnReadIncrementalRelationV2 = 
MergeOnReadIncrementalRelationV2(
-    spark.sqlContext, options, metaClient, schemaSpec, schemaSpec)
 
   override def listFiles(partitionFilters: Seq[Expression], dataFilters: 
Seq[Expression]): Seq[PartitionDirectory] = {
     val fileSlices = 
mergeOnReadIncrementalRelation.listFileSplits(partitionFilters, 
dataFilters).toSeq.flatMap(
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV1.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV1.scala
index 4538d7c31821..2e0b14ecd968 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV1.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV1.scala
@@ -51,7 +51,7 @@ case class MergeOnReadIncrementalRelationV1(override val 
sqlContext: SQLContext,
                                             private val userSchema: 
Option[StructType],
                                             private val prunedDataSchema: 
Option[StructType] = None)
   extends BaseMergeOnReadSnapshotRelation(sqlContext, optParams, metaClient, 
Seq(), userSchema, prunedDataSchema)
-    with HoodieIncrementalRelationV1Trait {
+    with HoodieIncrementalRelationV1Trait with MergeOnReadIncrementalRelation {
 
   override type Relation = MergeOnReadIncrementalRelationV1
 
@@ -123,7 +123,7 @@ case class MergeOnReadIncrementalRelationV1(override val 
sqlContext: SQLContext,
     }
   }
 
-  def listFileSplits(partitionFilters: Seq[Expression], dataFilters: 
Seq[Expression]): Map[InternalRow, Seq[FileSlice]] = {
+  override def listFileSplits(partitionFilters: Seq[Expression], dataFilters: 
Seq[Expression]): Map[InternalRow, Seq[FileSlice]] = {
     val slices = if (includedCommits.isEmpty) {
       List()
     } else {
@@ -152,7 +152,7 @@ case class MergeOnReadIncrementalRelationV1(override val 
sqlContext: SQLContext,
     })
   }
 
-  def getRequiredFilters: Seq[Filter] = {
+  override def getRequiredFilters: Seq[Filter] = {
     if (includedCommits.isEmpty) {
       Seq.empty
     } else {
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV2.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV2.scala
index 216ead57f8f3..9946e11d707b 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV2.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV2.scala
@@ -42,6 +42,11 @@ import org.apache.spark.sql.types.StructType
 import scala.collection.JavaConverters._
 import scala.collection.immutable
 
+trait MergeOnReadIncrementalRelation {
+  def listFileSplits(partitionFilters: Seq[Expression], dataFilters: 
Seq[Expression]): Map[InternalRow, Seq[FileSlice]]
+  def getRequiredFilters: Seq[Filter]
+}
+
 case class MergeOnReadIncrementalRelationV2(override val sqlContext: 
SQLContext,
                                             override val optParams: 
Map[String, String],
                                             override val metaClient: 
HoodieTableMetaClient,
@@ -49,7 +54,7 @@ case class MergeOnReadIncrementalRelationV2(override val 
sqlContext: SQLContext,
                                             private val prunedDataSchema: 
Option[StructType] = None,
                                             override val rangeType: RangeType 
= RangeType.CLOSED_CLOSED)
   extends BaseMergeOnReadSnapshotRelation(sqlContext, optParams, metaClient, 
Seq(), userSchema, prunedDataSchema)
-    with HoodieIncrementalRelationV2Trait {
+    with HoodieIncrementalRelationV2Trait with MergeOnReadIncrementalRelation {
 
   override type Relation = MergeOnReadIncrementalRelationV2
 
@@ -120,7 +125,7 @@ case class MergeOnReadIncrementalRelationV2(override val 
sqlContext: SQLContext,
     }
   }
 
-  def listFileSplits(partitionFilters: Seq[Expression], dataFilters: 
Seq[Expression]): Map[InternalRow, Seq[FileSlice]] = {
+  override def listFileSplits(partitionFilters: Seq[Expression], dataFilters: 
Seq[Expression]): Map[InternalRow, Seq[FileSlice]] = {
     val slices = if (includedCommits.isEmpty) {
       List()
     } else {
@@ -149,7 +154,7 @@ case class MergeOnReadIncrementalRelationV2(override val 
sqlContext: SQLContext,
     })
   }
 
-  def getRequiredFilters: Seq[Filter] = {
+  override def getRequiredFilters: Seq[Filter] = {
     if (includedCommits.isEmpty) {
       Seq.empty
     } else {

Reply via email to