This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 0b75508a22ea20c00b25a6558e5044a86e468d38
Author: Vinish Reddy <[email protected]>
AuthorDate: Mon May 13 07:23:31 2024 +0530

    [HUDI-7501] Use source profile for S3 and GCS sources (#10861)
    
    Co-authored-by: Y Ethan Guo <[email protected]>
---
 .../org/apache/hudi/utilities/UtilHelpers.java     |  53 ++++-----
 .../sources/GcsEventsHoodieIncrSource.java         |  61 ++++------
 .../hudi/utilities/sources/HoodieIncrSource.java   |   6 +-
 .../apache/hudi/utilities/sources/RowSource.java   |   8 +-
 .../sources/S3EventsHoodieIncrSource.java          |  87 +++-----------
 .../sources/helpers/CloudDataFetcher.java          |  79 ++++++++++++-
 .../helpers/CloudObjectsSelectorCommon.java        |  70 ++++++++----
 .../helpers/gcs/GcsObjectMetadataFetcher.java      |  86 --------------
 .../sources/TestGcsEventsHoodieIncrSource.java     |  83 ++++++++++----
 .../utilities/sources/TestHoodieIncrSource.java    |   3 +-
 .../sources/TestS3EventsHoodieIncrSource.java      | 125 ++++++++++++++++-----
 .../debezium/TestAbstractDebeziumSource.java       |   3 +-
 .../helpers/TestCloudObjectsSelectorCommon.java    |  42 ++++---
 13 files changed, 383 insertions(+), 323 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
index 026bb621677..abf0558e5ff 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
@@ -40,6 +40,7 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.config.HoodieLockConfig;
@@ -140,42 +141,30 @@ public class UtilHelpers {
   }
 
   public static Source createSource(String sourceClass, TypedProperties cfg, 
JavaSparkContext jssc,
-      SparkSession sparkSession, SchemaProvider schemaProvider,
-      HoodieIngestionMetrics metrics) throws IOException {
-    try {
+                                    SparkSession sparkSession, 
HoodieIngestionMetrics metrics, StreamContext streamContext) throws IOException 
{
+    // All possible constructors.
+    Class<?>[] constructorArgsStreamContextMetrics = new Class<?>[] 
{TypedProperties.class, JavaSparkContext.class, SparkSession.class, 
HoodieIngestionMetrics.class, StreamContext.class};
+    Class<?>[] constructorArgsStreamContext = new Class<?>[] 
{TypedProperties.class, JavaSparkContext.class, SparkSession.class, 
StreamContext.class};
+    Class<?>[] constructorArgsMetrics = new Class<?>[] {TypedProperties.class, 
JavaSparkContext.class, SparkSession.class, SchemaProvider.class, 
HoodieIngestionMetrics.class};
+    Class<?>[] constructorArgs = new Class<?>[] {TypedProperties.class, 
JavaSparkContext.class, SparkSession.class, SchemaProvider.class};
+    // List of constructor and their respective arguments.
+    List<Pair<Class<?>[], Object[]>> sourceConstructorAndArgs = new 
ArrayList<>();
+    sourceConstructorAndArgs.add(Pair.of(constructorArgsStreamContextMetrics, 
new Object[] {cfg, jssc, sparkSession, metrics, streamContext}));
+    sourceConstructorAndArgs.add(Pair.of(constructorArgsStreamContext, new 
Object[] {cfg, jssc, sparkSession, streamContext}));
+    sourceConstructorAndArgs.add(Pair.of(constructorArgsMetrics, new Object[] 
{cfg, jssc, sparkSession, streamContext.getSchemaProvider(), metrics}));
+    sourceConstructorAndArgs.add(Pair.of(constructorArgs, new Object[] {cfg, 
jssc, sparkSession, streamContext.getSchemaProvider()}));
+
+    HoodieException sourceClassLoadException = null;
+    for (Pair<Class<?>[], Object[]> constructor : sourceConstructorAndArgs) {
       try {
-        return (Source) ReflectionUtils.loadClass(sourceClass,
-            new Class<?>[] {TypedProperties.class, JavaSparkContext.class,
-                SparkSession.class, SchemaProvider.class,
-                HoodieIngestionMetrics.class},
-            cfg, jssc, sparkSession, schemaProvider, metrics);
+        return (Source) ReflectionUtils.loadClass(sourceClass, 
constructor.getLeft(), constructor.getRight());
       } catch (HoodieException e) {
-        return (Source) ReflectionUtils.loadClass(sourceClass,
-            new Class<?>[] {TypedProperties.class, JavaSparkContext.class,
-                SparkSession.class, SchemaProvider.class},
-            cfg, jssc, sparkSession, schemaProvider);
+        sourceClassLoadException = e;
+      } catch (Throwable t) {
+        throw new IOException("Could not load source class " + sourceClass, t);
       }
-    } catch (Throwable e) {
-      throw new IOException("Could not load source class " + sourceClass, e);
-    }
-  }
-
-  public static Source createSource(String sourceClass, TypedProperties cfg, 
JavaSparkContext jssc,
-                                    SparkSession sparkSession, 
HoodieIngestionMetrics metrics, StreamContext streamContext)
-      throws IOException {
-    try {
-      try {
-        return (Source) ReflectionUtils.loadClass(sourceClass,
-            new Class<?>[] {TypedProperties.class, JavaSparkContext.class,
-                SparkSession.class,
-                HoodieIngestionMetrics.class, StreamContext.class},
-            cfg, jssc, sparkSession, metrics, streamContext);
-      } catch (HoodieException e) {
-        return createSource(sourceClass, cfg, jssc, sparkSession, 
streamContext.getSchemaProvider(), metrics);
-      }
-    } catch (Throwable e) {
-      throw new IOException("Could not load source class " + sourceClass, e);
     }
+    throw new IOException("Could not load source class " + sourceClass, 
sourceClassLoadException);
   }
 
   public static JsonKafkaSourcePostProcessor 
createJsonKafkaSourcePostProcessor(String postProcessorClassNames, 
TypedProperties props) throws IOException {
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java
index d1d320f99b8..5900ddade24 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java
@@ -26,13 +26,12 @@ import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher;
 import org.apache.hudi.utilities.sources.helpers.CloudObjectIncrCheckpoint;
-import org.apache.hudi.utilities.sources.helpers.CloudObjectMetadata;
 import org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon;
-import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
 import 
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.MissingCheckpointStrategy;
 import org.apache.hudi.utilities.sources.helpers.QueryInfo;
 import org.apache.hudi.utilities.sources.helpers.QueryRunner;
-import org.apache.hudi.utilities.sources.helpers.gcs.GcsObjectMetadataFetcher;
+import org.apache.hudi.utilities.streamer.DefaultStreamContext;
+import org.apache.hudi.utilities.streamer.StreamContext;
 
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.Dataset;
@@ -42,7 +41,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Collections;
-import java.util.List;
 
 import static 
org.apache.hudi.common.util.ConfigUtils.checkRequiredConfigProperties;
 import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
@@ -52,6 +50,7 @@ import static 
org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
 import static 
org.apache.hudi.utilities.config.CloudSourceConfig.ENABLE_EXISTS_CHECK;
 import static 
org.apache.hudi.utilities.config.HoodieIncrSourceConfig.HOODIE_SRC_BASE_PATH;
 import static 
org.apache.hudi.utilities.config.HoodieIncrSourceConfig.NUM_INSTANTS_PER_FETCH;
+import static 
org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon.Type.GCS;
 import static 
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.generateQueryInfo;
 import static 
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.getHollowCommitHandleMode;
 import static 
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.getMissingCheckpointStrategy;
@@ -109,8 +108,7 @@ public class GcsEventsHoodieIncrSource extends 
HoodieIncrSource {
   private final int numInstantsPerFetch;
 
   private final MissingCheckpointStrategy missingCheckpointStrategy;
-  private final GcsObjectMetadataFetcher gcsObjectMetadataFetcher;
-  private final CloudDataFetcher gcsObjectDataFetcher;
+  private final CloudDataFetcher cloudDataFetcher;
   private final QueryRunner queryRunner;
   private final Option<SchemaProvider> schemaProvider;
   private final Option<SnapshotLoadQuerySplitter> snapshotLoadQuerySplitter;
@@ -120,16 +118,26 @@ public class GcsEventsHoodieIncrSource extends 
HoodieIncrSource {
   public GcsEventsHoodieIncrSource(TypedProperties props, JavaSparkContext 
jsc, SparkSession spark,
                                    SchemaProvider schemaProvider) {
 
-    this(props, jsc, spark, schemaProvider,
-        new GcsObjectMetadataFetcher(props),
-        new CloudDataFetcher(props),
-        new QueryRunner(spark, props)
+    this(props, jsc, spark,
+        new CloudDataFetcher(props, jsc, spark),
+        new QueryRunner(spark, props),
+        new DefaultStreamContext(schemaProvider, Option.empty())
+    );
+  }
+
+  public GcsEventsHoodieIncrSource(TypedProperties props, JavaSparkContext 
jsc, SparkSession spark,
+                                   StreamContext streamContext) {
+
+    this(props, jsc, spark,
+        new CloudDataFetcher(props, jsc, spark),
+        new QueryRunner(spark, props),
+        streamContext
     );
   }
 
   GcsEventsHoodieIncrSource(TypedProperties props, JavaSparkContext jsc, 
SparkSession spark,
-                            SchemaProvider schemaProvider, 
GcsObjectMetadataFetcher gcsObjectMetadataFetcher, CloudDataFetcher 
gcsObjectDataFetcher, QueryRunner queryRunner) {
-    super(props, jsc, spark, schemaProvider);
+                            CloudDataFetcher cloudDataFetcher, QueryRunner 
queryRunner, StreamContext streamContext) {
+    super(props, jsc, spark, streamContext);
 
     checkRequiredConfigProperties(props, 
Collections.singletonList(HOODIE_SRC_BASE_PATH));
     srcPath = getStringWithAltKeys(props, HOODIE_SRC_BASE_PATH);
@@ -137,10 +145,9 @@ public class GcsEventsHoodieIncrSource extends 
HoodieIncrSource {
     numInstantsPerFetch = getIntWithAltKeys(props, NUM_INSTANTS_PER_FETCH);
     checkIfFileExists = getBooleanWithAltKeys(props, ENABLE_EXISTS_CHECK);
 
-    this.gcsObjectMetadataFetcher = gcsObjectMetadataFetcher;
-    this.gcsObjectDataFetcher = gcsObjectDataFetcher;
+    this.cloudDataFetcher = cloudDataFetcher;
     this.queryRunner = queryRunner;
-    this.schemaProvider = Option.ofNullable(schemaProvider);
+    this.schemaProvider = Option.ofNullable(streamContext.getSchemaProvider());
     this.snapshotLoadQuerySplitter = 
SnapshotLoadQuerySplitter.getInstance(props);
 
     LOG.info("srcPath: " + srcPath);
@@ -168,28 +175,6 @@ public class GcsEventsHoodieIncrSource extends 
HoodieIncrSource {
           + queryInfo.getStartInstant());
       return Pair.of(Option.empty(), queryInfo.getStartInstant());
     }
-
-    Pair<QueryInfo, Dataset<Row>> queryInfoDatasetPair = 
queryRunner.run(queryInfo, snapshotLoadQuerySplitter);
-    Dataset<Row> filteredSourceData = 
gcsObjectMetadataFetcher.applyFilter(queryInfoDatasetPair.getRight());
-    queryInfo = queryInfoDatasetPair.getLeft();
-    LOG.info("Adjusting end checkpoint:" + queryInfo.getEndInstant() + " based 
on sourceLimit :" + sourceLimit);
-    Pair<CloudObjectIncrCheckpoint, Option<Dataset<Row>>> checkPointAndDataset 
=
-        IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit(
-            filteredSourceData, sourceLimit, queryInfo, 
cloudObjectIncrCheckpoint);
-    if (!checkPointAndDataset.getRight().isPresent()) {
-      LOG.info("Empty source, returning endpoint:" + 
checkPointAndDataset.getLeft());
-      return Pair.of(Option.empty(), 
checkPointAndDataset.getLeft().toString());
-    }
-    LOG.info("Adjusted end checkpoint :" + checkPointAndDataset.getLeft());
-
-    Pair<Option<Dataset<Row>>, String> extractedCheckPointAndDataset = 
extractData(queryInfo, checkPointAndDataset.getRight().get());
-    return Pair.of(extractedCheckPointAndDataset.getLeft(), 
checkPointAndDataset.getLeft().toString());
-  }
-
-  private Pair<Option<Dataset<Row>>, String> extractData(QueryInfo queryInfo, 
Dataset<Row> cloudObjectMetadataDF) {
-    List<CloudObjectMetadata> cloudObjectMetadata = 
gcsObjectMetadataFetcher.getGcsObjectMetadata(sparkContext, 
cloudObjectMetadataDF, checkIfFileExists);
-    LOG.info("Total number of files to process :" + 
cloudObjectMetadata.size());
-    Option<Dataset<Row>> fileDataRows = 
gcsObjectDataFetcher.getCloudObjectDataDF(sparkSession, cloudObjectMetadata, 
props, schemaProvider);
-    return Pair.of(fileDataRows, queryInfo.getEndInstant());
+    return cloudDataFetcher.fetchPartitionedSource(GCS, 
cloudObjectIncrCheckpoint, this.sourceProfileSupplier, 
queryRunner.run(queryInfo, snapshotLoadQuerySplitter), this.schemaProvider, 
sourceLimit);
   }
 }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
index 9ea394889c9..eecab298840 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
@@ -25,9 +25,9 @@ import 
org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.utilities.config.HoodieIncrSourceConfig;
-import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
 import org.apache.hudi.utilities.sources.helpers.QueryInfo;
+import org.apache.hudi.utilities.streamer.StreamContext;
 
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.Dataset;
@@ -127,8 +127,8 @@ public class HoodieIncrSource extends RowSource {
   }
 
   public HoodieIncrSource(TypedProperties props, JavaSparkContext 
sparkContext, SparkSession sparkSession,
-                          SchemaProvider schemaProvider) {
-    super(props, sparkContext, sparkSession, schemaProvider);
+                          StreamContext streamContext) {
+    super(props, sparkContext, sparkSession, streamContext);
     this.snapshotLoadQuerySplitter = 
SnapshotLoadQuerySplitter.getInstance(props);
   }
 
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/RowSource.java 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/RowSource.java
index 1c7e9d99098..f76c285f2bb 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/RowSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/RowSource.java
@@ -26,8 +26,9 @@ import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.utilities.UtilHelpers;
 import org.apache.hudi.utilities.exception.HoodieReadFromSourceException;
 import org.apache.hudi.utilities.schema.SchemaProvider;
-
 import org.apache.hudi.utilities.sources.helpers.SanitizationUtils;
+import org.apache.hudi.utilities.streamer.StreamContext;
+
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
@@ -41,6 +42,11 @@ public abstract class RowSource extends Source<Dataset<Row>> 
{
       SchemaProvider schemaProvider) {
     super(props, sparkContext, sparkSession, schemaProvider, SourceType.ROW);
   }
+  
+  public RowSource(TypedProperties props, JavaSparkContext sparkContext, 
SparkSession sparkSession,
+                   StreamContext streamContext) {
+    super(props, sparkContext, sparkSession, SourceType.ROW, streamContext);
+  }
 
   protected abstract Pair<Option<Dataset<Row>>, String> 
fetchNextBatch(Option<String> lastCkptStr, long sourceLimit);
 
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
index be9914190e7..579bc5c2021 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
@@ -23,41 +23,32 @@ import org.apache.hudi.common.model.HoodieRecord;
 import 
org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.hadoop.fs.HadoopFSUtils;
-import org.apache.hudi.storage.StorageConfiguration;
-import org.apache.hudi.utilities.config.S3EventsHoodieIncrSourceConfig;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher;
 import org.apache.hudi.utilities.sources.helpers.CloudObjectIncrCheckpoint;
-import org.apache.hudi.utilities.sources.helpers.CloudObjectMetadata;
 import org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon;
 import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
 import org.apache.hudi.utilities.sources.helpers.QueryInfo;
 import org.apache.hudi.utilities.sources.helpers.QueryRunner;
+import org.apache.hudi.utilities.streamer.DefaultStreamContext;
+import org.apache.hudi.utilities.streamer.StreamContext;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Collections;
-import java.util.List;
 
 import static 
org.apache.hudi.common.util.ConfigUtils.checkRequiredConfigProperties;
-import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
 import static org.apache.hudi.common.util.ConfigUtils.getIntWithAltKeys;
 import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
 import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
-import static 
org.apache.hudi.utilities.config.CloudSourceConfig.ENABLE_EXISTS_CHECK;
 import static 
org.apache.hudi.utilities.config.HoodieIncrSourceConfig.HOODIE_SRC_BASE_PATH;
 import static 
org.apache.hudi.utilities.config.HoodieIncrSourceConfig.NUM_INSTANTS_PER_FETCH;
-import static 
org.apache.hudi.utilities.config.S3EventsHoodieIncrSourceConfig.S3_FS_PREFIX;
-import static 
org.apache.hudi.utilities.config.S3EventsHoodieIncrSourceConfig.S3_INCR_ENABLE_EXISTS_CHECK;
-import static 
org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon.getCloudObjectMetadataPerPartition;
+import static 
org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon.Type.S3;
 import static 
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.getHollowCommitHandleMode;
 import static 
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.getMissingCheckpointStrategy;
 
@@ -69,7 +60,6 @@ public class S3EventsHoodieIncrSource extends 
HoodieIncrSource {
   private static final Logger LOG = 
LoggerFactory.getLogger(S3EventsHoodieIncrSource.class);
   private final String srcPath;
   private final int numInstantsPerFetch;
-  private final boolean checkIfFileExists;
   private final IncrSourceHelper.MissingCheckpointStrategy 
missingCheckpointStrategy;
   private final QueryRunner queryRunner;
   private final CloudDataFetcher cloudDataFetcher;
@@ -78,50 +68,39 @@ public class S3EventsHoodieIncrSource extends 
HoodieIncrSource {
 
   private final Option<SnapshotLoadQuerySplitter> snapshotLoadQuerySplitter;
 
-  public static class Config {
-    // control whether we do existence check for files before consuming them
-    @Deprecated
-    static final String ENABLE_EXISTS_CHECK = 
S3_INCR_ENABLE_EXISTS_CHECK.key();
-    @Deprecated
-    static final Boolean DEFAULT_ENABLE_EXISTS_CHECK = 
S3_INCR_ENABLE_EXISTS_CHECK.defaultValue();
-
-    @Deprecated
-    static final String S3_FS_PREFIX = 
S3EventsHoodieIncrSourceConfig.S3_FS_PREFIX.key();
-
-    /**
-     * {@link #SPARK_DATASOURCE_OPTIONS} is json string, passed to the reader 
while loading dataset.
-     * Example Hudi Streamer conf
-     * - --hoodie-conf 
hoodie.streamer.source.s3incr.spark.datasource.options={"header":"true","encoding":"UTF-8"}
-     */
-    @Deprecated
-    public static final String SPARK_DATASOURCE_OPTIONS = 
S3EventsHoodieIncrSourceConfig.SPARK_DATASOURCE_OPTIONS.key();
+  public S3EventsHoodieIncrSource(
+      TypedProperties props,
+      JavaSparkContext sparkContext,
+      SparkSession sparkSession,
+      SchemaProvider schemaProvider) {
+    this(props, sparkContext, sparkSession, new QueryRunner(sparkSession, 
props),
+        new CloudDataFetcher(props, sparkContext, sparkSession), new 
DefaultStreamContext(schemaProvider, Option.empty()));
   }
 
   public S3EventsHoodieIncrSource(
       TypedProperties props,
       JavaSparkContext sparkContext,
       SparkSession sparkSession,
-      SchemaProvider schemaProvider) {
-    this(props, sparkContext, sparkSession, schemaProvider, new 
QueryRunner(sparkSession, props),
-        new CloudDataFetcher(props));
+      StreamContext streamContext) {
+    this(props, sparkContext, sparkSession, new QueryRunner(sparkSession, 
props),
+        new CloudDataFetcher(props, sparkContext, sparkSession), 
streamContext);
   }
 
   public S3EventsHoodieIncrSource(
       TypedProperties props,
       JavaSparkContext sparkContext,
       SparkSession sparkSession,
-      SchemaProvider schemaProvider,
       QueryRunner queryRunner,
-      CloudDataFetcher cloudDataFetcher) {
-    super(props, sparkContext, sparkSession, schemaProvider);
+      CloudDataFetcher cloudDataFetcher,
+      StreamContext streamContext) {
+    super(props, sparkContext, sparkSession, streamContext);
     checkRequiredConfigProperties(props, 
Collections.singletonList(HOODIE_SRC_BASE_PATH));
     this.srcPath = getStringWithAltKeys(props, HOODIE_SRC_BASE_PATH);
     this.numInstantsPerFetch = getIntWithAltKeys(props, 
NUM_INSTANTS_PER_FETCH);
-    this.checkIfFileExists = getBooleanWithAltKeys(props, ENABLE_EXISTS_CHECK);
     this.missingCheckpointStrategy = getMissingCheckpointStrategy(props);
     this.queryRunner = queryRunner;
     this.cloudDataFetcher = cloudDataFetcher;
-    this.schemaProvider = Option.ofNullable(schemaProvider);
+    this.schemaProvider = Option.ofNullable(streamContext.getSchemaProvider());
     this.snapshotLoadQuerySplitter = 
SnapshotLoadQuerySplitter.getInstance(props);
   }
 
@@ -144,36 +123,6 @@ public class S3EventsHoodieIncrSource extends 
HoodieIncrSource {
       LOG.warn("Already caught up. No new data to process");
       return Pair.of(Option.empty(), queryInfo.getEndInstant());
     }
-    Pair<QueryInfo, Dataset<Row>> queryInfoDatasetPair = 
queryRunner.run(queryInfo, snapshotLoadQuerySplitter);
-    queryInfo = queryInfoDatasetPair.getLeft();
-    Dataset<Row> filteredSourceData = queryInfoDatasetPair.getRight().filter(
-        
CloudObjectsSelectorCommon.generateFilter(CloudObjectsSelectorCommon.Type.S3, 
props));
-
-    LOG.info("Adjusting end checkpoint:" + queryInfo.getEndInstant() + " based 
on sourceLimit :" + sourceLimit);
-    Pair<CloudObjectIncrCheckpoint, Option<Dataset<Row>>> checkPointAndDataset 
=
-        IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit(
-            filteredSourceData, sourceLimit, queryInfo, 
cloudObjectIncrCheckpoint);
-    if (!checkPointAndDataset.getRight().isPresent()) {
-      LOG.info("Empty source, returning endpoint:" + 
checkPointAndDataset.getLeft());
-      return Pair.of(Option.empty(), 
checkPointAndDataset.getLeft().toString());
-    }
-    LOG.info("Adjusted end checkpoint :" + checkPointAndDataset.getLeft());
-
-    String s3FS = getStringWithAltKeys(props, S3_FS_PREFIX, 
true).toLowerCase();
-    String s3Prefix = s3FS + "://";
-
-    // Create S3 paths
-    StorageConfiguration<Configuration> storageConf = 
HadoopFSUtils.getStorageConfWithCopy(sparkContext.hadoopConfiguration());
-    List<CloudObjectMetadata> cloudObjectMetadata = 
checkPointAndDataset.getRight().get()
-        .select(CloudObjectsSelectorCommon.S3_BUCKET_NAME,
-                CloudObjectsSelectorCommon.S3_OBJECT_KEY,
-                CloudObjectsSelectorCommon.S3_OBJECT_SIZE)
-        .distinct()
-        .mapPartitions(getCloudObjectMetadataPerPartition(s3Prefix, 
storageConf, checkIfFileExists), Encoders.kryo(CloudObjectMetadata.class))
-        .collectAsList();
-    LOG.info("Total number of files to process :" + 
cloudObjectMetadata.size());
-
-    Option<Dataset<Row>> datasetOption = 
cloudDataFetcher.getCloudObjectDataDF(sparkSession, cloudObjectMetadata, props, 
schemaProvider);
-    return Pair.of(datasetOption, checkPointAndDataset.getLeft().toString());
+    return cloudDataFetcher.fetchPartitionedSource(S3, 
cloudObjectIncrCheckpoint, this.sourceProfileSupplier, 
queryRunner.run(queryInfo, snapshotLoadQuerySplitter), this.schemaProvider, 
sourceLimit);
   }
 }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudDataFetcher.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudDataFetcher.java
index ed1a49e33e7..06fb89da9a4 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudDataFetcher.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudDataFetcher.java
@@ -21,8 +21,11 @@ package org.apache.hudi.utilities.sources.helpers;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.utilities.streamer.SourceProfileSupplier;
 
+import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
@@ -32,10 +35,13 @@ import org.slf4j.LoggerFactory;
 import java.io.Serializable;
 import java.util.List;
 
+import static 
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_MAX_FILE_SIZE;
+import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
 import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
 import static 
org.apache.hudi.utilities.config.CloudSourceConfig.DATAFILE_FORMAT;
+import static 
org.apache.hudi.utilities.config.CloudSourceConfig.ENABLE_EXISTS_CHECK;
+import static 
org.apache.hudi.utilities.config.CloudSourceConfig.SOURCE_MAX_BYTES_PER_PARTITION;
 import static 
org.apache.hudi.utilities.config.HoodieIncrSourceConfig.SOURCE_FILE_FORMAT;
-import static 
org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon.loadAsDataset;
 
 /**
  * Connects to S3/GCS from Spark and downloads data from a given list of files.
@@ -45,14 +51,24 @@ public class CloudDataFetcher implements Serializable {
 
   private static final String EMPTY_STRING = "";
 
-  private final TypedProperties props;
+  private transient TypedProperties props;
+  private transient JavaSparkContext sparkContext;
+  private transient SparkSession sparkSession;
+  private transient CloudObjectsSelectorCommon cloudObjectsSelectorCommon;
 
   private static final Logger LOG = 
LoggerFactory.getLogger(CloudDataFetcher.class);
 
   private static final long serialVersionUID = 1L;
 
-  public CloudDataFetcher(TypedProperties props) {
+  public CloudDataFetcher(TypedProperties props, JavaSparkContext jsc, 
SparkSession sparkSession) {
+    this(props, jsc, sparkSession, new CloudObjectsSelectorCommon(props));
+  }
+
+  public CloudDataFetcher(TypedProperties props, JavaSparkContext jsc, 
SparkSession sparkSession, CloudObjectsSelectorCommon 
cloudObjectsSelectorCommon) {
     this.props = props;
+    this.sparkContext = jsc;
+    this.sparkSession = sparkSession;
+    this.cloudObjectsSelectorCommon = cloudObjectsSelectorCommon;
   }
 
   public static String getFileFormat(TypedProperties props) {
@@ -63,8 +79,59 @@ public class CloudDataFetcher implements Serializable {
         : getStringWithAltKeys(props, DATAFILE_FORMAT, EMPTY_STRING);
   }
 
-  public Option<Dataset<Row>> getCloudObjectDataDF(SparkSession spark, 
List<CloudObjectMetadata> cloudObjectMetadata,
-                                                   TypedProperties props, 
Option<SchemaProvider> schemaProviderOption) {
-    return loadAsDataset(spark, cloudObjectMetadata, props, 
getFileFormat(props), schemaProviderOption);
+  public Pair<Option<Dataset<Row>>, String> fetchPartitionedSource(
+      CloudObjectsSelectorCommon.Type cloudType,
+      CloudObjectIncrCheckpoint cloudObjectIncrCheckpoint,
+      Option<SourceProfileSupplier> sourceProfileSupplier,
+      Pair<QueryInfo, Dataset<Row>> queryInfoDatasetPair,
+      Option<SchemaProvider> schemaProvider,
+      long sourceLimit) {
+    boolean isSourceProfileSupplierAvailable = 
sourceProfileSupplier.isPresent() && 
sourceProfileSupplier.get().getSourceProfile() != null;
+    if (isSourceProfileSupplierAvailable) {
+      LOG.debug("Using source limit from source profile sourceLimitFromConfig 
{} sourceLimitFromProfile {}", sourceLimit, 
sourceProfileSupplier.get().getSourceProfile().getMaxSourceBytes());
+      sourceLimit = 
sourceProfileSupplier.get().getSourceProfile().getMaxSourceBytes();
+    }
+
+    QueryInfo queryInfo = queryInfoDatasetPair.getLeft();
+    String filter = CloudObjectsSelectorCommon.generateFilter(cloudType, 
props);
+    LOG.info("Adding filter string to Dataset: " + filter);
+    Dataset<Row> filteredSourceData = 
queryInfoDatasetPair.getRight().filter(filter);
+
+    LOG.info("Adjusting end checkpoint:" + queryInfo.getEndInstant() + " based 
on sourceLimit :" + sourceLimit);
+    Pair<CloudObjectIncrCheckpoint, Option<Dataset<Row>>> checkPointAndDataset 
=
+        IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit(
+            filteredSourceData, sourceLimit, queryInfo, 
cloudObjectIncrCheckpoint);
+    if (!checkPointAndDataset.getRight().isPresent()) {
+      LOG.info("Empty source, returning endpoint:" + 
checkPointAndDataset.getLeft());
+      return Pair.of(Option.empty(), 
checkPointAndDataset.getLeft().toString());
+    }
+    LOG.info("Adjusted end checkpoint :" + checkPointAndDataset.getLeft());
+
+    boolean checkIfFileExists = getBooleanWithAltKeys(props, 
ENABLE_EXISTS_CHECK);
+    List<CloudObjectMetadata> cloudObjectMetadata = 
CloudObjectsSelectorCommon.getObjectMetadata(cloudType, sparkContext, 
checkPointAndDataset.getRight().get(), checkIfFileExists, props);
+    LOG.info("Total number of files to process :" + 
cloudObjectMetadata.size());
+
+    long bytesPerPartition = 
props.containsKey(SOURCE_MAX_BYTES_PER_PARTITION.key()) ? 
props.getLong(SOURCE_MAX_BYTES_PER_PARTITION.key()) :
+        props.getLong(PARQUET_MAX_FILE_SIZE.key(), 
Long.parseLong(PARQUET_MAX_FILE_SIZE.defaultValue()));
+    if (isSourceProfileSupplierAvailable) {
+      long bytesPerPartitionFromProfile = (long) 
sourceProfileSupplier.get().getSourceProfile().getSourceSpecificContext();
+      if (bytesPerPartitionFromProfile > 0) {
+        LOG.debug("Using bytesPerPartition from source profile 
bytesPerPartitionFromConfig {} bytesPerPartitionFromProfile {}", 
bytesPerPartition, bytesPerPartitionFromProfile);
+        bytesPerPartition = bytesPerPartitionFromProfile;
+      }
+    }
+    Option<Dataset<Row>> datasetOption = 
getCloudObjectDataDF(cloudObjectMetadata, schemaProvider, bytesPerPartition);
+    return Pair.of(datasetOption, checkPointAndDataset.getLeft().toString());
+  }
+
+  private Option<Dataset<Row>> getCloudObjectDataDF(List<CloudObjectMetadata> 
cloudObjectMetadata, Option<SchemaProvider> schemaProviderOption, long 
bytesPerPartition) {
+    long totalSize = 0;
+    for (CloudObjectMetadata o : cloudObjectMetadata) {
+      totalSize += o.getSize();
+    }
+    // inflate 10% for potential hoodie meta fields
+    double totalSizeWithHoodieMetaFields = totalSize * 1.1;
+    int numPartitions = (int) Math.max(Math.ceil(totalSizeWithHoodieMetaFields 
/ bytesPerPartition), 1);
+    return cloudObjectsSelectorCommon.loadAsDataset(sparkSession, 
cloudObjectMetadata, getFileFormat(props), schemaProviderOption, numPartitions);
   }
 }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java
index 8a442455291..8aee9d92754 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java
@@ -37,9 +37,11 @@ import org.apache.avro.Schema;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.function.MapPartitionsFunction;
 import org.apache.spark.sql.DataFrameReader;
 import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
 import org.slf4j.Logger;
@@ -53,7 +55,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
-import static 
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_MAX_FILE_SIZE;
 import static org.apache.hudi.common.util.CollectionUtils.isNullOrEmpty;
 import static org.apache.hudi.common.util.ConfigUtils.containsConfigProperty;
 import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
@@ -62,8 +63,8 @@ import static 
org.apache.hudi.utilities.config.CloudSourceConfig.IGNORE_RELATIVE
 import static 
org.apache.hudi.utilities.config.CloudSourceConfig.IGNORE_RELATIVE_PATH_SUBSTR;
 import static 
org.apache.hudi.utilities.config.CloudSourceConfig.PATH_BASED_PARTITION_FIELDS;
 import static 
org.apache.hudi.utilities.config.CloudSourceConfig.SELECT_RELATIVE_PATH_PREFIX;
-import static 
org.apache.hudi.utilities.config.CloudSourceConfig.SOURCE_MAX_BYTES_PER_PARTITION;
 import static 
org.apache.hudi.utilities.config.CloudSourceConfig.SPARK_DATASOURCE_READER_COMMA_SEPARATED_PATH_FORMAT;
+import static 
org.apache.hudi.utilities.config.S3EventsHoodieIncrSourceConfig.S3_FS_PREFIX;
 import static 
org.apache.hudi.utilities.config.S3EventsHoodieIncrSourceConfig.S3_IGNORE_KEY_PREFIX;
 import static 
org.apache.hudi.utilities.config.S3EventsHoodieIncrSourceConfig.S3_IGNORE_KEY_SUBSTRING;
 import static 
org.apache.hudi.utilities.config.S3EventsHoodieIncrSourceConfig.S3_KEY_PREFIX;
@@ -85,6 +86,13 @@ public class CloudObjectsSelectorCommon {
   public static final String GCS_OBJECT_KEY = "name";
   public static final String GCS_OBJECT_SIZE = "size";
   private static final String SPACE_DELIMTER = " ";
+  private static final String GCS_PREFIX = "gs://";
+
+  private final TypedProperties properties;
+
+  public CloudObjectsSelectorCommon(TypedProperties properties) {
+    this.properties = properties;
+  }
 
   /**
    * Return a function that extracts filepaths from a list of Rows.
@@ -205,8 +213,40 @@ public class CloudObjectsSelectorCommon {
     return filter.toString();
   }
 
-  public static Option<Dataset<Row>> loadAsDataset(SparkSession spark, 
List<CloudObjectMetadata> cloudObjectMetadata,
-                                                   TypedProperties props, 
String fileFormat, Option<SchemaProvider> schemaProviderOption) {
+  /**
+   * @param cloudObjectMetadataDF a Dataset that contains metadata of S3/GCS 
objects. Assumed to be a persisted form
+   *                              of a Cloud Storage SQS/PubSub Notification 
event.
+   * @param checkIfExists         Check if each file exists, before returning 
its full path
+   * @return A {@link List} of {@link CloudObjectMetadata} containing file 
info.
+   */
+  public static List<CloudObjectMetadata> getObjectMetadata(
+      Type type,
+      JavaSparkContext jsc,
+      Dataset<Row> cloudObjectMetadataDF,
+      boolean checkIfExists,
+      TypedProperties props
+  ) {
+    StorageConfiguration<Configuration> storageConf = 
HadoopFSUtils.getStorageConfWithCopy(jsc.hadoopConfiguration());
+    if (type == Type.GCS) {
+      return cloudObjectMetadataDF
+          .select("bucket", "name", "size")
+          .distinct()
+          .mapPartitions(getCloudObjectMetadataPerPartition(GCS_PREFIX, 
storageConf, checkIfExists), Encoders.kryo(CloudObjectMetadata.class))
+          .collectAsList();
+    } else if (type == Type.S3) {
+      String s3FS = getStringWithAltKeys(props, S3_FS_PREFIX, 
true).toLowerCase();
+      String s3Prefix = s3FS + "://";
+      return cloudObjectMetadataDF
+          .select(CloudObjectsSelectorCommon.S3_BUCKET_NAME, 
CloudObjectsSelectorCommon.S3_OBJECT_KEY, 
CloudObjectsSelectorCommon.S3_OBJECT_SIZE)
+          .distinct()
+          .mapPartitions(getCloudObjectMetadataPerPartition(s3Prefix, 
storageConf, checkIfExists), Encoders.kryo(CloudObjectMetadata.class))
+          .collectAsList();
+    }
+    throw new UnsupportedOperationException("Invalid cloud type " + type);
+  }
+
+  public Option<Dataset<Row>> loadAsDataset(SparkSession spark, 
List<CloudObjectMetadata> cloudObjectMetadata,
+                                            String fileFormat, 
Option<SchemaProvider> schemaProviderOption, int numPartitions) {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Extracted distinct files " + cloudObjectMetadata.size()
           + " and some samples " + 
cloudObjectMetadata.stream().map(CloudObjectMetadata::getPath).limit(10).collect(Collectors.toList()));
@@ -216,7 +256,7 @@ public class CloudObjectsSelectorCommon {
       return Option.empty();
     }
     DataFrameReader reader = spark.read().format(fileFormat);
-    String datasourceOpts = getStringWithAltKeys(props, 
CloudSourceConfig.SPARK_DATASOURCE_OPTIONS, true);
+    String datasourceOpts = getStringWithAltKeys(properties, 
CloudSourceConfig.SPARK_DATASOURCE_OPTIONS, true);
     if (schemaProviderOption.isPresent()) {
       Schema sourceSchema = schemaProviderOption.get().getSourceSchema();
       if (sourceSchema != null && 
!sourceSchema.equals(InputBatch.NULL_SCHEMA)) {
@@ -225,7 +265,7 @@ public class CloudObjectsSelectorCommon {
     }
     if (StringUtils.isNullOrEmpty(datasourceOpts)) {
       // fall back to legacy config for BWC. TODO consolidate in HUDI-6020
-      datasourceOpts = getStringWithAltKeys(props, 
S3EventsHoodieIncrSourceConfig.SPARK_DATASOURCE_OPTIONS, true);
+      datasourceOpts = getStringWithAltKeys(properties, 
S3EventsHoodieIncrSourceConfig.SPARK_DATASOURCE_OPTIONS, true);
     }
     if (StringUtils.nonEmpty(datasourceOpts)) {
       final ObjectMapper mapper = new ObjectMapper();
@@ -239,18 +279,10 @@ public class CloudObjectsSelectorCommon {
       reader = reader.options(sparkOptionsMap);
     }
     List<String> paths = new ArrayList<>();
-    long totalSize = 0;
     for (CloudObjectMetadata o : cloudObjectMetadata) {
       paths.add(o.getPath());
-      totalSize += o.getSize();
     }
-    // inflate 10% for potential hoodie meta fields
-    totalSize *= 1.1;
-    // if source bytes are provided, then give preference to that.
-    long bytesPerPartition = 
props.containsKey(SOURCE_MAX_BYTES_PER_PARTITION.key()) ? 
props.getLong(SOURCE_MAX_BYTES_PER_PARTITION.key()) :
-        props.getLong(PARQUET_MAX_FILE_SIZE.key(), 
Long.parseLong(PARQUET_MAX_FILE_SIZE.defaultValue()));
-    int numPartitions = (int) Math.max(Math.ceil(totalSize / 
bytesPerPartition), 1);
-    boolean isCommaSeparatedPathFormat = 
props.getBoolean(SPARK_DATASOURCE_READER_COMMA_SEPARATED_PATH_FORMAT.key(), 
false);
+    boolean isCommaSeparatedPathFormat = 
properties.getBoolean(SPARK_DATASOURCE_READER_COMMA_SEPARATED_PATH_FORMAT.key(),
 false);
 
     Dataset<Row> dataset;
     if (isCommaSeparatedPathFormat) {
@@ -260,8 +292,8 @@ public class CloudObjectsSelectorCommon {
     }
 
     // add partition column from source path if configured
-    if (containsConfigProperty(props, PATH_BASED_PARTITION_FIELDS)) {
-      String[] partitionKeysToAdd = getStringWithAltKeys(props, 
PATH_BASED_PARTITION_FIELDS).split(",");
+    if (containsConfigProperty(properties, PATH_BASED_PARTITION_FIELDS)) {
+      String[] partitionKeysToAdd = getStringWithAltKeys(properties, 
PATH_BASED_PARTITION_FIELDS).split(",");
       // Add partition column for all path-based partition keys. If key is not 
present in path, the value will be null.
       for (String partitionKey : partitionKeysToAdd) {
         String partitionPathPattern = String.format("%s=", partitionKey);
@@ -284,10 +316,6 @@ public class CloudObjectsSelectorCommon {
     return dataset;
   }
 
-  public static Option<Dataset<Row>> loadAsDataset(SparkSession spark, 
List<CloudObjectMetadata> cloudObjectMetadata, TypedProperties props, String 
fileFormat) {
-    return loadAsDataset(spark, cloudObjectMetadata, props, fileFormat, 
Option.empty());
-  }
-
   private static Option<String> getPropVal(TypedProperties props, 
ConfigProperty<String> configProperty) {
     String value = getStringWithAltKeys(props, configProperty, true);
     if (!StringUtils.isNullOrEmpty(value)) {
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/GcsObjectMetadataFetcher.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/GcsObjectMetadataFetcher.java
deleted file mode 100644
index 21ca334d05f..00000000000
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/GcsObjectMetadataFetcher.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.utilities.sources.helpers.gcs;
-
-import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.hadoop.fs.HadoopFSUtils;
-import org.apache.hudi.storage.StorageConfiguration;
-import org.apache.hudi.utilities.sources.helpers.CloudObjectMetadata;
-import org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Encoders;
-import org.apache.spark.sql.Row;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Serializable;
-import java.util.List;
-
-import static 
org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon.getCloudObjectMetadataPerPartition;
-
-/**
- * Extracts a list of GCS {@link CloudObjectMetadata} containing metadata of 
GCS objects from a given Spark Dataset as input.
- * Optionally:
- * i) Match the filename and path against provided input filter strings
- * ii) Check if each file exists on GCS, in which case it assumes SparkContext 
is already
- * configured with GCS options through 
GcsEventsHoodieIncrSource.addGcsAccessConfs().
- */
-public class GcsObjectMetadataFetcher implements Serializable {
-
-  private final TypedProperties props;
-
-  private static final String GCS_PREFIX = "gs://";
-  private static final long serialVersionUID = 1L;
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(GcsObjectMetadataFetcher.class);
-
-  public GcsObjectMetadataFetcher(TypedProperties props) {
-    this.props = props;
-  }
-
-  /**
-   * @param cloudObjectMetadataDF a Dataset that contains metadata of GCS 
objects. Assumed to be a persisted form
-   *                              of a Cloud Storage Pubsub Notification event.
-   * @param checkIfExists         Check if each file exists, before returning 
its full path
-   * @return A {@link List} of {@link CloudObjectMetadata} containing GCS info.
-   */
-  public List<CloudObjectMetadata> getGcsObjectMetadata(JavaSparkContext jsc, 
Dataset<Row> cloudObjectMetadataDF, boolean checkIfExists) {
-    StorageConfiguration<Configuration> storageConf = 
HadoopFSUtils.getStorageConfWithCopy(jsc.hadoopConfiguration());
-    return cloudObjectMetadataDF
-        .select("bucket", "name", "size")
-        .distinct()
-        .mapPartitions(getCloudObjectMetadataPerPartition(GCS_PREFIX, 
storageConf, checkIfExists), Encoders.kryo(CloudObjectMetadata.class))
-        .collectAsList();
-  }
-
-  /**
-   * @param cloudObjectMetadataDF a Dataset that contains metadata of GCS 
objects. Assumed to be a persisted form
-   *                              of a Cloud Storage Pubsub Notification event.
-   * @return Dataset<Row> after apply the filtering.
-   */
-  public Dataset<Row> applyFilter(Dataset<Row> cloudObjectMetadataDF) {
-    String filter = 
CloudObjectsSelectorCommon.generateFilter(CloudObjectsSelectorCommon.Type.GCS, 
props);
-    LOG.info("Adding filter string to Dataset: " + filter);
-
-    return cloudObjectMetadataDF.filter(filter);
-  }
-}
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
index 8d529fda073..dda205db8f8 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
@@ -36,14 +36,19 @@ import org.apache.hudi.config.HoodieArchivalConfig;
 import org.apache.hudi.config.HoodieCleanConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
+import org.apache.hudi.utilities.UtilHelpers;
 import org.apache.hudi.utilities.config.CloudSourceConfig;
+import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
 import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
 import org.apache.hudi.utilities.schema.SchemaProvider;
+import 
org.apache.hudi.utilities.sources.TestS3EventsHoodieIncrSource.TestSourceProfile;
 import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher;
+import org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon;
 import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
 import org.apache.hudi.utilities.sources.helpers.QueryInfo;
 import org.apache.hudi.utilities.sources.helpers.QueryRunner;
-import org.apache.hudi.utilities.sources.helpers.gcs.GcsObjectMetadataFetcher;
+import org.apache.hudi.utilities.streamer.DefaultStreamContext;
+import org.apache.hudi.utilities.streamer.SourceProfileSupplier;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -61,8 +66,8 @@ import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.CsvSource;
 import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
-import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -78,8 +83,12 @@ import java.util.stream.Collectors;
 
 import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
 import static 
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.times;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -94,13 +103,14 @@ public class TestGcsEventsHoodieIncrSource extends 
SparkClientFunctionalTestHarn
   @TempDir
   protected java.nio.file.Path tempDir;
 
-  @Mock
-  CloudDataFetcher gcsObjectDataFetcher;
-
   @Mock
   QueryRunner queryRunner;
   @Mock
   QueryInfo queryInfo;
+  @Mock
+  CloudObjectsSelectorCommon cloudObjectsSelectorCommon;
+  @Mock
+  SourceProfileSupplier sourceProfileSupplier;
 
   protected Option<SchemaProvider> schemaProvider;
   private HoodieTableMetaClient metaClient;
@@ -133,9 +143,6 @@ public class TestGcsEventsHoodieIncrSource extends 
SparkClientFunctionalTestHarn
     Pair<String, List<HoodieRecord>> inserts = 
writeGcsMetadataRecords(commitTimeForWrites);
 
     readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 
100L, inserts.getKey());
-
-    verify(gcsObjectDataFetcher, times(0)).getCloudObjectDataDF(
-        Mockito.any(), Mockito.any(), Mockito.any(), eq(schemaProvider));
   }
 
   @Test
@@ -151,7 +158,7 @@ public class TestGcsEventsHoodieIncrSource extends 
SparkClientFunctionalTestHarn
     Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
 
     setMockQueryRunner(inputDs);
-
+    when(sourceProfileSupplier.getSourceProfile()).thenReturn(null);
     readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 
100L, "1#path/to/file1.json");
   }
 
@@ -170,7 +177,7 @@ public class TestGcsEventsHoodieIncrSource extends 
SparkClientFunctionalTestHarn
     Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
 
     setMockQueryRunner(inputDs);
-
+    when(sourceProfileSupplier.getSourceProfile()).thenReturn(null);
     readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 
250L, "1#path/to/file2.json");
     readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file2.json"), 
250L, "1#path/to/file3.json");
   }
@@ -193,7 +200,7 @@ public class TestGcsEventsHoodieIncrSource extends 
SparkClientFunctionalTestHarn
     Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
 
     setMockQueryRunner(inputDs);
-
+    when(sourceProfileSupplier.getSourceProfile()).thenReturn(null);
     readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 
250L, "1#path/to/file10006.json");
     readAndAssert(READ_UPTO_LATEST_COMMIT, 
Option.of("1#path/to/file10006.json"), 250L, "1#path/to/file10007.json");
   }
@@ -227,15 +234,20 @@ public class TestGcsEventsHoodieIncrSource extends 
SparkClientFunctionalTestHarn
     filePathSizeAndCommitTime.add(Triple.of(String.format("path/to/file5%s", 
extension), 150L, "2"));
 
     Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
-
+    List<Long> bytesPerPartition = Arrays.asList(10L, 100L, -1L);
     setMockQueryRunner(inputDs);
 
-    readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1"), 100L,
-                  "1#path/to/file1" + extension, typedProperties);
-    readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file1" + 
extension), 100L,
-                  "1#path/to/file2" + extension, typedProperties);
-    readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file2" + 
extension), 1000L,
-                  "2#path/to/file5" + extension, typedProperties);
+    when(sourceProfileSupplier.getSourceProfile()).thenReturn(new 
TestSourceProfile(100L, bytesPerPartition.get(0)));
+    readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1"), 100L, 
"1#path/to/file1" + extension, typedProperties);
+    when(sourceProfileSupplier.getSourceProfile()).thenReturn(new 
TestSourceProfile(100L, bytesPerPartition.get(1)));
+    readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file1" + 
extension), 100L, "1#path/to/file2" + extension, typedProperties);
+    when(sourceProfileSupplier.getSourceProfile()).thenReturn(new 
TestSourceProfile(1000L, bytesPerPartition.get(2)));
+    readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file2" + 
extension), 1000L, "2#path/to/file5" + extension, typedProperties);
+    // Verify the partitions being passed in getCloudObjectDataDF are correct.
+    List<Integer> numPartitions = Arrays.asList(12, 2, 1);
+    ArgumentCaptor<Integer> argumentCaptor = 
ArgumentCaptor.forClass(Integer.class);
+    verify(cloudObjectsSelectorCommon, atLeastOnce()).loadAsDataset(any(), 
any(), any(), eq(schemaProvider), argumentCaptor.capture());
+    Assertions.assertEquals(numPartitions, argumentCaptor.getAllValues());
   }
 
   @ParameterizedTest
