This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 38c87b7ebe1 [HUDI-7004] Add support of snapshotLoadQuerySplitter in 
s3/gcs sources (#10152)
38c87b7ebe1 is described below

commit 38c87b7ebe148e8870db83be433376ad89b9c048
Author: harshal <harshal.j.pa...@gmail.com>
AuthorDate: Wed Nov 22 20:53:42 2023 +0530

    [HUDI-7004] Add support of snapshotLoadQuerySplitter in s3/gcs sources 
(#10152)
---
 .../apache/hudi/common/config/TypedProperties.java |  5 ++
 .../sources/GcsEventsHoodieIncrSource.java         |  7 +-
 .../hudi/utilities/sources/HoodieIncrSource.java   |  6 +-
 .../sources/S3EventsHoodieIncrSource.java          |  9 ++-
 .../sources/SnapshotLoadQuerySplitter.java         |  9 +++
 .../utilities/sources/helpers/QueryRunner.java     | 35 +++++----
 .../sources/TestGcsEventsHoodieIncrSource.java     | 85 ++++++++++++++++++++--
 .../sources/TestS3EventsHoodieIncrSource.java      | 78 ++++++++++++++++++--
 8 files changed, 198 insertions(+), 36 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/config/TypedProperties.java 
b/hudi-common/src/main/java/org/apache/hudi/common/config/TypedProperties.java
index 3db8210cade..86b7f4cc457 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/config/TypedProperties.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/config/TypedProperties.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.common.config;
 
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
 
 import java.io.Serializable;
@@ -78,6 +79,10 @@ public class TypedProperties extends Properties implements 
Serializable {
     return containsKey(property) ? getProperty(property) : defaultValue;
   }
 
+  public Option<String> getNonEmptyStringOpt(String property, String 
defaultValue) {
+    return Option.ofNullable(StringUtils.emptyToNull(getString(property, 
defaultValue)));
+  }
+
   public List<String> getStringList(String property, String delimiter, 
List<String> defaultVal) {
     if (!containsKey(property)) {
       return defaultVal;
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java
index d09bad71916..a06130d3972 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java
@@ -114,6 +114,7 @@ public class GcsEventsHoodieIncrSource extends 
HoodieIncrSource {
   private final CloudDataFetcher gcsObjectDataFetcher;
   private final QueryRunner queryRunner;
   private final Option<SchemaProvider> schemaProvider;
+  private final Option<SnapshotLoadQuerySplitter> snapshotLoadQuerySplitter;
 
 
   public static final String GCS_OBJECT_KEY = "name";
@@ -145,6 +146,7 @@ public class GcsEventsHoodieIncrSource extends 
HoodieIncrSource {
     this.gcsObjectDataFetcher = gcsObjectDataFetcher;
     this.queryRunner = queryRunner;
     this.schemaProvider = Option.ofNullable(schemaProvider);
+    this.snapshotLoadQuerySplitter = 
SnapshotLoadQuerySplitter.getInstance(props);
 
     LOG.info("srcPath: " + srcPath);
     LOG.info("missingCheckpointStrategy: " + missingCheckpointStrategy);
@@ -171,8 +173,9 @@ public class GcsEventsHoodieIncrSource extends 
HoodieIncrSource {
       return Pair.of(Option.empty(), queryInfo.getStartInstant());
     }
 
-    Dataset<Row> cloudObjectMetadataDF = queryRunner.run(queryInfo);
-    Dataset<Row> filteredSourceData = 
gcsObjectMetadataFetcher.applyFilter(cloudObjectMetadataDF);
+    Pair<QueryInfo, Dataset<Row>> queryInfoDatasetPair = 
queryRunner.run(queryInfo, snapshotLoadQuerySplitter);
+    Dataset<Row> filteredSourceData = 
gcsObjectMetadataFetcher.applyFilter(queryInfoDatasetPair.getRight());
+    queryInfo = queryInfoDatasetPair.getLeft();
     LOG.info("Adjusting end checkpoint:" + queryInfo.getEndInstant() + " based 
on sourceLimit :" + sourceLimit);
     Pair<CloudObjectIncrCheckpoint, Option<Dataset<Row>>> checkPointAndDataset 
=
         IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit(
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
index 1d302fa106b..f87e5c231bf 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
@@ -25,7 +25,6 @@ import org.apache.hudi.common.model.HoodieRecord;
 import 
org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling;
 import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.utilities.config.HoodieIncrSourceConfig;
 import org.apache.hudi.utilities.schema.SchemaProvider;
@@ -56,7 +55,6 @@ import static 
org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
 import static org.apache.hudi.common.util.ConfigUtils.getIntWithAltKeys;
 import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
 import static org.apache.hudi.utilities.UtilHelpers.createRecordMerger;
-import static 
org.apache.hudi.utilities.sources.SnapshotLoadQuerySplitter.Config.SNAPSHOT_LOAD_QUERY_SPLITTER_CLASS_NAME;
 import static 
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.generateQueryInfo;
 import static 
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.getHollowCommitHandleMode;
 
@@ -150,9 +148,7 @@ public class HoodieIncrSource extends RowSource {
       }
     }
 
-    this.snapshotLoadQuerySplitter = 
Option.ofNullable(props.getString(SNAPSHOT_LOAD_QUERY_SPLITTER_CLASS_NAME, 
null))
-        .map(className -> (SnapshotLoadQuerySplitter) 
ReflectionUtils.loadClass(className,
-            new Class<?>[] {TypedProperties.class}, props));
+    this.snapshotLoadQuerySplitter = 
SnapshotLoadQuerySplitter.getInstance(props);
   }
 
   @Override
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
index 4b9be847c75..325e494e0ab 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
@@ -80,6 +80,8 @@ public class S3EventsHoodieIncrSource extends 
HoodieIncrSource {
 
   private final Option<SchemaProvider> schemaProvider;
 
+  private final Option<SnapshotLoadQuerySplitter> snapshotLoadQuerySplitter;
+
   public static class Config {
     // control whether we do existence check for files before consuming them
     @Deprecated
@@ -138,6 +140,7 @@ public class S3EventsHoodieIncrSource extends 
HoodieIncrSource {
     this.queryRunner = queryRunner;
     this.cloudDataFetcher = cloudDataFetcher;
     this.schemaProvider = Option.ofNullable(schemaProvider);
+    this.snapshotLoadQuerySplitter = 
SnapshotLoadQuerySplitter.getInstance(props);
   }
 
   @Override
@@ -158,9 +161,9 @@ public class S3EventsHoodieIncrSource extends 
HoodieIncrSource {
       LOG.warn("Already caught up. No new data to process");
       return Pair.of(Option.empty(), queryInfo.getEndInstant());
     }
-
-    Dataset<Row> source = queryRunner.run(queryInfo);
-    Dataset<Row> filteredSourceData = applyFilter(source, fileFormat);
+    Pair<QueryInfo, Dataset<Row>> queryInfoDatasetPair = 
queryRunner.run(queryInfo, snapshotLoadQuerySplitter);
+    queryInfo = queryInfoDatasetPair.getLeft();
+    Dataset<Row> filteredSourceData = 
applyFilter(queryInfoDatasetPair.getRight(), fileFormat);
 
     LOG.info("Adjusting end checkpoint:" + queryInfo.getEndInstant() + " based 
on sourceLimit :" + sourceLimit);
     Pair<CloudObjectIncrCheckpoint, Option<Dataset<Row>>> checkPointAndDataset 
=
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/SnapshotLoadQuerySplitter.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/SnapshotLoadQuerySplitter.java
index 6a13607b1d5..ca299122ec7 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/SnapshotLoadQuerySplitter.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/SnapshotLoadQuerySplitter.java
@@ -20,10 +20,13 @@ package org.apache.hudi.utilities.sources;
 
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.utilities.sources.helpers.QueryInfo;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 
+import static 
org.apache.hudi.utilities.sources.SnapshotLoadQuerySplitter.Config.SNAPSHOT_LOAD_QUERY_SPLITTER_CLASS_NAME;
+
 /**
  * Abstract splitter responsible for managing the snapshot load query 
operations.
  */
@@ -75,4 +78,10 @@ public abstract class SnapshotLoadQuerySplitter {
         .map(checkpoint -> queryInfo.withUpdatedEndInstant(checkpoint))
         .orElse(queryInfo);
   }
+
+  public static Option<SnapshotLoadQuerySplitter> getInstance(TypedProperties 
props) {
+    return props.getNonEmptyStringOpt(SNAPSHOT_LOAD_QUERY_SPLITTER_CLASS_NAME, 
null)
+        .map(className -> (SnapshotLoadQuerySplitter) 
ReflectionUtils.loadClass(className,
+            new Class<?>[] {TypedProperties.class}, props));
+  }
 }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java
index ef903d7c647..2f0a8bf488e 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java
@@ -21,9 +21,12 @@ package org.apache.hudi.utilities.sources.helpers;
 import org.apache.hudi.DataSourceReadOptions;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.utilities.config.HoodieIncrSourceConfig;
 
+import org.apache.hudi.utilities.sources.SnapshotLoadQuerySplitter;
 import org.apache.spark.sql.Column;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
@@ -62,16 +65,14 @@ public class QueryRunner {
    * @param queryInfo all meta info about the query to be executed.
    * @return the output of the query as Dataset < Row >.
    */
-  public Dataset<Row> run(QueryInfo queryInfo) {
-    Dataset<Row> dataset = null;
+  public Pair<QueryInfo, Dataset<Row>> run(QueryInfo queryInfo, 
Option<SnapshotLoadQuerySplitter> snapshotLoadQuerySplitterOption) {
     if (queryInfo.isIncremental()) {
-      dataset = runIncrementalQuery(queryInfo);
+      return runIncrementalQuery(queryInfo);
     } else if (queryInfo.isSnapshot()) {
-      dataset = runSnapshotQuery(queryInfo);
+      return runSnapshotQuery(queryInfo, snapshotLoadQuerySplitterOption);
     } else {
       throw new HoodieException("Unknown query type " + 
queryInfo.getQueryType());
     }
-    return dataset;
   }
 
   public static Dataset<Row> applyOrdering(Dataset<Row> dataset, List<String> 
orderByColumns) {
@@ -82,26 +83,34 @@ public class QueryRunner {
     return dataset;
   }
 
-  public Dataset<Row> runIncrementalQuery(QueryInfo queryInfo) {
+  public Pair<QueryInfo, Dataset<Row>> runIncrementalQuery(QueryInfo 
queryInfo) {
     LOG.info("Running incremental query");
-    return sparkSession.read().format("org.apache.hudi")
+    return Pair.of(queryInfo, sparkSession.read().format("org.apache.hudi")
         .option(DataSourceReadOptions.QUERY_TYPE().key(), 
queryInfo.getQueryType())
         .option(DataSourceReadOptions.BEGIN_INSTANTTIME().key(), 
queryInfo.getPreviousInstant())
         .option(DataSourceReadOptions.END_INSTANTTIME().key(), 
queryInfo.getEndInstant())
         
.option(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN().key(),
             
props.getString(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN().key(),
                 
DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN().defaultValue()))
-        .load(sourcePath);
+        .load(sourcePath));
   }
 
-  public Dataset<Row> runSnapshotQuery(QueryInfo queryInfo) {
+  public Pair<QueryInfo, Dataset<Row>> runSnapshotQuery(QueryInfo queryInfo, 
Option<SnapshotLoadQuerySplitter> snapshotLoadQuerySplitterOption) {
     LOG.info("Running snapshot query");
-    return sparkSession.read().format("org.apache.hudi")
-        .option(DataSourceReadOptions.QUERY_TYPE().key(), 
queryInfo.getQueryType()).load(sourcePath)
+    Dataset<Row> snapshot = sparkSession.read().format("org.apache.hudi")
+        .option(DataSourceReadOptions.QUERY_TYPE().key(), 
queryInfo.getQueryType()).load(sourcePath);
+    QueryInfo snapshotQueryInfo = snapshotLoadQuerySplitterOption
+        .map(snapshotLoadQuerySplitter -> 
snapshotLoadQuerySplitter.getNextCheckpoint(snapshot, queryInfo))
+        .orElse(queryInfo);
+    return Pair.of(snapshotQueryInfo, applySnapshotQueryFilters(snapshot, 
snapshotQueryInfo));
+  }
+
+  public Dataset<Row> applySnapshotQueryFilters(Dataset<Row> snapshot, 
QueryInfo snapshotQueryInfo) {
+    return snapshot
         // add filtering so that only interested records are returned.
         .filter(String.format("%s >= '%s'", 
HoodieRecord.COMMIT_TIME_METADATA_FIELD,
-            queryInfo.getStartInstant()))
+            snapshotQueryInfo.getStartInstant()))
         .filter(String.format("%s <= '%s'", 
HoodieRecord.COMMIT_TIME_METADATA_FIELD,
-            queryInfo.getEndInstant()));
+            snapshotQueryInfo.getEndInstant()));
   }
 }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
index 5c31f310800..bc2906d251f 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
@@ -40,6 +40,7 @@ import 
org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher;
 import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
+import org.apache.hudi.utilities.sources.helpers.QueryInfo;
 import org.apache.hudi.utilities.sources.helpers.QueryRunner;
 import org.apache.hudi.utilities.sources.helpers.gcs.GcsObjectMetadataFetcher;
 
@@ -56,6 +57,8 @@ import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
 import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
@@ -93,6 +96,8 @@ public class TestGcsEventsHoodieIncrSource extends 
SparkClientFunctionalTestHarn
 
   @Mock
   QueryRunner queryRunner;
+  @Mock
+  QueryInfo queryInfo;
 
   protected Option<SchemaProvider> schemaProvider;
   private HoodieTableMetaClient metaClient;
@@ -142,7 +147,7 @@ public class TestGcsEventsHoodieIncrSource extends 
SparkClientFunctionalTestHarn
     filePathSizeAndCommitTime.add(Triple.of("path/to/file3.json", 200L, "1"));
     Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
 
-    when(queryRunner.run(Mockito.any())).thenReturn(inputDs);
+    setMockQueryRunner(inputDs);
 
     readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 
100L, "1#path/to/file1.json");
   }
@@ -160,7 +165,8 @@ public class TestGcsEventsHoodieIncrSource extends 
SparkClientFunctionalTestHarn
     filePathSizeAndCommitTime.add(Triple.of("path/to/file3.json", 200L, "1"));
 
     Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
-    when(queryRunner.run(Mockito.any())).thenReturn(inputDs);
+
+    setMockQueryRunner(inputDs);
 
     readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 
250L, "1#path/to/file2.json");
     readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file2.json"), 
250L, "1#path/to/file3.json");
@@ -183,7 +189,7 @@ public class TestGcsEventsHoodieIncrSource extends 
SparkClientFunctionalTestHarn
 
     Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
 
-    when(queryRunner.run(Mockito.any())).thenReturn(inputDs);
+    setMockQueryRunner(inputDs);
 
     readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 
250L, "1#path/to/file10006.json");
     readAndAssert(READ_UPTO_LATEST_COMMIT, 
Option.of("1#path/to/file10006.json"), 250L, "1#path/to/file10007.json");
@@ -205,7 +211,7 @@ public class TestGcsEventsHoodieIncrSource extends 
SparkClientFunctionalTestHarn
 
     Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
 
-    when(queryRunner.run(Mockito.any())).thenReturn(inputDs);
+    setMockQueryRunner(inputDs);
 
     readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 
100L, "1#path/to/file1.json");
     readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file1.json"), 
100L, "1#path/to/file2.json");
@@ -213,10 +219,68 @@ public class TestGcsEventsHoodieIncrSource extends 
SparkClientFunctionalTestHarn
     readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 
100L, "1#path/to/file1.json");
   }
 
