pkgajulapalli commented on code in PR #11092:
URL: https://github.com/apache/hudi/pull/11092#discussion_r1582751312
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java:
##########
@@ -61,22 +65,38 @@ protected KafkaSource(TypedProperties props,
JavaSparkContext sparkContext, Spar
@Override
protected InputBatch<T> fetchNewData(Option<String> lastCheckpointStr, long
sourceLimit) {
try {
- OffsetRange[] offsetRanges;
- if (sourceProfileSupplier.isPresent() &&
sourceProfileSupplier.get().getSourceProfile() != null) {
- SourceProfile<Long> kafkaSourceProfile =
sourceProfileSupplier.get().getSourceProfile();
- offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr,
kafkaSourceProfile.getSourceSpecificContext(),
kafkaSourceProfile.getSourcePartitions(), metrics);
- LOG.info("About to read numEvents {} of size {} bytes in {} partitions
from Kafka for topic {} with offsetRanges {}",
- kafkaSourceProfile.getSourceSpecificContext(),
kafkaSourceProfile.getMaxSourceBytes(),
- kafkaSourceProfile.getSourcePartitions(),
offsetGen.getTopicName(), offsetRanges);
- } else {
- offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr,
sourceLimit, metrics);
- }
- return toInputBatch(offsetRanges);
+ return toInputBatch(getOffsetRanges(props, sourceProfileSupplier,
offsetGen, metrics,
+ lastCheckpointStr, sourceLimit));
} catch (org.apache.kafka.common.errors.TimeoutException e) {
throw new HoodieSourceTimeoutException("Kafka Source timed out " +
e.getMessage());
}
}
+ @SuppressWarnings("unchecked")
+ public static OffsetRange[] getOffsetRanges(TypedProperties props,
Review Comment:
Yes, i just extracted it as a static method.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]