@@ -264,15 +276,41 @@ public class TestGcsEventsHoodieIncrSource extends 
SparkClientFunctionalTestHarn
     setMockQueryRunner(inputDs, Option.of(snapshotCheckPoint));
     TypedProperties typedProperties = setProps(READ_UPTO_LATEST_COMMIT);
     
typedProperties.setProperty("hoodie.streamer.source.cloud.data.ignore.relpath.prefix",
 "path/to/skip");
+    when(sourceProfileSupplier.getSourceProfile()).thenReturn(null);
+    List<Long> bytesPerPartition = Arrays.asList(10L, 20L, -1L, 1000L * 1000L 
* 1000L);
+
     //1. snapshot query, read all records
+    when(sourceProfileSupplier.getSourceProfile()).thenReturn(new 
TestSourceProfile(50000L, bytesPerPartition.get(0)));
     readAndAssert(READ_UPTO_LATEST_COMMIT, Option.empty(), 50000L, exptected1, 
typedProperties);
     //2. incremental query, as commit is present in timeline
+    when(sourceProfileSupplier.getSourceProfile()).thenReturn(new 
TestSourceProfile(10L, bytesPerPartition.get(1)));
     readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(exptected1), 10L, 
exptected2, typedProperties);
     //3. snapshot query with source limit less than first commit size