+  @ParameterizedTest
+  @CsvSource({
+      "1,1#path/to/file2.json,3#path/to/file4.json,1#path/to/file1.json,1",
+      "2,1#path/to/file2.json,3#path/to/file4.json,1#path/to/file1.json,2",
+      "3,3#path/to/file5.json,3,1#path/to/file1.json,3"
+  })
+  public void testSplitSnapshotLoad(String snapshotCheckPoint, String 
exptected1, String exptected2, String exptected3, String exptected4) throws 
IOException {
+
+    writeGcsMetadataRecords("1");
+    writeGcsMetadataRecords("2");
+    writeGcsMetadataRecords("3");
+
+    List<Triple<String, Long, String>> filePathSizeAndCommitTime = new 
ArrayList<>();
+    // Add file paths and sizes to the list
+    filePathSizeAndCommitTime.add(Triple.of("path/to/file1.json", 50L, "1"));
+    filePathSizeAndCommitTime.add(Triple.of("path/to/file2.json", 50L, "1"));
+    filePathSizeAndCommitTime.add(Triple.of("path/to/skip1.json", 50L, "2"));
+    filePathSizeAndCommitTime.add(Triple.of("path/to/skip2.json", 50L, "2"));
+    filePathSizeAndCommitTime.add(Triple.of("path/to/file5.json", 50L, "3"));
+    filePathSizeAndCommitTime.add(Triple.of("path/to/file4.json", 50L, "3"));
+
+    Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
+
+    setMockQueryRunner(inputDs, Option.of(snapshotCheckPoint));
+    TypedProperties typedProperties = setProps(READ_UPTO_LATEST_COMMIT);
+    
typedProperties.setProperty("hoodie.deltastreamer.source.cloud.data.ignore.relpath.prefix",
 "path/to/skip");
+    //1. snapshot query, read all records
+    readAndAssert(READ_UPTO_LATEST_COMMIT, Option.empty(), 50000L, exptected1, 
typedProperties);
+    //2. incremental query, as commit is present in timeline
+    readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(exptected1), 10L, 
exptected2, typedProperties);
+    //3. snapshot query with source limit less than first commit size
+    readAndAssert(READ_UPTO_LATEST_COMMIT, Option.empty(), 50L, exptected3, 
typedProperties);
+    
typedProperties.setProperty("hoodie.deltastreamer.source.cloud.data.ignore.relpath.prefix",
 "path/to");
