bvaradar commented on code in PR #8378:
URL: https://github.com/apache/hudi/pull/8378#discussion_r1163506777


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java:
##########
@@ -650,16 +648,19 @@ private JavaRDD<GenericRecord> 
getTransformedRDD(Dataset<Row> rowDataset, boolea
 
   /**
    * Process previous commit metadata and checkpoint configs set by user to 
determine the checkpoint to resume from.
-   * @param commitTimelineOpt commit timeline of interest.
+   *
+   * @param commitsTimelineOpt commits timeline of interest, including .commit 
and .deltacommit.
    * @return the checkpoint to resume from if applicable.
    * @throws IOException
    */
-  private Option<String> getCheckpointToResume(Option<HoodieTimeline> 
commitTimelineOpt) throws IOException {
+  private Option<String> getCheckpointToResume(Option<HoodieTimeline> 
commitsTimelineOpt) throws IOException {
     Option<String> resumeCheckpointStr = Option.empty();
-    Option<HoodieInstant> lastCommit = commitTimelineOpt.get().lastInstant();
+    // try get checkpoint from commits(including commit and deltacommit)
+    // in COW migrating to MOR case, the first batch of the deltastreamer will 
lost the checkpoint from COW table, cause the dataloss
+    Option<HoodieInstant> lastCommit = commitsTimelineOpt.get().lastInstant();

Review Comment:
   For MOR table, we need to only read .deltacommit files if there is atleast 
one .deltacommit in the timeline. Otherwise, pick the latest .commit file. 
   This is safe approach . 
   If there are no .deltacommit, then this table is either empty or just being 
converted from COW to MOR. In this case, pick the latest .commit and read 
checkpoint from there. 
   So, the pseudo-code is something like 
   
   ```
   boolean hasNoDeltaCommit = commitsTimelineOpt.filter(instant -> 
instant.action.equals(HoodieTimeline.DELTA_COMMIT_ACTION).empty()
   if (isMOR && hasNoDeltaCommit) {
   commitsTimelineOpt = commitsTimelineOpt.filter(instant -> 
!instant.action.equals(HoodieTimeline.DELTA_COMMIT_ACTION) 
   }
   /// Rest of the code
   ```
   
   Let me know if you have questions. 
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to