+    when(sourceProfileSupplier.getSourceProfile()).thenReturn(new 
TestSourceProfile(50L, bytesPerPartition.get(2)));
     readAndAssert(READ_UPTO_LATEST_COMMIT, Option.empty(), 50L, exptected3, 
typedProperties);
     
typedProperties.setProperty("hoodie.streamer.source.cloud.data.ignore.relpath.prefix",
 "path/to");
     //4. As snapshotQuery will return 1 -> same would be return as 
nextCheckpoint (dataset is empty due to ignore prefix).
+    when(sourceProfileSupplier.getSourceProfile()).thenReturn(new 
TestSourceProfile(50L, bytesPerPartition.get(3)));
     readAndAssert(READ_UPTO_LATEST_COMMIT, Option.empty(), 50L, exptected4, 
typedProperties);
+    // Verify the partitions being passed in getCloudObjectDataDF are correct.
+    ArgumentCaptor<Integer> argumentCaptor = 
ArgumentCaptor.forClass(Integer.class);
+    verify(cloudObjectsSelectorCommon, atLeastOnce()).loadAsDataset(any(), 
any(), any(), eq(schemaProvider), argumentCaptor.capture());
+    if (snapshotCheckPoint.equals("1") || snapshotCheckPoint.equals("2")) {
+      Assertions.assertEquals(Arrays.asList(12, 3, 1), 
argumentCaptor.getAllValues());
+    } else {
+      Assertions.assertEquals(Arrays.asList(23, 1), 
argumentCaptor.getAllValues());
+    }
+  }
+
+  @Test
+  public void testCreateSource() throws IOException {
+    TypedProperties typedProperties = setProps(READ_UPTO_LATEST_COMMIT);
+    HoodieIngestionMetrics metrics = mock(HoodieIngestionMetrics.class);
+    Source gcsSource = 
UtilHelpers.createSource(GcsEventsHoodieIncrSource.class.getName(), 
typedProperties, jsc(), spark(), metrics,
+        new DefaultStreamContext(schemaProvider.orElse(null), 
Option.of(sourceProfileSupplier)));
+    assertEquals(Source.SourceType.ROW, gcsSource.getSourceType());
+    assertThrows(IOException.class, () -> 
UtilHelpers.createSource(GcsEventsHoodieIncrSource.class.getName(), new 
TypedProperties(), jsc(), spark(), metrics,
+        new DefaultStreamContext(schemaProvider.orElse(null), 
Option.of(sourceProfileSupplier))));
   }
 
   private void setMockQueryRunner(Dataset<Row> inputDs) {
@@ -281,7 +319,7 @@ public class TestGcsEventsHoodieIncrSource extends 
SparkClientFunctionalTestHarn
 
   private void setMockQueryRunner(Dataset<Row> inputDs, Option<String> 
nextCheckPointOpt) {
 
-    when(queryRunner.run(Mockito.any(QueryInfo.class), 
Mockito.any())).thenAnswer(invocation -> {
+    when(queryRunner.run(any(QueryInfo.class), any())).thenAnswer(invocation 
-> {
       QueryInfo queryInfo = invocation.getArgument(0);
       QueryInfo updatedQueryInfo = nextCheckPointOpt.map(nextCheckPoint ->
               queryInfo.withUpdatedEndInstant(nextCheckPoint))
@@ -302,7 +340,8 @@ public class TestGcsEventsHoodieIncrSource extends 
SparkClientFunctionalTestHarn
                              TypedProperties typedProperties) {
 
     GcsEventsHoodieIncrSource incrSource = new 
GcsEventsHoodieIncrSource(typedProperties, jsc(),
-        spark(), schemaProvider.orElse(null), new 
GcsObjectMetadataFetcher(typedProperties), gcsObjectDataFetcher, queryRunner);
+        spark(), new CloudDataFetcher(typedProperties, jsc(), spark(), 
cloudObjectsSelectorCommon), queryRunner,
+        new DefaultStreamContext(schemaProvider.orElse(null), 
Option.of(sourceProfileSupplier)));
 
     Pair<Option<Dataset<Row>>, String> dataAndCheckpoint = 
incrSource.fetchNextBatch(checkpointToPull, sourceLimit);
 
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java
index d01543044b0..c1e7f9dca49 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java
@@ -43,6 +43,7 @@ import 
org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
 import org.apache.hudi.utilities.sources.helpers.TestSnapshotQuerySplitterImpl;
+import org.apache.hudi.utilities.streamer.DefaultStreamContext;
 
 import org.apache.avro.Schema;
 import org.apache.spark.api.java.JavaRDD;
@@ -335,7 +336,7 @@ public class TestHoodieIncrSource extends 
SparkClientFunctionalTestHarness {
     snapshotCheckPointImplClassOpt.map(className ->
         
properties.setProperty(SnapshotLoadQuerySplitter.Config.SNAPSHOT_LOAD_QUERY_SPLITTER_CLASS_NAME,
 className));
     TypedProperties typedProperties = new TypedProperties(properties);
-    HoodieIncrSource incrSource = new HoodieIncrSource(typedProperties, jsc(), 
spark(), new DummySchemaProvider(HoodieTestDataGenerator.AVRO_SCHEMA));
+    HoodieIncrSource incrSource = new HoodieIncrSource(typedProperties, jsc(), 
spark(), new DefaultStreamContext(new 
DummySchemaProvider(HoodieTestDataGenerator.AVRO_SCHEMA), Option.empty()));
 
     // read everything until latest
     Pair<Option<Dataset<Row>>, String> batchCheckPoint = 
incrSource.fetchNextBatch(checkpointToPull, 500);
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java
index 553078ff3fc..be26dfb1f3b 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java
@@ -36,14 +36,20 @@ import org.apache.hudi.config.HoodieArchivalConfig;
 import org.apache.hudi.config.HoodieCleanConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
+import org.apache.hudi.utilities.UtilHelpers;
 import org.apache.hudi.utilities.config.CloudSourceConfig;
+import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
 import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher;
+import org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon;
 import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
 import org.apache.hudi.utilities.sources.helpers.QueryInfo;
 import org.apache.hudi.utilities.sources.helpers.QueryRunner;
 import 
org.apache.hudi.utilities.sources.helpers.TestCloudObjectsSelectorCommon;
+import org.apache.hudi.utilities.streamer.DefaultStreamContext;
+import org.apache.hudi.utilities.streamer.SourceProfile;
+import org.apache.hudi.utilities.streamer.SourceProfileSupplier;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -61,6 +67,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.CsvSource;
 import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.junit.jupiter.MockitoExtension;
@@ -68,6 +75,7 @@ import org.mockito.junit.jupiter.MockitoExtension;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -76,7 +84,10 @@ import java.util.stream.Collectors;
 
 import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
 import static 
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT;
-import static org.mockito.ArgumentMatchers.eq;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 @ExtendWith(MockitoExtension.class)
@@ -93,7 +104,9 @@ public class TestS3EventsHoodieIncrSource extends 
SparkClientFunctionalTestHarne
   @Mock
   QueryRunner mockQueryRunner;
   @Mock
-  CloudDataFetcher mockCloudDataFetcher;
+  CloudObjectsSelectorCommon mockCloudObjectsSelectorCommon;
+  @Mock
+  SourceProfileSupplier sourceProfileSupplier;
   @Mock
   QueryInfo queryInfo;
   private JavaSparkContext jsc;
@@ -257,8 +270,8 @@ public class TestS3EventsHoodieIncrSource extends 
SparkClientFunctionalTestHarne
     Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
 
     setMockQueryRunner(inputDs);
-    when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(), 
Mockito.any(), Mockito.any(), eq(schemaProvider)))
-        .thenReturn(Option.empty());
+    when(mockCloudObjectsSelectorCommon.loadAsDataset(Mockito.any(), 
Mockito.any(), Mockito.any(), Mockito.eq(schemaProvider), 
Mockito.anyInt())).thenReturn(Option.empty());
+    when(sourceProfileSupplier.getSourceProfile()).thenReturn(null);
 
     readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 
100L, "1#path/to/file1.json");
     readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file1.json"), 
200L, "1#path/to/file2.json");
@@ -282,8 +295,8 @@ public class TestS3EventsHoodieIncrSource extends 
SparkClientFunctionalTestHarne
     Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
 
     setMockQueryRunner(inputDs);
-    when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(), 
Mockito.any(), Mockito.any(), eq(schemaProvider)))
-        .thenReturn(Option.empty());
+    when(mockCloudObjectsSelectorCommon.loadAsDataset(Mockito.any(), 
Mockito.any(), Mockito.any(), Mockito.eq(schemaProvider), 
Mockito.anyInt())).thenReturn(Option.empty());
+    when(sourceProfileSupplier.getSourceProfile()).thenReturn(null);
 
     readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 