+    //4. As snapshotQuery will return 1 -> same would be return as 
nextCheckpoint (dataset is empty due to ignore prefix).
+    readAndAssert(READ_UPTO_LATEST_COMMIT, Option.empty(), 50L, exptected4, 
typedProperties);
+  }
+
+  private void setMockQueryRunner(Dataset<Row> inputDs) {
+    setMockQueryRunner(inputDs, Option.empty());
+  }
+
+  private void setMockQueryRunner(Dataset<Row> inputDs, Option<String> 
nextCheckPointOpt) {
+
+    when(queryRunner.run(Mockito.any(QueryInfo.class), 
Mockito.any())).thenAnswer(invocation -> {
+      QueryInfo queryInfo = invocation.getArgument(0);
+      QueryInfo updatedQueryInfo = nextCheckPointOpt.map(nextCheckPoint ->
+              queryInfo.withUpdatedEndInstant(nextCheckPoint))
+          .orElse(queryInfo);
+      if (updatedQueryInfo.isSnapshot()) {
+        return Pair.of(updatedQueryInfo,
+            inputDs.filter(String.format("%s >= '%s'", 
HoodieRecord.COMMIT_TIME_METADATA_FIELD,
+                    updatedQueryInfo.getStartInstant()))
+                .filter(String.format("%s <= '%s'", 
HoodieRecord.COMMIT_TIME_METADATA_FIELD,
+                    updatedQueryInfo.getEndInstant())));
+      }
+      return Pair.of(updatedQueryInfo, inputDs);
+    });
+  }
+
   private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy 
