nsivabalan commented on a change in pull request #2438:
URL: https://github.com/apache/hudi/pull/2438#discussion_r657945070



##########
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
##########
@@ -312,13 +313,13 @@ public void refreshTimeline() throws IOException {
       if (lastCommit.isPresent()) {
         HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
             
.fromBytes(commitTimelineOpt.get().getInstantDetails(lastCommit.get()).get(), 
HoodieCommitMetadata.class);
-        if (cfg.checkpoint != null && 
!cfg.checkpoint.equals(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))) {
-          resumeCheckpointStr = Option.of(cfg.checkpoint);
-        } else if (commitMetadata.getMetadata(CHECKPOINT_KEY) != null) {
-          //if previous checkpoint is an empty string, skip resume use 
Option.empty()
-          if (!commitMetadata.getMetadata(CHECKPOINT_KEY).isEmpty()) {
-            resumeCheckpointStr = 
Option.of(commitMetadata.getMetadata(CHECKPOINT_KEY));
+        if (cfg.checkpoint != null) {

Review comment:
       we could club both these within single if condition. 
   ```
   if (cfg.checkpoint != null && 
(StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))
                     || 
!cfg.checkpoint.equals(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))) {
               resumeCheckpointStr = Option.of(cfg.checkpoint);
   }

##########
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
##########
@@ -330,6 +331,9 @@ public void refreshTimeline() throws IOException {
                   + 
commitTimelineOpt.get().getInstants().collect(Collectors.toList()) + ", 
CommitMetadata="
                   + commitMetadata.toJsonString());
         }
+        if 
(!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))) {
+          props.put("hoodie.deltastreamer.source.kafka.checkpoint.type", 
"string");

Review comment:
       actually better thing to do here is to remove the entry from props. wdyt?

##########
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
##########
@@ -330,6 +331,9 @@ public void refreshTimeline() throws IOException {
                   + 
commitTimelineOpt.get().getInstants().collect(Collectors.toList()) + ", 
CommitMetadata="
                   + commitMetadata.toJsonString());
         }
+        if 
(!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))) {
+          props.put("hoodie.deltastreamer.source.kafka.checkpoint.type", 
"string");

Review comment:
       rather than hardcoding the config here, can we use variable please.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to