This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new c6639fe2d46 [HUDI-6629] - Changes for s3/gcs IncrSource job to taken 
into sourceLimit during ingestion (#9336)
c6639fe2d46 is described below

commit c6639fe2d46e7d41c1f2fad3ab3cc7847e92ddc9
Author: lokesh-lingarajan-0310 
<[email protected]>
AuthorDate: Mon Aug 7 18:41:37 2023 -0700

    [HUDI-6629] - Changes for s3/gcs IncrSource job to taken into sourceLimit 
during ingestion (#9336)
    
    - Change s3 and gcs incremental job to batch within a commit based on the 
source limit
    - Refactor s3 incr job to lend more testing
    - Added test cases for both s3 and gcs incr jobs
    - Checkpoint format => "commitTime#Key", sorted order of these columns will 
help resume ingestion
    - Added a few timeline apis to support fetching commit data from current 
commit
    
    ---------
    
    Co-authored-by: Lokesh Lingarajan 
<[email protected]>
    Co-authored-by: Sagar Sumit <[email protected]>
---
 .../table/timeline/HoodieDefaultTimeline.java      |  14 +
 .../hudi/common/table/timeline/HoodieTimeline.java |  18 ++
 .../table/timeline/TestHoodieActiveTimeline.java   |   4 +
 .../sources/GcsEventsHoodieIncrSource.java         |  81 +++---
 .../hudi/utilities/sources/HoodieIncrSource.java   |  31 +-
 .../sources/S3EventsHoodieIncrSource.java          | 161 ++++++-----
 ...bjectDataFetcher.java => CloudDataFetcher.java} |  17 +-
 .../sources/helpers/CloudObjectIncrCheckpoint.java |  70 +++++
 .../sources/helpers/IncrSourceHelper.java          | 144 ++++++++--
 .../hudi/utilities/sources/helpers/QueryInfo.java  | 112 ++++++++
 .../utilities/sources/helpers/QueryRunner.java     |  93 ++++++
 .../utilities/sources/helpers/gcs/QueryInfo.java   | 101 -------
 .../sources/TestGcsEventsHoodieIncrSource.java     | 155 +++++++++-
 .../sources/TestS3EventsHoodieIncrSource.java      | 320 +++++++++++++++++++++
 .../sources/helpers/TestIncrSourceHelper.java      | 249 ++++++++++++++++
 15 files changed, 1292 insertions(+), 278 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
index a5d56c91d5e..e504e401739 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
@@ -28,6 +28,7 @@ import java.io.Serializable;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.ArrayList;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Set;
 import java.util.function.Function;
@@ -203,6 +204,12 @@ public class HoodieDefaultTimeline implements 
HoodieTimeline {
         getInstantsAsStream().filter(s -> 
HoodieTimeline.isInRange(s.getTimestamp(), startTs, endTs)), details);
   }
 