missingCheckpointStrategy,
-                             Option<String> checkpointToPull, long 
sourceLimit, String expectedCheckpoint) {
-    TypedProperties typedProperties = setProps(missingCheckpointStrategy);
-    typedProperties.put("hoodie.deltastreamer.source.hoodieincr.file.format", 
"json");
+                             Option<String> checkpointToPull, long 
sourceLimit, String expectedCheckpoint,
+                             TypedProperties typedProperties) {
 
     GcsEventsHoodieIncrSource incrSource = new 
GcsEventsHoodieIncrSource(typedProperties, jsc(),
         spark(), schemaProvider.orElse(null), new 
GcsObjectMetadataFetcher(typedProperties, "json"), gcsObjectDataFetcher, 
queryRunner);
@@ -230,6 +294,13 @@ public class TestGcsEventsHoodieIncrSource extends 
SparkClientFunctionalTestHarn
     Assertions.assertEquals(expectedCheckpoint, nextCheckPoint);
   }
 
+  private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy 
missingCheckpointStrategy,
+                             Option<String> checkpointToPull, long 
sourceLimit, String expectedCheckpoint) {
+    TypedProperties typedProperties = setProps(missingCheckpointStrategy);
+    typedProperties.put("hoodie.deltastreamer.source.hoodieincr.file.format", 
"json");
+    readAndAssert(missingCheckpointStrategy, checkpointToPull, sourceLimit, 
expectedCheckpoint, typedProperties);
+  }
+
   private HoodieRecord getGcsMetadataRecord(String commitTime, String 
filename, String bucketName, String generation) {
     String partitionPath = bucketName;
 
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java
index 7d58d21d874..e0af8d73e26 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java
@@ -40,6 +40,7 @@ import 
org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher;
 import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
+import org.apache.hudi.utilities.sources.helpers.QueryInfo;
 import org.apache.hudi.utilities.sources.helpers.QueryRunner;
 import 
org.apache.hudi.utilities.sources.helpers.TestCloudObjectsSelectorCommon;
 
@@ -56,6 +57,8 @@ import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
 import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.junit.jupiter.MockitoExtension;
@@ -88,6 +91,8 @@ public class TestS3EventsHoodieIncrSource extends 
SparkClientFunctionalTestHarne
   QueryRunner mockQueryRunner;
   @Mock
   CloudDataFetcher mockCloudDataFetcher;
+  @Mock
+  QueryInfo queryInfo;
   private JavaSparkContext jsc;
   private HoodieTableMetaClient metaClient;
 
@@ -248,7 +253,7 @@ public class TestS3EventsHoodieIncrSource extends 
SparkClientFunctionalTestHarne
 
     Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
 
-    when(mockQueryRunner.run(Mockito.any())).thenReturn(inputDs);
+    setMockQueryRunner(inputDs);
     when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(), 
Mockito.any(), Mockito.any(), eq(schemaProvider)))
         .thenReturn(Option.empty());
 
