This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 93cd25fded8 [HUDI-7416] Add interface for StreamProfile to be used in 
StreamSync for reading and writing data (#10687)
93cd25fded8 is described below

commit 93cd25fded8b0225ddfc54a49cc40fc5e4ad740c
Author: Vinish Reddy <[email protected]>
AuthorDate: Thu Feb 22 21:25:39 2024 +0530

    [HUDI-7416] Add interface for StreamProfile to be used in StreamSync for 
reading and writing data (#10687)
    
    * Add interface for StreamProfile to be used in StreamSync for reading and 
writing data
    
    * Tidying up
    
    * Add StreamContext class
    
    * Add java doc
    
    * Address comments
    
    * Fix build errors
    
    * Add annotations for SourceProfileSupplier
---
 .../org/apache/hudi/utilities/UtilHelpers.java     | 19 ++++++++
 .../hudi/utilities/deltastreamer/DeltaSync.java    |  4 +-
 .../hudi/utilities/sources/AvroKafkaSource.java    | 10 ++--
 .../hudi/utilities/sources/JsonKafkaSource.java    | 13 ++++--
 .../apache/hudi/utilities/sources/KafkaSource.java | 39 +++++++++++-----
 .../hudi/utilities/sources/ProtoKafkaSource.java   | 13 ++++--
 .../org/apache/hudi/utilities/sources/Source.java  | 11 ++++-
 .../utilities/sources/helpers/KafkaOffsetGen.java  | 39 ++++++++--------
 .../utilities/streamer/DefaultStreamContext.java   | 48 +++++++++++++++++++
 .../hudi/utilities/streamer/HoodieStreamer.java    | 21 ++++++---
 .../hudi/utilities/streamer/SourceProfile.java     | 54 ++++++++++++++++++++++
 .../utilities/streamer/SourceProfileSupplier.java  | 34 ++++++++++++++
 .../hudi/utilities/streamer/StreamContext.java     | 44 ++++++++++++++++++
 .../apache/hudi/utilities/streamer/StreamSync.java | 10 ++--
 .../utilities/sources/BaseTestKafkaSource.java     | 51 ++++++++++++++++++++
 .../utilities/sources/TestJsonKafkaSource.java     | 17 +++++--
 .../utilities/sources/TestProtoKafkaSource.java    |  3 +-
 17 files changed, 370 insertions(+), 60 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
index 18e92a8463c..18af52d334e 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
@@ -66,6 +66,7 @@ import org.apache.hudi.utilities.sources.InputBatch;
 import org.apache.hudi.utilities.sources.Source;
 import 
org.apache.hudi.utilities.sources.processor.ChainedJsonKafkaSourcePostProcessor;
 import 
org.apache.hudi.utilities.sources.processor.JsonKafkaSourcePostProcessor;
+import org.apache.hudi.utilities.streamer.StreamContext;
 import org.apache.hudi.utilities.transform.ChainedTransformer;
 import org.apache.hudi.utilities.transform.ErrorTableAwareChainedTransformer;
 import org.apache.hudi.utilities.transform.Transformer;
@@ -156,6 +157,24 @@ public class UtilHelpers {
     }
   }
 
+  public static Source createSource(String sourceClass, TypedProperties cfg, 
JavaSparkContext jssc,
+                                    SparkSession sparkSession, 
HoodieIngestionMetrics metrics, StreamContext streamContext)
+      throws IOException {
+    try {
+      try {
+        return (Source) ReflectionUtils.loadClass(sourceClass,
+            new Class<?>[] {TypedProperties.class, JavaSparkContext.class,
+                SparkSession.class,
+                HoodieIngestionMetrics.class, streamContext.getClass()},
+            cfg, jssc, sparkSession, metrics, streamContext);
+      } catch (HoodieException e) {
+        return createSource(sourceClass, cfg, jssc, sparkSession, 
streamContext.getSchemaProvider(), metrics);
+      }
+    } catch (Throwable e) {
+      throw new IOException("Could not load source class " + sourceClass, e);
+    }
+  }
+
   public static JsonKafkaSourcePostProcessor 
createJsonKafkaSourcePostProcessor(String postProcessorClassNames, 
TypedProperties props) throws IOException {
     if (StringUtils.isNullOrEmpty(postProcessorClassNames)) {
       return null;
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index c794db32510..4002d1579bb 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -22,7 +22,9 @@ package org.apache.hudi.utilities.deltastreamer;
 import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.utilities.streamer.DefaultStreamContext;
 import org.apache.hudi.utilities.streamer.HoodieStreamer;
 import org.apache.hudi.utilities.streamer.StreamSync;
 
@@ -49,6 +51,6 @@ public class DeltaSync extends StreamSync {
   public DeltaSync(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession, 
SchemaProvider schemaProvider,
                    TypedProperties props, HoodieSparkEngineContext 
hoodieSparkContext, FileSystem fs, Configuration conf,
                    Function<SparkRDDWriteClient, Boolean> 
onInitializingHoodieWriteClient) throws IOException {
-    super(cfg, sparkSession, schemaProvider, props, hoodieSparkContext, fs, 
conf, onInitializingHoodieWriteClient);
+    super(cfg, sparkSession, props, hoodieSparkContext, fs, conf, 
onInitializingHoodieWriteClient, new DefaultStreamContext(schemaProvider, 
Option.empty()));
   }
 }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
index 2bf92280faf..46095590430 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
@@ -27,6 +27,8 @@ import 
org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.sources.helpers.AvroConvertor;
 import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
+import org.apache.hudi.utilities.streamer.DefaultStreamContext;
+import org.apache.hudi.utilities.streamer.StreamContext;
 
 import org.apache.avro.generic.GenericRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -69,9 +71,11 @@ public class AvroKafkaSource extends 
KafkaSource<GenericRecord> {
 
   public AvroKafkaSource(TypedProperties props, JavaSparkContext sparkContext, 
SparkSession sparkSession,
                          SchemaProvider schemaProvider, HoodieIngestionMetrics 
metrics) {
-    super(props, sparkContext, sparkSession,
-        UtilHelpers.getSchemaProviderForKafkaSource(schemaProvider, props, 
sparkContext),
-        SourceType.AVRO, metrics);
+    this(props, sparkContext, sparkSession, metrics, new 
DefaultStreamContext(UtilHelpers.getSchemaProviderForKafkaSource(schemaProvider,
 props, sparkContext), Option.empty()));
+  }
+
+  public AvroKafkaSource(TypedProperties properties, JavaSparkContext 
sparkContext, SparkSession sparkSession, HoodieIngestionMetrics metrics, 
StreamContext streamContext) {
+    super(properties, sparkContext, sparkSession, SourceType.AVRO, metrics, 
streamContext);
     this.originalSchemaProvider = schemaProvider;
 
     props.put(NATIVE_KAFKA_KEY_DESERIALIZER_PROP, 
StringDeserializer.class.getName());
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
index eb67abfee3a..0a609dde720 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.utilities.sources;
 
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.utilities.UtilHelpers;
 import org.apache.hudi.utilities.config.JsonKafkaPostProcessorConfig;
@@ -27,6 +28,8 @@ import 
org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
 import 
org.apache.hudi.utilities.sources.processor.JsonKafkaSourcePostProcessor;
+import org.apache.hudi.utilities.streamer.DefaultStreamContext;
+import org.apache.hudi.utilities.streamer.StreamContext;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -44,10 +47,10 @@ import java.util.LinkedList;
 import java.util.List;
 
 import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
+import static 
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_KEY_COLUMN;
 import static 
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_OFFSET_COLUMN;
 import static 
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_PARTITION_COLUMN;
 import static 
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_TIMESTAMP_COLUMN;
-import static 
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_KEY_COLUMN;
 
 /**
  * Read json kafka data.
@@ -56,9 +59,11 @@ public class JsonKafkaSource extends KafkaSource<String> {
 
   public JsonKafkaSource(TypedProperties properties, JavaSparkContext 
sparkContext, SparkSession sparkSession,
                          SchemaProvider schemaProvider, HoodieIngestionMetrics 
metrics) {
-    super(properties, sparkContext, sparkSession,
-        UtilHelpers.getSchemaProviderForKafkaSource(schemaProvider, 
properties, sparkContext),
-        SourceType.JSON, metrics);
+    this(properties, sparkContext, sparkSession, metrics, new 
DefaultStreamContext(UtilHelpers.getSchemaProviderForKafkaSource(schemaProvider,
 properties, sparkContext), Option.empty()));
+  }
+
+  public JsonKafkaSource(TypedProperties properties, JavaSparkContext 
sparkContext, SparkSession sparkSession, HoodieIngestionMetrics metrics, 
StreamContext streamContext) {
+    super(properties, sparkContext, sparkSession, SourceType.JSON, metrics, 
streamContext);
     properties.put("key.deserializer", StringDeserializer.class.getName());
     properties.put("value.deserializer", StringDeserializer.class.getName());
     this.offsetGen = new KafkaOffsetGen(props);
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java
index bb26d579582..52a6a1217cc 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java
@@ -26,6 +26,8 @@ import 
org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
 import org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
+import org.apache.hudi.utilities.streamer.SourceProfile;
+import org.apache.hudi.utilities.streamer.StreamContext;
 
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
@@ -50,9 +52,9 @@ abstract class KafkaSource<T> extends Source<JavaRDD<T>> {
   protected final boolean shouldAddOffsets;
 
   protected KafkaSource(TypedProperties props, JavaSparkContext sparkContext, 
SparkSession sparkSession,
-                        SchemaProvider schemaProvider, SourceType sourceType, 
HoodieIngestionMetrics metrics) {
-    super(props, sparkContext, sparkSession, schemaProvider, sourceType);
-    this.schemaProvider = schemaProvider;
+                        SourceType sourceType, HoodieIngestionMetrics metrics, 
StreamContext streamContext) {
+    super(props, sparkContext, sparkSession, sourceType, streamContext);
+    this.schemaProvider = streamContext.getSchemaProvider();
     this.metrics = metrics;
     this.shouldAddOffsets = 
KafkaOffsetPostProcessor.Config.shouldAddOffsets(props);
   }
@@ -60,21 +62,34 @@ abstract class KafkaSource<T> extends Source<JavaRDD<T>> {
   @Override
   protected InputBatch<JavaRDD<T>> fetchNewData(Option<String> 
lastCheckpointStr, long sourceLimit) {
     try {
-      OffsetRange[] offsetRanges = 
offsetGen.getNextOffsetRanges(lastCheckpointStr, sourceLimit, metrics);
-      long totalNewMsgs = 
KafkaOffsetGen.CheckpointUtils.totalNewMessages(offsetRanges);
-      LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + 
offsetGen.getTopicName());
-      if (totalNewMsgs <= 0) {
-        
metrics.updateStreamerSourceNewMessageCount(METRIC_NAME_KAFKA_MESSAGE_IN_COUNT, 
0);
-        return new InputBatch<>(Option.empty(), 
KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges));
+      OffsetRange[] offsetRanges;
+      if (sourceProfileSupplier.isPresent() && 
sourceProfileSupplier.get().getSourceProfile() != null) {
+        SourceProfile<Long> kafkaSourceProfile = 
sourceProfileSupplier.get().getSourceProfile();
+        offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr, 
kafkaSourceProfile.getSourceSpecificContext(), 
kafkaSourceProfile.getSourcePartitions(), metrics);
+        LOG.info("About to read numEvents {} of size {} bytes in {} partitions 
from Kafka for topic {} with offsetRanges {}",
+            kafkaSourceProfile.getSourceSpecificContext(), 
kafkaSourceProfile.getMaxSourceBytes(),
+            kafkaSourceProfile.getSourcePartitions(), 
offsetGen.getTopicName(), offsetRanges);
+      } else {
+        offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr, 
sourceLimit, metrics);
       }
-      
metrics.updateStreamerSourceNewMessageCount(METRIC_NAME_KAFKA_MESSAGE_IN_COUNT, 
totalNewMsgs);
-      JavaRDD<T> newDataRDD = toRDD(offsetRanges);
-      return new InputBatch<>(Option.of(newDataRDD), 
KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges));
+      return toInputBatch(offsetRanges);
     } catch (org.apache.kafka.common.errors.TimeoutException e) {
       throw new HoodieSourceTimeoutException("Kafka Source timed out " + 
e.getMessage());
     }
   }
 
+  private InputBatch<JavaRDD<T>> toInputBatch(OffsetRange[] offsetRanges) {
+    long totalNewMsgs = 
KafkaOffsetGen.CheckpointUtils.totalNewMessages(offsetRanges);
+    LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + 
offsetGen.getTopicName());
+    if (totalNewMsgs <= 0) {
+      
metrics.updateStreamerSourceNewMessageCount(METRIC_NAME_KAFKA_MESSAGE_IN_COUNT, 
0);
+      return new InputBatch<>(Option.empty(), 
KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges));
+    }
+    
metrics.updateStreamerSourceNewMessageCount(METRIC_NAME_KAFKA_MESSAGE_IN_COUNT, 
totalNewMsgs);
+    JavaRDD<T> newDataRDD = toRDD(offsetRanges);
+    return new InputBatch<>(Option.of(newDataRDD), 
KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges));
+  }
+
   abstract JavaRDD<T> toRDD(OffsetRange[] offsetRanges);
 
   @Override
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java
index 67927480454..208e591c8f1 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java
@@ -19,12 +19,15 @@
 package org.apache.hudi.utilities.sources;
 
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.utilities.config.ProtoClassBasedSchemaProviderConfig;
 import org.apache.hudi.utilities.exception.HoodieReadFromSourceException;
 import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
+import org.apache.hudi.utilities.streamer.DefaultStreamContext;
+import org.apache.hudi.utilities.streamer.StreamContext;
 
 import com.google.protobuf.Message;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
@@ -51,9 +54,13 @@ public class ProtoKafkaSource extends KafkaSource<Message> {
 
   private final String className;
 
-  public ProtoKafkaSource(TypedProperties props, JavaSparkContext sparkContext,
-                          SparkSession sparkSession, SchemaProvider 
schemaProvider, HoodieIngestionMetrics metrics) {
-    super(props, sparkContext, sparkSession, schemaProvider, SourceType.PROTO, 
metrics);
+  public ProtoKafkaSource(TypedProperties props, JavaSparkContext 
sparkContext, SparkSession sparkSession,
+                          SchemaProvider schemaProvider, 
HoodieIngestionMetrics metrics) {
+    this(props, sparkContext, sparkSession, metrics, new 
DefaultStreamContext(schemaProvider, Option.empty()));
+  }
+
+  public ProtoKafkaSource(TypedProperties properties, JavaSparkContext 
sparkContext, SparkSession sparkSession, HoodieIngestionMetrics metrics, 
StreamContext streamContext) {
+    super(properties, sparkContext, sparkSession, SourceType.PROTO, metrics, 
streamContext);
     checkRequiredConfigProperties(props, Collections.singletonList(
         ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_CLASS_NAME));
     props.put(NATIVE_KAFKA_KEY_DESERIALIZER_PROP, StringDeserializer.class);
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java
index cbc0722056b..dfb07c718a0 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java
@@ -25,6 +25,9 @@ import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.utilities.callback.SourceCommitCallback;
 import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.utilities.streamer.DefaultStreamContext;
+import org.apache.hudi.utilities.streamer.SourceProfileSupplier;
+import org.apache.hudi.utilities.streamer.StreamContext;
 
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.SparkSession;
@@ -44,6 +47,7 @@ public abstract class Source<T> implements 
SourceCommitCallback, Serializable {
   protected transient TypedProperties props;
   protected transient JavaSparkContext sparkContext;
   protected transient SparkSession sparkSession;
+  protected transient Option<SourceProfileSupplier> sourceProfileSupplier;
   private transient SchemaProvider overriddenSchemaProvider;
 
   private final SourceType sourceType;
@@ -55,11 +59,16 @@ public abstract class Source<T> implements 
SourceCommitCallback, Serializable {
 
   protected Source(TypedProperties props, JavaSparkContext sparkContext, 
SparkSession sparkSession,
       SchemaProvider schemaProvider, SourceType sourceType) {
+    this(props, sparkContext, sparkSession, sourceType, new 
DefaultStreamContext(schemaProvider, Option.empty()));
+  }
+
+  protected Source(TypedProperties props, JavaSparkContext sparkContext, 
SparkSession sparkSession, SourceType sourceType, StreamContext streamContext) {
     this.props = props;
     this.sparkContext = sparkContext;
     this.sparkSession = sparkSession;
-    this.overriddenSchemaProvider = schemaProvider;
+    this.overriddenSchemaProvider = streamContext.getSchemaProvider();
     this.sourceType = sourceType;
+    this.sourceProfileSupplier = streamContext.getSourceProfileSupplier();
   }
 
   @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
index d5faec3595e..32df651d556 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
@@ -241,7 +241,24 @@ public class KafkaOffsetGen {
   }
 
   public OffsetRange[] getNextOffsetRanges(Option<String> lastCheckpointStr, 
long sourceLimit, HoodieIngestionMetrics metrics) {
+    // Come up with final set of OffsetRanges to read (account for new 
partitions, limit number of events)
+    long maxEventsToReadFromKafka = getLongWithAltKeys(props, 
KafkaSourceConfig.MAX_EVENTS_FROM_KAFKA_SOURCE);
 
+    long numEvents;
+    if (sourceLimit == Long.MAX_VALUE) {
+      numEvents = maxEventsToReadFromKafka;
+      LOG.info("SourceLimit not configured, set numEvents to default value : " 
+ maxEventsToReadFromKafka);
+    } else {
+      numEvents = sourceLimit;
+    }
+
+    long minPartitions = getLongWithAltKeys(props, 
KafkaSourceConfig.KAFKA_SOURCE_MIN_PARTITIONS);
+    LOG.info("getNextOffsetRanges set config " + 
KafkaSourceConfig.KAFKA_SOURCE_MIN_PARTITIONS.key() + " to " + minPartitions);
+
+    return getNextOffsetRanges(lastCheckpointStr, numEvents, minPartitions, 
metrics);
+  }
+
+  public OffsetRange[] getNextOffsetRanges(Option<String> lastCheckpointStr, 
long numEvents, long minPartitions, HoodieIngestionMetrics metrics) {
     // Obtain current metadata for the topic
     Map<TopicPartition, Long> fromOffsets;
     Map<TopicPartition, Long> toOffsets;
@@ -279,29 +296,9 @@ public class KafkaOffsetGen {
       // Obtain the latest offsets.
       toOffsets = consumer.endOffsets(topicPartitions);
     }
-
-    // Come up with final set of OffsetRanges to read (account for new 
partitions, limit number of events)
-    long maxEventsToReadFromKafka = getLongWithAltKeys(props, 
KafkaSourceConfig.MAX_EVENTS_FROM_KAFKA_SOURCE);
-
-    long numEvents;
-    if (sourceLimit == Long.MAX_VALUE) {
-      numEvents = maxEventsToReadFromKafka;
-      LOG.info("SourceLimit not configured, set numEvents to default value : " 
+ maxEventsToReadFromKafka);
-    } else {
-      numEvents = sourceLimit;
-    }
-
-    // TODO(HUDI-4625) remove
-    if (numEvents < toOffsets.size()) {
-      throw new HoodieException("sourceLimit should not be less than the 
number of kafka partitions");
-    }
-
-    long minPartitions = getLongWithAltKeys(props, 
KafkaSourceConfig.KAFKA_SOURCE_MIN_PARTITIONS);
-    LOG.info("getNextOffsetRanges set config " + 
KafkaSourceConfig.KAFKA_SOURCE_MIN_PARTITIONS.key() + " to " + minPartitions);
-
     return CheckpointUtils.computeOffsetRanges(fromOffsets, toOffsets, 
numEvents, minPartitions);
   }
-
+  
   /**
    * Fetch partition infos for given topic.
    *
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/DefaultStreamContext.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/DefaultStreamContext.java
new file mode 100644
index 00000000000..f8dabeb89c9
--- /dev/null
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/DefaultStreamContext.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.streamer;
+
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+
+/**
+ * The default implementation for the StreamContext interface,
+ * composes SchemaProvider and SourceProfileSupplier currently,
+ * can be extended for other arguments in the future.
+ */
+public class DefaultStreamContext implements StreamContext {
+
+  private final SchemaProvider schemaProvider;
+  private final Option<SourceProfileSupplier> sourceProfileSupplier;
+
+  public DefaultStreamContext(SchemaProvider schemaProvider, 
Option<SourceProfileSupplier> sourceProfileSupplier) {
+    this.schemaProvider = schemaProvider;
+    this.sourceProfileSupplier = sourceProfileSupplier;
+  }
+
+  @Override
+  public SchemaProvider getSchemaProvider() {
+    return schemaProvider;
+  }
+
+  @Override
+  public Option<SourceProfileSupplier> getSourceProfileSupplier() {
+    return sourceProfileSupplier;
+  }
+}
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
index 8ecc937c5e7..ef31cc34ab5 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
@@ -143,8 +143,12 @@ public class HoodieStreamer implements Serializable {
     this(cfg, jssc, fs, conf, Option.empty());
   }
 
+  public HoodieStreamer(Config cfg, JavaSparkContext jssc, FileSystem fs, 
Configuration conf, Option<TypedProperties> propsOverride) throws IOException {
+    this(cfg, jssc, fs, conf, propsOverride, Option.empty());
+  }
+
   public HoodieStreamer(Config cfg, JavaSparkContext jssc, FileSystem fs, 
Configuration conf,
-                        Option<TypedProperties> propsOverride) throws 
IOException {
+                        Option<TypedProperties> propsOverride, 
Option<SourceProfileSupplier> sourceProfileSupplier) throws IOException {
     this.properties = combineProperties(cfg, propsOverride, 
jssc.hadoopConfiguration());
     if (cfg.initialCheckpointProvider != null && cfg.checkpoint == null) {
       InitialCheckPointProvider checkPointProvider =
@@ -158,7 +162,7 @@ public class HoodieStreamer implements Serializable {
         cfg.runBootstrap ? new BootstrapExecutor(cfg, jssc, fs, conf, 
this.properties) : null);
     HoodieSparkEngineContext sparkEngineContext = new 
HoodieSparkEngineContext(jssc);
     this.ingestionService = Option.ofNullable(
-        cfg.runBootstrap ? null : new StreamSyncService(cfg, 
sparkEngineContext, fs, conf, Option.ofNullable(this.properties)));
+        cfg.runBootstrap ? null : new StreamSyncService(cfg, 
sparkEngineContext, fs, conf, Option.ofNullable(this.properties), 
sourceProfileSupplier));
   }
 
   private static TypedProperties combineProperties(Config cfg, 
Option<TypedProperties> propsOverride, Configuration hadoopConf) {
@@ -656,7 +660,7 @@ public class HoodieStreamer implements Serializable {
     private final Option<ConfigurationHotUpdateStrategy> 
configurationHotUpdateStrategyOpt;
 
     public StreamSyncService(Config cfg, HoodieSparkEngineContext 
hoodieSparkContext, FileSystem fs, Configuration conf,
-                             Option<TypedProperties> properties) throws 
IOException {
+                             Option<TypedProperties> properties, 
Option<SourceProfileSupplier> sourceProfileSupplier) throws IOException {
       super(HoodieIngestionConfig.newBuilder()
           .isContinuous(cfg.continuousMode)
           .withMinSyncInternalSeconds(cfg.minSyncIntervalSeconds).build());
@@ -708,13 +712,18 @@ public class HoodieStreamer implements Serializable {
           UtilHelpers.createSchemaProvider(cfg.schemaProviderClassName, props, 
hoodieSparkContext.jsc()),
           props, hoodieSparkContext.jsc(), cfg.transformerClassNames);
 
-      streamSync = new StreamSync(cfg, sparkSession, schemaProvider, props, 
hoodieSparkContext, fs, conf, this::onInitializingWriteClient);
+      streamSync = new StreamSync(cfg, sparkSession, props, 
hoodieSparkContext, fs, conf, this::onInitializingWriteClient, new 
DefaultStreamContext(schemaProvider, sourceProfileSupplier));
 
     }
 
     public StreamSyncService(HoodieStreamer.Config cfg, 
HoodieSparkEngineContext hoodieSparkContext, FileSystem fs, Configuration conf)
         throws IOException {
-      this(cfg, hoodieSparkContext, fs, conf, Option.empty());
+      this(cfg, hoodieSparkContext, fs, conf, Option.empty(), Option.empty());
+    }
+
+    public StreamSyncService(HoodieStreamer.Config cfg, 
HoodieSparkEngineContext hoodieSparkContext, FileSystem fs, Configuration conf, 
Option<TypedProperties> properties)
+            throws IOException {
+      this(cfg, hoodieSparkContext, fs, conf, properties, Option.empty());
     }
 
     private void initializeTableTypeAndBaseFileFormat() {
@@ -728,7 +737,7 @@ public class HoodieStreamer implements Serializable {
       if (streamSync != null) {
         streamSync.close();
       }
-      streamSync = new StreamSync(cfg, sparkSession, schemaProvider, props, 
hoodieSparkContext, fs, hiveConf, this::onInitializingWriteClient);
+      streamSync = new StreamSync(cfg, sparkSession, props, 
hoodieSparkContext, fs, hiveConf, this::onInitializingWriteClient, new 
DefaultStreamContext(schemaProvider, Option.empty()));
     }
 
     @Override
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceProfile.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceProfile.java
new file mode 100644
index 00000000000..d830cf5dee3
--- /dev/null
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceProfile.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.streamer;
+
+import org.apache.hudi.ApiMaturityLevel;
+import org.apache.hudi.PublicAPIClass;
+import org.apache.hudi.PublicAPIMethod;
+
+/**
+ * A profile containing details about how the next input batch in StreamSync 
should be consumed and written.
+ * For eg: KafkaSourceProfile contains number of events to consume in this 
sync round.
+ * S3SourceProfile contains the list of files to consume in this sync round.
+ * HudiIncrementalSourceProfile contains the beginInstant and endInstant 
commit times to consume in this sync round etc.
+ *
+ * @param <T> The type for source context, varies based on sourceType as 
described above.
+ */
+@PublicAPIClass(maturity = ApiMaturityLevel.EVOLVING)
+public interface SourceProfile<T> {
+
+  /**
+   * @return The maxBytes that will be consumed from the source in this sync 
round.
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  long getMaxSourceBytes();
+
+  /**
+   * @return The number of output partitions required in source RDD.
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  int getSourcePartitions();
+
+  /**
+   * @return The source specific context based on sourceType as described 
above.
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  T getSourceSpecificContext();
+}
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceProfileSupplier.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceProfileSupplier.java
new file mode 100644
index 00000000000..34bfb8dff94
--- /dev/null
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceProfileSupplier.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.streamer;
+
+import org.apache.hudi.ApiMaturityLevel;
+import org.apache.hudi.PublicAPIClass;
+import org.apache.hudi.PublicAPIMethod;
+
+/**
+ * Supplier for SourceProfile
+ */
+@PublicAPIClass(maturity = ApiMaturityLevel.EVOLVING)
+public interface SourceProfileSupplier {
+  @SuppressWarnings("rawtypes")
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  SourceProfile getSourceProfile();
+}
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamContext.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamContext.java
new file mode 100644
index 00000000000..bfe337ee3f2
--- /dev/null
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamContext.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.streamer;
+
+import org.apache.hudi.ApiMaturityLevel;
+import org.apache.hudi.PublicAPIClass;
+import org.apache.hudi.PublicAPIMethod;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+
+/**
+ * The context required to sync one batch of data to hoodie table using 
StreamSync.
+ */
+@PublicAPIClass(maturity = ApiMaturityLevel.EVOLVING)
+public interface StreamContext {
+
+  /**
+   * The schema provider used for reading data from source and also writing to 
hoodie table.
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  SchemaProvider getSchemaProvider();
+
+  /**
+   * An optional stream profile supplying details regarding how the next input 
batch in StreamSync should be consumed and written.
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  Option<SourceProfileSupplier> getSourceProfileSupplier();
+}
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
index f87bf083854..4b83ff92b7b 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
@@ -259,19 +259,19 @@ public class StreamSync implements Serializable, 
Closeable {
   public StreamSync(HoodieStreamer.Config cfg, SparkSession sparkSession, 
SchemaProvider schemaProvider,
                     TypedProperties props, JavaSparkContext jssc, FileSystem 
fs, Configuration conf,
                     Function<SparkRDDWriteClient, Boolean> 
onInitializingHoodieWriteClient) throws IOException {
-    this(cfg, sparkSession, schemaProvider, props, new 
HoodieSparkEngineContext(jssc), fs, conf, onInitializingHoodieWriteClient);
+    this(cfg, sparkSession, props, new HoodieSparkEngineContext(jssc), fs, 
conf, onInitializingHoodieWriteClient, new DefaultStreamContext(schemaProvider, 
Option.empty()));
   }
 
-  public StreamSync(HoodieStreamer.Config cfg, SparkSession sparkSession, 
SchemaProvider schemaProvider,
+  public StreamSync(HoodieStreamer.Config cfg, SparkSession sparkSession,
                     TypedProperties props, HoodieSparkEngineContext 
hoodieSparkContext, FileSystem fs, Configuration conf,
-                    Function<SparkRDDWriteClient, Boolean> 
onInitializingHoodieWriteClient) throws IOException {
+                    Function<SparkRDDWriteClient, Boolean> 
onInitializingHoodieWriteClient, StreamContext streamContext) throws 
IOException {
     this.cfg = cfg;
     this.hoodieSparkContext = hoodieSparkContext;
     this.sparkSession = sparkSession;
     this.fs = fs;
     this.onInitializingHoodieWriteClient = onInitializingHoodieWriteClient;
     this.props = props;
-    this.userProvidedSchemaProvider = schemaProvider;
+    this.userProvidedSchemaProvider = streamContext.getSchemaProvider();
     this.processedSchema = new SchemaSet();
     this.autoGenerateRecordKeys = 
KeyGenUtils.enableAutoGenerateRecordKeys(props);
     this.keyGenClassName = getKeyGeneratorClassName(new 
TypedProperties(props));
@@ -285,7 +285,7 @@ public class StreamSync implements Serializable, Closeable {
       this.errorWriteFailureStrategy = 
ErrorTableUtils.getErrorWriteFailureStrategy(props);
     }
     refreshTimeline();
-    Source source = UtilHelpers.createSource(cfg.sourceClassName, props, 
hoodieSparkContext.jsc(), sparkSession, schemaProvider, metrics);
+    Source source = UtilHelpers.createSource(cfg.sourceClassName, props, 
hoodieSparkContext.jsc(), sparkSession, metrics, streamContext);
     this.formatAdapter = new SourceFormatAdapter(source, 
this.errorTableWriter, Option.of(props));
 
     Supplier<Option<Schema>> schemaSupplier = schemaProvider == null ? 
Option::empty : () -> Option.ofNullable(schemaProvider.getSourceSchema());
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java
index b5cbf2738f6..011a1f626b2 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java
@@ -28,6 +28,8 @@ import 
org.apache.hudi.utilities.exception.HoodieStreamerException;
 import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.streamer.SourceFormatAdapter;
+import org.apache.hudi.utilities.streamer.SourceProfile;
+import org.apache.hudi.utilities.streamer.SourceProfileSupplier;
 
 import org.apache.avro.generic.GenericRecord;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -52,6 +54,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 /**
  * Generic tests for all {@link KafkaSource} to ensure all implementations 
properly handle offsets, fetch limits, failure modes, etc.
@@ -60,6 +63,7 @@ abstract class BaseTestKafkaSource extends 
SparkClientFunctionalTestHarness {
   protected static final String TEST_TOPIC_PREFIX = "hoodie_test_";
 
   protected final HoodieIngestionMetrics metrics = 
mock(HoodieIngestionMetrics.class);
+  protected final Option<SourceProfileSupplier> sourceProfile = 
Option.of(mock(SourceProfileSupplier.class));
 
   protected SchemaProvider schemaProvider;
   protected KafkaTestUtils testUtils;
@@ -277,4 +281,51 @@ abstract class BaseTestKafkaSource extends 
SparkClientFunctionalTestHarness {
             + " either the data was aged out by Kafka or the topic may have 
been deleted before all the data in the topic was processed.",
         t.getMessage());
   }
+
+  @Test
+  public void testKafkaSourceWithOffsetsFromSourceProfile() {
+    // topic setup.
+    final String topic = TEST_TOPIC_PREFIX + "testKafkaSourceWithOffsetRanges";
+    testUtils.createTopic(topic, 2);
+    TypedProperties props = createPropsForKafkaSource(topic, null, "earliest");
+
+    when(sourceProfile.get().getSourceProfile()).thenReturn(new 
TestSourceProfile(Long.MAX_VALUE, 4, 500));
+    SourceFormatAdapter kafkaSource = createSource(props);
+
+    // Test for empty data.
+    assertEquals(Option.empty(), 
kafkaSource.fetchNewDataInAvroFormat(Option.empty(), 
Long.MAX_VALUE).getBatch());
+
+    // Publish messages and assert source has picked up all messages in 
offsetRanges supplied by input batch profile.
+    sendMessagesToKafka(topic, 1000, 2);
+    InputBatch<JavaRDD<GenericRecord>> fetch1 = 
kafkaSource.fetchNewDataInAvroFormat(Option.empty(), 900);
+    assertEquals(500, fetch1.getBatch().get().count());
+  }
+
+  static class TestSourceProfile implements SourceProfile<Long> {
+
+    private final long maxSourceBytes;
+    private final int sourcePartitions;
+    private final long numEvents;
+
+    public TestSourceProfile(long maxSourceBytes, int sourcePartitions, long 
numEvents) {
+      this.maxSourceBytes = maxSourceBytes;
+      this.sourcePartitions = sourcePartitions;
+      this.numEvents = numEvents;
+    }
+
+    @Override
+    public long getMaxSourceBytes() {
+      return maxSourceBytes;
+    }
+
+    @Override
+    public int getSourcePartitions() {
+      return sourcePartitions;
+    }
+
+    @Override
+    public Long getSourceSpecificContext() {
+      return numEvents;
+    }
+  }
 }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
index 14ffd31582a..59a85a06e9c 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
@@ -23,14 +23,16 @@ import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieAvroRecord;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.testutils.InProcessTimeGenerator;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.testutils.InProcessTimeGenerator;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.utilities.UtilHelpers;
 import org.apache.hudi.utilities.config.HoodieStreamerConfig;
 import org.apache.hudi.utilities.config.KafkaSourceConfig;
 import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
 import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
 import org.apache.hudi.utilities.streamer.BaseErrorTableWriter;
+import org.apache.hudi.utilities.streamer.DefaultStreamContext;
 import org.apache.hudi.utilities.streamer.ErrorEvent;
 import org.apache.hudi.utilities.streamer.SourceFormatAdapter;
 
@@ -60,10 +62,10 @@ import scala.Tuple2;
 import static 
org.apache.hudi.config.HoodieErrorTableConfig.ERROR_TABLE_BASE_PATH;
 import static org.apache.hudi.config.HoodieErrorTableConfig.ERROR_TARGET_TABLE;
 import static 
org.apache.hudi.utilities.config.KafkaSourceConfig.ENABLE_KAFKA_COMMIT_OFFSET;
+import static 
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_KEY_COLUMN;
 import static 
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_OFFSET_COLUMN;
 import static 
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_PARTITION_COLUMN;
 import static 
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_TIMESTAMP_COLUMN;
-import static 
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_KEY_COLUMN;
 import static 
org.apache.hudi.utilities.testutils.UtilitiesTestBase.Helpers.jsonifyRecords;
 import static 
org.apache.hudi.utilities.testutils.UtilitiesTestBase.Helpers.jsonifyRecordsByPartitions;
 import static 
org.apache.hudi.utilities.testutils.UtilitiesTestBase.Helpers.jsonifyRecordsByPartitionsWithNullKafkaKey;
@@ -104,7 +106,7 @@ public class TestJsonKafkaSource extends 
BaseTestKafkaSource {
 
   @Override
   SourceFormatAdapter createSource(TypedProperties props) {
-    return new SourceFormatAdapter(new JsonKafkaSource(props, jsc(), spark(), 
schemaProvider, metrics));
+    return new SourceFormatAdapter(new JsonKafkaSource(props, jsc(), spark(), 
metrics, new DefaultStreamContext(schemaProvider, sourceProfile)));
   }
 
   // test whether empty messages can be filtered
@@ -356,4 +358,13 @@ public class TestJsonKafkaSource extends 
BaseTestKafkaSource {
     dfWithOffsetInfo.unpersist();
     dfWithOffsetInfoAndNullKafkaKey.unpersist();
   }
+
+  @Test
+  public void testCreateSource() throws IOException {
+    final String topic = TEST_TOPIC_PREFIX + "testJsonKafkaSourceCreation";
+    testUtils.createTopic(topic, 2);
+    TypedProperties props = createPropsForKafkaSource(topic, null, "earliest");
+    Source jsonKafkaSource = 
UtilHelpers.createSource(JsonKafkaSource.class.getName(), props, jsc(), 
spark(), metrics, new DefaultStreamContext(schemaProvider, sourceProfile));
+    assertEquals(Source.SourceType.JSON, jsonKafkaSource.getSourceType());
+  }
 }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestProtoKafkaSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestProtoKafkaSource.java
index 52376f89741..b56d87c9263 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestProtoKafkaSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestProtoKafkaSource.java
@@ -25,6 +25,7 @@ import org.apache.hudi.utilities.config.KafkaSourceConfig;
 import org.apache.hudi.utilities.config.ProtoClassBasedSchemaProviderConfig;
 import org.apache.hudi.utilities.schema.ProtoClassBasedSchemaProvider;
 import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.utilities.streamer.DefaultStreamContext;
 import org.apache.hudi.utilities.streamer.SourceFormatAdapter;
 import org.apache.hudi.utilities.test.proto.Nested;
 import org.apache.hudi.utilities.test.proto.Sample;
@@ -89,7 +90,7 @@ public class TestProtoKafkaSource extends BaseTestKafkaSource 
{
   @Override
   SourceFormatAdapter createSource(TypedProperties props) {
     this.schemaProvider = new ProtoClassBasedSchemaProvider(props, jsc());
-    Source protoKafkaSource = new ProtoKafkaSource(props, jsc(), spark(), 
schemaProvider, metrics);
+    Source protoKafkaSource = new ProtoKafkaSource(props, jsc(), spark(), 
metrics, new DefaultStreamContext(schemaProvider, sourceProfile));
     return new SourceFormatAdapter(protoKafkaSource);
   }
 


Reply via email to