This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new fef14db3d9b [HUDI-7667] Created util method to get offset range for
fetching new data (#11092)
fef14db3d9b is described below
commit fef14db3d9b3ae4d8aac793951c6f9353e493372
Author: Praveen Gajulapalli <[email protected]>
AuthorDate: Mon Apr 29 18:59:48 2024 +0530
[HUDI-7667] Created util method to get offset range for fetching new data
(#11092)
Created util method to get offsetRanges while fetching new data.
Same util method can be used in any Source to get offsetRanges via
SourceProfile.
This will help in improving the estimation of offset ranges to read data
from kafka.
---
.../apache/hudi/utilities/sources/KafkaSource.java | 42 ++++++++++++++++------
1 file changed, 31 insertions(+), 11 deletions(-)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java
index 3dc7fe69a0d..99af1ab0086 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java
@@ -27,6 +27,7 @@ import
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
import org.apache.hudi.utilities.streamer.SourceProfile;
+import org.apache.hudi.utilities.streamer.SourceProfileSupplier;
import org.apache.hudi.utilities.streamer.StreamContext;
import org.apache.spark.api.java.JavaSparkContext;
@@ -35,7 +36,10 @@ import org.apache.spark.streaming.kafka010.OffsetRange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Arrays;
+
import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
+import static org.apache.hudi.common.util.ConfigUtils.getLongWithAltKeys;
public abstract class KafkaSource<T> extends Source<T> {
private static final Logger LOG = LoggerFactory.getLogger(KafkaSource.class);
@@ -61,22 +65,38 @@ public abstract class KafkaSource<T> extends Source<T> {
@Override
protected InputBatch<T> fetchNewData(Option<String> lastCheckpointStr, long
sourceLimit) {
try {
- OffsetRange[] offsetRanges;
- if (sourceProfileSupplier.isPresent() &&
sourceProfileSupplier.get().getSourceProfile() != null) {
- SourceProfile<Long> kafkaSourceProfile =
sourceProfileSupplier.get().getSourceProfile();
- offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr,
kafkaSourceProfile.getSourceSpecificContext(),
kafkaSourceProfile.getSourcePartitions(), metrics);
- LOG.info("About to read numEvents {} of size {} bytes in {} partitions
from Kafka for topic {} with offsetRanges {}",
- kafkaSourceProfile.getSourceSpecificContext(),
kafkaSourceProfile.getMaxSourceBytes(),
- kafkaSourceProfile.getSourcePartitions(),
offsetGen.getTopicName(), offsetRanges);
- } else {
- offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr,
sourceLimit, metrics);
- }
- return toInputBatch(offsetRanges);
+ return toInputBatch(getOffsetRanges(props, sourceProfileSupplier,
offsetGen, metrics,
+ lastCheckpointStr, sourceLimit));
} catch (org.apache.kafka.common.errors.TimeoutException e) {
throw new HoodieSourceTimeoutException("Kafka Source timed out " +
e.getMessage());
}
}
+ @SuppressWarnings("unchecked")
+ public static OffsetRange[] getOffsetRanges(TypedProperties props,
+ Option<SourceProfileSupplier>
sourceProfileSupplier,
+ KafkaOffsetGen offsetGen,
+ HoodieIngestionMetrics metrics,
+ Option<String> lastCheckpointStr,
+ long sourceLimit) {
+ OffsetRange[] offsetRanges;
+ if (sourceProfileSupplier.isPresent() &&
sourceProfileSupplier.get().getSourceProfile() != null) {
+ SourceProfile<Long> kafkaSourceProfile =
sourceProfileSupplier.get().getSourceProfile();
+ offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr,
kafkaSourceProfile.getSourceSpecificContext(),
+ kafkaSourceProfile.getSourcePartitions(), metrics);
+ LOG.info("About to read maxEventsInSyncRound {} of size {} bytes in {}
partitions from Kafka for topic {} with offsetRanges {}",
+ kafkaSourceProfile.getSourceSpecificContext(),
kafkaSourceProfile.getMaxSourceBytes(),
+ kafkaSourceProfile.getSourcePartitions(), offsetGen.getTopicName(),
offsetRanges);
+ } else {
+ long minPartitions = getLongWithAltKeys(props,
KafkaSourceConfig.KAFKA_SOURCE_MIN_PARTITIONS);
+ offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr,
sourceLimit, metrics);
+ LOG.info("About to read sourceLimit {} in {} spark partitions from kafka
for topic {} with offset ranges {}",
+ sourceLimit, minPartitions, offsetGen.getTopicName(),
+ Arrays.toString(offsetRanges));
+ }
+ return offsetRanges;
+ }
+
private InputBatch<T> toInputBatch(OffsetRange[] offsetRanges) {
long totalNewMsgs =
KafkaOffsetGen.CheckpointUtils.totalNewMessages(offsetRanges);
LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" +
offsetGen.getTopicName());