@@ -273,7 +278,7 @@ public class TestS3EventsHoodieIncrSource extends 
SparkClientFunctionalTestHarne
 
     Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
 
-    when(mockQueryRunner.run(Mockito.any())).thenReturn(inputDs);
+    setMockQueryRunner(inputDs);
     when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(), 
Mockito.any(), Mockito.any(), eq(schemaProvider)))
         .thenReturn(Option.empty());
 
@@ -301,7 +306,7 @@ public class TestS3EventsHoodieIncrSource extends 
SparkClientFunctionalTestHarne
 
     Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
 
-    when(mockQueryRunner.run(Mockito.any())).thenReturn(inputDs);
+    setMockQueryRunner(inputDs);
     when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(), 
Mockito.any(), Mockito.any(), eq(schemaProvider)))
         .thenReturn(Option.empty());
 
@@ -329,7 +334,7 @@ public class TestS3EventsHoodieIncrSource extends 
SparkClientFunctionalTestHarne
 
     Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
 
-    when(mockQueryRunner.run(Mockito.any())).thenReturn(inputDs);
+    setMockQueryRunner(inputDs);
     TypedProperties typedProperties = setProps(READ_UPTO_LATEST_COMMIT);
     
typedProperties.setProperty("hoodie.deltastreamer.source.s3incr.ignore.key.prefix",
 "path/to/skip");
 
