codope commented on code in PR #11092:
URL: https://github.com/apache/hudi/pull/11092#discussion_r1582737132
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java:
##########
@@ -61,22 +65,38 @@ protected KafkaSource(TypedProperties props,
JavaSparkContext sparkContext, Spar
@Override
protected InputBatch<T> fetchNewData(Option<String> lastCheckpointStr, long
sourceLimit) {
try {
- OffsetRange[] offsetRanges;
- if (sourceProfileSupplier.isPresent() &&
sourceProfileSupplier.get().getSourceProfile() != null) {
- SourceProfile<Long> kafkaSourceProfile =
sourceProfileSupplier.get().getSourceProfile();
- offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr,
kafkaSourceProfile.getSourceSpecificContext(),
kafkaSourceProfile.getSourcePartitions(), metrics);
- LOG.info("About to read numEvents {} of size {} bytes in {} partitions
from Kafka for topic {} with offsetRanges {}",
- kafkaSourceProfile.getSourceSpecificContext(),
kafkaSourceProfile.getMaxSourceBytes(),
- kafkaSourceProfile.getSourcePartitions(),
offsetGen.getTopicName(), offsetRanges);
- } else {
- offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr,
sourceLimit, metrics);
- }
- return toInputBatch(offsetRanges);
+ return toInputBatch(getOffsetRanges(props, sourceProfileSupplier,
offsetGen, metrics,
+ lastCheckpointStr, sourceLimit));
} catch (org.apache.kafka.common.errors.TimeoutException e) {
throw new HoodieSourceTimeoutException("Kafka Source timed out " +
e.getMessage());
}
}
+ @SuppressWarnings("unchecked")
+ public static OffsetRange[] getOffsetRanges(TypedProperties props,
+ Option<SourceProfileSupplier>
sourceProfileSupplier,
+ KafkaOffsetGen offsetGen,
+ HoodieIngestionMetrics metrics,
+ Option<String> lastCheckpointStr,
+ long sourceLimit) {
+ OffsetRange[] offsetRanges;
+ if (sourceProfileSupplier.isPresent() &&
sourceProfileSupplier.get().getSourceProfile() != null) {
+ SourceProfile<Long> kafkaSourceProfile =
sourceProfileSupplier.get().getSourceProfile();
+ offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr,
kafkaSourceProfile.getSourceSpecificContext(),
+ kafkaSourceProfile.getSourcePartitions(), metrics);
+ LOG.info("About to read maxEventsInSyncRound {} of size {} bytes in {}
partitions from Kafka for topic {} with offsetRanges {}",
+ kafkaSourceProfile.getSourceSpecificContext(),
kafkaSourceProfile.getMaxSourceBytes(),
+ kafkaSourceProfile.getSourcePartitions(), offsetGen.getTopicName(),
offsetRanges);
+ } else {
+ int minPartitions = (int) getLongWithAltKeys(props,
KafkaSourceConfig.KAFKA_SOURCE_MIN_PARTITIONS);
Review Comment:
pass it as an argument if you're already fetching `minPartitions` before
this point.
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java:
##########
@@ -61,22 +65,38 @@ protected KafkaSource(TypedProperties props,
JavaSparkContext sparkContext, Spar
@Override
protected InputBatch<T> fetchNewData(Option<String> lastCheckpointStr, long
sourceLimit) {
try {
- OffsetRange[] offsetRanges;
- if (sourceProfileSupplier.isPresent() &&
sourceProfileSupplier.get().getSourceProfile() != null) {
- SourceProfile<Long> kafkaSourceProfile =
sourceProfileSupplier.get().getSourceProfile();
- offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr,
kafkaSourceProfile.getSourceSpecificContext(),
kafkaSourceProfile.getSourcePartitions(), metrics);
- LOG.info("About to read numEvents {} of size {} bytes in {} partitions
from Kafka for topic {} with offsetRanges {}",
- kafkaSourceProfile.getSourceSpecificContext(),
kafkaSourceProfile.getMaxSourceBytes(),
- kafkaSourceProfile.getSourcePartitions(),
offsetGen.getTopicName(), offsetRanges);
- } else {
- offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr,
sourceLimit, metrics);
- }
- return toInputBatch(offsetRanges);
+ return toInputBatch(getOffsetRanges(props, sourceProfileSupplier,
offsetGen, metrics,
+ lastCheckpointStr, sourceLimit));
} catch (org.apache.kafka.common.errors.TimeoutException e) {
throw new HoodieSourceTimeoutException("Kafka Source timed out " +
e.getMessage());
}
}
+ @SuppressWarnings("unchecked")
+ public static OffsetRange[] getOffsetRanges(TypedProperties props,
Review Comment:
are both if and else branches of this method covered by the unit tests?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]