This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch branch-0.x in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 2c5af864cd5b4dfca00b21d1d542e634bbff081d Author: Praveen Gajulapalli <[email protected]> AuthorDate: Mon Apr 29 18:59:48 2024 +0530 [HUDI-7667] Created util method to get offset range for fetching new data (#11092) Created util method to get offsetRanges while fetching new data. Same util method can be used in any Source to get offsetRanges via SourceProfile. This will help in improving the estimation of offset ranges to read data from kafka. --- .../apache/hudi/utilities/sources/KafkaSource.java | 42 ++++++++++++++++------ 1 file changed, 31 insertions(+), 11 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java index 3dc7fe69a0d..99af1ab0086 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java @@ -27,6 +27,7 @@ import org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen; import org.apache.hudi.utilities.streamer.SourceProfile; +import org.apache.hudi.utilities.streamer.SourceProfileSupplier; import org.apache.hudi.utilities.streamer.StreamContext; import org.apache.spark.api.java.JavaSparkContext; @@ -35,7 +36,10 @@ import org.apache.spark.streaming.kafka010.OffsetRange; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Arrays; + import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys; +import static org.apache.hudi.common.util.ConfigUtils.getLongWithAltKeys; public abstract class KafkaSource<T> extends Source<T> { private static final Logger LOG = LoggerFactory.getLogger(KafkaSource.class); @@ -61,22 +65,38 @@ public abstract class KafkaSource<T> extends Source<T> { @Override protected InputBatch<T> fetchNewData(Option<String> lastCheckpointStr, long sourceLimit) { try { - OffsetRange[] offsetRanges; - if (sourceProfileSupplier.isPresent() && sourceProfileSupplier.get().getSourceProfile() != null) { - SourceProfile<Long> kafkaSourceProfile = sourceProfileSupplier.get().getSourceProfile(); - offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr, kafkaSourceProfile.getSourceSpecificContext(), kafkaSourceProfile.getSourcePartitions(), metrics); - LOG.info("About to read numEvents {} of size {} bytes in {} partitions from Kafka for topic {} with offsetRanges {}", - kafkaSourceProfile.getSourceSpecificContext(), kafkaSourceProfile.getMaxSourceBytes(), - kafkaSourceProfile.getSourcePartitions(), offsetGen.getTopicName(), offsetRanges); - } else { - offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr, sourceLimit, metrics); - } - return toInputBatch(offsetRanges); + return toInputBatch(getOffsetRanges(props, sourceProfileSupplier, offsetGen, metrics, + lastCheckpointStr, sourceLimit)); } catch (org.apache.kafka.common.errors.TimeoutException e) { throw new HoodieSourceTimeoutException("Kafka Source timed out " + e.getMessage()); } } + @SuppressWarnings("unchecked") + public static OffsetRange[] getOffsetRanges(TypedProperties props, + Option<SourceProfileSupplier> sourceProfileSupplier, + KafkaOffsetGen offsetGen, + HoodieIngestionMetrics metrics, + Option<String> lastCheckpointStr, + long sourceLimit) { + OffsetRange[] offsetRanges; + if (sourceProfileSupplier.isPresent() && sourceProfileSupplier.get().getSourceProfile() != null) { + SourceProfile<Long> kafkaSourceProfile = sourceProfileSupplier.get().getSourceProfile(); + offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr, kafkaSourceProfile.getSourceSpecificContext(), + kafkaSourceProfile.getSourcePartitions(), metrics); + LOG.info("About to read maxEventsInSyncRound {} of size {} bytes in {} partitions from Kafka for topic {} with offsetRanges {}", + kafkaSourceProfile.getSourceSpecificContext(), kafkaSourceProfile.getMaxSourceBytes(), + kafkaSourceProfile.getSourcePartitions(), offsetGen.getTopicName(), offsetRanges); + } else { + long minPartitions = getLongWithAltKeys(props, KafkaSourceConfig.KAFKA_SOURCE_MIN_PARTITIONS); + offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr, sourceLimit, metrics); + LOG.info("About to read sourceLimit {} in {} spark partitions from kafka for topic {} with offset ranges {}", + sourceLimit, minPartitions, offsetGen.getTopicName(), + Arrays.toString(offsetRanges)); + } + return offsetRanges; + } + private InputBatch<T> toInputBatch(OffsetRange[] offsetRanges) { long totalNewMsgs = KafkaOffsetGen.CheckpointUtils.totalNewMessages(offsetRanges); LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + offsetGen.getTopicName());