@@ -361,7 +366,7 @@ public class TestS3EventsHoodieIncrSource extends 
SparkClientFunctionalTestHarne
 
     Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
 
-    when(mockQueryRunner.run(Mockito.any())).thenReturn(inputDs);
+    setMockQueryRunner(inputDs);
     when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(), 
Mockito.any(), Mockito.any(), eq(schemaProvider)))
         .thenReturn(Option.empty());
     TypedProperties typedProperties = setProps(READ_UPTO_LATEST_COMMIT);
@@ -393,7 +398,7 @@ public class TestS3EventsHoodieIncrSource extends 
SparkClientFunctionalTestHarne
 
     Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
 
-    when(mockQueryRunner.run(Mockito.any())).thenReturn(inputDs);
+    setMockQueryRunner(inputDs);
     when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(), 
Mockito.any(), Mockito.any(), eq(schemaProvider)))
         .thenReturn(Option.empty());
     TypedProperties typedProperties = setProps(READ_UPTO_LATEST_COMMIT);
@@ -407,6 +412,45 @@ public class TestS3EventsHoodieIncrSource extends 
SparkClientFunctionalTestHarne
     readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file3.json"), 
50L, "3#path/to/file4.json", typedProperties);
   }
 
+  @ParameterizedTest
+  @CsvSource({
+      "1,1#path/to/file2.json,3#path/to/file4.json,1#path/to/file1.json,1",
+      "2,1#path/to/file2.json,3#path/to/file4.json,1#path/to/file1.json,2",
+      "3,3#path/to/file5.json,3,1#path/to/file1.json,3"
+  })
+  public void testSplitSnapshotLoad(String snapshotCheckPoint, String 
exptected1, String exptected2, String exptected3, String exptected4) throws 
IOException {
+
+    writeS3MetadataRecords("1");
+    writeS3MetadataRecords("2");
+    writeS3MetadataRecords("3");
+
+    List<Triple<String, Long, String>> filePathSizeAndCommitTime = new 
ArrayList<>();
+    // Add file paths and sizes to the list
+    filePathSizeAndCommitTime.add(Triple.of("path/to/file1.json", 50L, "1"));
+    filePathSizeAndCommitTime.add(Triple.of("path/to/file2.json", 50L, "1"));
+    filePathSizeAndCommitTime.add(Triple.of("path/to/skip1.json", 50L, "2"));
+    filePathSizeAndCommitTime.add(Triple.of("path/to/skip2.json", 50L, "2"));
+    filePathSizeAndCommitTime.add(Triple.of("path/to/file5.json", 50L, "3"));
+    filePathSizeAndCommitTime.add(Triple.of("path/to/file4.json", 50L, "3"));
+
+    Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
+
+    setMockQueryRunner(inputDs, Option.of(snapshotCheckPoint));
+    when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(), 
Mockito.any(), Mockito.any(), eq(schemaProvider)))
+        .thenReturn(Option.empty());
+    TypedProperties typedProperties = setProps(READ_UPTO_LATEST_COMMIT);
+    
typedProperties.setProperty("hoodie.deltastreamer.source.s3incr.ignore.key.prefix",
 "path/to/skip");