+  @Override
+  public HoodieDefaultTimeline findInstantsInClosedRange(String startTs, 
String endTs) {
+    return new HoodieDefaultTimeline(
+        instants.stream().filter(instant -> 
HoodieTimeline.isInClosedRange(instant.getTimestamp(), startTs, endTs)), 
details);
+  }
+
   @Override
   public HoodieDefaultTimeline findInstantsInRangeByStateTransitionTime(String 
startTs, String endTs) {
     return new HoodieDefaultTimeline(
@@ -244,6 +251,13 @@ public class HoodieDefaultTimeline implements 
HoodieTimeline {
             details);
   }
 
+  @Override
+  public Option<HoodieInstant> findInstantBefore(String instantTime) {
+    return Option.fromJavaOptional(instants.stream()
+        .filter(instant -> compareTimestamps(instant.getTimestamp(), 
LESSER_THAN, instantTime))
+        .max(Comparator.comparing(HoodieInstant::getTimestamp)));
+  }
+
   @Override
   public HoodieDefaultTimeline findInstantsBeforeOrEquals(String instantTime) {
     return new HoodieDefaultTimeline(getInstantsAsStream()
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
index 291dc5c1e59..a1e70c2e22e 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
@@ -241,6 +241,11 @@ public interface HoodieTimeline extends Serializable {
    */
   HoodieTimeline findInstantsInRange(String startTs, String endTs);
 
+  /**
+   * Create a new Timeline with instants after or equals startTs and before or 
on endTs.
+   */
+  HoodieTimeline findInstantsInClosedRange(String startTs, String endTs);
+
   /**`
    * Create a new Timeline with instants after startTs and before or on endTs
    * by state transition timestamp of actions.
@@ -267,6 +272,11 @@ public interface HoodieTimeline extends Serializable {
    */
   HoodieTimeline findInstantsBefore(String instantTime);
 
+  /**
+   * Finds the instant before specified time.
+   */
+  Option<HoodieInstant> findInstantBefore(String instantTime);
+
   /**
    * Create new timeline with all instants before or equals specified time.
    */
@@ -413,6 +423,14 @@ public interface HoodieTimeline extends Serializable {
             && HoodieTimeline.compareTimestamps(timestamp, 
LESSER_THAN_OR_EQUALS, endTs);
   }
 
+  /**
+   * Return true if specified timestamp is in range [startTs, endTs].
+   */
+  static boolean isInClosedRange(String timestamp, String startTs, String 
endTs) {
+    return HoodieTimeline.compareTimestamps(timestamp, GREATER_THAN_OR_EQUALS, 
startTs)
+        && HoodieTimeline.compareTimestamps(timestamp, LESSER_THAN_OR_EQUALS, 
endTs);
+  }
+
   static HoodieInstant getCompletedInstant(final HoodieInstant instant) {
     return new HoodieInstant(State.COMPLETED, instant.getAction(), 
instant.getTimestamp());
   }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java
index 99525883dc3..06afc6fd5d3 100755
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java
@@ -171,6 +171,10 @@ public class TestHoodieActiveTimeline extends 
HoodieCommonTestHarness {
         
timeline.getCommitTimeline().filterCompletedInstants().findInstantsInRange("04",
 "11")
             .getInstantsAsStream().map(HoodieInstant::getTimestamp),
         "findInstantsInRange should return 4 instants");
+    assertStreamEquals(Stream.of("03", "05", "07", "09", "11"),
+        
timeline.getCommitTimeline().filterCompletedInstants().findInstantsInClosedRange("03",
 "11")
+            .getInstantsAsStream().map(HoodieInstant::getTimestamp),
+        "findInstantsInClosedRange should return 5 instants");
     assertStreamEquals(Stream.of("09", "11"),
         
timeline.getCommitTimeline().filterCompletedInstants().findInstantsAfter("07", 
2)
             .getInstantsAsStream().map(HoodieInstant::getTimestamp),
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java
index 96d0464509e..d47f8420513 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java
@@ -20,15 +20,19 @@ package org.apache.hudi.utilities.sources;
 
 import org.apache.hudi.DataSourceUtils;
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecord;
 import 
org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.utilities.sources.helpers.CloudObjectIncrCheckpoint;
 import org.apache.hudi.utilities.sources.helpers.CloudObjectMetadata;
+import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
 import 
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.MissingCheckpointStrategy;
-import org.apache.hudi.utilities.sources.helpers.gcs.GcsObjectDataFetcher;
+import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher;
 import org.apache.hudi.utilities.sources.helpers.gcs.GcsObjectMetadataFetcher;
-import org.apache.hudi.utilities.sources.helpers.gcs.QueryInfo;
+import org.apache.hudi.utilities.sources.helpers.QueryInfo;
+import org.apache.hudi.utilities.sources.helpers.QueryRunner;
 
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.Dataset;
@@ -46,7 +50,7 @@ import static 
org.apache.hudi.utilities.config.CloudSourceConfig.ENABLE_EXISTS_C
 import static 
org.apache.hudi.utilities.config.HoodieIncrSourceConfig.HOODIE_SRC_BASE_PATH;
 import static 
org.apache.hudi.utilities.config.HoodieIncrSourceConfig.NUM_INSTANTS_PER_FETCH;
 import static 
org.apache.hudi.utilities.config.HoodieIncrSourceConfig.SOURCE_FILE_FORMAT;
-import static 
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.calculateBeginAndEndInstants;
+import static 
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.generateQueryInfo;
 import static 
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.getHollowCommitHandleMode;
 import static 
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.getMissingCheckpointStrategy;
 
@@ -104,7 +108,11 @@ public class GcsEventsHoodieIncrSource extends 
HoodieIncrSource {
 
   private final MissingCheckpointStrategy missingCheckpointStrategy;
   private final GcsObjectMetadataFetcher gcsObjectMetadataFetcher;
-  private final GcsObjectDataFetcher gcsObjectDataFetcher;
+  private final CloudDataFetcher gcsObjectDataFetcher;
+  private final QueryRunner queryRunner;
+
+  public static final String GCS_OBJECT_KEY = "name";
+  public static final String GCS_OBJECT_SIZE = "size";
 
   private static final Logger LOG = 
LoggerFactory.getLogger(GcsEventsHoodieIncrSource.class);
 
@@ -113,12 +121,13 @@ public class GcsEventsHoodieIncrSource extends 
HoodieIncrSource {
 
     this(props, jsc, spark, schemaProvider,
         new GcsObjectMetadataFetcher(props, getSourceFileFormat(props)),
-        new GcsObjectDataFetcher(props, props.getString(DATAFILE_FORMAT.key(), 
DATAFILE_FORMAT.defaultValue()))
+        new CloudDataFetcher(props, props.getString(DATAFILE_FORMAT.key(), 
DATAFILE_FORMAT.defaultValue())),
+        new QueryRunner(spark, props)
     );
   }
 
   GcsEventsHoodieIncrSource(TypedProperties props, JavaSparkContext jsc, 
SparkSession spark,
-                            SchemaProvider schemaProvider, 
GcsObjectMetadataFetcher gcsObjectMetadataFetcher, GcsObjectDataFetcher 
gcsObjectDataFetcher) {
+                            SchemaProvider schemaProvider, 
GcsObjectMetadataFetcher gcsObjectMetadataFetcher, CloudDataFetcher 
gcsObjectDataFetcher, QueryRunner queryRunner) {
     super(props, jsc, spark, schemaProvider);
 
     DataSourceUtils.checkRequiredProperties(props, 
Collections.singletonList(HOODIE_SRC_BASE_PATH.key()));
@@ -129,6 +138,7 @@ public class GcsEventsHoodieIncrSource extends 
HoodieIncrSource {
 
     this.gcsObjectMetadataFetcher = gcsObjectMetadataFetcher;
     this.gcsObjectDataFetcher = gcsObjectDataFetcher;
+    this.queryRunner = queryRunner;
 
     LOG.info("srcPath: " + srcPath);
     LOG.info("missingCheckpointStrategy: " + missingCheckpointStrategy);
@@ -137,59 +147,48 @@ public class GcsEventsHoodieIncrSource extends 
HoodieIncrSource {
   }
 
   @Override
-  public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> 
lastCkptStr, long sourceLimit) {
-    QueryInfo queryInfo = getQueryInfo(lastCkptStr);
+  public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> 
lastCheckpoint, long sourceLimit) {
+    CloudObjectIncrCheckpoint cloudObjectIncrCheckpoint = 
CloudObjectIncrCheckpoint.fromString(lastCheckpoint);
+    HollowCommitHandling handlingMode = getHollowCommitHandleMode(props);
+
+    QueryInfo queryInfo = generateQueryInfo(
+        sparkContext, srcPath, numInstantsPerFetch,
+        Option.of(cloudObjectIncrCheckpoint.getCommit()),
+        missingCheckpointStrategy, handlingMode, 
HoodieRecord.COMMIT_TIME_METADATA_FIELD,
+        GCS_OBJECT_KEY, GCS_OBJECT_SIZE, true,
+        Option.ofNullable(cloudObjectIncrCheckpoint.getKey()));
+    LOG.info("Querying GCS with:" + cloudObjectIncrCheckpoint + " and 
queryInfo:" + queryInfo);
 
-    if (queryInfo.areStartAndEndInstantsEqual()) {
-      LOG.info("Already caught up. Begin Checkpoint was: " + 
queryInfo.getStartInstant());
+    if (isNullOrEmpty(cloudObjectIncrCheckpoint.getKey()) && 
queryInfo.areStartAndEndInstantsEqual()) {
+      LOG.info("Source of file names is empty. Returning empty result and 
endInstant: "
+          + queryInfo.getStartInstant());
       return Pair.of(Option.empty(), queryInfo.getStartInstant());
     }
 
-    Dataset<Row> cloudObjectMetadataDF = 
queryInfo.initCloudObjectMetadata(srcPath, sparkSession);
-
+    Dataset<Row> cloudObjectMetadataDF = queryRunner.run(queryInfo);
     if (cloudObjectMetadataDF.isEmpty()) {
       LOG.info("Source of file names is empty. Returning empty result and 
endInstant: "
           + queryInfo.getEndInstant());
       return Pair.of(Option.empty(), queryInfo.getEndInstant());
     }
 
-    return extractData(queryInfo, cloudObjectMetadataDF);
+    LOG.info("Adjusting end checkpoint:" + queryInfo.getEndInstant() + " based 
on sourceLimit :" + sourceLimit);
+    Pair<CloudObjectIncrCheckpoint, Dataset<Row>> checkPointAndDataset =
+        IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit(
+            cloudObjectMetadataDF, sourceLimit, queryInfo, 
cloudObjectIncrCheckpoint);
+    LOG.info("Adjusted end checkpoint :" + checkPointAndDataset.getLeft());
+
+    Pair<Option<Dataset<Row>>, String> extractedCheckPointAndDataset = 
extractData(queryInfo, checkPointAndDataset.getRight());
+    return Pair.of(extractedCheckPointAndDataset.getLeft(), 
checkPointAndDataset.getLeft().toString());
   }
 
   private Pair<Option<Dataset<Row>>, String> extractData(QueryInfo queryInfo, 
Dataset<Row> cloudObjectMetadataDF) {
     List<CloudObjectMetadata> cloudObjectMetadata = 
gcsObjectMetadataFetcher.getGcsObjectMetadata(sparkContext, 
cloudObjectMetadataDF, checkIfFileExists);
+    LOG.info("Total number of files to process :" + 
cloudObjectMetadata.size());
     Option<Dataset<Row>> fileDataRows = 
gcsObjectDataFetcher.getCloudObjectDataDF(sparkSession, cloudObjectMetadata, 
props);
     return Pair.of(fileDataRows, queryInfo.getEndInstant());
   }
 
-  private QueryInfo getQueryInfo(Option<String> lastCkptStr) {
-    Option<String> beginInstant = getBeginInstant(lastCkptStr);
-
-    HollowCommitHandling handlingMode = getHollowCommitHandleMode(props);
-    Pair<String, Pair<String, String>> queryInfoPair = 
calculateBeginAndEndInstants(
-        sparkContext, srcPath, numInstantsPerFetch, beginInstant, 
missingCheckpointStrategy, handlingMode);
-
-    QueryInfo queryInfo = new QueryInfo(
-        queryInfoPair.getLeft(),
-        queryInfoPair.getRight().getLeft(),
-        queryInfoPair.getRight().getRight(),
-        handlingMode);
-
-    if (LOG.isDebugEnabled()) {
-      queryInfo.logDetails();
-    }
-
-    return queryInfo;
-  }
-
-  private Option<String> getBeginInstant(Option<String> lastCheckpoint) {
-    if (lastCheckpoint.isPresent() && !isNullOrEmpty(lastCheckpoint.get())) {
-      return lastCheckpoint;
-    }
-
-    return Option.empty();
-  }
-
   private static String getSourceFileFormat(TypedProperties props) {
     return props.getString(SOURCE_FILE_FORMAT.key(), 
SOURCE_FILE_FORMAT.defaultValue());
   }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
index 7791273ae22..4836aeb34cc 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
@@ -28,6 +28,7 @@ import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.utilities.config.HoodieIncrSourceConfig;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
+import org.apache.hudi.utilities.sources.helpers.QueryInfo;
 
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.Dataset;
@@ -45,7 +46,7 @@ import static 
org.apache.hudi.DataSourceReadOptions.INCREMENTAL_READ_HANDLE_HOLL
 import static org.apache.hudi.DataSourceReadOptions.QUERY_TYPE;
 import static 
org.apache.hudi.DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL;
 import static org.apache.hudi.utilities.UtilHelpers.createRecordMerger;
-import static 
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.calculateBeginAndEndInstants;
+import static 
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.generateQueryInfo;
 import static 
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.getHollowCommitHandleMode;
 
 public class HoodieIncrSource extends RowSource {
@@ -58,7 +59,7 @@ public class HoodieIncrSource extends RowSource {
      * {@link #HOODIE_SRC_BASE_PATH} is the base-path for the source Hoodie 
table.
      */
     @Deprecated
-    static final String HOODIE_SRC_BASE_PATH = 
HoodieIncrSourceConfig.HOODIE_SRC_BASE_PATH.key();
+    public static final String HOODIE_SRC_BASE_PATH = 
HoodieIncrSourceConfig.HOODIE_SRC_BASE_PATH.key();
 
     /**
      * {@link #NUM_INSTANTS_PER_FETCH} allows the max number of instants whose 
changes can be incrementally fetched.
@@ -154,21 +155,23 @@ public class HoodieIncrSource extends RowSource {
         lastCkptStr.isPresent() ? lastCkptStr.get().isEmpty() ? Option.empty() 
: lastCkptStr : Option.empty();
 
     HollowCommitHandling handlingMode = getHollowCommitHandleMode(props);
-    Pair<String, Pair<String, String>> queryTypeAndInstantEndpts = 
calculateBeginAndEndInstants(sparkContext, srcPath,
-        numInstantsPerFetch, beginInstant, missingCheckpointStrategy, 
handlingMode);
-
-    if 
(queryTypeAndInstantEndpts.getValue().getKey().equals(queryTypeAndInstantEndpts.getValue().getValue()))
 {
-      LOG.warn("Already caught up. Begin Checkpoint was :" + 
queryTypeAndInstantEndpts.getValue().getKey());
-      return Pair.of(Option.empty(), 
queryTypeAndInstantEndpts.getValue().getKey());
+    QueryInfo queryInfo = generateQueryInfo(sparkContext, srcPath,
+        numInstantsPerFetch, beginInstant, missingCheckpointStrategy, 
handlingMode,
+        HoodieRecord.COMMIT_TIME_METADATA_FIELD, 
HoodieRecord.RECORD_KEY_METADATA_FIELD,
+        null, false, Option.empty());
+
+    if (queryInfo.areStartAndEndInstantsEqual()) {
+      LOG.info("Already caught up. No new data to process");
+      return Pair.of(Option.empty(), queryInfo.getEndInstant());
     }
 
     Dataset<Row> source;
     // Do Incr pull. Set end instant if available
-    if 
(queryTypeAndInstantEndpts.getKey().equals(QUERY_TYPE_INCREMENTAL_OPT_VAL())) {
+    if (queryInfo.isIncremental()) {
       source = sparkSession.read().format("org.apache.hudi")
           .option(QUERY_TYPE().key(), QUERY_TYPE_INCREMENTAL_OPT_VAL())
-          .option(BEGIN_INSTANTTIME().key(), 
queryTypeAndInstantEndpts.getValue().getLeft())
-          .option(END_INSTANTTIME().key(), 
queryTypeAndInstantEndpts.getValue().getRight())
+          .option(BEGIN_INSTANTTIME().key(), queryInfo.getStartInstant())
+          .option(END_INSTANTTIME().key(), queryInfo.getEndInstant())
           
.option(INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES().key(),
               
props.getString(INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES().key(),
                   
INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES().defaultValue()))
@@ -181,9 +184,9 @@ public class HoodieIncrSource extends RowSource {
           .load(srcPath)
           // add filtering so that only interested records are returned.
           .filter(String.format("%s > '%s'", 
HoodieRecord.COMMIT_TIME_METADATA_FIELD,
-              queryTypeAndInstantEndpts.getRight().getLeft()))
+              queryInfo.getStartInstant()))
           .filter(String.format("%s <= '%s'", 
HoodieRecord.COMMIT_TIME_METADATA_FIELD,
-              queryTypeAndInstantEndpts.getRight().getRight()));
+              queryInfo.getEndInstant()));
     }
 
     HoodieRecord.HoodieRecordType recordType = 
createRecordMerger(props).getRecordType();
@@ -199,6 +202,6 @@ public class HoodieIncrSource extends RowSource {
     String[] colsToDrop = shouldDropMetaFields ? 
HoodieRecord.HOODIE_META_COLUMNS.stream().toArray(String[]::new) :
         HoodieRecord.HOODIE_META_COLUMNS.stream().filter(x -> 
!x.equals(HoodieRecord.PARTITION_PATH_METADATA_FIELD)).toArray(String[]::new);
     final Dataset<Row> src = source.drop(colsToDrop);
-    return Pair.of(Option.of(src), 
queryTypeAndInstantEndpts.getRight().getRight());
+    return Pair.of(Option.of(src), queryInfo.getEndInstant());
   }
 }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
index 99d8ae36951..40abe47189d 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
@@ -26,11 +26,14 @@ import 
org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.utilities.config.HoodieIncrSourceConfig;
 import org.apache.hudi.utilities.config.S3EventsHoodieIncrSourceConfig;
 import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher;
+import org.apache.hudi.utilities.sources.helpers.CloudObjectIncrCheckpoint;
 import org.apache.hudi.utilities.sources.helpers.CloudObjectMetadata;
 import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
+import org.apache.hudi.utilities.sources.helpers.QueryInfo;
+import org.apache.hudi.utilities.sources.helpers.QueryRunner;
 
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.Dataset;
@@ -43,20 +46,15 @@ import org.slf4j.LoggerFactory;
 import java.util.Collections;
 import java.util.List;
 
-import static org.apache.hudi.DataSourceReadOptions.BEGIN_INSTANTTIME;
-import static org.apache.hudi.DataSourceReadOptions.END_INSTANTTIME;
-import static 
org.apache.hudi.DataSourceReadOptions.INCREMENTAL_READ_HANDLE_HOLLOW_COMMIT;
-import static org.apache.hudi.DataSourceReadOptions.QUERY_TYPE;
-import static 
org.apache.hudi.DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL;
-import static 
org.apache.hudi.DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL;
+import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
 import static 
org.apache.hudi.utilities.config.HoodieIncrSourceConfig.HOODIE_SRC_BASE_PATH;
 import static 
org.apache.hudi.utilities.config.HoodieIncrSourceConfig.NUM_INSTANTS_PER_FETCH;
-import static 
org.apache.hudi.utilities.config.HoodieIncrSourceConfig.READ_LATEST_INSTANT_ON_MISSING_CKPT;
 import static 
org.apache.hudi.utilities.config.HoodieIncrSourceConfig.SOURCE_FILE_FORMAT;
+import static 
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_SOURCE_FILE_FORMAT;
 import static 
org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon.getCloudObjectMetadataPerPartition;
-import static 
org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon.loadAsDataset;
-import static 
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.calculateBeginAndEndInstants;
+import static 
org.apache.hudi.utilities.sources.helpers.CloudStoreIngestionConfig.DATAFILE_FORMAT;
 import static 
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.getHollowCommitHandleMode;
+import static 
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.getMissingCheckpointStrategy;
 
 /**
  * This source will use the S3 events meta information from hoodie table 
generate by {@link S3EventsSource}.
@@ -64,6 +62,13 @@ import static 
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.getHoll
 public class S3EventsHoodieIncrSource extends HoodieIncrSource {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(S3EventsHoodieIncrSource.class);
+  private final String srcPath;
+  private final int numInstantsPerFetch;
+  private final boolean checkIfFileExists;
+  private final String fileFormat;
+  private final IncrSourceHelper.MissingCheckpointStrategy 
missingCheckpointStrategy;
+  private final QueryRunner queryRunner;
+  private final CloudDataFetcher cloudDataFetcher;
 
   public static class Config {
     // control whether we do existence check for files before consuming them
@@ -93,94 +98,100 @@ public class S3EventsHoodieIncrSource extends 
HoodieIncrSource {
     public static final String SPARK_DATASOURCE_OPTIONS = 
S3EventsHoodieIncrSourceConfig.SPARK_DATASOURCE_OPTIONS.key();
   }
 
+  public static final String S3_OBJECT_KEY = "s3.object.key";
+  public static final String S3_OBJECT_SIZE = "s3.object.size";
+  public static final String S3_BUCKET_NAME = "s3.bucket.name";
+
   public S3EventsHoodieIncrSource(
       TypedProperties props,
       JavaSparkContext sparkContext,
       SparkSession sparkSession,
       SchemaProvider schemaProvider) {
-    super(props, sparkContext, sparkSession, schemaProvider);
+    this(props, sparkContext, sparkSession, schemaProvider, new 
QueryRunner(sparkSession, props),
+        new CloudDataFetcher(props, props.getString(DATAFILE_FORMAT, 
DEFAULT_SOURCE_FILE_FORMAT)));
   }
 
-  @Override
-  public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> 
lastCkptStr, long sourceLimit) {
+  public S3EventsHoodieIncrSource(
+      TypedProperties props,
+      JavaSparkContext sparkContext,
+      SparkSession sparkSession,
+      SchemaProvider schemaProvider,
+      QueryRunner queryRunner,
+      CloudDataFetcher cloudDataFetcher) {
+    super(props, sparkContext, sparkSession, schemaProvider);
     DataSourceUtils.checkRequiredProperties(props, 
Collections.singletonList(HOODIE_SRC_BASE_PATH.key()));
-    String srcPath = props.getString(HOODIE_SRC_BASE_PATH.key());
-    int numInstantsPerFetch = props.getInteger(NUM_INSTANTS_PER_FETCH.key(), 
NUM_INSTANTS_PER_FETCH.defaultValue());
-    boolean readLatestOnMissingCkpt = props.getBoolean(
-        READ_LATEST_INSTANT_ON_MISSING_CKPT.key(), 
READ_LATEST_INSTANT_ON_MISSING_CKPT.defaultValue());
-    IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy = 
(props.containsKey(HoodieIncrSourceConfig.MISSING_CHECKPOINT_STRATEGY.key()))
-        ? 
IncrSourceHelper.MissingCheckpointStrategy.valueOf(props.getString(HoodieIncrSourceConfig.MISSING_CHECKPOINT_STRATEGY.key()))
 : null;
-    if (readLatestOnMissingCkpt) {
-      missingCheckpointStrategy = 
IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST;
-    }
-    String fileFormat = props.getString(SOURCE_FILE_FORMAT.key(), 
SOURCE_FILE_FORMAT.defaultValue());
-
-    // Use begin Instant if set and non-empty
-    Option<String> beginInstant =
-        lastCkptStr.isPresent()
-            ? lastCkptStr.get().isEmpty() ? Option.empty() : lastCkptStr
-            : Option.empty();
+    this.srcPath = props.getString(HOODIE_SRC_BASE_PATH.key());
+    this.numInstantsPerFetch = props.getInteger(NUM_INSTANTS_PER_FETCH.key(), 
NUM_INSTANTS_PER_FETCH.defaultValue());
+    this.checkIfFileExists = props.getBoolean(Config.ENABLE_EXISTS_CHECK, 
Config.DEFAULT_ENABLE_EXISTS_CHECK);
+    this.fileFormat = props.getString(SOURCE_FILE_FORMAT.key(), 
SOURCE_FILE_FORMAT.defaultValue());
+    this.missingCheckpointStrategy = getMissingCheckpointStrategy(props);
+    this.queryRunner = queryRunner;
+    this.cloudDataFetcher = cloudDataFetcher;
+  }
 
+  @Override
+  public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> 
lastCheckpoint, long sourceLimit) {
+    CloudObjectIncrCheckpoint cloudObjectIncrCheckpoint = 
CloudObjectIncrCheckpoint.fromString(lastCheckpoint);
     HollowCommitHandling handlingMode = getHollowCommitHandleMode(props);
-    Pair<String, Pair<String, String>> queryTypeAndInstantEndpts = 
calculateBeginAndEndInstants(sparkContext, srcPath,
-        numInstantsPerFetch, beginInstant, missingCheckpointStrategy, 
handlingMode);
-
-    if 
(queryTypeAndInstantEndpts.getValue().getKey().equals(queryTypeAndInstantEndpts.getValue().getValue()))
 {
-      LOG.warn("Already caught up. Begin Checkpoint was :" + 
queryTypeAndInstantEndpts.getValue().getKey());
-      return Pair.of(Option.empty(), 
queryTypeAndInstantEndpts.getValue().getKey());
-    }
-
-    Dataset<Row> source;
-    // Do incremental pull. Set end instant if available.
-    if 
(queryTypeAndInstantEndpts.getKey().equals(QUERY_TYPE_INCREMENTAL_OPT_VAL())) {
-      source = sparkSession.read().format("org.apache.hudi")
-          .option(QUERY_TYPE().key(), QUERY_TYPE_INCREMENTAL_OPT_VAL())
-          .option(BEGIN_INSTANTTIME().key(), 
queryTypeAndInstantEndpts.getRight().getLeft())
-          .option(END_INSTANTTIME().key(), 
queryTypeAndInstantEndpts.getRight().getRight())
-          .option(INCREMENTAL_READ_HANDLE_HOLLOW_COMMIT().key(), 
handlingMode.name())
-          .load(srcPath);
-    } else {
-      // if checkpoint is missing from source table, and if strategy is set to 
READ_UPTO_LATEST_COMMIT, we have to issue snapshot query
-      source = sparkSession.read().format("org.apache.hudi")
-          .option(QUERY_TYPE().key(), 
QUERY_TYPE_SNAPSHOT_OPT_VAL()).load(srcPath)
-          // add filtering so that only interested records are returned.
-          .filter(String.format("%s > '%s'", 
HoodieRecord.COMMIT_TIME_METADATA_FIELD,
-              queryTypeAndInstantEndpts.getRight().getLeft()))
-          .filter(String.format("%s <= '%s'", 
HoodieRecord.COMMIT_TIME_METADATA_FIELD,
-              queryTypeAndInstantEndpts.getRight().getRight()));
+    QueryInfo queryInfo =
+        IncrSourceHelper.generateQueryInfo(
+            sparkContext, srcPath, numInstantsPerFetch,
+            Option.of(cloudObjectIncrCheckpoint.getCommit()),
+            missingCheckpointStrategy, handlingMode,
+            HoodieRecord.COMMIT_TIME_METADATA_FIELD,
+            S3_OBJECT_KEY, S3_OBJECT_SIZE, true,
+            Option.ofNullable(cloudObjectIncrCheckpoint.getKey()));
+    LOG.info("Querying S3 with:" + cloudObjectIncrCheckpoint + ", queryInfo:" 
+ queryInfo);
+
+    if (isNullOrEmpty(cloudObjectIncrCheckpoint.getKey()) && 
queryInfo.areStartAndEndInstantsEqual()) {
+      LOG.warn("Already caught up. No new data to process");
+      return Pair.of(Option.empty(), queryInfo.getEndInstant());
     }
 
+    Dataset<Row> source = queryRunner.run(queryInfo);
     if (source.isEmpty()) {
-      return Pair.of(Option.empty(), 
queryTypeAndInstantEndpts.getRight().getRight());
+      LOG.info("Source of file names is empty. Returning empty result and 
endInstant: "
+          + queryInfo.getEndInstant());
+      return Pair.of(Option.empty(), queryInfo.getEndInstant());
     }
 
-    String filter = "s3.object.size > 0";
-    if 
(!StringUtils.isNullOrEmpty(props.getString(S3EventsHoodieIncrSourceConfig.S3_KEY_PREFIX.key(),
 null))) {
-      filter = filter + " and s3.object.key like '" + 
props.getString(S3EventsHoodieIncrSourceConfig.S3_KEY_PREFIX.key()) + "%'";
-    }
-    if 
(!StringUtils.isNullOrEmpty(props.getString(S3EventsHoodieIncrSourceConfig.S3_IGNORE_KEY_PREFIX.key(),
 null))) {
-      filter = filter + " and s3.object.key not like '" + 
props.getString(S3EventsHoodieIncrSourceConfig.S3_IGNORE_KEY_PREFIX.key()) + 
"%'";
-    }
-    if 
(!StringUtils.isNullOrEmpty(props.getString(S3EventsHoodieIncrSourceConfig.S3_IGNORE_KEY_SUBSTRING.key(),
 null))) {
-      filter = filter + " and s3.object.key not like '%" + 
props.getString(S3EventsHoodieIncrSourceConfig.S3_IGNORE_KEY_SUBSTRING.key()) + 
"%'";
-    }
-    // add file format filtering by default
-    filter = filter + " and s3.object.key like '%" + fileFormat + "%'";
+    Dataset<Row> filteredSourceData = applyFilter(source, fileFormat);
+
+    LOG.info("Adjusting end checkpoint:" + queryInfo.getEndInstant() + " based 
on sourceLimit :" + sourceLimit);
+    Pair<CloudObjectIncrCheckpoint, Dataset<Row>> checkPointAndDataset =
+        IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit(
+            filteredSourceData, sourceLimit, queryInfo, 
cloudObjectIncrCheckpoint);
+    LOG.info("Adjusted end checkpoint :" + checkPointAndDataset.getLeft());
 
     String s3FS = 
props.getString(S3EventsHoodieIncrSourceConfig.S3_FS_PREFIX.key(), 
S3EventsHoodieIncrSourceConfig.S3_FS_PREFIX.defaultValue()).toLowerCase();
     String s3Prefix = s3FS + "://";
 
     // Create S3 paths
-    final boolean checkExists = 
props.getBoolean(S3EventsHoodieIncrSourceConfig.S3_INCR_ENABLE_EXISTS_CHECK.key(),
 S3EventsHoodieIncrSourceConfig.S3_INCR_ENABLE_EXISTS_CHECK.defaultValue());
     SerializableConfiguration serializableHadoopConf = new 
SerializableConfiguration(sparkContext.hadoopConfiguration());
-    List<CloudObjectMetadata> cloudObjectMetadata = source
-        .filter(filter)
-        .select("s3.bucket.name", "s3.object.key", "s3.object.size")
+    List<CloudObjectMetadata> cloudObjectMetadata = 
checkPointAndDataset.getRight()
+        .select(S3_BUCKET_NAME, S3_OBJECT_KEY, S3_OBJECT_SIZE)
         .distinct()
-        .mapPartitions(getCloudObjectMetadataPerPartition(s3Prefix, 
serializableHadoopConf, checkExists), Encoders.kryo(CloudObjectMetadata.class))
+        .mapPartitions(getCloudObjectMetadataPerPartition(s3Prefix, 
serializableHadoopConf, checkIfFileExists), 
Encoders.kryo(CloudObjectMetadata.class))
         .collectAsList();
+    LOG.info("Total number of files to process :" + 
cloudObjectMetadata.size());
+
+    Option<Dataset<Row>> datasetOption = 
cloudDataFetcher.getCloudObjectDataDF(sparkSession, cloudObjectMetadata, props);
+    return Pair.of(datasetOption, checkPointAndDataset.getLeft().toString());
+  }
 
-    Option<Dataset<Row>> datasetOption = loadAsDataset(sparkSession, 
cloudObjectMetadata, props, fileFormat);
-    return Pair.of(datasetOption, 
queryTypeAndInstantEndpts.getRight().getRight());
+  Dataset<Row> applyFilter(Dataset<Row> source, String fileFormat) {
+    String filter = S3_OBJECT_SIZE + " > 0";
+    if 
(!StringUtils.isNullOrEmpty(props.getString(S3EventsHoodieIncrSourceConfig.S3_KEY_PREFIX.key(),
 null))) {
+      filter = filter + " and " + S3_OBJECT_KEY + " like '" + 
props.getString(S3EventsHoodieIncrSourceConfig.S3_KEY_PREFIX.key()) + "%'";
+    }
+    if 
(!StringUtils.isNullOrEmpty(props.getString(S3EventsHoodieIncrSourceConfig.S3_IGNORE_KEY_PREFIX.key(),
 null))) {
+      filter = filter + " and " + S3_OBJECT_KEY + " not like '" + 
props.getString(S3EventsHoodieIncrSourceConfig.S3_IGNORE_KEY_PREFIX.key()) + 
"%'";
+    }
+    if 
(!StringUtils.isNullOrEmpty(props.getString(S3EventsHoodieIncrSourceConfig.S3_IGNORE_KEY_SUBSTRING.key(),
 null))) {
+      filter = filter + " and " + S3_OBJECT_KEY + " not like '%" + 
props.getString(S3EventsHoodieIncrSourceConfig.S3_IGNORE_KEY_SUBSTRING.key()) + 
"%'";
+    }
+    // add file format filtering by default
+    filter = filter + " and " + S3_OBJECT_KEY + " like '%" + fileFormat + "%'";
+    return source.filter(filter);
   }
 }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/GcsObjectDataFetcher.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudDataFetcher.java
similarity index 75%
rename from 
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/GcsObjectDataFetcher.java
rename to 
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudDataFetcher.java
index 26005d74f31..dfa6c68ec6f 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/GcsObjectDataFetcher.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudDataFetcher.java
@@ -16,17 +16,16 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.utilities.sources.helpers.gcs;
+package org.apache.hudi.utilities.sources.helpers;
 
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.utilities.sources.helpers.CloudObjectMetadata;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
 import java.util.List;
@@ -34,19 +33,19 @@ import java.util.List;
 import static 
org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon.loadAsDataset;
 
 /**
- * Connects to GCS from Spark and downloads data from a given list of files.
- * Assumes SparkContext is already configured with GCS options through 
GcsEventsHoodieIncrSource.addGcsAccessConfs().
+ * Connects to S3/GCS from Spark and downloads data from a given list of files.
+ * Assumes SparkContext is already configured.
  */
-public class GcsObjectDataFetcher implements Serializable {
+public class CloudDataFetcher implements Serializable {
 
   private final String fileFormat;
   private TypedProperties props;
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(GcsObjectDataFetcher.class);
+  private static final Logger LOG = 
LoggerFactory.getLogger(CloudDataFetcher.class);
 
   private static final long serialVersionUID = 1L;
 
-  public GcsObjectDataFetcher(TypedProperties props, String fileFormat) {
+  public CloudDataFetcher(TypedProperties props, String fileFormat) {
     this.fileFormat = fileFormat;
     this.props = props;
   }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectIncrCheckpoint.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectIncrCheckpoint.java
new file mode 100644
index 00000000000..cafef4aa145
--- /dev/null
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectIncrCheckpoint.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.apache.hudi.common.util.Option;
+
+import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
+import static 
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.DEFAULT_BEGIN_TIMESTAMP;
+
+/**
+ * This POJO is used to craft checkpoints that supports size based batching
+ * This object will be use by object based Hudi incr sources (s3/gcs)
+ */
+public class CloudObjectIncrCheckpoint {
+
+  private final String commit;
+  private final String key;
+
+  public CloudObjectIncrCheckpoint(String commit, String key) {
+    this.commit = commit;
+    this.key = key;
+  }
+
+  public String getCommit() {
+    return commit;
+  }
+
+  public String getKey() {
+    return key;
+  }
+
+  public static CloudObjectIncrCheckpoint fromString(Option<String> 
lastCheckpoint) {
+    if (lastCheckpoint.isPresent()) {
+      Option<String[]> splitResult = lastCheckpoint.map(str -> str.split("#", 
2));
+      if (splitResult.isPresent() && splitResult.get().length == 2) {
+        String[] split = splitResult.get();
+        return new CloudObjectIncrCheckpoint(split[0], split[1]);
+      } else {
+        return new CloudObjectIncrCheckpoint(lastCheckpoint.get(), null);
+      }
+    }
+    return new CloudObjectIncrCheckpoint(DEFAULT_BEGIN_TIMESTAMP, null);
+  }
+
+  @Override
+  public String toString() {
+    if (isNullOrEmpty(commit) && isNullOrEmpty(key)) {
+      return DEFAULT_BEGIN_TIMESTAMP;
+    } else if (isNullOrEmpty(key)) {
+      return commit;
+    }
+    return commit + "#" + key;
+  }
+}
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java
index 9097dc3dd6f..73b7afbf753 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java
@@ -31,19 +31,29 @@ import 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
 import org.apache.hudi.utilities.sources.HoodieIncrSource;
 
 import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
+import org.apache.spark.sql.expressions.Window;
+import org.apache.spark.sql.expressions.WindowSpec;
+import org.apache.spark.sql.functions;
+import org.apache.spark.storage.StorageLevel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.util.Objects;
 import java.util.function.Function;
 
 import static 
org.apache.hudi.DataSourceReadOptions.INCREMENTAL_READ_HANDLE_HOLLOW_COMMIT;
 import static 
org.apache.hudi.common.table.timeline.TimelineUtils.handleHollowCommitIfNeeded;
 import static 
org.apache.hudi.utilities.config.HoodieIncrSourceConfig.MISSING_CHECKPOINT_STRATEGY;
 import static 
org.apache.hudi.utilities.config.HoodieIncrSourceConfig.READ_LATEST_INSTANT_ON_MISSING_CKPT;
+import static org.apache.spark.sql.functions.col;
+import static org.apache.spark.sql.functions.sum;
 
 public class IncrSourceHelper {
 
-  private static final String DEFAULT_BEGIN_TIMESTAMP = "000";
+  private static final Logger LOG = 
LoggerFactory.getLogger(IncrSourceHelper.class);
+  public static final String DEFAULT_BEGIN_TIMESTAMP = 
HoodieTimeline.INIT_INSTANT_TS;
+  private static final String CUMULATIVE_COLUMN_NAME = "cumulativeSize";
 
   /**
    * Get a timestamp which is the next value in a descending sequence.
@@ -74,20 +84,31 @@ public class IncrSourceHelper {
   /**
    * Find begin and end instants to be set for the next fetch.
    *
-   * @param jssc                            Java Spark Context
-   * @param srcBasePath                     Base path of Hudi source table
-   * @param numInstantsPerFetch             Max Instants per fetch
-   * @param beginInstant                    Last Checkpoint String
+   * @param jssc                      Java Spark Context
+   * @param srcBasePath               Base path of Hudi source table
+   * @param numInstantsPerFetch       Max Instants per fetch
+   * @param beginInstant              Last Checkpoint String
    * @param missingCheckpointStrategy when begin instant is missing, allow 
reading based on missing checkpoint strategy
-   * @return begin and end instants along with query type.
+   * @param handlingMode              Hollow Commit Handling Mode
+   * @param orderColumn               Column to order by (used for size based 
incr source)
+   * @param keyColumn                 Key column (used for size based incr 
source)
+   * @param limitColumn               Limit column (used for size based incr 
source)
+   * @param sourceLimitBasedBatching  When sourceLimit based batching is used, 
we need to fetch the current commit as well,
+   *                                  this flag is used to indicate that.
+   * @param lastCheckpointKey         Last checkpoint key (used in the upgrade 
code path)
+   * @return begin and end instants along with query type and other 
information.
    */
-  public static Pair<String, Pair<String, String>> 
calculateBeginAndEndInstants(JavaSparkContext jssc, String srcBasePath,
-                                                                               
 int numInstantsPerFetch, Option<String> beginInstant,
-                                                                               
 MissingCheckpointStrategy missingCheckpointStrategy,
-                                                                               
 HollowCommitHandling handlingMode) {
+  public static QueryInfo generateQueryInfo(JavaSparkContext jssc, String 
srcBasePath,
+                                            int numInstantsPerFetch, 
Option<String> beginInstant,
+                                            MissingCheckpointStrategy 
missingCheckpointStrategy,
+                                            HollowCommitHandling handlingMode,
+                                            String orderColumn, String 
keyColumn, String limitColumn,
+                                            boolean sourceLimitBasedBatching,
+                                            Option<String> lastCheckpointKey) {
     ValidationUtils.checkArgument(numInstantsPerFetch > 0,
         "Make sure the config 
hoodie.deltastreamer.source.hoodieincr.num_instants is set to a positive 
value");
     HoodieTableMetaClient srcMetaClient = 
HoodieTableMetaClient.builder().setConf(jssc.hadoopConfiguration()).setBasePath(srcBasePath).setLoadActiveTimelineOnLoad(true).build();
+
     HoodieTimeline completedCommitTimeline = 
srcMetaClient.getCommitsAndCompactionTimeline().filterCompletedInstants();
     final HoodieTimeline activeCommitTimeline = 
handleHollowCommitIfNeeded(completedCommitTimeline, srcMetaClient, 
handlingMode);
     Function<HoodieInstant, String> timestampForLastInstant = instant -> 
handlingMode == HollowCommitHandling.USE_TRANSITION_TIME
@@ -106,45 +127,108 @@ public class IncrSourceHelper {
       }
     });
 
+    String previousInstantTime = beginInstantTime;
+    if (!beginInstantTime.equals(DEFAULT_BEGIN_TIMESTAMP)) {
+      Option<HoodieInstant> previousInstant = 
activeCommitTimeline.findInstantBefore(beginInstantTime);
+      if (previousInstant.isPresent()) {
+        previousInstantTime = previousInstant.get().getTimestamp();
+      }
+    }
+
     if (missingCheckpointStrategy == MissingCheckpointStrategy.READ_LATEST || 
!activeCommitTimeline.isBeforeTimelineStarts(beginInstantTime)) {
-      Option<HoodieInstant> nthInstant = 
Option.fromJavaOptional(activeCommitTimeline
-          .findInstantsAfter(beginInstantTime, 
numInstantsPerFetch).getInstantsAsStream().reduce((x, y) -> y));
-      return Pair.of(DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL(), 
Pair.of(beginInstantTime, 
nthInstant.map(HoodieInstant::getTimestamp).orElse(beginInstantTime)));
+      Option<HoodieInstant> nthInstant;
+      // When we are in the upgrade code path from non-sourcelimit-based 
batching to sourcelimit-based batching, we need to avoid fetching the commit
+      // that is read already. Else we will have duplicates in append-only use 
case if we use "findInstantsAfterOrEquals".
+      // As soon as we have a new format of checkpoint and a key we will move 
to the new code of fetching the current commit as well.
+      if (sourceLimitBasedBatching && lastCheckpointKey.isPresent()) {
+        nthInstant = Option.fromJavaOptional(activeCommitTimeline
+            .findInstantsAfterOrEquals(beginInstantTime, 
numInstantsPerFetch).getInstantsAsStream().reduce((x, y) -> y));
+      } else {
+        nthInstant = Option.fromJavaOptional(activeCommitTimeline
+            .findInstantsAfter(beginInstantTime, 
numInstantsPerFetch).getInstantsAsStream().reduce((x, y) -> y));
+      }
+      return new 
QueryInfo(DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL(), 
previousInstantTime,
+          beginInstantTime, 
nthInstant.map(HoodieInstant::getTimestamp).orElse(beginInstantTime),
+          orderColumn, keyColumn, limitColumn);
     } else {
       // when MissingCheckpointStrategy is set to read everything until 
latest, trigger snapshot query.
       Option<HoodieInstant> lastInstant = activeCommitTimeline.lastInstant();
-      return Pair.of(DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL(), 
Pair.of(beginInstantTime, timestampForLastInstant.apply(lastInstant.get())));
+      return new QueryInfo(DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL(),
+          previousInstantTime, beginInstantTime, 
lastInstant.get().getTimestamp(),
+          orderColumn, keyColumn, limitColumn);
     }
   }
 
   /**
-   * Validate instant time seen in the incoming row.
+   * Adjust the source dataset to size based batch based on last checkpoint 
key.
    *
-   * @param row          Input Row
-   * @param instantTime  Hoodie Instant time of the row
-   * @param sinceInstant begin instant of the batch
-   * @param endInstant   end instant of the batch
+   * @param sourceData  Source dataset
+   * @param sourceLimit Max number of bytes to be read from source
+   * @param queryInfo   Query Info
+   * @return end instants along with filtered rows.
    */
-  public static void validateInstantTime(Row row, String instantTime, String 
sinceInstant, String endInstant) {
-    Objects.requireNonNull(instantTime);
-    
ValidationUtils.checkArgument(HoodieTimeline.compareTimestamps(instantTime, 
HoodieTimeline.GREATER_THAN, sinceInstant),
-        "Instant time(_hoodie_commit_time) in row (" + row + ") was : " + 
instantTime + "but expected to be between "
-            + sinceInstant + "(excl) - " + endInstant + "(incl)");
-    ValidationUtils.checkArgument(
-        HoodieTimeline.compareTimestamps(instantTime, 
HoodieTimeline.LESSER_THAN_OR_EQUALS, endInstant),
-        "Instant time(_hoodie_commit_time) in row (" + row + ") was : " + 
instantTime + "but expected to be between "
-            + sinceInstant + "(excl) - " + endInstant + "(incl)");
+  public static Pair<CloudObjectIncrCheckpoint, Dataset<Row>> 
filterAndGenerateCheckpointBasedOnSourceLimit(Dataset<Row> sourceData,
+                                                                               
                             long sourceLimit, QueryInfo queryInfo,
+                                                                               
                             CloudObjectIncrCheckpoint 
cloudObjectIncrCheckpoint) {
+    if (sourceData.isEmpty()) {
+      LOG.info("Empty source, returning endpoint:" + 
queryInfo.getEndInstant());
+      return Pair.of(cloudObjectIncrCheckpoint, sourceData);
+    }
+    // Let's persist the dataset to avoid triggering the dag repeatedly
+    sourceData.persist(StorageLevel.MEMORY_AND_DISK());
+    // Set ordering in query to enable batching
+    Dataset<Row> orderedDf = QueryRunner.applyOrdering(sourceData, 
queryInfo.getOrderByColumns());
+    Option<String> lastCheckpoint = 
Option.of(cloudObjectIncrCheckpoint.getCommit());
+    Option<String> lastCheckpointKey = 
Option.ofNullable(cloudObjectIncrCheckpoint.getKey());
+    Option<String> concatenatedKey = lastCheckpoint.flatMap(checkpoint -> 
lastCheckpointKey.map(key -> checkpoint + key));
+
+    // Filter until last checkpoint key
+    if (concatenatedKey.isPresent()) {
+      orderedDf = orderedDf.withColumn("commit_key",
+          functions.concat(functions.col(queryInfo.getOrderColumn()), 
functions.col(queryInfo.getKeyColumn())));
+      // Apply incremental filter
+      orderedDf = 
orderedDf.filter(functions.col("commit_key").gt(concatenatedKey.get())).drop("commit_key");
+      // We could be just at the end of the commit, so return empty
+      if (orderedDf.isEmpty()) {
+        LOG.info("Empty ordered source, returning endpoint:" + 
queryInfo.getEndInstant());
+        sourceData.unpersist();
+        return Pair.of(new 
CloudObjectIncrCheckpoint(queryInfo.getEndInstant(), lastCheckpointKey.get()), 
orderedDf);
+      }
+    }
+
+    // Limit based on sourceLimit
+    WindowSpec windowSpec = Window.orderBy(col(queryInfo.getOrderColumn()), 
col(queryInfo.getKeyColumn()));
+    // Add the 'cumulativeSize' column with running sum of 'limitColumn'
+    Dataset<Row> aggregatedData = orderedDf.withColumn(CUMULATIVE_COLUMN_NAME,
+        sum(col(queryInfo.getLimitColumn())).over(windowSpec));
+    Dataset<Row> collectedRows = 
aggregatedData.filter(col(CUMULATIVE_COLUMN_NAME).leq(sourceLimit));
+
+    Row row = null;
+    if (collectedRows.isEmpty()) {
+      // If the first element itself exceeds limits then return first element
+      LOG.info("First object exceeding source limit: " + sourceLimit + " 
bytes");
+      row = aggregatedData.select(queryInfo.getOrderColumn(), 
queryInfo.getKeyColumn(), CUMULATIVE_COLUMN_NAME).first();
+      collectedRows = aggregatedData.limit(1);
+    } else {
+      // Get the last row and form composite key
+      row = collectedRows.select(queryInfo.getOrderColumn(), 
queryInfo.getKeyColumn(), CUMULATIVE_COLUMN_NAME).orderBy(
+          col(queryInfo.getOrderColumn()).desc(), 
col(queryInfo.getKeyColumn()).desc()).first();
+    }
+    LOG.info("Processed batch size: " + row.getLong(2) + " bytes");
+    sourceData.unpersist();
+    return Pair.of(new CloudObjectIncrCheckpoint(row.getString(0), 
row.getString(1)), collectedRows);
   }
 
   /**
    * Determine the policy to choose if a checkpoint is missing (detected by 
the absence of a beginInstant),
    * during a run of a {@link HoodieIncrSource}.
+   *
    * @param props the usual Hudi props object
    * @return
    */
   public static MissingCheckpointStrategy 
getMissingCheckpointStrategy(TypedProperties props) {
     boolean readLatestOnMissingCkpt = props.getBoolean(
-            READ_LATEST_INSTANT_ON_MISSING_CKPT.key(), 
READ_LATEST_INSTANT_ON_MISSING_CKPT.defaultValue());
+        READ_LATEST_INSTANT_ON_MISSING_CKPT.key(), 
READ_LATEST_INSTANT_ON_MISSING_CKPT.defaultValue());
 
     if (readLatestOnMissingCkpt) {
       return MissingCheckpointStrategy.READ_LATEST;
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryInfo.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryInfo.java
new file mode 100644
index 00000000000..4e4ee275829
--- /dev/null
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryInfo.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static 
org.apache.hudi.DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL;
+import static 
org.apache.hudi.DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL;
+
+/**
+ * This class is used to prepare query information for s3 and gcs incr source.
+ * Some of the information in this class is used for batching based on 
sourceLimit.
+ */
+public class QueryInfo {
+  private final String queryType;
+  private final String previousInstant;
+  private final String startInstant;
+  private final String endInstant;
+  private final String orderColumn;
+  private final String keyColumn;
+  private final String limitColumn;
+  private final List<String> orderByColumns;
+
+  public QueryInfo(
+      String queryType, String previousInstant,
+      String startInstant, String endInstant,
+      String orderColumn, String keyColumn,
+      String limitColumn) {
+    this.queryType = queryType;
+    this.previousInstant = previousInstant;
+    this.startInstant = startInstant;
+    this.endInstant = endInstant;
+    this.orderColumn = orderColumn;
+    this.keyColumn = keyColumn;
+    this.limitColumn = limitColumn;
+    this.orderByColumns = Arrays.asList(orderColumn, keyColumn);
+  }
+
+  public boolean areStartAndEndInstantsEqual() {
+    return getStartInstant().equals(getEndInstant());
+  }
+
+  public boolean isIncremental() {
+    return QUERY_TYPE_INCREMENTAL_OPT_VAL().equals(queryType);
+  }
+
+  public boolean isSnapshot() {
+    return QUERY_TYPE_SNAPSHOT_OPT_VAL().equals(queryType);
+  }
+
+  public String getQueryType() {
+    return queryType;
+  }
+
+  public String getPreviousInstant() {
+    return previousInstant;
+  }
+
+  public String getStartInstant() {
+    return startInstant;
+  }
+
+  public String getEndInstant() {
+    return endInstant;
+  }
+
+  public String getOrderColumn() {
+    return orderColumn;
+  }
+
+  public String getKeyColumn() {
+    return keyColumn;
+  }
+
+  public String getLimitColumn() {
+    return limitColumn;
+  }
+
+  public List<String> getOrderByColumns() {
+    return orderByColumns;
+  }
+
+  @Override
+  public String toString() {
+    return ("Query information for Incremental Source "
+        + "queryType: " + queryType
+        + ", previousInstant: " + previousInstant
+        + ", startInstant: " + startInstant
+        + ", endInstant: " + endInstant
+        + ", orderColumn: " + orderColumn
+        + ", keyColumn: " + keyColumn
+        + ", limitColumn: " + limitColumn
+        + ", orderByColumns: " + orderByColumns);
+  }
+}
\ No newline at end of file
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java
new file mode 100644
index 00000000000..06f082aff7a
--- /dev/null
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.apache.hudi.DataSourceReadOptions;
+import org.apache.hudi.DataSourceUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.utilities.config.HoodieIncrSourceConfig;
+
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.functions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * This class is currently used only by s3 and gcs incr sources that supports 
size based batching
+ * This class will fetch comitted files from the current commit to support 
size based batching.
+ */
+public class QueryRunner {
+  private final SparkSession sparkSession;
+  private final String sourcePath;
+
+  private static final Logger LOG = LoggerFactory.getLogger(QueryRunner.class);
+
+  public QueryRunner(SparkSession sparkSession, TypedProperties props) {
+    this.sparkSession = sparkSession;
+    DataSourceUtils.checkRequiredProperties(props, 
Collections.singletonList(HoodieIncrSourceConfig.HOODIE_SRC_BASE_PATH.key()));
+    this.sourcePath = 
props.getString(HoodieIncrSourceConfig.HOODIE_SRC_BASE_PATH.key());
+  }
+
+  public Dataset<Row> run(QueryInfo queryInfo) {
+    Dataset<Row> dataset = null;
+    if (queryInfo.isIncremental()) {
+      dataset = runIncrementalQuery(queryInfo);
+    } else if (queryInfo.isSnapshot()) {
+      dataset = runSnapshotQuery(queryInfo);
+    } else {
+      throw new HoodieException("Unknown query type " + 
queryInfo.getQueryType());
+    }
+    return dataset;
+  }
+
+  public static Dataset<Row> applyOrdering(Dataset<Row> dataset, List<String> 
orderByColumns) {
+    if (orderByColumns != null && !orderByColumns.isEmpty()) {
+      LOG.debug("Applying ordering " + orderByColumns);
+      return 
dataset.orderBy(orderByColumns.stream().map(functions::col).toArray(Column[]::new));
+    }
+    return dataset;
+  }
+
+  public Dataset<Row> runIncrementalQuery(QueryInfo queryInfo) {
+    LOG.info("Running incremental query");
+    return sparkSession.read().format("org.apache.hudi")
+        .option(DataSourceReadOptions.QUERY_TYPE().key(), 
queryInfo.getQueryType())
+        .option(DataSourceReadOptions.BEGIN_INSTANTTIME().key(), 
queryInfo.getPreviousInstant())
+        .option(DataSourceReadOptions.END_INSTANTTIME().key(), 
queryInfo.getEndInstant()).load(sourcePath);
+  }
+
+  public Dataset<Row> runSnapshotQuery(QueryInfo queryInfo) {
+    LOG.info("Running snapshot query");
+    return sparkSession.read().format("org.apache.hudi")
+        .option(DataSourceReadOptions.QUERY_TYPE().key(), 
queryInfo.getQueryType()).load(sourcePath)
+        // add filtering so that only interested records are returned.
+        .filter(String.format("%s >= '%s'", 
HoodieRecord.COMMIT_TIME_METADATA_FIELD,
+            queryInfo.getStartInstant()))
+        .filter(String.format("%s <= '%s'", 
HoodieRecord.COMMIT_TIME_METADATA_FIELD,
+            queryInfo.getEndInstant()));
+  }
+}
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/QueryInfo.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/QueryInfo.java
deleted file mode 100644
index ed172da8f29..00000000000
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/QueryInfo.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.utilities.sources.helpers.gcs;
-
-import org.apache.hudi.common.model.HoodieRecord;
-import 
org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling;
-
-import org.apache.spark.sql.DataFrameReader;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.SparkSession;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.hudi.DataSourceReadOptions.BEGIN_INSTANTTIME;
-import static org.apache.hudi.DataSourceReadOptions.END_INSTANTTIME;
-import static 
org.apache.hudi.DataSourceReadOptions.INCREMENTAL_READ_HANDLE_HOLLOW_COMMIT;
-import static org.apache.hudi.DataSourceReadOptions.QUERY_TYPE;
-import static 
org.apache.hudi.DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL;
-import static 
org.apache.hudi.DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL;
-
-/**
- * Uses the start and end instants of a Hudi Streamer Source to help construct 
the right kind
- * of query for subsequent requests.
- */
-public class QueryInfo {
-
-  private final String queryType;
-  private final String startInstant;
-  private final String endInstant;
-  private final HollowCommitHandling handlingMode;
-  private static final Logger LOG = LoggerFactory.getLogger(QueryInfo.class);
-
-  public QueryInfo(String queryType, String startInstant, String endInstant, 
HollowCommitHandling handlingMode) {
-    this.queryType = queryType;
-    this.startInstant = startInstant;
-    this.endInstant = endInstant;
-    this.handlingMode = handlingMode;
-  }
-
-  public Dataset<Row> initCloudObjectMetadata(String srcPath, SparkSession 
sparkSession) {
-    if (isIncremental()) {
-      return incrementalQuery(sparkSession).load(srcPath);
-    }
-
-    // Issue a snapshot query.
-    return snapshotQuery(sparkSession).load(srcPath)
-        .filter(String.format("%s > '%s'", 
HoodieRecord.COMMIT_TIME_METADATA_FIELD, getStartInstant()))
-        .filter(String.format("%s <= '%s'", 
HoodieRecord.COMMIT_TIME_METADATA_FIELD, getEndInstant()));
-  }
-
-  public boolean areStartAndEndInstantsEqual() {
-    return getStartInstant().equals(getEndInstant());
-  }
-
-  private DataFrameReader snapshotQuery(SparkSession sparkSession) {
-    return sparkSession.read().format("org.apache.hudi")
-        .option(QUERY_TYPE().key(), QUERY_TYPE_SNAPSHOT_OPT_VAL());
-  }
-
-  private DataFrameReader incrementalQuery(SparkSession sparkSession) {
-    return sparkSession.read().format("org.apache.hudi")
-        .option(QUERY_TYPE().key(), QUERY_TYPE_INCREMENTAL_OPT_VAL())
-        .option(BEGIN_INSTANTTIME().key(), getStartInstant())
-        .option(END_INSTANTTIME().key(), getEndInstant())
-        .option(INCREMENTAL_READ_HANDLE_HOLLOW_COMMIT().key(), 
handlingMode.name());
-  }
-
-  public boolean isIncremental() {
-    return QUERY_TYPE_INCREMENTAL_OPT_VAL().equals(queryType);
-  }
-
-  public String getStartInstant() {
-    return startInstant;
-  }
-
-  public String getEndInstant() {
-    return endInstant;
-  }
-
-  public void logDetails() {
-    LOG.debug("queryType: " + queryType + ", startInstant: " + startInstant + 
", endInstant: " + endInstant);
-  }
-
-}
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
index f1701654212..9414bbec4fd 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
@@ -31,6 +31,7 @@ import 
org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
 import org.apache.hudi.common.testutils.SchemaTestUtil;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.common.util.collection.Triple;
 import org.apache.hudi.config.HoodieArchivalConfig;
 import org.apache.hudi.config.HoodieCleanConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -38,13 +39,17 @@ import 
org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
 import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
 import org.apache.hudi.utilities.sources.helpers.CloudObjectMetadata;
 import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
-import org.apache.hudi.utilities.sources.helpers.gcs.GcsObjectDataFetcher;
+import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher;
+import org.apache.hudi.utilities.sources.helpers.QueryRunner;
 import org.apache.hudi.utilities.sources.helpers.gcs.GcsObjectMetadataFetcher;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.expressions.GenericRow;
@@ -62,9 +67,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
+import java.util.stream.Collectors;
 
 import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
 import static 
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT;
@@ -81,6 +90,8 @@ public class TestGcsEventsHoodieIncrSource extends 
SparkClientFunctionalTestHarn
   private static final Schema GCS_METADATA_SCHEMA = 
SchemaTestUtil.getSchemaFromResource(
       TestGcsEventsHoodieIncrSource.class, 
"/streamer-config/gcs-metadata.avsc", true);
 
+  private ObjectMapper mapper = new ObjectMapper();
+
   @TempDir
   protected java.nio.file.Path tempDir;
 
@@ -88,16 +99,21 @@ public class TestGcsEventsHoodieIncrSource extends 
SparkClientFunctionalTestHarn
   GcsObjectMetadataFetcher gcsObjectMetadataFetcher;
 
   @Mock
-  GcsObjectDataFetcher gcsObjectDataFetcher;
+  CloudDataFetcher gcsObjectDataFetcher;
+
+  @Mock
+  QueryRunner queryRunner;
 
   protected FilebasedSchemaProvider schemaProvider;
   private HoodieTableMetaClient metaClient;
+  private JavaSparkContext jsc;
 
   private static final Logger LOG = 
LoggerFactory.getLogger(TestGcsEventsHoodieIncrSource.class);
 
   @BeforeEach
   public void setUp() throws IOException {
     metaClient = getHoodieMetaClient(hadoopConf(), basePath());
+    jsc = JavaSparkContext.fromSparkContext(spark().sparkContext());
     MockitoAnnotations.initMocks(this);
   }
 
@@ -113,7 +129,7 @@ public class TestGcsEventsHoodieIncrSource extends 
SparkClientFunctionalTestHarn
 
     Pair<String, List<HoodieRecord>> inserts = 
writeGcsMetadataRecords(commitTimeForWrites);
 
-    readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 0, 
inserts.getKey());
+    readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 
100L, 0, inserts.getKey());
 
     verify(gcsObjectMetadataFetcher, 
times(0)).getGcsObjectMetadata(Mockito.any(), Mockito.any(),
             anyBoolean());
@@ -143,10 +159,17 @@ public class TestGcsEventsHoodieIncrSource extends 
SparkClientFunctionalTestHarn
         DataTypes.createStructField("text", DataTypes.StringType, true)
     });
     Dataset<Row> rows = spark().createDataFrame(recs, schema);
+    List<Triple<String, Long, String>> filePathSizeAndCommitTime = new 
ArrayList<>();
+    // Add file paths and sizes to the list
+    filePathSizeAndCommitTime.add(Triple.of("path/to/file1.json", 100L, "1"));
+    filePathSizeAndCommitTime.add(Triple.of("path/to/file2.json", 150L, "1"));
+    filePathSizeAndCommitTime.add(Triple.of("path/to/file3.json", 200L, "1"));
+    Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
 
     when(gcsObjectDataFetcher.getCloudObjectDataDF(Mockito.any(), 
eq(cloudObjectMetadataList), Mockito.any())).thenReturn(Option.of(rows));
+    when(queryRunner.run(Mockito.any())).thenReturn(inputDs);
 
-    readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 4, 
inserts.getKey());
+    readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 
100L, 4, "1#path/to/file1.json");
 
     verify(gcsObjectMetadataFetcher, 
times(1)).getGcsObjectMetadata(Mockito.any(), Mockito.any(),
             anyBoolean());
@@ -154,14 +177,103 @@ public class TestGcsEventsHoodieIncrSource extends 
SparkClientFunctionalTestHarn
             eq(cloudObjectMetadataList), Mockito.any());
   }
 
+  @Test
+  public void testTwoFilesAndContinueInSameCommit() throws IOException {
+    String commitTimeForWrites = "2";
+    String commitTimeForReads = "1";
+
+    Pair<String, List<HoodieRecord>> inserts = 
writeGcsMetadataRecords(commitTimeForWrites);
+    List<CloudObjectMetadata> cloudObjectMetadataList = Arrays.asList(
+        new CloudObjectMetadata("data-file-1.json", 1),
+        new CloudObjectMetadata("data-file-2.json", 1));
+    when(gcsObjectMetadataFetcher.getGcsObjectMetadata(Mockito.any(), 
Mockito.any(), anyBoolean())).thenReturn(cloudObjectMetadataList);
+
+    List<Row> recs = Arrays.asList(
+        new GenericRow(new String[] {"1", "Hello 1"}),
+        new GenericRow(new String[] {"2", "Hello 2"}),
+        new GenericRow(new String[] {"3", "Hello 3"}),
+        new GenericRow(new String[] {"4", "Hello 4"})
+    );
+    StructType schema = new StructType(new StructField[] {
+        DataTypes.createStructField("id", DataTypes.StringType, true),
+        DataTypes.createStructField("text", DataTypes.StringType, true)
+    });
+    Dataset<Row> rows = spark().createDataFrame(recs, schema);
+
+    List<Triple<String, Long, String>> filePathSizeAndCommitTime = new 
ArrayList<>();
+    // Add file paths and sizes to the list
+    filePathSizeAndCommitTime.add(Triple.of("path/to/file1.json", 100L, "1"));
+    filePathSizeAndCommitTime.add(Triple.of("path/to/file2.json", 150L, "1"));
+    filePathSizeAndCommitTime.add(Triple.of("path/to/file3.json", 200L, "1"));
+
+    Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
+
+    when(gcsObjectDataFetcher.getCloudObjectDataDF(Mockito.any(), 
eq(cloudObjectMetadataList), Mockito.any())).thenReturn(Option.of(rows));
+    when(queryRunner.run(Mockito.any())).thenReturn(inputDs);
+
+    readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 
250L, 4, "1#path/to/file2.json");
+    readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file2.json"), 
250L, 4, "1#path/to/file3.json");
+
+    verify(gcsObjectMetadataFetcher, 
times(2)).getGcsObjectMetadata(Mockito.any(), Mockito.any(),
+        anyBoolean());
+    verify(gcsObjectDataFetcher, times(2)).getCloudObjectDataDF(Mockito.any(),
+        eq(cloudObjectMetadataList), Mockito.any());
+  }
+
+  @Test
+  public void testTwoFilesAndContinueAcrossCommits() throws IOException {
+    String commitTimeForWrites = "2";
+    String commitTimeForReads = "1";
+
+    Pair<String, List<HoodieRecord>> inserts = 
writeGcsMetadataRecords(commitTimeForWrites);
+    List<CloudObjectMetadata> cloudObjectMetadataList = Arrays.asList(
+        new CloudObjectMetadata("data-file-1.json", 1),
+        new CloudObjectMetadata("data-file-2.json", 1));
+    when(gcsObjectMetadataFetcher.getGcsObjectMetadata(Mockito.any(), 
Mockito.any(), anyBoolean())).thenReturn(cloudObjectMetadataList);
+
+    List<Row> recs = Arrays.asList(
+        new GenericRow(new String[] {"1", "Hello 1"}),
+        new GenericRow(new String[] {"2", "Hello 2"}),
+        new GenericRow(new String[] {"3", "Hello 3"}),
+        new GenericRow(new String[] {"4", "Hello 4"})
+    );
+    StructType schema = new StructType(new StructField[] {
+        DataTypes.createStructField("id", DataTypes.StringType, true),
+        DataTypes.createStructField("text", DataTypes.StringType, true)
+    });
+    Dataset<Row> rows = spark().createDataFrame(recs, schema);
+
+    List<Triple<String, Long, String>> filePathSizeAndCommitTime = new 
ArrayList<>();
+    // Add file paths and sizes to the list
+    filePathSizeAndCommitTime.add(Triple.of("path/to/file1.json", 100L, "1"));
+    filePathSizeAndCommitTime.add(Triple.of("path/to/file3.json", 200L, "1"));
+    filePathSizeAndCommitTime.add(Triple.of("path/to/file2.json", 150L, "1"));
+    filePathSizeAndCommitTime.add(Triple.of("path/to/file4.json", 50L, "2"));
+    filePathSizeAndCommitTime.add(Triple.of("path/to/file5.json", 150L, "2"));
+
+    Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
+
+    when(gcsObjectDataFetcher.getCloudObjectDataDF(Mockito.any(), 
eq(cloudObjectMetadataList), Mockito.any())).thenReturn(Option.of(rows));
+    when(queryRunner.run(Mockito.any())).thenReturn(inputDs);
+
+    readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 
100L, 4, "1#path/to/file1.json");
+    readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file1.json"), 
100L, 4, "1#path/to/file2.json");
+    readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file2.json"), 
1000L, 4, "2#path/to/file5.json");
+
+    verify(gcsObjectMetadataFetcher, 
times(3)).getGcsObjectMetadata(Mockito.any(), Mockito.any(),
+        anyBoolean());
+    verify(gcsObjectDataFetcher, times(3)).getCloudObjectDataDF(Mockito.any(),
+        eq(cloudObjectMetadataList), Mockito.any());
+  }
+
   private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy 
missingCheckpointStrategy,
-                             Option<String> checkpointToPull, int 
expectedCount, String expectedCheckpoint) {
+                             Option<String> checkpointToPull, long 
sourceLimit, int expectedCount, String expectedCheckpoint) {
     TypedProperties typedProperties = setProps(missingCheckpointStrategy);
 
     GcsEventsHoodieIncrSource incrSource = new 
GcsEventsHoodieIncrSource(typedProperties, jsc(),
-            spark(), schemaProvider, gcsObjectMetadataFetcher, 
gcsObjectDataFetcher);
+            spark(), schemaProvider, gcsObjectMetadataFetcher, 
gcsObjectDataFetcher, queryRunner);
 
-    Pair<Option<Dataset<Row>>, String> dataAndCheckpoint = 
incrSource.fetchNextBatch(checkpointToPull, 100);
+    Pair<Option<Dataset<Row>>, String> dataAndCheckpoint = 
incrSource.fetchNextBatch(checkpointToPull, sourceLimit);
 
     Option<Dataset<Row>> datasetOpt = dataAndCheckpoint.getLeft();
     String nextCheckPoint = dataAndCheckpoint.getRight();
@@ -174,7 +286,7 @@ public class TestGcsEventsHoodieIncrSource extends 
SparkClientFunctionalTestHarn
       assertEquals(datasetOpt.get().count(), expectedCount);
     }
 
-    Assertions.assertEquals(nextCheckPoint, expectedCheckpoint);
+    Assertions.assertEquals(expectedCheckpoint, nextCheckPoint);
   }
 
   private HoodieRecord getGcsMetadataRecord(String commitTime, String 
filename, String bucketName, String generation) {
@@ -260,4 +372,31 @@ public class TestGcsEventsHoodieIncrSource extends 
SparkClientFunctionalTestHarn
             .withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION)
             .forTable(metaClient.getTableConfig().getTableName());
   }
+
+  private String generateGCSEventMetadata(Long objectSize, String bucketName, 
String objectKey, String commitTime)
+      throws JsonProcessingException {
+    Map<String, Object> objectMetadata = new HashMap<>();
+    objectMetadata.put("bucket", bucketName);
+    objectMetadata.put("name", objectKey);
+    objectMetadata.put("size", objectSize);
+    objectMetadata.put("_hoodie_commit_time", commitTime);
+    return mapper.writeValueAsString(objectMetadata);
+  }
+
+  private List<String> getSampleGCSObjectKeys(List<Triple<String, Long, 
String>> filePathSizeAndCommitTime) {
+    return filePathSizeAndCommitTime.stream().map(f -> {
+      try {
+        return generateGCSEventMetadata(f.getMiddle(), "bucket-1", 
f.getLeft(), f.getRight());
+      } catch (JsonProcessingException e) {
+        throw new RuntimeException(e);
+      }
+    }).collect(Collectors.toList());
+  }
+
+  private Dataset<Row> generateDataset(List<Triple<String, Long, String>> 
filePathSizeAndCommitTime) {
+    JavaRDD<String> testRdd = 
jsc.parallelize(getSampleGCSObjectKeys(filePathSizeAndCommitTime), 2);
+    Dataset<Row> inputDs = spark().read().json(testRdd);
+    return inputDs;
+  }
+
 }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java
new file mode 100644
index 00000000000..8bd345626e7
--- /dev/null
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieAvroPayload;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
+import org.apache.hudi.common.testutils.SchemaTestUtil;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.common.util.collection.Triple;
+import org.apache.hudi.config.HoodieArchivalConfig;
+import org.apache.hudi.config.HoodieCleanConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher;
+import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
+import org.apache.hudi.utilities.sources.helpers.QueryRunner;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
+import static 
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+public class TestS3EventsHoodieIncrSource extends 
SparkClientFunctionalTestHarness {
+  private static final Schema S3_METADATA_SCHEMA = 
SchemaTestUtil.getSchemaFromResource(
+      TestS3EventsHoodieIncrSource.class, "/streamer-config/s3-metadata.avsc", 
true);
+
+  private ObjectMapper mapper = new ObjectMapper();
+
+  private static final String MY_BUCKET = "some-bucket";
+
+  @Mock
+  private SchemaProvider mockSchemaProvider;
+  @Mock
+  QueryRunner mockQueryRunner;
+  @Mock
+  CloudDataFetcher mockCloudDataFetcher;
+  private JavaSparkContext jsc;
+  private HoodieTableMetaClient metaClient;
+
+  @BeforeEach
+  public void setUp() throws IOException {
+    jsc = JavaSparkContext.fromSparkContext(spark().sparkContext());
+    metaClient = getHoodieMetaClient(hadoopConf(), basePath());
+  }
+
+  private List<String> getSampleS3ObjectKeys(List<Triple<String, Long, 
String>> filePathSizeAndCommitTime) {
+    return filePathSizeAndCommitTime.stream().map(f -> {
+      try {
+        return generateS3EventMetadata(f.getMiddle(), MY_BUCKET, f.getLeft(), 
f.getRight());
+      } catch (JsonProcessingException e) {
+        throw new RuntimeException(e);
+      }
+    }).collect(Collectors.toList());
+  }
+
+  private Dataset<Row> generateDataset(List<Triple<String, Long, String>> 
filePathSizeAndCommitTime) {
+    JavaRDD<String> testRdd = 
jsc.parallelize(getSampleS3ObjectKeys(filePathSizeAndCommitTime), 2);
+    Dataset<Row> inputDs = spark().read().json(testRdd);
+    return inputDs;
+  }
+
+  /**
+   * Generates simple Json structure like below
+   * <p>
+   * s3 : {
+   * object : {
+   * size:
+   * key:
+   * }
+   * bucket: {
+   * name:
+   * }
+   */
+  private String generateS3EventMetadata(Long objectSize, String bucketName, 
String objectKey, String commitTime)
+      throws JsonProcessingException {
+    Map<String, Object> objectMetadata = new HashMap<>();
+    objectMetadata.put("size", objectSize);
+    objectMetadata.put("key", objectKey);
+    Map<String, String> bucketMetadata = new HashMap<>();
+    bucketMetadata.put("name", bucketName);
+    Map<String, Object> s3Metadata = new HashMap<>();
+    s3Metadata.put("object", objectMetadata);
+    s3Metadata.put("bucket", bucketMetadata);
+    Map<String, Object> eventMetadata = new HashMap<>();
+    eventMetadata.put("s3", s3Metadata);
+    eventMetadata.put("_hoodie_commit_time", commitTime);
+    return mapper.writeValueAsString(eventMetadata);
+  }
+
+  private HoodieRecord generateS3EventMetadata(String commitTime, String 
bucketName, String objectKey, Long objectSize) {
+    String partitionPath = bucketName;
+    Schema schema = S3_METADATA_SCHEMA;
+    GenericRecord rec = new GenericData.Record(schema);
+    Schema.Field s3Field = schema.getField("s3");
+    Schema s3Schema = s3Field.schema().getTypes().get(1); // Assuming the 
record schema is the second type
+    // Create a generic record for the "s3" field
+    GenericRecord s3Record = new GenericData.Record(s3Schema);
+
+    Schema.Field s3BucketField = s3Schema.getField("bucket");
+    Schema s3Bucket = s3BucketField.schema().getTypes().get(1); // Assuming 
the record schema is the second type
+    GenericRecord s3BucketRec = new GenericData.Record(s3Bucket);
+    s3BucketRec.put("name", bucketName);
+
+
+    Schema.Field s3ObjectField = s3Schema.getField("object");
+    Schema s3Object = s3ObjectField.schema().getTypes().get(1); // Assuming 
the record schema is the second type
+    GenericRecord s3ObjectRec = new GenericData.Record(s3Object);
+    s3ObjectRec.put("key", objectKey);
+    s3ObjectRec.put("size", objectSize);
+
+    s3Record.put("bucket", s3BucketRec);
+    s3Record.put("object", s3ObjectRec);
+    rec.put("s3", s3Record);
+    rec.put("_hoodie_commit_time", commitTime);
+
+    HoodieAvroPayload payload = new HoodieAvroPayload(Option.of(rec));
+    return new HoodieAvroRecord(new HoodieKey(objectKey, partitionPath), 
payload);
+  }
+
+  private TypedProperties setProps(IncrSourceHelper.MissingCheckpointStrategy 
missingCheckpointStrategy) {
+    Properties properties = new Properties();
+    properties.setProperty("hoodie.deltastreamer.source.hoodieincr.path", 
basePath());
+    
properties.setProperty("hoodie.deltastreamer.source.hoodieincr.missing.checkpoint.strategy",
+        missingCheckpointStrategy.name());
+    
properties.setProperty("hoodie.deltastreamer.source.hoodieincr.file.format", 
"json");
+    return new TypedProperties(properties);
+  }
+
+  private HoodieWriteConfig.Builder getConfigBuilder(String basePath, 
HoodieTableMetaClient metaClient) {
+    return HoodieWriteConfig.newBuilder()
+        .withPath(basePath)
+        .withSchema(S3_METADATA_SCHEMA.toString())
+        .withParallelism(2, 2)
+        .withBulkInsertParallelism(2)
+        .withFinalizeWriteParallelism(2).withDeleteParallelism(2)
+        .withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION)
+        .forTable(metaClient.getTableConfig().getTableName());
+  }
+
+  private HoodieWriteConfig getWriteConfig() {
+    return getConfigBuilder(basePath(), metaClient)
+        
.withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 
3).build())
+        
.withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build())
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+            .withMaxNumDeltaCommitsBeforeCompaction(1).build())
+        .build();
+  }
+
+  private Pair<String, List<HoodieRecord>> writeS3MetadataRecords(String 
commitTime) throws IOException {
+    HoodieWriteConfig writeConfig = getWriteConfig();
+    SparkRDDWriteClient writeClient = getHoodieWriteClient(writeConfig);
+
+    writeClient.startCommitWithTime(commitTime);
+    List<HoodieRecord> s3MetadataRecords = Arrays.asList(
+        generateS3EventMetadata(commitTime, "bucket-1", "data-file-1.json", 1L)
+    );
+    JavaRDD<WriteStatus> result = 
writeClient.upsert(jsc().parallelize(s3MetadataRecords, 1), commitTime);
+
+    List<WriteStatus> statuses = result.collect();
+    assertNoWriteErrors(statuses);
+
+    return Pair.of(commitTime, s3MetadataRecords);
+  }
+
+  @Test
+  public void testEmptyCheckpoint() throws IOException {
+    String commitTimeForWrites = "1";
+    String commitTimeForReads = commitTimeForWrites;
+
+    Pair<String, List<HoodieRecord>> inserts = 
writeS3MetadataRecords(commitTimeForWrites);
+
+    readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 0L, 
inserts.getKey());
+  }
+
+  @Test
+  public void testOneFileInCommit() throws IOException {
+    String commitTimeForWrites1 = "2";
+    String commitTimeForReads = "1";
+
+    Pair<String, List<HoodieRecord>> inserts = 
writeS3MetadataRecords(commitTimeForReads);
+    inserts = writeS3MetadataRecords(commitTimeForWrites1);
+
+    List<Triple<String, Long, String>> filePathSizeAndCommitTime = new 
ArrayList<>();
+    // Add file paths and sizes to the list
+    filePathSizeAndCommitTime.add(Triple.of("path/to/file1.json", 100L, "1"));
+    filePathSizeAndCommitTime.add(Triple.of("path/to/file2.json", 150L, "1"));
+    filePathSizeAndCommitTime.add(Triple.of("path/to/file3.json", 200L, "1"));
+
+    Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
+
+    when(mockQueryRunner.run(Mockito.any())).thenReturn(inputDs);
+    when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(), 
Mockito.any(), Mockito.any()))
+        .thenReturn(Option.empty());
+
+    readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 
100L, "1#path/to/file1.json");
+    readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file1.json"), 
200L, "1#path/to/file2.json");
+    readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file2.json"), 
200L, "1#path/to/file3.json");
+  }
+
+  @Test
+  public void testTwoFilesAndContinueInSameCommit() throws IOException {
+    String commitTimeForWrites = "2";
+    String commitTimeForReads = "1";
+
+    Pair<String, List<HoodieRecord>> inserts = 
writeS3MetadataRecords(commitTimeForReads);
+    inserts = writeS3MetadataRecords(commitTimeForWrites);
+
+    List<Triple<String, Long, String>> filePathSizeAndCommitTime = new 
ArrayList<>();
+    // Add file paths and sizes to the list
+    filePathSizeAndCommitTime.add(Triple.of("path/to/file1.json", 100L, "1"));
+    filePathSizeAndCommitTime.add(Triple.of("path/to/file2.json", 150L, "1"));
+    filePathSizeAndCommitTime.add(Triple.of("path/to/file3.json", 200L, "1"));
+
+    Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
+
+    when(mockQueryRunner.run(Mockito.any())).thenReturn(inputDs);
+    when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(), 
Mockito.any(), Mockito.any()))
+        .thenReturn(Option.empty());
+
+    readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 
250L, "1#path/to/file2.json");
+    readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file2.json"), 
250L, "1#path/to/file3.json");
+
+  }
+
+  @Test
+  public void testTwoFilesAndContinueAcrossCommits() throws IOException {
+    String commitTimeForWrites = "2";
+    String commitTimeForReads = "1";
+
+    Pair<String, List<HoodieRecord>> inserts = 
writeS3MetadataRecords(commitTimeForReads);
+    inserts = writeS3MetadataRecords(commitTimeForWrites);
+
+
+    List<Triple<String, Long, String>> filePathSizeAndCommitTime = new 
ArrayList<>();
+    // Add file paths and sizes to the list
+    filePathSizeAndCommitTime.add(Triple.of("path/to/file1.json", 100L, "1"));
+    filePathSizeAndCommitTime.add(Triple.of("path/to/file3.json", 200L, "1"));
+    filePathSizeAndCommitTime.add(Triple.of("path/to/file2.json", 150L, "1"));
+    filePathSizeAndCommitTime.add(Triple.of("path/to/file4.json", 50L, "2"));
+    filePathSizeAndCommitTime.add(Triple.of("path/to/file5.json", 150L, "2"));
+
+    Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
+
+    when(mockQueryRunner.run(Mockito.any())).thenReturn(inputDs);
+    when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(), 
Mockito.any(), Mockito.any()))
+        .thenReturn(Option.empty());
+
+    readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1"), 100L, 
"1#path/to/file1.json");
+    readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file1.json"), 
100L, "1#path/to/file2.json");
+    readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file2.json"), 
1000L, "2#path/to/file5.json");
+  }
+
+  private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy 
missingCheckpointStrategy,
+                             Option<String> checkpointToPull, long 
sourceLimit, String expectedCheckpoint) {
+    TypedProperties typedProperties = setProps(missingCheckpointStrategy);
+
+    S3EventsHoodieIncrSource incrSource = new 
S3EventsHoodieIncrSource(typedProperties, jsc(),
+        spark(), mockSchemaProvider, mockQueryRunner, mockCloudDataFetcher);
+
+    Pair<Option<Dataset<Row>>, String> dataAndCheckpoint = 
incrSource.fetchNextBatch(checkpointToPull, sourceLimit);
+
+    Option<Dataset<Row>> datasetOpt = dataAndCheckpoint.getLeft();
+    String nextCheckPoint = dataAndCheckpoint.getRight();
+
+    Assertions.assertNotNull(nextCheckPoint);
+    Assertions.assertEquals(expectedCheckpoint, nextCheckPoint);
+  }
+}
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestIncrSourceHelper.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestIncrSourceHelper.java
new file mode 100644
index 00000000000..3c0b5ee23c8
--- /dev/null
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestIncrSourceHelper.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.common.util.collection.Triple;
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.types.StructType;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.INIT_INSTANT_TS;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class TestIncrSourceHelper extends SparkClientFunctionalTestHarness {
+
+  private ObjectMapper mapper = new ObjectMapper();
+  private JavaSparkContext jsc;
+
+  @BeforeEach
+  public void setUp() throws IOException {
+    jsc = JavaSparkContext.fromSparkContext(spark().sparkContext());
+  }
+
+  private String generateS3EventMetadata(Long objectSize, String bucketName, 
String objectKey, String commitTime)
+      throws JsonProcessingException {
+    Map<String, Object> objectMetadata = new HashMap<>();
+    objectMetadata.put("size", objectSize);
+    objectMetadata.put("key", objectKey);
+    Map<String, String> bucketMetadata = new HashMap<>();
+    bucketMetadata.put("name", bucketName);
+    Map<String, Object> s3Metadata = new HashMap<>();
+    s3Metadata.put("object", objectMetadata);
+    s3Metadata.put("bucket", bucketMetadata);
+    Map<String, Object> eventMetadata = new HashMap<>();
+    eventMetadata.put("s3", s3Metadata);
+    eventMetadata.put("_hoodie_commit_time", commitTime);
+    return mapper.writeValueAsString(eventMetadata);
+  }
+
+  private List<String> getSampleS3ObjectKeys(List<Triple<String, Long, 
String>> filePathSizeAndCommitTime) {
+    return filePathSizeAndCommitTime.stream().map(f -> {
+      try {
+        return generateS3EventMetadata(f.getMiddle(), "bucket-1", f.getLeft(), 
f.getRight());
+      } catch (JsonProcessingException e) {
+        throw new RuntimeException(e);
+      }
+    }).collect(Collectors.toList());
+  }
+
+  private Dataset<Row> generateDataset(List<Triple<String, Long, String>> 
filePathSizeAndCommitTime) {
+    JavaRDD<String> testRdd = 
jsc.parallelize(getSampleS3ObjectKeys(filePathSizeAndCommitTime), 2);
+    Dataset<Row> inputDs = spark().read().json(testRdd);
+    return inputDs;
+  }
+
+  @Test
+  void testEmptySource() {
+    StructType schema = new StructType();
+    Dataset<Row> emptyDataset = spark().createDataFrame(new ArrayList<Row>(), 
schema);
+    QueryInfo queryInfo = new QueryInfo(
+        QUERY_TYPE_INCREMENTAL_OPT_VAL(), "commit1", "commit1",
+        "commit2", "_hoodie_commit_time",
+        "s3.object.key", "s3.object.size");
+    Pair<CloudObjectIncrCheckpoint, Dataset<Row>> result = 
IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit(
+        emptyDataset, 50L, queryInfo, new CloudObjectIncrCheckpoint(null, 
null));
+    assertEquals(INIT_INSTANT_TS, result.getKey().toString());
+    assertEquals(emptyDataset, result.getRight());
+  }
+
+  @Test
+  void testSingleObjectExceedingSourceLimit() {
+    List<Triple<String, Long, String>> filePathSizeAndCommitTime = new 
ArrayList<>();
+    // Add file paths and sizes to the list
+    filePathSizeAndCommitTime.add(Triple.of("path/to/file1.json", 100L, 
"commit1"));
+    filePathSizeAndCommitTime.add(Triple.of("path/to/file3.json", 200L, 
"commit1"));
+    filePathSizeAndCommitTime.add(Triple.of("path/to/file2.json", 150L, 
"commit1"));
+    filePathSizeAndCommitTime.add(Triple.of("path/to/file4.json", 50L, 
"commit2"));
+    filePathSizeAndCommitTime.add(Triple.of("path/to/file5.json", 150L, 
"commit2"));
+    Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
+
+    QueryInfo queryInfo = new QueryInfo(
+        QUERY_TYPE_INCREMENTAL_OPT_VAL(), "commit1", "commit1",
+        "commit2", "_hoodie_commit_time",
+        "s3.object.key", "s3.object.size");
+    Pair<CloudObjectIncrCheckpoint, Dataset<Row>> result = 
IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit(
+        inputDs, 50L, queryInfo, new CloudObjectIncrCheckpoint("commit1", 
null));
+    Row row = 
result.getRight().select("cumulativeSize").collectAsList().get((int) 
result.getRight().count() - 1);
+    assertEquals("commit1#path/to/file1.json", result.getKey().toString());
+    List<Row> rows = result.getRight().collectAsList();
+    assertEquals(1, rows.size());
+    assertEquals("[[commit1,[[bucket-1],[path/to/file1.json,100]],100]]", 
rows.toString());
+    assertEquals(100L, row.get(0));
+  }
+
+  @Test
+  void testMultipleObjectExceedingSourceLimit() {
+    List<Triple<String, Long, String>> filePathSizeAndCommitTime = new 
ArrayList<>();
+    // Add file paths and sizes to the list
+    filePathSizeAndCommitTime.add(Triple.of("path/to/file1.json", 100L, 
"commit1"));
+    filePathSizeAndCommitTime.add(Triple.of("path/to/file3.json", 200L, 
"commit1"));
+    filePathSizeAndCommitTime.add(Triple.of("path/to/file2.json", 150L, 
"commit1"));
+    filePathSizeAndCommitTime.add(Triple.of("path/to/file4.json", 50L, 
"commit2"));
+    filePathSizeAndCommitTime.add(Triple.of("path/to/file5.json", 150L, 
"commit2"));
+    filePathSizeAndCommitTime.add(Triple.of("path/to/file7.json", 100L, 
"commit3"));
+    filePathSizeAndCommitTime.add(Triple.of("path/to/file7.json", 250L, 
"commit3"));
+    Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
+
+    QueryInfo queryInfo = new QueryInfo(
+        QUERY_TYPE_INCREMENTAL_OPT_VAL(), "commit1", "commit1",
+        "commit2", "_hoodie_commit_time",
+        "s3.object.key", "s3.object.size");
+    Pair<CloudObjectIncrCheckpoint, Dataset<Row>> result = 
IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit(
+        inputDs, 350L, queryInfo, new CloudObjectIncrCheckpoint("commit1", 
null));
+    Row row = 
result.getRight().select("cumulativeSize").collectAsList().get((int) 
result.getRight().count() - 1);
+    assertEquals("commit1#path/to/file2.json", result.getKey().toString());
+    List<Row> rows = result.getRight().collectAsList();
+    assertEquals(2, rows.size());
+    assertEquals("[[commit1,[[bucket-1],[path/to/file1.json,100]],100], 
[commit1,[[bucket-1],[path/to/file2.json,150]],250]]", rows.toString());
+    assertEquals(250L, row.get(0));
+
+    result = IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit(
+        inputDs, 550L, queryInfo, new CloudObjectIncrCheckpoint("commit1", 
null));
+    row = result.getRight().select("cumulativeSize").collectAsList().get((int) 
result.getRight().count() - 1);
+    assertEquals("commit2#path/to/file4.json", result.getKey().toString());
+    rows = result.getRight().collectAsList();
+    assertEquals(4, rows.size());
+    assertEquals("[[commit1,[[bucket-1],[path/to/file1.json,100]],100], 
[commit1,[[bucket-1],[path/to/file2.json,150]],250],"
+            + " [commit1,[[bucket-1],[path/to/file3.json,200]],450], 
[commit2,[[bucket-1],[path/to/file4.json,50]],500]]",
+        rows.toString());
+    assertEquals(500L, row.get(0));
+  }
+
+  @Test
+  void testCatchAllObjects() {
+    List<Triple<String, Long, String>> filePathSizeAndCommitTime = new 
ArrayList<>();
+    // Add file paths and sizes to the list
+    filePathSizeAndCommitTime.add(Triple.of("path/to/file1.json", 100L, 
"commit1"));
+    filePathSizeAndCommitTime.add(Triple.of("path/to/file3.json", 200L, 
"commit1"));
+    filePathSizeAndCommitTime.add(Triple.of("path/to/file2.json", 150L, 
"commit1"));
+    filePathSizeAndCommitTime.add(Triple.of("path/to/file4.json", 50L, 
"commit2"));
+    filePathSizeAndCommitTime.add(Triple.of("path/to/file5.json", 150L, 
"commit2"));
+    filePathSizeAndCommitTime.add(Triple.of("path/to/file8.json", 100L, 
"commit3"));
+    filePathSizeAndCommitTime.add(Triple.of("path/to/file6.json", 250L, 
"commit3"));
+    filePathSizeAndCommitTime.add(Triple.of("path/to/file7.json", 50L, 
"commit3"));
+    Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
+
+    QueryInfo queryInfo = new QueryInfo(
+        QUERY_TYPE_INCREMENTAL_OPT_VAL(), "commit1", "commit1",
+        "commit2", "_hoodie_commit_time",
+        "s3.object.key", "s3.object.size");
+    Pair<CloudObjectIncrCheckpoint, Dataset<Row>> result = 
IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit(
+        inputDs, 1500L, queryInfo, new CloudObjectIncrCheckpoint("commit1", 
null));
+    Row row = 
result.getRight().select("cumulativeSize").collectAsList().get((int) 
result.getRight().count() - 1);
+    assertEquals("commit3#path/to/file8.json", result.getKey().toString());
+    List<Row> rows = result.getRight().collectAsList();
+    assertEquals(8, rows.size());
+    assertEquals(1050L, row.get(0));
+  }
+
+  @Test
+  void testFileOrderingAcrossCommits() {
+    List<Triple<String, Long, String>> filePathSizeAndCommitTime = new 
ArrayList<>();
+    // Add file paths and sizes to the list
+    filePathSizeAndCommitTime.add(Triple.of("path/to/file8.json", 100L, 
"commit3"));
+    filePathSizeAndCommitTime.add(Triple.of("path/to/file6.json", 250L, 
"commit3"));
+    filePathSizeAndCommitTime.add(Triple.of("path/to/file7.json", 50L, 
"commit3"));
+    filePathSizeAndCommitTime.add(Triple.of("path/to/file0.json", 100L, 
"commit4"));
+    filePathSizeAndCommitTime.add(Triple.of("path/to/file1.json", 50L, 
"commit4"));
+    filePathSizeAndCommitTime.add(Triple.of("path/to/file2.json", 50L, 
"commit4"));
+    Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
+
+    QueryInfo queryInfo = new QueryInfo(
+        QUERY_TYPE_INCREMENTAL_OPT_VAL(), "commit3", "commit3",
+        "commit4", "_hoodie_commit_time",
+        "s3.object.key", "s3.object.size");
+    Pair<CloudObjectIncrCheckpoint, Dataset<Row>> result = 
IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit(
+        inputDs, 50L, queryInfo, new 
CloudObjectIncrCheckpoint("commit3","path/to/file8.json"));
+    Row row = 
result.getRight().select("cumulativeSize").collectAsList().get((int) 
result.getRight().count() - 1);
+    assertEquals("commit4#path/to/file0.json", result.getKey().toString());
+    List<Row> rows = result.getRight().collectAsList();
+    assertEquals(1, rows.size());
+    assertEquals(100L, row.get(0));
+
+    result = IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit(
+        inputDs, 350L, queryInfo, new 
CloudObjectIncrCheckpoint("commit3","path/to/file8.json"));
+    row = result.getRight().select("cumulativeSize").collectAsList().get((int) 
result.getRight().count() - 1);
+    assertEquals("commit4#path/to/file2.json", result.getKey().toString());
+    rows = result.getRight().collectAsList();
+    assertEquals(3, rows.size());
+    assertEquals(200L, row.get(0));
+  }
+
+  @Test
+  void testLastObjectInCommit() {
+    List<Triple<String, Long, String>> filePathSizeAndCommitTime = new 
ArrayList<>();
+    // Add file paths and sizes to the list
+    filePathSizeAndCommitTime.add(Triple.of("path/to/file1.json", 100L, 
"commit1"));
+    filePathSizeAndCommitTime.add(Triple.of("path/to/file3.json", 200L, 
"commit1"));
+    filePathSizeAndCommitTime.add(Triple.of("path/to/file2.json", 150L, 
"commit1"));
+    filePathSizeAndCommitTime.add(Triple.of("path/to/file4.json", 50L, 
"commit2"));
+    filePathSizeAndCommitTime.add(Triple.of("path/to/file5.json", 150L, 
"commit2"));
+    filePathSizeAndCommitTime.add(Triple.of("path/to/file8.json", 100L, 
"commit3"));
+    filePathSizeAndCommitTime.add(Triple.of("path/to/file6.json", 250L, 
"commit3"));
+    filePathSizeAndCommitTime.add(Triple.of("path/to/file7.json", 50L, 
"commit3"));
+    Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
+
+    QueryInfo queryInfo = new QueryInfo(
+        QUERY_TYPE_INCREMENTAL_OPT_VAL(), "commit1", "commit1",
+        "commit3", "_hoodie_commit_time",
+        "s3.object.key", "s3.object.size");
+    Pair<CloudObjectIncrCheckpoint, Dataset<Row>> result = 
IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit(
+        inputDs, 1500L, queryInfo, new 
CloudObjectIncrCheckpoint("commit3","path/to/file8.json"));
+    assertEquals("commit3#path/to/file8.json", result.getKey().toString());
+    assertTrue(result.getRight().isEmpty());
+  }
+}
\ No newline at end of file

Reply via email to