This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 236658b4cc9 [HUDI-6157] Fix potential data loss for flink streaming 
source from table with multi writer (#8611)
236658b4cc9 is described below

commit 236658b4cc9ea28e9fa4a3a94d87315a31be4ccc
Author: Danny Chan <[email protected]>
AuthorDate: Thu May 4 20:42:05 2023 +0800

    [HUDI-6157] Fix potential data loss for flink streaming source from table 
with multi writer (#8611)
---
 .../apache/hudi/configuration/OptionsResolver.java |  11 ++
 .../sink/partitioner/profile/WriteProfiles.java    |  18 ++
 .../apache/hudi/source/IncrementalInputSplits.java | 197 +++++++++++++++------
 .../hudi/source/StreamReadMonitoringFunction.java  |  29 +--
 .../org/apache/hudi/table/HoodieTableSource.java   |   2 +-
 .../java/org/apache/hudi/util/StreamerUtil.java    |  14 ++
 .../hudi/source/TestIncrementalInputSplits.java    |  12 +-
 .../source/TestStreamReadMonitoringFunction.java   | 104 +++++++++++
 .../apache/hudi/table/format/TestInputFormat.java  | 115 ++++++++++--
 .../test/java/org/apache/hudi/utils/TestUtils.java |  18 ++
 10 files changed, 436 insertions(+), 84 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
index 1cceb619c54..113155f47eb 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
@@ -260,6 +260,17 @@ public class OptionsResolver {
         
.equalsIgnoreCase(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name());
   }
 
+  /**
+   * Returns whether to read the instants using completion time.
+   *
+   * <p>A Hudi instant contains both the txn start time and completion time, 
for incremental subscription
+   * of the source reader, using completion time to filter the candidate 
instants can avoid data loss
+   * in scenarios like multiple writers.
+   */
+  public static boolean isReadByTxnCompletionTime(Configuration conf) {
+    return 
conf.getBoolean(HoodieCommonConfig.READ_BY_STATE_TRANSITION_TIME.key(), 
HoodieCommonConfig.READ_BY_STATE_TRANSITION_TIME.defaultValue());
+  }
+
   /**
    * Returns the index type.
    */
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java
index 7c4886852f1..2f959b241dd 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java
@@ -82,6 +82,24 @@ public class WriteProfiles {
     PROFILES.remove(path);
   }
 
+  /**
+   * Returns all the incremental write file statuses with the given commits 
metadata.
+   * Only existing files are included.
+   *
+   * @param basePath           Table base path
+   * @param hadoopConf         The hadoop conf
+   * @param metadataList       The commit metadata list (should in ascending 
order)
+   * @param tableType          The table type
+   * @return the file status array
+   */
+  public static FileStatus[] getFilesFromMetadata(
+      Path basePath,
+      Configuration hadoopConf,
+      List<HoodieCommitMetadata> metadataList,
+      HoodieTableType tableType) {
+    return getFilesFromMetadata(basePath, hadoopConf, metadataList, tableType, 
true);
+  }
+
   /**
    * Returns all the incremental write file statuses with the given commits 
metadata.
    *
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
index 2c21f40ce57..ba96b6acead 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
@@ -41,6 +41,7 @@ import org.apache.hudi.source.prune.PartitionPruners;
 import org.apache.hudi.table.format.cdc.CdcInputSplit;
 import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
 import org.apache.hudi.util.ClusteringUtil;
+import org.apache.hudi.util.StreamerUtil;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
@@ -67,6 +68,7 @@ import java.util.stream.Stream;
 
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN_OR_EQUALS;
+import static org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN_OR_EQUALS;
 
 /**
@@ -125,14 +127,12 @@ public class IncrementalInputSplits implements 
Serializable {
    * Returns the incremental input splits.
    *
    * @param metaClient The meta client
-   * @param hadoopConf The hadoop configuration
    * @param cdcEnabled Whether cdc is enabled
    *
    * @return The list of incremental input splits or empty if there are no new 
instants
    */
   public Result inputSplits(
       HoodieTableMetaClient metaClient,
-      org.apache.hadoop.conf.Configuration hadoopConf,
       boolean cdcEnabled) {
     HoodieTimeline commitTimeline = getReadTimeline(metaClient);
     if (commitTimeline.empty()) {
@@ -209,7 +209,7 @@ public class IncrementalInputSplits implements Serializable 
{
         LOG.warn("No partitions found for reading in user provided path.");
         return Result.EMPTY;
       }
-      FileStatus[] files = WriteProfiles.getFilesFromMetadata(path, 
hadoopConf, metadataList, metaClient.getTableType(), false);
+      FileStatus[] files = WriteProfiles.getFilesFromMetadata(path, 
metaClient.getHadoopConf(), metadataList, metaClient.getTableType(), false);
       if (files == null) {
         LOG.warn("Found deleted files in metadata, fall back to full table 
scan.");
         // fallback to full table scan
@@ -241,16 +241,16 @@ public class IncrementalInputSplits implements 
Serializable {
    * Returns the incremental input splits.
    *
    * @param metaClient    The meta client
-   * @param hadoopConf    The hadoop configuration
    * @param issuedInstant The last issued instant, only valid in streaming read
+   * @param issuedOffset  The last issued offset, only valid in streaming read
    * @param cdcEnabled    Whether cdc is enabled
    *
    * @return The list of incremental input splits or empty if there are no new 
instants
    */
   public Result inputSplits(
       HoodieTableMetaClient metaClient,
-      @Nullable org.apache.hadoop.conf.Configuration hadoopConf,
-      String issuedInstant,
+      @Nullable String issuedInstant,
+      @Nullable String issuedOffset,
       boolean cdcEnabled) {
     metaClient.reloadActiveTimeline();
     HoodieTimeline commitTimeline = getReadTimeline(metaClient);
@@ -258,85 +258,152 @@ public class IncrementalInputSplits implements 
Serializable {
       LOG.warn("No splits found for the table under path " + path);
       return Result.EMPTY;
     }
+
+    // Assumes a timeline:
+    //    c1.inflight, c2(issued instant), c3, c4
+    // -> c1,          c2(issued instant), c3, c4
+    // c1, c3 and c4 are the candidate instants,
+    // we call c1 a 'hollow' instant which has lower version number but 
greater completion time,
+    // filtering the timeline using just c2 could cause data loss,
+    // check these hollow instants first.
+    Result hollowSplits = getHollowInputSplits(metaClient, 
metaClient.getHadoopConf(), issuedInstant, issuedOffset, commitTimeline, 
cdcEnabled);
+
     List<HoodieInstant> instants = filterInstantsWithRange(commitTimeline, 
issuedInstant);
     // get the latest instant that satisfies condition
-    final HoodieInstant instantToIssue = instants.size() == 0 ? null : 
instants.get(instants.size() - 1);
+    final String endInstant = instants.size() == 0 ? null : 
instants.get(instants.size() - 1).getTimestamp();
     final InstantRange instantRange;
-    if (instantToIssue != null) {
+    if (endInstant != null) {
       // when cdc is enabled, returns instant range with nullable boundary
       // to filter the reading instants on the timeline
-      instantRange = getInstantRange(issuedInstant, 
instantToIssue.getTimestamp(), cdcEnabled);
-    } else {
+      instantRange = getInstantRange(issuedInstant, endInstant, cdcEnabled);
+    } else if (hollowSplits.isEmpty()) {
       LOG.info("No new instant found for the table under path " + path + ", 
skip reading");
       return Result.EMPTY;
+    } else {
+      return hollowSplits;
     }
 
-    final Set<String> readPartitions;
-    final FileStatus[] fileStatuses;
+    // version number should be monotonically increasing
+    // fetch the instant offset by completion time
+    String offsetToIssue = 
instants.stream().map(HoodieInstant::getStateTransitionTime).max(String::compareTo).orElse(endInstant);
 
     if (instantRange == null) {
       // reading from the earliest, scans the partitions and files directly.
       FileIndex fileIndex = getFileIndex();
-      readPartitions = new TreeSet<>(fileIndex.getOrBuildPartitionPaths());
+
+      Set<String> readPartitions = new 
TreeSet<>(fileIndex.getOrBuildPartitionPaths());
       if (readPartitions.size() == 0) {
         LOG.warn("No partitions found for reading under path: " + path);
         return Result.EMPTY;
       }
-      fileStatuses = fileIndex.getFilesInPartitions();
 
+      FileStatus[] fileStatuses = fileIndex.getFilesInPartitions();
       if (fileStatuses.length == 0) {
         LOG.warn("No files found for reading under path: " + path);
         return Result.EMPTY;
       }
 
-      final String endInstant = instantToIssue.getTimestamp();
       List<MergeOnReadInputSplit> inputSplits = getInputSplits(metaClient, 
commitTimeline,
           fileStatuses, readPartitions, endInstant, null, false);
 
-      return Result.instance(inputSplits, endInstant);
+      return Result.instance(inputSplits, endInstant, offsetToIssue);
     } else {
-      // streaming read
-      if (cdcEnabled) {
-        // case1: cdc change log enabled
-        final String endInstant = instantToIssue.getTimestamp();
-        List<MergeOnReadInputSplit> inputSplits = 
getCdcInputSplits(metaClient, instantRange);
-        return Result.instance(inputSplits, endInstant);
-      }
-      // case2: normal streaming read
-      String tableName = conf.getString(FlinkOptions.TABLE_NAME);
-      List<HoodieCommitMetadata> activeMetadataList = instants.stream()
-          .map(instant -> WriteProfiles.getCommitMetadata(tableName, path, 
instant, commitTimeline)).collect(Collectors.toList());
-      List<HoodieCommitMetadata> archivedMetadataList = 
getArchivedMetadata(metaClient, instantRange, commitTimeline, tableName);
-      if (archivedMetadataList.size() > 0) {
-        LOG.warn("\n"
-            + 
"--------------------------------------------------------------------------------\n"
-            + "---------- caution: the reader has fall behind too much from 
the writer,\n"
-            + "---------- tweak 'read.tasks' option to add parallelism of read 
tasks.\n"
-            + 
"--------------------------------------------------------------------------------");
-      }
-      List<HoodieCommitMetadata> metadataList = archivedMetadataList.size() > 0
-          // IMPORTANT: the merged metadata list must be in ascending order by 
instant time
-          ? mergeList(archivedMetadataList, activeMetadataList)
-          : activeMetadataList;
+      List<MergeOnReadInputSplit> inputSplits = getIncInputSplits(metaClient, 
metaClient.getHadoopConf(), commitTimeline, instants, instantRange, endInstant, 
cdcEnabled);
+      return Result.instance(mergeList(hollowSplits.getInputSplits(), 
inputSplits), endInstant, offsetToIssue);
+    }
+  }
 
-      readPartitions = getReadPartitions(metadataList);
-      if (readPartitions.size() == 0) {
-        LOG.warn("No partitions found for reading under path: " + path);
-        return Result.EMPTY;
-      }
-      fileStatuses = WriteProfiles.getFilesFromMetadata(path, hadoopConf, 
metadataList, metaClient.getTableType(), true);
+  /**
+   * Returns the input splits for streaming incremental read.
+   */
+  private List<MergeOnReadInputSplit> getIncInputSplits(
+      HoodieTableMetaClient metaClient,
+      org.apache.hadoop.conf.Configuration hadoopConf,
+      HoodieTimeline commitTimeline,
+      List<HoodieInstant> instants,
+      InstantRange instantRange,
+      String endInstant,
+      boolean cdcEnabled) {
+    // streaming read
+    if (cdcEnabled) {
+      // case1: cdc change log enabled
+      return getCdcInputSplits(metaClient, instantRange);
+    }
+    // case2: normal streaming read
+    String tableName = conf.getString(FlinkOptions.TABLE_NAME);
+    List<HoodieCommitMetadata> activeMetadataList = instants.stream()
+        .map(instant -> WriteProfiles.getCommitMetadata(tableName, path, 
instant, commitTimeline)).collect(Collectors.toList());
+    List<HoodieCommitMetadata> archivedMetadataList = 
getArchivedMetadata(metaClient, instantRange, commitTimeline, tableName);
+    if (archivedMetadataList.size() > 0) {
+      LOG.warn("\n"
+          + 
"--------------------------------------------------------------------------------\n"
+          + "---------- caution: the reader has fall behind too much from the 
writer,\n"
+          + "---------- tweak 'read.tasks' option to add parallelism of read 
tasks.\n"
+          + 
"--------------------------------------------------------------------------------");
+    }
+    // IMPORTANT: the merged metadata list must be in ascending order by 
instant time
+    List<HoodieCommitMetadata> metadataList = mergeList(archivedMetadataList, 
activeMetadataList);
 
-      if (fileStatuses.length == 0) {
-        LOG.warn("No files found for reading under path: " + path);
-        return Result.EMPTY;
-      }
+    Set<String> readPartitions = getReadPartitions(metadataList);
+    if (readPartitions.size() == 0) {
+      LOG.warn("No partitions found for reading under path: " + path);
+      return Collections.emptyList();
+    }
+    FileStatus[] fileStatuses = WriteProfiles.getFilesFromMetadata(path, 
hadoopConf, metadataList, metaClient.getTableType());
 
-      final String endInstant = instantToIssue.getTimestamp();
-      List<MergeOnReadInputSplit> inputSplits = getInputSplits(metaClient, 
commitTimeline,
-          fileStatuses, readPartitions, endInstant, instantRange, 
skipCompaction);
+    if (fileStatuses.length == 0) {
+      LOG.warn("No files found for reading under path: " + path);
+      return Collections.emptyList();
+    }
+
+    return getInputSplits(metaClient, commitTimeline,
+        fileStatuses, readPartitions, endInstant, instantRange, 
skipCompaction);
+  }
 
-      return Result.instance(inputSplits, endInstant);
+  /**
+   * Returns the input splits for 'hollow' instants.
+   */
+  private Result getHollowInputSplits(
+      HoodieTableMetaClient metaClient,
+      org.apache.hadoop.conf.Configuration hadoopConf,
+      @Nullable String issuedInstant,
+      @Nullable String issuedOffset,
+      HoodieTimeline commitTimeline,
+      boolean cdcEnabled) {
+    if (issuedInstant == null || issuedOffset == null) {
+      return Result.EMPTY;
+    }
+    // find the write commit instant that finishes later than the issued 
instant
+    // while with smaller txn start time.
+    List<HoodieInstant> instants = commitTimeline.getInstantsAsStream()
+        .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), 
LESSER_THAN, issuedInstant))
+        .filter(s -> 
HoodieTimeline.compareTimestamps(s.getStateTransitionTime(), GREATER_THAN, 
issuedOffset))
+        .filter(s -> StreamerUtil.isWriteCommit(metaClient.getTableType(), s, 
commitTimeline)).collect(Collectors.toList());
+    if (instants.isEmpty()) {
+      return Result.EMPTY;
     }
+    String offsetToIssue = 
instants.stream().map(HoodieInstant::getStateTransitionTime).max(String::compareTo).orElse(issuedOffset);
+    List<MergeOnReadInputSplit> inputSplits = instants.stream().map(instant -> 
{
+      String instantTs = instant.getTimestamp();
+
+      // Assumes we consume from timeline:
+      //    c0, c1.inflight, c2(issued instant), c3, c4
+      // -> c0, c1,          c2(issued instant), c3, c4
+      // c1, c3 and c4 are the candidate instants,
+
+      // c4 data file could include overlapping records from c2,
+      // use (c2, c4] instant range for c3 and c4,
+
+      // c1 data file could include overlapping records from c0,
+      // use the [c1, c1] instant range for c1.
+      InstantRange instantRange = InstantRange.builder()
+          .startInstant(instantTs)
+          .endInstant(instantTs)
+          .nullableBoundary(cdcEnabled)
+          .rangeType(InstantRange.RangeType.CLOSE_CLOSE).build();
+      return getIncInputSplits(metaClient, hadoopConf, commitTimeline, 
Collections.singletonList(instant), instantRange, instantTs, cdcEnabled);
+    }).flatMap(Collection::stream).collect(Collectors.toList());
+    return Result.instance(inputSplits, issuedInstant, offsetToIssue);
   }
 
   @Nullable
@@ -491,12 +558,13 @@ public class IncrementalInputSplits implements 
Serializable {
    *
    * @param commitTimeline The completed commits timeline
    * @param issuedInstant  The last issued instant that has already been 
delivered to downstream
+   *
    * @return the filtered hoodie instants
    */
   @VisibleForTesting
   public List<HoodieInstant> filterInstantsWithRange(
       HoodieTimeline commitTimeline,
-      final String issuedInstant) {
+      @Nullable final String issuedInstant) {
     HoodieTimeline completedTimeline = 
commitTimeline.filterCompletedInstants();
     if (issuedInstant != null) {
       // returns early for streaming mode
@@ -546,6 +614,12 @@ public class IncrementalInputSplits implements 
Serializable {
   }
 
   private static <T> List<T> mergeList(List<T> list1, List<T> list2) {
+    if (list1.isEmpty()) {
+      return list2;
+    }
+    if (list2.isEmpty()) {
+      return list1;
+    }
     List<T> merged = new ArrayList<>(list1);
     merged.addAll(list2);
     return merged;
@@ -561,6 +635,7 @@ public class IncrementalInputSplits implements Serializable 
{
   public static class Result {
     private final List<MergeOnReadInputSplit> inputSplits; // input splits
     private final String endInstant; // end instant to consume to
+    private final String offset;     // monotonic increasing consumption offset
 
     public static final Result EMPTY = instance(Collections.emptyList(), "");
 
@@ -576,13 +651,23 @@ public class IncrementalInputSplits implements 
Serializable {
       return this.endInstant;
     }
 
-    private Result(List<MergeOnReadInputSplit> inputSplits, String endInstant) 
{
+    @Nullable
+    public String getOffset() {
+      return offset;
+    }
+
+    private Result(List<MergeOnReadInputSplit> inputSplits, String endInstant, 
@Nullable String offset) {
       this.inputSplits = inputSplits;
       this.endInstant = endInstant;
+      this.offset = offset;
     }
 
     public static Result instance(List<MergeOnReadInputSplit> inputSplits, 
String endInstant) {
-      return new Result(inputSplits, endInstant);
+      return new Result(inputSplits, endInstant, null);
+    }
+
+    public static Result instance(List<MergeOnReadInputSplit> inputSplits, 
String endInstant, String offset) {
+      return new Result(inputSplits, endInstant, offset);
     }
   }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
index 5bd7abeef4d..4d325b3a649 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
@@ -92,12 +92,12 @@ public class StreamReadMonitoringFunction
 
   private String issuedInstant;
 
+  private String issuedOffset;
+
   private transient ListState<String> instantState;
 
   private final Configuration conf;
 
-  private transient org.apache.hadoop.conf.Configuration hadoopConf;
-
   private HoodieTableMetaClient metaClient;
 
   private final IncrementalInputSplits incrementalInputSplits;
@@ -145,7 +145,7 @@ public class StreamReadMonitoringFunction
         retrievedStates.add(entry);
       }
 
-      ValidationUtils.checkArgument(retrievedStates.size() <= 1,
+      ValidationUtils.checkArgument(retrievedStates.size() <= 2,
           getClass().getSimpleName() + " retrieved invalid state.");
 
       if (retrievedStates.size() == 1 && issuedInstant != null) {
@@ -156,21 +156,23 @@ public class StreamReadMonitoringFunction
             "The " + getClass().getSimpleName() + " has already restored from 
a previous Flink version.");
 
       } else if (retrievedStates.size() == 1) {
+        // for forward compatibility
         this.issuedInstant = retrievedStates.get(0);
         if (LOG.isDebugEnabled()) {
-          LOG.debug("{} retrieved a issued instant of time {} for table {} 
with path {}.",
+          LOG.debug("{} retrieved an issued instant of time {} for table {} 
with path {}.",
               getClass().getSimpleName(), issuedInstant, 
conf.get(FlinkOptions.TABLE_NAME), path);
         }
+      } else if (retrievedStates.size() == 2) {
+        this.issuedInstant = retrievedStates.get(0);
+        this.issuedOffset = retrievedStates.get(1);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("{} retrieved an issued instant of time [{}, {}] for table 
{} with path {}.",
+              getClass().getSimpleName(), issuedInstant, issuedOffset, 
conf.get(FlinkOptions.TABLE_NAME), path);
+        }
       }
     }
   }
 
-  @Override
-  public void open(Configuration parameters) throws Exception {
-    super.open(parameters);
-    this.hadoopConf = HadoopConfigurations.getHadoopConf(conf);
-  }
-
   @Override
   public void run(SourceFunction.SourceContext<MergeOnReadInputSplit> context) 
throws Exception {
     checkpointLock = context.getCheckpointLock();
@@ -187,6 +189,7 @@ public class StreamReadMonitoringFunction
     if (this.metaClient != null) {
       return this.metaClient;
     }
+    org.apache.hadoop.conf.Configuration hadoopConf = 
HadoopConfigurations.getHadoopConf(conf);
     if (StreamerUtil.tableExists(this.path.toString(), hadoopConf)) {
       this.metaClient = StreamerUtil.createMetaClient(this.path.toString(), 
hadoopConf);
       return this.metaClient;
@@ -203,7 +206,7 @@ public class StreamReadMonitoringFunction
       return;
     }
     IncrementalInputSplits.Result result =
-        incrementalInputSplits.inputSplits(metaClient, this.hadoopConf, 
this.issuedInstant, this.cdcEnabled);
+        incrementalInputSplits.inputSplits(metaClient, this.issuedInstant, 
this.issuedOffset, this.cdcEnabled);
     if (result.isEmpty()) {
       // no new instants, returns early
       return;
@@ -214,6 +217,7 @@ public class StreamReadMonitoringFunction
     }
     // update the issues instant time
     this.issuedInstant = result.getEndInstant();
+    this.issuedOffset = result.getOffset();
     LOG.info("\n"
             + "------------------------------------------------------------\n"
             + "---------- consumed to instant: {}\n"
@@ -259,5 +263,8 @@ public class StreamReadMonitoringFunction
     if (this.issuedInstant != null) {
       this.instantState.add(this.issuedInstant);
     }
+    if (this.issuedOffset != null) {
+      this.instantState.add(this.issuedOffset);
+    }
   }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index a9c72754b7a..4136c67cb28 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -398,7 +398,7 @@ public class HoodieTableSource implements
             .partitionPruner(partitionPruner)
             .build();
         final boolean cdcEnabled = 
this.conf.getBoolean(FlinkOptions.CDC_ENABLED);
-        final IncrementalInputSplits.Result result = 
incrementalInputSplits.inputSplits(metaClient, hadoopConf, cdcEnabled);
+        final IncrementalInputSplits.Result result = 
incrementalInputSplits.inputSplits(metaClient, cdcEnabled);
         if (result.isEmpty()) {
           // When there is no input splits, just return an empty source.
           LOG.warn("No input splits generate for incremental read, returns 
empty collection instead");
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index 61643e68214..071646de61a 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -22,6 +22,7 @@ import 
org.apache.hudi.common.config.DFSPropertiesConfiguration;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.EngineType;
 import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.TableSchemaResolver;
@@ -448,6 +449,19 @@ public class StreamerUtil {
     }
   }
 
+  /**
+   * Returns whether the given instant is a data writing commit.
+   *
+   * @param tableType The table type
+   * @param instant   The instant
+   * @param timeline  The timeline
+   */
+  public static boolean isWriteCommit(HoodieTableType tableType, HoodieInstant 
instant, HoodieTimeline timeline) {
+    return tableType == HoodieTableType.MERGE_ON_READ
+        ? !instant.getAction().equals(HoodieTimeline.COMMIT_ACTION) // not a 
compaction
+        : !ClusteringUtil.isClusteringInstant(instant, timeline);   // not a 
clustering
+  }
+
   /**
    * Returns the auxiliary path.
    */
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java
index a0cdb588ae4..609e099f997 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java
@@ -62,7 +62,7 @@ import static 
org.junit.jupiter.api.Assertions.assertIterableEquals;
 public class TestIncrementalInputSplits extends HoodieCommonTestHarness {
 
   @BeforeEach
-  private void init() throws IOException {
+  void init() throws IOException {
     initPath();
     initMetaClient();
   }
@@ -92,7 +92,7 @@ public class TestIncrementalInputSplits extends 
HoodieCommonTestHarness {
     assertEquals(2, instantRange2.size());
     assertIterableEquals(Arrays.asList(commit2, commit3), instantRange2);
 
-    // simulate first iteration cycle with read from LATEST commit
+    // simulate first iteration cycle with read from the LATEST commit
     List<HoodieInstant> instantRange1 = iis.filterInstantsWithRange(timeline, 
null);
     assertEquals(1, instantRange1.size());
     assertIterableEquals(Collections.singletonList(commit3), instantRange1);
@@ -161,7 +161,7 @@ public class TestIncrementalInputSplits extends 
HoodieCommonTestHarness {
         .path(new Path(basePath))
         .rowType(TestConfigurations.ROW_TYPE)
         .build();
-    IncrementalInputSplits.Result result = iis.inputSplits(metaClient, 
metaClient.getHadoopConf(), null, false);
+    IncrementalInputSplits.Result result = iis.inputSplits(metaClient, null, 
null, false);
     List<String> partitions = getFilteredPartitions(result);
     assertEquals(Arrays.asList("par1", "par2", "par3", "par4", "par5", 
"par6"), partitions);
   }
@@ -186,11 +186,15 @@ public class TestIncrementalInputSplits extends 
HoodieCommonTestHarness {
         .rowType(TestConfigurations.ROW_TYPE)
         .partitionPruner(partitionPruner)
         .build();
-    IncrementalInputSplits.Result result = iis.inputSplits(metaClient, 
metaClient.getHadoopConf(), null, false);
+    IncrementalInputSplits.Result result = iis.inputSplits(metaClient, null, 
null, false);
     List<String> partitions = getFilteredPartitions(result);
     assertEquals(expectedPartitions, partitions);
   }
 
+  // -------------------------------------------------------------------------
+  //  Utilities
+  // -------------------------------------------------------------------------
+
   private static Stream<Arguments> partitionEvaluators() {
     FieldReferenceExpression partitionFieldRef = new 
FieldReferenceExpression("partition", DataTypes.STRING(), 0, 0);
     // `partition` != 'par3'
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
index 616edc37f1c..3449f7a950f 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
@@ -18,7 +18,12 @@
 
 package org.apache.hudi.source;
 
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.log.InstantRange;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.HadoopConfigurations;
 import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
 import org.apache.hudi.util.StreamerUtil;
 import org.apache.hudi.utils.TestConfigurations;
@@ -31,6 +36,7 @@ import 
org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import org.apache.flink.table.data.RowData;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
@@ -39,6 +45,7 @@ import java.io.File;
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Objects;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
@@ -199,6 +206,96 @@ public class TestStreamReadMonitoringFunction {
     }
   }
 
+  @Test
+  public void testConsumingHollowInstants() throws Exception {
+    // write 4 commits
+    conf.setString("hoodie.parquet.small.file.limit", "0"); // invalidate the 
small file strategy
+    for (int i = 0; i < 8; i += 2) {
+      List<RowData> dataset = TestData.dataSetInsert(i + 1, i + 2);
+      TestData.writeData(dataset, conf);
+    }
+
+    // we got 4 commits on the timeline: c1, c2, c3, c4
+    // re-create the metadata file for c2 and c3 so that they have greater 
completion time than c4.
+    // the completion time sequence become: c1, c4, c2, c3,
+    // we will test with the same consumption sequence.
+    HoodieTableMetaClient metaClient = 
StreamerUtil.createMetaClient(tempFile.getAbsolutePath(), 
HadoopConfigurations.getHadoopConf(conf));
+    List<HoodieInstant> oriInstants = 
metaClient.getCommitsTimeline().filterCompletedInstants().getInstants();
+    assertThat(oriInstants.size(), is(4));
+    List<HoodieCommitMetadata> metadataList = new ArrayList<>();
+    // timeline: c1, c2.inflight, c3.inflight, c4
+    for (int i = 1; i <= 2; i++) {
+      HoodieInstant instant = oriInstants.get(i);
+      metadataList.add(TestUtils.deleteInstantFile(metaClient, instant));
+    }
+
+    List<HoodieInstant> instants = 
metaClient.reloadActiveTimeline().getCommitsTimeline().filterCompletedInstants().getInstants();
+    assertThat(instants.size(), is(2));
+
+    String c2 = oriInstants.get(1).getTimestamp();
+    String c3 = oriInstants.get(2).getTimestamp();
+    String c4 = instants.get(1).getTimestamp();
+
+    conf.setString(FlinkOptions.READ_START_COMMIT, 
FlinkOptions.START_COMMIT_EARLIEST);
+    StreamReadMonitoringFunction function = TestUtils.getMonitorFunc(conf);
+    try (AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> harness = 
createHarness(function)) {
+      harness.setup();
+      harness.open();
+
+      // timeline: c1, c2.inflight, c3.inflight, c4
+      // -> c1
+      CountDownLatch latch = new CountDownLatch(2);
+      CollectingSourceContext sourceContext = new 
CollectingSourceContext(latch);
+
+      runAsync(sourceContext, function);
+
+      assertTrue(latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should 
finish splits generation");
+      assertThat("Should produce the expected splits",
+          sourceContext.getPartitionPaths(), is("par1"));
+      assertTrue(sourceContext.splits.stream().noneMatch(split -> 
split.getInstantRange().isPresent()),
+          "No instants should have range limit");
+      assertTrue(sourceContext.splits.stream().allMatch(split -> 
split.getLatestCommit().equals(c4)),
+          "All the splits should be with specified instant time");
+
+      // reset the source context
+      latch = new CountDownLatch(1);
+      sourceContext.reset(latch);
+
+      // timeline: c1, c2, c3.inflight, c4
+      // c4 -> c2
+      TestUtils.saveInstantAsComplete(metaClient, oriInstants.get(1), 
metadataList.get(0)); // complete c2
+      assertTrue(latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should 
finish splits generation");
+      assertThat("Should produce the expected splits",
+          sourceContext.getPartitionPaths(), is("par1"));
+      assertTrue(sourceContext.splits.stream().allMatch(split -> 
split.getInstantRange().isPresent()),
+          "All instants should have range limit");
+      assertTrue(sourceContext.splits.stream().allMatch(split -> 
isPointInstantRange(split.getInstantRange().get(), c2)),
+          "All the splits should have point instant range");
+      assertTrue(sourceContext.splits.stream().allMatch(split -> 
split.getLatestCommit().equals(c2)),
+          "All the splits should be with specified instant time");
+
+      // reset the source context
+      latch = new CountDownLatch(1);
+      sourceContext.reset(latch);
+
+      // timeline: c1, c2, c3, c4
+      // c4 -> c3
+      TestUtils.saveInstantAsComplete(metaClient, oriInstants.get(2), 
metadataList.get(1)); // complete c3
+      assertTrue(latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should 
finish splits generation");
+      assertThat("Should produce the expected splits",
+          sourceContext.getPartitionPaths(), is("par1"));
+      assertTrue(sourceContext.splits.stream().allMatch(split -> 
split.getInstantRange().isPresent()),
+          "All instants should have range limit");
+      assertTrue(sourceContext.splits.stream().allMatch(split -> 
isPointInstantRange(split.getInstantRange().get(), c3)),
+          "All the splits should have point instant range");
+      assertTrue(sourceContext.splits.stream().allMatch(split -> 
split.getLatestCommit().equals(c3)),
+          "All the splits should be with specified instant time");
+
+      // Stop the stream task.
+      function.close();
+    }
+  }
+
   @Test
   public void testCheckpointRestore() throws Exception {
     TestData.writeData(TestData.DATA_SET_INSERT, conf);
@@ -317,6 +414,12 @@ public class TestStreamReadMonitoringFunction {
     }
   }
 
+  private static boolean isPointInstantRange(InstantRange instantRange, String 
timestamp) {
+    return instantRange != null
+        && Objects.equals(timestamp, instantRange.getStartInstant())
+        && Objects.equals(timestamp, instantRange.getEndInstant());
+  }
+
   private AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> 
createHarness(
       StreamReadMonitoringFunction function) throws Exception {
     StreamSource<MergeOnReadInputSplit, StreamReadMonitoringFunction> 
streamSource = new StreamSource<>(function);
@@ -387,6 +490,7 @@ public class TestStreamReadMonitoringFunction {
     public String getPartitionPaths() {
       return this.splits.stream()
           .map(TestUtils::getSplitPartitionPath)
+          .distinct()
           .sorted(Comparator.naturalOrder())
           .collect(Collectors.joining(","));
     }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
index 56730368f6f..d1b5516d1ad 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
@@ -21,6 +21,7 @@ package org.apache.hudi.table.format;
 import org.apache.hudi.client.HoodieFlinkWriteClient;
 import org.apache.hudi.client.common.HoodieFlinkEngineContext;
 import org.apache.hudi.common.model.EventTimeAvroPayload;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
@@ -72,6 +73,7 @@ import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
  * Test cases for MergeOnReadInputFormat and ParquetInputFormat.
@@ -431,7 +433,7 @@ public class TestInputFormat {
 
     // default read the latest commit
     // the compaction base files are skipped
-    IncrementalInputSplits.Result splits1 = 
incrementalInputSplits.inputSplits(metaClient, hadoopConf, null, false);
+    IncrementalInputSplits.Result splits1 = 
incrementalInputSplits.inputSplits(metaClient, null, null, false);
     assertFalse(splits1.isEmpty());
     List<RowData> result1 = readData(inputFormat, 
splits1.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
 
@@ -447,7 +449,7 @@ public class TestInputFormat {
     String secondCommit = 
TestUtils.getNthCompleteInstant(metaClient.getBasePath(), 0, 
HoodieTimeline.COMMIT_ACTION);
     conf.setString(FlinkOptions.READ_START_COMMIT, secondCommit);
 
-    IncrementalInputSplits.Result splits2 = 
incrementalInputSplits.inputSplits(metaClient, hadoopConf, null, false);
+    IncrementalInputSplits.Result splits2 = 
incrementalInputSplits.inputSplits(metaClient, null, null, false);
     assertFalse(splits2.isEmpty());
     List<RowData> result2 = readData(inputFormat, 
splits2.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
     String actual2 = TestData.rowDataToString(result2);
@@ -463,7 +465,7 @@ public class TestInputFormat {
     inputFormat = this.tableSource.getInputFormat(true);
 
     // filter out the last commit by partition pruning
-    IncrementalInputSplits.Result splits3 = 
incrementalInputSplits.inputSplits(metaClient, hadoopConf, null, false);
+    IncrementalInputSplits.Result splits3 = 
incrementalInputSplits.inputSplits(metaClient, null, null, false);
     assertFalse(splits3.isEmpty());
     List<RowData> result3 = readData(inputFormat, 
splits3.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
     String actual3 = TestData.rowDataToString(result3);
@@ -475,8 +477,6 @@ public class TestInputFormat {
   void testReadSkipClustering() throws Exception {
     beforeEach(HoodieTableType.COPY_ON_WRITE);
 
-    org.apache.hadoop.conf.Configuration hadoopConf = 
HadoopConfigurations.getHadoopConf(conf);
-
     // write base first with clustering
     conf.setString(FlinkOptions.OPERATION, "insert");
     conf.setBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED, true);
@@ -498,7 +498,7 @@ public class TestInputFormat {
 
     // default read the latest commit
     // the clustering files are skipped
-    IncrementalInputSplits.Result splits1 = 
incrementalInputSplits.inputSplits(metaClient, hadoopConf, null, false);
+    IncrementalInputSplits.Result splits1 = 
incrementalInputSplits.inputSplits(metaClient, null, null, false);
     assertFalse(splits1.isEmpty());
     List<RowData> result1 = readData(inputFormat, 
splits1.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
 
@@ -514,7 +514,7 @@ public class TestInputFormat {
     String secondCommit = 
TestUtils.getNthCompleteInstant(metaClient.getBasePath(), 0, 
HoodieTimeline.REPLACE_COMMIT_ACTION);
     conf.setString(FlinkOptions.READ_START_COMMIT, secondCommit);
 
-    IncrementalInputSplits.Result splits2 = 
incrementalInputSplits.inputSplits(metaClient, hadoopConf, null, false);
+    IncrementalInputSplits.Result splits2 = 
incrementalInputSplits.inputSplits(metaClient, null, null, false);
     assertFalse(splits2.isEmpty());
     List<RowData> result2 = readData(inputFormat, 
splits2.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
     String actual2 = TestData.rowDataToString(result2);
@@ -531,7 +531,7 @@ public class TestInputFormat {
     inputFormat = this.tableSource.getInputFormat(true);
 
     // filter out the last commit by partition pruning
-    IncrementalInputSplits.Result splits3 = 
incrementalInputSplits.inputSplits(metaClient, hadoopConf, null, false);
+    IncrementalInputSplits.Result splits3 = 
incrementalInputSplits.inputSplits(metaClient, null, null, false);
     assertFalse(splits3.isEmpty());
     List<RowData> result3 = readData(inputFormat, 
splits3.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
     String actual3 = TestData.rowDataToString(result3);
@@ -539,12 +539,103 @@ public class TestInputFormat {
     assertThat(actual3, is(expected3));
   }
 
+  @ParameterizedTest
+  @EnumSource(value = HoodieTableType.class)
+  void testReadHollowInstants(HoodieTableType tableType) throws Exception {
+    Map<String, String> options = new HashMap<>();
+    options.put("hoodie.parquet.small.file.limit", "0"); // invalidate the 
small file strategy
+    beforeEach(tableType, options);
+
+    // write 4 commits
+    for (int i = 0; i < 8; i += 2) {
+      List<RowData> dataset = TestData.dataSetInsert(i + 1, i + 2);
+      TestData.writeData(dataset, conf);
+    }
+
+    // we got 4 commits on the timeline: c1, c2, c3, c4
+    // re-create the metadata file for c2 and c3 so that they have greater 
completion time than c4.
+    // the completion time sequence become: c1, c4, c2, c3,
+    // we will test with the same consumption sequence.
+    HoodieTableMetaClient metaClient = 
StreamerUtil.createMetaClient(tempFile.getAbsolutePath(), 
HadoopConfigurations.getHadoopConf(conf));
+    List<HoodieInstant> oriInstants = 
metaClient.getCommitsTimeline().filterCompletedInstants().getInstants();
+    assertThat(oriInstants.size(), is(4));
+    List<HoodieCommitMetadata> metadataList = new ArrayList<>();
+    // timeline: c1, c2.inflight, c3.inflight, c4
+    for (int i = 1; i <= 2; i++) {
+      HoodieInstant instant = oriInstants.get(i);
+      metadataList.add(TestUtils.deleteInstantFile(metaClient, instant));
+    }
+
+    List<HoodieInstant> instants = 
metaClient.reloadActiveTimeline().getCommitsTimeline().filterCompletedInstants().getInstants();
+    assertThat(instants.size(), is(2));
+
+    String c4 = instants.get(1).getTimestamp();
+
+    InputFormat<RowData, ?> inputFormat = 
this.tableSource.getInputFormat(true);
+    assertThat(inputFormat, instanceOf(MergeOnReadInputFormat.class));
+
+    IncrementalInputSplits incrementalInputSplits = 
IncrementalInputSplits.builder()
+        .rowType(TestConfigurations.ROW_TYPE)
+        .conf(conf)
+        .path(FilePathUtils.toFlinkPath(metaClient.getBasePathV2()))
+        .build();
+
+    // timeline: c1, c2.inflight, c3.inflight, c4
+    // default read the latest commit
+    IncrementalInputSplits.Result splits1 = 
incrementalInputSplits.inputSplits(metaClient, null, null, false);
+    assertFalse(splits1.isEmpty());
+    List<RowData> result1 = readData(inputFormat, 
splits1.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
+    TestData.assertRowDataEquals(result1, TestData.dataSetInsert(7, 8));
+
+    // timeline: c1, c2.inflight, c3.inflight, c4
+    // -> c1
+    conf.setString(FlinkOptions.READ_START_COMMIT, 
FlinkOptions.START_COMMIT_EARLIEST);
+    IncrementalInputSplits.Result splits2 = 
incrementalInputSplits.inputSplits(metaClient, null, null, false);
+    assertFalse(splits2.isEmpty());
+    List<RowData> result2 = readData(inputFormat, 
splits2.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
+    TestData.assertRowDataEquals(result2, TestData.dataSetInsert(1, 2, 7, 8));
+
+    // timeline: c1, c2, c3.inflight, c4
+    // c4 -> c2
+    TestUtils.saveInstantAsComplete(metaClient, oriInstants.get(1), 
metadataList.get(0)); // complete c2
+    assertThat(splits2.getEndInstant(), is(c4));
+    IncrementalInputSplits.Result splits3 = 
incrementalInputSplits.inputSplits(metaClient, splits2.getEndInstant(), 
splits2.getOffset(), false);
+    assertFalse(splits3.isEmpty());
+    List<RowData> result3 = readData(inputFormat, 
splits3.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
+    TestData.assertRowDataEquals(result3, TestData.dataSetInsert(3, 4));
+
+    // test c2 and c4, c2 completion time > c1, so it is not a hollow instant
+    IncrementalInputSplits.Result splits4 = 
incrementalInputSplits.inputSplits(metaClient, 
oriInstants.get(0).getTimestamp(), oriInstants.get(0).getStateTransitionTime(), 
false);
+    assertFalse(splits4.isEmpty());
+    List<RowData> result4 = readData(inputFormat, 
splits4.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
+    TestData.assertRowDataEquals(result4, TestData.dataSetInsert(3, 4, 7, 8));
+
+    // timeline: c1, c2, c3, c4
+    // c4 -> c3
+    TestUtils.saveInstantAsComplete(metaClient, oriInstants.get(2), 
metadataList.get(1)); // complete c3
+    assertThat(splits3.getEndInstant(), is(c4));
+    IncrementalInputSplits.Result splits5 = 
incrementalInputSplits.inputSplits(metaClient, splits3.getEndInstant(), 
splits3.getOffset(), false);
+    assertFalse(splits5.isEmpty());
+    List<RowData> result5 = readData(inputFormat, 
splits5.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
+    TestData.assertRowDataEquals(result5, TestData.dataSetInsert(5, 6));
+
+    // c4 ->
+    assertThat(splits5.getEndInstant(), is(c4));
+    IncrementalInputSplits.Result splits6 = 
incrementalInputSplits.inputSplits(metaClient, splits5.getEndInstant(), 
splits5.getOffset(), false);
+    assertTrue(splits6.isEmpty());
+
+    // test c2 and c4, c2 is recognized as a hollow instant
+    // the (version_number, completion_time) pair is not consistent, just for 
test purpose
+    IncrementalInputSplits.Result splits7 = 
incrementalInputSplits.inputSplits(metaClient, 
oriInstants.get(2).getTimestamp(), oriInstants.get(3).getStateTransitionTime(), 
false);
+    assertFalse(splits7.isEmpty());
+    List<RowData> result7 = readData(inputFormat, 
splits7.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
+    TestData.assertRowDataEquals(result7, TestData.dataSetInsert(3, 4, 7, 8));
+  }
+
   @Test
   void testReadBaseFilesWithStartCommit() throws Exception {
     beforeEach(HoodieTableType.COPY_ON_WRITE);
 
-    org.apache.hadoop.conf.Configuration hadoopConf = 
HadoopConfigurations.getHadoopConf(conf);
-
     // write base files
     TestData.writeData(TestData.DATA_SET_INSERT, conf);
 
@@ -560,7 +651,7 @@ public class TestInputFormat {
         .build();
 
     // default read the latest commit
-    IncrementalInputSplits.Result splits1 = 
incrementalInputSplits.inputSplits(metaClient, hadoopConf, null, false);
+    IncrementalInputSplits.Result splits1 = 
incrementalInputSplits.inputSplits(metaClient, null, null, false);
     assertFalse(splits1.isEmpty());
     List<RowData> result1 = readData(inputFormat, 
splits1.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
 
@@ -575,7 +666,7 @@ public class TestInputFormat {
     String secondCommit = 
TestUtils.getNthCompleteInstant(metaClient.getBasePath(), 1, 
HoodieTimeline.COMMIT_ACTION);
     conf.setString(FlinkOptions.READ_START_COMMIT, secondCommit);
 
-    IncrementalInputSplits.Result splits2 = 
incrementalInputSplits.inputSplits(metaClient, hadoopConf, null, false);
+    IncrementalInputSplits.Result splits2 = 
incrementalInputSplits.inputSplits(metaClient, null, null, false);
     assertFalse(splits2.isEmpty());
     List<RowData> result2 = readData(inputFormat, 
splits2.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
     String actual2 = TestData.rowDataToString(result2);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java
index 46af5fa7a82..1e951dc3cb0 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java
@@ -18,9 +18,14 @@
 
 package org.apache.hudi.utils;
 
+import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.HadoopConfigurations;
 import org.apache.hudi.source.StreamReadMonitoringFunction;
@@ -32,6 +37,8 @@ import org.apache.flink.core.fs.Path;
 
 import javax.annotation.Nullable;
 
+import java.nio.charset.StandardCharsets;
+
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
@@ -117,4 +124,15 @@ public class TestUtils {
         .countInstants();
   }
 
+  public static HoodieCommitMetadata deleteInstantFile(HoodieTableMetaClient 
metaClient, HoodieInstant instant) throws Exception {
+    ValidationUtils.checkArgument(instant.isCompleted());
+    HoodieCommitMetadata metadata = TimelineUtils.getCommitMetadata(instant, 
metaClient.getActiveTimeline());
+    HoodieActiveTimeline.deleteInstantFile(metaClient.getFs(), 
metaClient.getMetaPath(), instant);
+    return metadata;
+  }
+
+  public static void saveInstantAsComplete(HoodieTableMetaClient metaClient, 
HoodieInstant instant, HoodieCommitMetadata metadata) throws Exception {
+    metaClient.getActiveTimeline().saveAsComplete(new HoodieInstant(true, 
instant.getAction(), instant.getTimestamp()),
+        Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
+  }
 }

Reply via email to