+    //1. snapshot query, read all records
+    readAndAssert(READ_UPTO_LATEST_COMMIT, Option.empty(), 50000L, exptected1, 
typedProperties);
+    //2. incremental query, as commit is present in timeline
+    readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(exptected1), 10L, 
exptected2, typedProperties);
+    //3. snapshot query with source limit less than first commit size
+    readAndAssert(READ_UPTO_LATEST_COMMIT, Option.empty(), 50L, exptected3, 
typedProperties);
+    
typedProperties.setProperty("hoodie.deltastreamer.source.s3incr.ignore.key.prefix",
 "path/to");
+    //4. As snapshotQuery will return 1 -> same would be return as 
nextCheckpoint (dataset is empty due to ignore prefix).
+    readAndAssert(READ_UPTO_LATEST_COMMIT, Option.empty(), 50L, exptected4, 
typedProperties);
+  }
+
   private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy 
missingCheckpointStrategy,
                              Option<String> checkpointToPull, long 
sourceLimit, String expectedCheckpoint,
                              TypedProperties typedProperties) {
@@ -422,6 +466,28 @@ public class TestS3EventsHoodieIncrSource extends 
SparkClientFunctionalTestHarne
     Assertions.assertEquals(expectedCheckpoint, nextCheckPoint);
   }
 
+  private void setMockQueryRunner(Dataset<Row> inputDs) {
+    setMockQueryRunner(inputDs, Option.empty());
+  }
+
+  private void setMockQueryRunner(Dataset<Row> inputDs, Option<String> 
nextCheckPointOpt) {
+
+    when(mockQueryRunner.run(Mockito.any(QueryInfo.class), 
Mockito.any())).thenAnswer(invocation -> {
+      QueryInfo queryInfo = invocation.getArgument(0);
+      QueryInfo updatedQueryInfo = nextCheckPointOpt.map(nextCheckPoint ->
+              queryInfo.withUpdatedEndInstant(nextCheckPoint))
+          .orElse(queryInfo);
+      if (updatedQueryInfo.isSnapshot()) {
+        return Pair.of(updatedQueryInfo,
+            inputDs.filter(String.format("%s >= '%s'", 
HoodieRecord.COMMIT_TIME_METADATA_FIELD,
+                    updatedQueryInfo.getStartInstant()))
+                .filter(String.format("%s <= '%s'", 
HoodieRecord.COMMIT_TIME_METADATA_FIELD,
+                    updatedQueryInfo.getEndInstant())));
+      }
+      return Pair.of(updatedQueryInfo, inputDs);
+    });
+  }
+
   private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy 
missingCheckpointStrategy,
                              Option<String> checkpointToPull, long 
sourceLimit, String expectedCheckpoint) {
     TypedProperties typedProperties = setProps(missingCheckpointStrategy);

Reply via email to