250L, "1#path/to/file2.json");
     readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file2.json"), 
250L, "1#path/to/file3.json");
@@ -322,15 +335,15 @@ public class TestS3EventsHoodieIncrSource extends 
SparkClientFunctionalTestHarne
     Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
 
     setMockQueryRunner(inputDs);
-    when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(), 
Mockito.any(), Mockito.any(), eq(schemaProvider)))
-        .thenReturn(Option.empty());
+    when(mockCloudObjectsSelectorCommon.loadAsDataset(Mockito.any(), 
Mockito.any(), Mockito.any(), Mockito.eq(schemaProvider), 
Mockito.anyInt())).thenReturn(Option.empty());
+    when(sourceProfileSupplier.getSourceProfile()).thenReturn(null);
 
     readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1"), 100L,
-                  "1#path/to/file1" + extension, typedProperties);
+        "1#path/to/file1" + extension, typedProperties);
     readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file1" + 
extension), 100L,
-                  "1#path/to/file2" + extension, typedProperties);
+        "1#path/to/file2" + extension, typedProperties);
     readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file2" + 
extension), 1000L,
-                  "2#path/to/file5" + extension, typedProperties);
+        "2#path/to/file5" + extension, typedProperties);
   }
 
   @Test
@@ -363,8 +376,9 @@ public class TestS3EventsHoodieIncrSource extends 
SparkClientFunctionalTestHarne
     readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("2"), 1000L, "2", 
