codope commented on a change in pull request #4485:
URL: https://github.com/apache/hudi/pull/4485#discussion_r779317011
##########
File path:
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
##########
@@ -471,6 +468,22 @@ public void refreshTimeline() throws IOException {
}).filter(Option::isPresent).findFirst().orElse(Option.empty());
}
+ protected Option<HoodieCommitMetadata>
getLatestCommitMetadataWithValidCheckpointInfo(HoodieTimeline timeline) throws
IOException {
+ return (Option<HoodieCommitMetadata>)
timeline.getReverseOrderedInstants().map(instant -> {
+ try {
+ HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
+ .fromBytes(timeline.getInstantDetails(instant).get(),
HoodieCommitMetadata.class);
+ if (commitMetadata.getMetadata(CHECKPOINT_KEY) != null ||
commitMetadata.getMetadata(CHECKPOINT_RESET_KEY) != null) {
Review comment:
Should we check for empty string as well?
##########
File path:
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
##########
@@ -471,6 +468,22 @@ public void refreshTimeline() throws IOException {
}).filter(Option::isPresent).findFirst().orElse(Option.empty());
}
+ protected Option<HoodieCommitMetadata>
getLatestCommitMetadataWithValidCheckpointInfo(HoodieTimeline timeline) throws
IOException {
Review comment:
Why can't we directly return the checkpoint optional instead of
HoodieCommitMetadata?
##########
File path:
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
##########
@@ -330,31 +330,28 @@ public void refreshTimeline() throws IOException {
if (commitTimelineOpt.isPresent()) {
Option<HoodieInstant> lastCommit = commitTimelineOpt.get().lastInstant();
if (lastCommit.isPresent()) {
- HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
-
.fromBytes(commitTimelineOpt.get().getInstantDetails(lastCommit.get()).get(),
HoodieCommitMetadata.class);
- if (cfg.checkpoint != null &&
(StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))
- ||
!cfg.checkpoint.equals(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY)))) {
- resumeCheckpointStr = Option.of(cfg.checkpoint);
- } else if
(!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_KEY))) {
- //if previous checkpoint is an empty string, skip resume use
Option.empty()
- resumeCheckpointStr =
Option.of(commitMetadata.getMetadata(CHECKPOINT_KEY));
- } else if
(HoodieTimeline.compareTimestamps(HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS,
- HoodieTimeline.LESSER_THAN, lastCommit.get().getTimestamp())) {
- // if previous commit metadata did not have the checkpoint key, try
traversing previous commits until we find one.
- Option<String> prevCheckpoint =
getPreviousCheckpoint(commitTimelineOpt.get());
- if (prevCheckpoint.isPresent()) {
- resumeCheckpointStr = prevCheckpoint;
- } else {
+ // if previous commit metadata did not have the checkpoint key, try
traversing previous commits until we find one.
+ Option<HoodieCommitMetadata> commitMetadataOption =
getLatestCommitMetadataWithValidCheckpointInfo(commitTimelineOpt.get());
+ if (commitMetadataOption.isPresent()) {
+ HoodieCommitMetadata commitMetadata = commitMetadataOption.get();
+ if (cfg.checkpoint != null &&
(StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))
+ ||
!cfg.checkpoint.equals(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY)))) {
+ resumeCheckpointStr = Option.of(cfg.checkpoint);
+ } else if
(!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_KEY))) {
+ //if previous checkpoint is an empty string, skip resume use
Option.empty()
+ resumeCheckpointStr =
Option.of(commitMetadata.getMetadata(CHECKPOINT_KEY));
+ } else if
(HoodieTimeline.compareTimestamps(HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS,
+ HoodieTimeline.LESSER_THAN, lastCommit.get().getTimestamp())) {
throw new HoodieDeltaStreamerException(
"Unable to find previous checkpoint. Please double check if
this table "
+ "was indeed built via delta streamer. Last Commit :" +
lastCommit + ", Instants :"
+
commitTimelineOpt.get().getInstants().collect(Collectors.toList()) + ",
CommitMetadata="
+ commitMetadata.toJsonString());
}
- }
- // KAFKA_CHECKPOINT_TYPE will be honored only for first batch.
- if
(!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))) {
- props.remove(KafkaOffsetGen.Config.KAFKA_CHECKPOINT_TYPE.key());
+ // KAFKA_CHECKPOINT_TYPE will be honored only for first batch.
+ if
(!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))) {
+ props.remove(KafkaOffsetGen.Config.KAFKA_CHECKPOINT_TYPE.key());
+ }
}
}
Review comment:
Can we extract this whole logic of getting resume checkpoint to a single
method for better readability?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]