This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 2c5af864cd5b4dfca00b21d1d542e634bbff081d
Author: Praveen Gajulapalli <[email protected]>
AuthorDate: Mon Apr 29 18:59:48 2024 +0530

    [HUDI-7667] Created util method to get offset range for fetching new data 
(#11092)
    
    Created util method to get offsetRanges while fetching new data.
    Same util method can be used in any Source to get offsetRanges via 
SourceProfile.
    This will help in improving the estimation of offset ranges to read data 
from kafka.
---
 .../apache/hudi/utilities/sources/KafkaSource.java | 42 ++++++++++++++++------
 1 file changed, 31 insertions(+), 11 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java
index 3dc7fe69a0d..99af1ab0086 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java
@@ -27,6 +27,7 @@ import 
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
 import org.apache.hudi.utilities.streamer.SourceProfile;
+import org.apache.hudi.utilities.streamer.SourceProfileSupplier;
 import org.apache.hudi.utilities.streamer.StreamContext;
 
 import org.apache.spark.api.java.JavaSparkContext;
@@ -35,7 +36,10 @@ import org.apache.spark.streaming.kafka010.OffsetRange;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Arrays;
+
 import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
+import static org.apache.hudi.common.util.ConfigUtils.getLongWithAltKeys;
 
 public abstract class KafkaSource<T> extends Source<T> {
   private static final Logger LOG = LoggerFactory.getLogger(KafkaSource.class);
@@ -61,22 +65,38 @@ public abstract class KafkaSource<T> extends Source<T> {
   @Override
   protected InputBatch<T> fetchNewData(Option<String> lastCheckpointStr, long 
sourceLimit) {
     try {
-      OffsetRange[] offsetRanges;
-      if (sourceProfileSupplier.isPresent() && 
sourceProfileSupplier.get().getSourceProfile() != null) {
-        SourceProfile<Long> kafkaSourceProfile = 
sourceProfileSupplier.get().getSourceProfile();
-        offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr, 
kafkaSourceProfile.getSourceSpecificContext(), 
kafkaSourceProfile.getSourcePartitions(), metrics);
-        LOG.info("About to read numEvents {} of size {} bytes in {} partitions 
from Kafka for topic {} with offsetRanges {}",
-            kafkaSourceProfile.getSourceSpecificContext(), 
kafkaSourceProfile.getMaxSourceBytes(),
-            kafkaSourceProfile.getSourcePartitions(), 
offsetGen.getTopicName(), offsetRanges);
-      } else {
-        offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr, 
sourceLimit, metrics);
-      }
-      return toInputBatch(offsetRanges);
+      return toInputBatch(getOffsetRanges(props, sourceProfileSupplier, 
offsetGen, metrics,
+          lastCheckpointStr, sourceLimit));
     } catch (org.apache.kafka.common.errors.TimeoutException e) {
       throw new HoodieSourceTimeoutException("Kafka Source timed out " + 
e.getMessage());
     }
   }
 
+  @SuppressWarnings("unchecked")
+  public static OffsetRange[] getOffsetRanges(TypedProperties props,
+                                              Option<SourceProfileSupplier> 
sourceProfileSupplier,
+                                              KafkaOffsetGen offsetGen,
+                                              HoodieIngestionMetrics metrics,
+                                              Option<String> lastCheckpointStr,
+                                              long sourceLimit) {
+    OffsetRange[] offsetRanges;
+    if (sourceProfileSupplier.isPresent() && 
sourceProfileSupplier.get().getSourceProfile() != null) {
+      SourceProfile<Long> kafkaSourceProfile = 
sourceProfileSupplier.get().getSourceProfile();
+      offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr, 
kafkaSourceProfile.getSourceSpecificContext(),
+          kafkaSourceProfile.getSourcePartitions(), metrics);
+      LOG.info("About to read maxEventsInSyncRound {} of size {} bytes in {} 
partitions from Kafka for topic {} with offsetRanges {}",
+          kafkaSourceProfile.getSourceSpecificContext(), 
kafkaSourceProfile.getMaxSourceBytes(),
+          kafkaSourceProfile.getSourcePartitions(), offsetGen.getTopicName(), 
offsetRanges);
+    } else {
+      long minPartitions = getLongWithAltKeys(props, 
KafkaSourceConfig.KAFKA_SOURCE_MIN_PARTITIONS);
+      offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr, 
sourceLimit, metrics);
+      LOG.info("About to read sourceLimit {} in {} spark partitions from kafka 
for topic {} with offset ranges {}",
+          sourceLimit, minPartitions, offsetGen.getTopicName(),
+          Arrays.toString(offsetRanges));
+    }
+    return offsetRanges;
+  }
+
   private InputBatch<T> toInputBatch(OffsetRange[] offsetRanges) {
     long totalNewMsgs = 
KafkaOffsetGen.CheckpointUtils.totalNewMessages(offsetRanges);
     LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + 
offsetGen.getTopicName());

Reply via email to