typedProperties);
   }
 
-  @Test
-  public void testFilterAnEntireCommit() throws IOException {
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testFilterAnEntireCommit(boolean useSourceProfile) throws 
IOException {
     String commitTimeForWrites1 = "2";
     String commitTimeForReads = "1";
 
@@ -385,16 +399,22 @@ public class TestS3EventsHoodieIncrSource extends 
SparkClientFunctionalTestHarne
     Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
 
     setMockQueryRunner(inputDs);
-    when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(), 
Mockito.any(), Mockito.any(), eq(schemaProvider)))
-        .thenReturn(Option.empty());
+    SourceProfile<Long> sourceProfile = new TestSourceProfile(50L, 10L);
+    when(mockCloudObjectsSelectorCommon.loadAsDataset(Mockito.any(), 
Mockito.any(), Mockito.any(), Mockito.eq(schemaProvider), 
Mockito.anyInt())).thenReturn(Option.empty());
+    if (useSourceProfile) {
+      when(sourceProfileSupplier.getSourceProfile()).thenReturn(sourceProfile);
+    } else {
+      when(sourceProfileSupplier.getSourceProfile()).thenReturn(null);
+    }
     TypedProperties typedProperties = setProps(READ_UPTO_LATEST_COMMIT);
     
typedProperties.setProperty("hoodie.streamer.source.s3incr.ignore.key.prefix", 
"path/to/skip");
 
     readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1"), 50L, 
