Davis-Zhang-Onehouse commented on code in PR #12718:
URL: https://github.com/apache/hudi/pull/12718#discussion_r1932737435


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamerCheckpointUtils.java:
##########
@@ -44,44 +45,86 @@
 
 import java.io.IOException;
 
+import static 
org.apache.hudi.common.table.checkpoint.CheckpointUtils.buildCheckpointFromConfigOverride;
 import static 
org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2.STREAMER_CHECKPOINT_KEY_V2;
 import static 
org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2.STREAMER_CHECKPOINT_RESET_KEY_V2;
 import static 
org.apache.hudi.common.table.timeline.InstantComparison.LESSER_THAN;
 import static 
org.apache.hudi.common.table.timeline.InstantComparison.compareTimestamps;
 import static org.apache.hudi.common.util.ConfigUtils.removeConfigFromProps;
+import static 
org.apache.hudi.table.upgrade.UpgradeDowngrade.needsUpgradeOrDowngrade;
 
 public class StreamerCheckpointUtils {
   private static final Logger LOG = 
LoggerFactory.getLogger(StreamerCheckpointUtils.class);
 
-  public static Option<Checkpoint> 
getCheckpointToResumeFrom(Option<HoodieTimeline> commitsTimelineOpt,
-                                                             
HoodieStreamer.Config streamerConfig,
-                                                             TypedProperties 
props) throws IOException {
+  /**
+   * The first phase of checkpoint resolution - read the checkpoint configs 
from 2 sources and resolve
+   * conflicts:
+   * <ul>
+   *   <li>commit metadata from the last completed instant, which can contain 
what is the last checkpoint
+   *       from the previous streamer ingestion.</li>
+   *   <li>user checkpoint overrides specified in the writer config {@code 
streamerConfig}. Users might want to
+   *       forcefully set the checkpoint to an arbitrary position or start 
from the very beginning.</li>
+   * </ul>
+   * The 2 sources can have conflicts, and we need to decide which config 
should prevail.
+   * <p>
+   * For the second phase of checkpoint resolution please refer
+   * {@link org.apache.hudi.utilities.sources.Source#translateCheckpoint} and 
child class overrides of this
+   * method.
+   */
+  public static Option<Checkpoint> 
resolveWhatCheckpointToResumeFrom(Option<HoodieTimeline> commitsTimelineOpt,
+                                                                     
HoodieStreamer.Config streamerConfig,
+                                                                     
TypedProperties props,
+                                                                     
HoodieTableMetaClient metaClient) throws IOException {
     Option<Checkpoint> checkpoint = Option.empty();
+    assertNoCheckpointOverrideDuringUpgrade(metaClient, streamerConfig, props);
+    // If we have both streamer config and commits specifying what checkpoint 
to use, go with the
+    // checkpoint resolution logic to resolve conflicting configurations.
     if (commitsTimelineOpt.isPresent()) {
-      checkpoint = getCheckpointToResumeString(commitsTimelineOpt.get(), 
streamerConfig, props);
+      checkpoint = 
resolveCheckpointBetweenConfigAndPrevCommit(commitsTimelineOpt.get(), 
streamerConfig, props);
     }
+    // If there is only streamer config, extract the checkpoint directly.
+    checkpoint = useCkpFromOverrideConfigIfAny(streamerConfig, props, 
checkpoint);
+    return checkpoint;
+  }
 
+  @VisibleForTesting
+  static void assertNoCheckpointOverrideDuringUpgrade(HoodieTableMetaClient 
metaClient, HoodieStreamer.Config streamerConfig, TypedProperties props) {
+    if (!StringUtils.isNullOrEmpty(streamerConfig.checkpoint)
+        || !StringUtils.isNullOrEmpty(streamerConfig.ignoreCheckpoint)) {
+      HoodieTableVersion writeTableVersion = 
HoodieTableVersion.fromVersionCode(ConfigUtils.getIntWithAltKeys(props, 
HoodieWriteConfig.WRITE_TABLE_VERSION));
+      HoodieWriteConfig config = 
HoodieWriteConfig.newBuilder().withPath(streamerConfig.targetBasePath).withProps(props).build();
+      if (config.autoUpgrade() && needsUpgradeOrDowngrade(metaClient, config, 
writeTableVersion)) {
+        throw new HoodieUpgradeDowngradeException(
+            String.format("When upgrade/downgrade is happening, please avoid 
setting --checkpoint option and --ignore-checkpoint for your delta streamers."
+                + " Detected invalid streamer configuration:\n%s", 
streamerConfig));
+      }
+    }
+  }
+
+  private static Option<Checkpoint> useCkpFromOverrideConfigIfAny(

Review Comment:
   I will leave it to future follow up



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamerCheckpointUtils.java:
##########
@@ -126,13 +167,26 @@ static Option<Checkpoint> 
getCheckpointToResumeString(HoodieTimeline commitsTime
         }
       } else if (streamerConfig.checkpoint != null) {
         // 
getLatestCommitMetadataWithValidCheckpointInfo(commitTimelineOpt.get()) will 
never return a commit metadata w/o any checkpoint key set.
-        resumeCheckpoint = 
Option.of(CheckpointUtils.shouldTargetCheckpointV2(writeTableVersion, 
streamerConfig.sourceClassName)
-            ? new StreamerCheckpointV2(streamerConfig.checkpoint) : new 
StreamerCheckpointV1(streamerConfig.checkpoint));
+        resumeCheckpoint = 
Option.of(buildCheckpointFromConfigOverride(streamerConfig.sourceClassName, 
writeTableVersion, streamerConfig.checkpoint));
       }
     }
     return resumeCheckpoint;
   }
 
+  private static boolean shouldUseCkpFromPrevCommit(Checkpoint 
checkpointFromCommit) {
+    return !StringUtils.isNullOrEmpty(checkpointFromCommit.getCheckpointKey());
+  }
+
+  private static boolean 
ckpOverrideCfgPrevailsOverCkpFromPrevCommit(HoodieStreamer.Config 
streamerConfig, Checkpoint checkpointFromCommit) {

Review Comment:
   I will leave it to future follow up



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to