This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new fef14db3d9b [HUDI-7667] Created util method to get offset range for 
fetching new data (#11092)
fef14db3d9b is described below

commit fef14db3d9b3ae4d8aac793951c6f9353e493372
Author: Praveen Gajulapalli <[email protected]>
AuthorDate: Mon Apr 29 18:59:48 2024 +0530

    [HUDI-7667] Created util method to get offset range for fetching new data 
(#11092)
    
    Created util method to get offsetRanges while fetching new data.
    Same util method can be used in any Source to get offsetRanges via 
SourceProfile.
    This will help in improving the estimation of offset ranges to read data 
from kafka.
---
 .../apache/hudi/utilities/sources/KafkaSource.java | 42 ++++++++++++++++------
 1 file changed, 31 insertions(+), 11 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java
index 3dc7fe69a0d..99af1ab0086 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java
@@ -27,6 +27,7 @@ import 
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
 import org.apache.hudi.utilities.streamer.SourceProfile;
+import org.apache.hudi.utilities.streamer.SourceProfileSupplier;
 import org.apache.hudi.utilities.streamer.StreamContext;
 
 import org.apache.spark.api.java.JavaSparkContext;
@@ -35,7 +36,10 @@ import org.apache.spark.streaming.kafka010.OffsetRange;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Arrays;
+
 import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
+import static org.apache.hudi.common.util.ConfigUtils.getLongWithAltKeys;
 
 public abstract class KafkaSource<T> extends Source<T> {
   private static final Logger LOG = LoggerFactory.getLogger(KafkaSource.class);
@@ -61,22 +65,38 @@ public abstract class KafkaSource<T> extends Source<T> {
   @Override
   protected InputBatch<T> fetchNewData(Option<String> lastCheckpointStr, long 
sourceLimit) {
     try {
-      OffsetRange[] offsetRanges;
-      if (sourceProfileSupplier.isPresent() && 
sourceProfileSupplier.get().getSourceProfile() != null) {
-        SourceProfile<Long> kafkaSourceProfile = 
sourceProfileSupplier.get().getSourceProfile();
-        offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr, 
kafkaSourceProfile.getSourceSpecificContext(), 
kafkaSourceProfile.getSourcePartitions(), metrics);
-        LOG.info("About to read numEvents {} of size {} bytes in {} partitions 
from Kafka for topic {} with offsetRanges {}",
-            kafkaSourceProfile.getSourceSpecificContext(), 
kafkaSourceProfile.getMaxSourceBytes(),
-            kafkaSourceProfile.getSourcePartitions(), 
offsetGen.getTopicName(), offsetRanges);
-      } else {
-        offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr, 
sourceLimit, metrics);
-      }
-      return toInputBatch(offsetRanges);
+      return toInputBatch(getOffsetRanges(props, sourceProfileSupplier, 
offsetGen, metrics,
+          lastCheckpointStr, sourceLimit));
     } catch (org.apache.kafka.common.errors.TimeoutException e) {
       throw new HoodieSourceTimeoutException("Kafka Source timed out " + 
e.getMessage());
     }
   }
 
+  @SuppressWarnings("unchecked")
+  public static OffsetRange[] getOffsetRanges(TypedProperties props,
+                                              Option<SourceProfileSupplier> 
sourceProfileSupplier,
+                                              KafkaOffsetGen offsetGen,
+                                              HoodieIngestionMetrics metrics,
+                                              Option<String> lastCheckpointStr,
+                                              long sourceLimit) {
+    OffsetRange[] offsetRanges;
+    if (sourceProfileSupplier.isPresent() && 
sourceProfileSupplier.get().getSourceProfile() != null) {
+      SourceProfile<Long> kafkaSourceProfile = 
sourceProfileSupplier.get().getSourceProfile();
+      offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr, 
kafkaSourceProfile.getSourceSpecificContext(),
+          kafkaSourceProfile.getSourcePartitions(), metrics);
+      LOG.info("About to read maxEventsInSyncRound {} of size {} bytes in {} 
partitions from Kafka for topic {} with offsetRanges {}",
+          kafkaSourceProfile.getSourceSpecificContext(), 
kafkaSourceProfile.getMaxSourceBytes(),
+          kafkaSourceProfile.getSourcePartitions(), offsetGen.getTopicName(), 
offsetRanges);
+    } else {
+      long minPartitions = getLongWithAltKeys(props, 
KafkaSourceConfig.KAFKA_SOURCE_MIN_PARTITIONS);
+      offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr, 
sourceLimit, metrics);
+      LOG.info("About to read sourceLimit {} in {} spark partitions from kafka 
for topic {} with offset ranges {}",
+          sourceLimit, minPartitions, offsetGen.getTopicName(),
+          Arrays.toString(offsetRanges));
+    }
+    return offsetRanges;
+  }
+
   private InputBatch<T> toInputBatch(OffsetRange[] offsetRanges) {
     long totalNewMsgs = 
KafkaOffsetGen.CheckpointUtils.totalNewMessages(offsetRanges);
     LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + 
offsetGen.getTopicName());

Reply via email to