"2#path/to/file4.json", typedProperties);
   }
 
-  @Test
-  public void testFilterAnEntireMiddleCommit() throws IOException {
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testFilterAnEntireMiddleCommit(boolean useSourceProfile) throws 
IOException {
     String commitTimeForWrites1 = "2";
     String commitTimeForWrites2 = "3";
     String commitTimeForReads = "1";
@@ -417,16 +437,21 @@ public class TestS3EventsHoodieIncrSource extends 
SparkClientFunctionalTestHarne
     Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
 
     setMockQueryRunner(inputDs);
-    when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(), 
Mockito.any(), Mockito.any(), eq(schemaProvider)))
-        .thenReturn(Option.empty());
+    when(mockCloudObjectsSelectorCommon.loadAsDataset(Mockito.any(), 
Mockito.any(), Mockito.any(), Mockito.eq(schemaProvider), 
Mockito.anyInt())).thenReturn(Option.empty());
+    SourceProfile<Long> sourceProfile = new TestSourceProfile(50L, 10L);
+    if (useSourceProfile) {
+      when(sourceProfileSupplier.getSourceProfile()).thenReturn(sourceProfile);
+    } else {
+      when(sourceProfileSupplier.getSourceProfile()).thenReturn(null);
+    }
+
     TypedProperties typedProperties = setProps(READ_UPTO_LATEST_COMMIT);
     
