waitingF commented on code in PR #8378:
URL: https://github.com/apache/hudi/pull/8378#discussion_r1163732085
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java:
##########
@@ -650,16 +648,19 @@ private JavaRDD<GenericRecord>
getTransformedRDD(Dataset<Row> rowDataset, boolea
/**
* Process previous commit metadata and checkpoint configs set by user to
determine the checkpoint to resume from.
- * @param commitTimelineOpt commit timeline of interest.
+ *
+ * @param commitsTimelineOpt commits timeline of interest, including .commit
and .deltacommit.
* @return the checkpoint to resume from if applicable.
* @throws IOException
*/
- private Option<String> getCheckpointToResume(Option<HoodieTimeline>
commitTimelineOpt) throws IOException {
+ private Option<String> getCheckpointToResume(Option<HoodieTimeline>
commitsTimelineOpt) throws IOException {
Option<String> resumeCheckpointStr = Option.empty();
- Option<HoodieInstant> lastCommit = commitTimelineOpt.get().lastInstant();
+ // try get checkpoint from commits(including commit and deltacommit)
+ // in COW migrating to MOR case, the first batch of the deltastreamer will
lost the checkpoint from COW table, cause the dataloss
+ Option<HoodieInstant> lastCommit = commitsTimelineOpt.get().lastInstant();
Review Comment:
> If there are no .deltacommit, then this table is either empty or just
being converted from COW to MOR. In this case, pick the latest .commit and read
checkpoint from there.
yeah, but the case of deltacommit existing should be treated specially
If has deltacommit, should pick the lastest deltacommit just as MOR do, so
the code can be like this
```
HoodieTimeline deltaCommitTimeline =
commitsTimelineOpt.get().filter(instant ->
instant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION));
if (!deltaCommitTimeline.empty()) {
commitsTimelineOpt = Option.of(deltaCommitTimeline);
}
Option<HoodieInstant> lastCommit =
commitsTimelineOpt.get().lastInstant();
```
If no deltacommit, it's safe to get the latest instant from the
commitsTimeline which will return the latest .commit no matter new table or MOR
just converted from COW or COW table. In this case we can just do this
```
Option<HoodieInstant> lastCommit =
commitsTimelineOpt.get().lastInstant();
```
right?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]