typedProperties.setProperty("hoodie.streamer.source.s3incr.ignore.key.prefix", 
"path/to/skip");
 
     readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file3.json"), 
50L, "3#path/to/file4.json", typedProperties);
 
     schemaProvider = Option.empty();
-    when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(), 
Mockito.any(), Mockito.any(), eq(schemaProvider)))
-        .thenReturn(Option.empty());
+    when(sourceProfileSupplier.getSourceProfile()).thenReturn(null);
     readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file3.json"), 
50L, "3#path/to/file4.json", typedProperties);
   }
 
@@ -454,26 +479,50 @@ public class TestS3EventsHoodieIncrSource extends 
SparkClientFunctionalTestHarne
     Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
 
     setMockQueryRunner(inputDs, Option.of(snapshotCheckPoint));
-    when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(), 
Mockito.any(), Mockito.any(), eq(schemaProvider)))
-        .thenReturn(Option.empty());
+    when(mockCloudObjectsSelectorCommon.loadAsDataset(Mockito.any(), 
Mockito.any(), Mockito.any(), Mockito.eq(schemaProvider), 
Mockito.anyInt())).thenReturn(Option.empty());
     TypedProperties typedProperties = setProps(READ_UPTO_LATEST_COMMIT);
     
typedProperties.setProperty("hoodie.streamer.source.s3incr.ignore.key.prefix", 
"path/to/skip");
+    List<Long> bytesPerPartition = Arrays.asList(10L, 20L, -1L, 1000L * 1000L 
* 1000L);
+
     //1. snapshot query, read all records
+    when(sourceProfileSupplier.getSourceProfile()).thenReturn(new 
TestSourceProfile(50000L, bytesPerPartition.get(0)));
     readAndAssert(READ_UPTO_LATEST_COMMIT, Option.empty(), 50000L, exptected1, 
typedProperties);
     //2. incremental query, as commit is present in timeline
+    when(sourceProfileSupplier.getSourceProfile()).thenReturn(new 
TestSourceProfile(10L, bytesPerPartition.get(1)));
     readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(exptected1), 10L, 
exptected2, typedProperties);
     //3. snapshot query with source limit less than first commit size
+    when(sourceProfileSupplier.getSourceProfile()).thenReturn(new 
TestSourceProfile(50L, bytesPerPartition.get(2)));
     readAndAssert(READ_UPTO_LATEST_COMMIT, Option.empty(), 50L, exptected3, 
typedProperties);
     
typedProperties.setProperty("hoodie.streamer.source.s3incr.ignore.key.prefix", 
"path/to");
     //4. As snapshotQuery will return 1 -> same would be return as 
nextCheckpoint (dataset is empty due to ignore prefix).
+    when(sourceProfileSupplier.getSourceProfile()).thenReturn(new 
TestSourceProfile(50L, bytesPerPartition.get(3)));
     readAndAssert(READ_UPTO_LATEST_COMMIT, Option.empty(), 50L, exptected4, 
typedProperties);
+    // Verify the partitions being passed in getCloudObjectDataDF are correct.
+    ArgumentCaptor<Integer> argumentCaptor = 
ArgumentCaptor.forClass(Integer.class);
+    verify(mockCloudObjectsSelectorCommon, 
atLeastOnce()).loadAsDataset(Mockito.any(), Mockito.any(), Mockito.any(), 
Mockito.eq(schemaProvider), argumentCaptor.capture());
+    List<Integer> numPartitions = Collections.emptyList();
+    if (snapshotCheckPoint.equals("1") || snapshotCheckPoint.equals("2")) {
+      Assertions.assertEquals(Arrays.asList(12, 3, 1), 
argumentCaptor.getAllValues());
+    } else {
+      Assertions.assertEquals(Arrays.asList(23, 1), 
argumentCaptor.getAllValues());
+    }
+  }
+
+  @Test
+  public void testCreateSource() throws IOException {
+    TypedProperties typedProperties = setProps(READ_UPTO_LATEST_COMMIT);
+    HoodieIngestionMetrics metrics = mock(HoodieIngestionMetrics.class);
+    Source s3Source = 
UtilHelpers.createSource(S3EventsHoodieIncrSource.class.getName(), 
typedProperties, jsc(), spark(), metrics,
+        new DefaultStreamContext(schemaProvider.orElse(null), 
Option.of(sourceProfileSupplier)));
+    assertEquals(Source.SourceType.ROW, s3Source.getSourceType());
   }
 
   private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy 
missingCheckpointStrategy,
                              Option<String> checkpointToPull, long 
sourceLimit, String expectedCheckpoint,
                              TypedProperties typedProperties) {
     S3EventsHoodieIncrSource incrSource = new 
S3EventsHoodieIncrSource(typedProperties, jsc(),
-        spark(), schemaProvider.orElse(null), mockQueryRunner, 
mockCloudDataFetcher);
+        spark(), mockQueryRunner, new CloudDataFetcher(typedProperties, jsc(), 
spark(), mockCloudObjectsSelectorCommon),
+        new DefaultStreamContext(schemaProvider.orElse(null), 
Option.of(sourceProfileSupplier)));
 
     Pair<Option<Dataset<Row>>, String> dataAndCheckpoint = 
incrSource.fetchNextBatch(checkpointToPull, sourceLimit);
 
@@ -512,4 +561,30 @@ public class TestS3EventsHoodieIncrSource extends 
SparkClientFunctionalTestHarne
 
     readAndAssert(missingCheckpointStrategy, checkpointToPull, sourceLimit, 
expectedCheckpoint, typedProperties);
   }
-}
+
+  static class TestSourceProfile implements SourceProfile<Long> {
+
+    private final long maxSourceBytes;
+    private final long bytesPerPartition;
+
+    public TestSourceProfile(long maxSourceBytes, long bytesPerPartition) {
+      this.maxSourceBytes = maxSourceBytes;
+      this.bytesPerPartition = bytesPerPartition;
+    }
+
+    @Override
+    public long getMaxSourceBytes() {
+      return maxSourceBytes;
+    }
+
+    @Override
+    public int getSourcePartitions() {
+      throw new UnsupportedOperationException("getSourcePartitions is not 
required for S3 source profile");
+    }
+
+    @Override
+    public Long getSourceSpecificContext() {
+      return bytesPerPartition;
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestAbstractDebeziumSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestAbstractDebeziumSource.java
index a57383c43b2..9e5d3d1f132 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestAbstractDebeziumSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestAbstractDebeziumSource.java
@@ -26,6 +26,7 @@ import 
org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.schema.SchemaRegistryProvider;
 import org.apache.hudi.utilities.sources.InputBatch;
+import org.apache.hudi.utilities.streamer.DefaultStreamContext;
 import org.apache.hudi.utilities.streamer.SourceFormatAdapter;
 import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
 
@@ -118,7 +119,7 @@ public abstract class TestAbstractDebeziumSource extends 
UtilitiesTestBase {
     TypedProperties props = createPropsForJsonSource();
 
     SchemaProvider schemaProvider = new MockSchemaRegistryProvider(props, jsc, 
this);
-    SourceFormatAdapter debeziumSource = new 
SourceFormatAdapter(UtilHelpers.createSource(sourceClass, props, jsc, 
sparkSession, schemaProvider, metrics));
+    SourceFormatAdapter debeziumSource = new 
SourceFormatAdapter(UtilHelpers.createSource(sourceClass, props, jsc, 
sparkSession, metrics, new DefaultStreamContext(schemaProvider, 
Option.empty())));
 
     testUtils.sendMessages(testTopicName, new String[] 
{generateDebeziumEvent(operation).toString()});
 
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java
index 79f15975cb5..4b30bb14b57 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java
@@ -50,14 +50,16 @@ public class TestCloudObjectsSelectorCommon extends 
HoodieSparkClientTestHarness
 
   @Test
   public void emptyMetadataReturnsEmptyOption() {
-    Option<Dataset<Row>> result = 
CloudObjectsSelectorCommon.loadAsDataset(sparkSession, Collections.emptyList(), 
new TypedProperties(), "json");
+    CloudObjectsSelectorCommon cloudObjectsSelectorCommon = new 
CloudObjectsSelectorCommon(new TypedProperties());
+    Option<Dataset<Row>> result = 
cloudObjectsSelectorCommon.loadAsDataset(sparkSession, Collections.emptyList(), 
"json", Option.empty(), 1);
     Assertions.assertFalse(result.isPresent());
   }
 
   @Test
   public void filesFromMetadataRead() {
+    CloudObjectsSelectorCommon cloudObjectsSelectorCommon = new 
CloudObjectsSelectorCommon(new TypedProperties());
     List<CloudObjectMetadata> input = Collections.singletonList(new 
CloudObjectMetadata("src/test/resources/data/partitioned/country=US/state=CA/data.json",
 1));
-    Option<Dataset<Row>> result = 
CloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, new 
TypedProperties(), "json");
+    Option<Dataset<Row>> result = 
cloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, "json", 
Option.empty(), 1);
     Assertions.assertTrue(result.isPresent());
     Assertions.assertEquals(1, result.get().count());
     Row expected = RowFactory.create("some data");
@@ -70,7 +72,8 @@ public class TestCloudObjectsSelectorCommon extends 
HoodieSparkClientTestHarness
 
     TypedProperties properties = new TypedProperties();
     
properties.put("hoodie.streamer.source.cloud.data.partition.fields.from.path", 
"country,state");
-    Option<Dataset<Row>> result = 
CloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, properties, 
"json");
+    CloudObjectsSelectorCommon cloudObjectsSelectorCommon = new 
CloudObjectsSelectorCommon(properties);
+    Option<Dataset<Row>> result = 
cloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, "json", 
Option.empty(), 1);
     Assertions.assertTrue(result.isPresent());
     Assertions.assertEquals(1, result.get().count());
     Row expected = RowFactory.create("some data", "US", "CA");
@@ -85,27 +88,15 @@ public class TestCloudObjectsSelectorCommon extends 
HoodieSparkClientTestHarness
     props.put("hoodie.streamer.schemaprovider.source.schema.file", 
schemaFilePath);
     props.put("hoodie.streamer.schema.provider.class.name", 
FilebasedSchemaProvider.class.getName());
     props.put("hoodie.streamer.source.cloud.data.partition.fields.from.path", 
"country,state");
+    CloudObjectsSelectorCommon cloudObjectsSelectorCommon = new 
CloudObjectsSelectorCommon(props);
     List<CloudObjectMetadata> input = Collections.singletonList(new 
CloudObjectMetadata("src/test/resources/data/partitioned/country=US/state=CA/data.json",
 1));
-    Option<Dataset<Row>> result = 
CloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, props, "json", 
Option.of(new FilebasedSchemaProvider(props, jsc)));
+    Option<Dataset<Row>> result = 
cloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, "json", 
Option.of(new FilebasedSchemaProvider(props, jsc)), 1);
     Assertions.assertTrue(result.isPresent());
     Assertions.assertEquals(1, result.get().count());
     Row expected = RowFactory.create("some data", "US", "CA");
     Assertions.assertEquals(Collections.singletonList(expected), 
result.get().collectAsList());
   }
 
-  @Test
-  public void partitionKeyNotPresentInPath() {
-    List<CloudObjectMetadata> input = Collections.singletonList(new 
CloudObjectMetadata("src/test/resources/data/partitioned/country=US/state=CA/data.json",
 1));
-    TypedProperties properties = new TypedProperties();
-    
properties.put("hoodie.streamer.source.cloud.data.reader.comma.separated.path.format",
 "false");
-    
properties.put("hoodie.streamer.source.cloud.data.partition.fields.from.path", 
"unknown");
-    Option<Dataset<Row>> result = 
CloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, properties, 
"json");
-    Assertions.assertTrue(result.isPresent());
-    Assertions.assertEquals(1, result.get().count());
-    Row expected = RowFactory.create("some data", null);
-    Assertions.assertEquals(Collections.singletonList(expected), 
result.get().collectAsList());
-  }
-
   @Test
   public void loadDatasetWithSchemaAndRepartition() {
     TypedProperties props = new TypedProperties();
@@ -121,10 +112,25 @@ public class TestCloudObjectsSelectorCommon extends 
HoodieSparkClientTestHarness
         new 
CloudObjectMetadata("src/test/resources/data/partitioned/country=US/state=TX/data.json",
 1000),
         new 
CloudObjectMetadata("src/test/resources/data/partitioned/country=IND/state=TS/data.json",
 1000)
     );
-    Option<Dataset<Row>> result = 
CloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, props, "json", 
Option.of(new FilebasedSchemaProvider(props, jsc)));
+    CloudObjectsSelectorCommon cloudObjectsSelectorCommon = new 
CloudObjectsSelectorCommon(props);
+    Option<Dataset<Row>> result = 
cloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, "json", 
Option.of(new FilebasedSchemaProvider(props, jsc)), 30);
     Assertions.assertTrue(result.isPresent());
     List<Row> expected = Arrays.asList(RowFactory.create("some data", "US", 
"CA"), RowFactory.create("some data", "US", "TX"), RowFactory.create("some 
data", "IND", "TS"));
     List<Row> actual = result.get().collectAsList();
     Assertions.assertEquals(new HashSet<>(expected), new HashSet<>(actual));
   }
+
+  @Test
+  public void partitionKeyNotPresentInPath() {
+    List<CloudObjectMetadata> input = Collections.singletonList(new 
CloudObjectMetadata("src/test/resources/data/partitioned/country=US/state=CA/data.json",
 1));
+    TypedProperties properties = new TypedProperties();
+    
properties.put("hoodie.deltastreamer.source.cloud.data.reader.comma.separated.path.format",
 "false");
+    
properties.put("hoodie.deltastreamer.source.cloud.data.partition.fields.from.path",
 "unknown");
+    CloudObjectsSelectorCommon cloudObjectsSelectorCommon = new 
CloudObjectsSelectorCommon(properties);
+    Option<Dataset<Row>> result = 
cloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, "json", 
Option.empty(), 1);
+    Assertions.assertTrue(result.isPresent());
+    Assertions.assertEquals(1, result.get().count());
+    Row expected = RowFactory.create("some data", null);
+    Assertions.assertEquals(Collections.singletonList(expected), 
result.get().collectAsList());
+  }
 }

Reply via email to