nsivabalan commented on code in PR #12718:
URL: https://github.com/apache/hudi/pull/12718#discussion_r1932681358


##########
hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/CheckpointUtils.java:
##########
@@ -61,6 +61,20 @@ public static Checkpoint getCheckpoint(HoodieCommitMetadata 
commitMetadata) {
     throw new HoodieException("Checkpoint is not found in the commit metadata: 
" + commitMetadata.getExtraMetadata());
   }
 
+  public static Checkpoint buildCheckpointFromGeneralSource(
+      String sourceClassName, int writeTableVersion, String 
checkpointToResume) {
+    return CheckpointUtils.shouldTargetCheckpointV2(writeTableVersion, 
sourceClassName)
+        ? new StreamerCheckpointV2(checkpointToResume) : new 
StreamerCheckpointV1(checkpointToResume);
+  }
+
+  // Whenever we create checkpoint from streamer config checkpoint override, 
we should use this function
+  // to build checkpoints.
+  public static Checkpoint buildCheckpointFromConfigOverride(

Review Comment:
   same here



##########
hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/HoodieIncrSourceCheckpointValUtils.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.checkpoint;
+
+/**
+ * Utility class providing methods to check if a string starts with specific 
resume-related prefixes.
+ */
+public class HoodieIncrSourceCheckpointValUtils {
+  public static final String RESET_CHECKPOINT_V2_SEPARATOR = ":";
+  public static final String REQUEST_TIME_PREFIX = 
"resumeFromInstantRequestTime";
+  public static final String COMPLETION_TIME_PREFIX = 
"resumeFromInstantCompletionTime";
+
+  /**
+   * For hoodie incremental source ingestion, if the target table is version 8 
or higher, the checkpoint
+   * key set by streamer config can be in either of the following format:
+   * - resumeFromInstantRequestTime:[checkpoint value based on request time]
+   * - resumeFromInstantCompletionTime:[checkpoint value based on completion 
time]
+   *
+   * StreamerCheckpointV2FromCfgCkp class itself captured the fact that this 
is version 8 and higher, plus
+   * the checkpoint source is from streamer config override.
+   *
+   * When the checkpoint is consumed by individual data sources, we need to 
convert them to either vanilla
+   * checkpoint v1 (request time based) or checkpoint v2 (completion time 
based).
+   */
+  public static Checkpoint 
resolveToV1V2Checkpoint(StreamerCheckpointFromCfgCkp checkpoint) {

Review Comment:
   minor. resolveToActualCheckpointVersion



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java:
##########
@@ -128,6 +139,17 @@ protected Option<Checkpoint> 
translateCheckpoint(Option<Checkpoint> lastCheckpoi
     throw new UnsupportedOperationException("Unsupported checkpoint type: " + 
lastCheckpoint.get());
   }
 
+  public void assertCheckpointVersion(Checkpoint checkpoint) {
+    if (checkpoint != null) {
+      if (shouldTargetCheckpointV2(writeTableVersion, getClass().getName()) && 
!(checkpoint instanceof StreamerCheckpointV2)) {
+        throw new IllegalStateException("Data source target checkpoint v2 
(completion time based) should always return checkpoint v2. Setup " + props);

Review Comment:
   props might contain 100+ entries. 
   can you print the checkpoint instead. 
   we could also print the checkpoint to resume from 



##########
hudi-common/src/test/java/org/apache/hudi/common/table/checkpoint/TestHoodieIncrSourceCheckpointValUtils.java:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.checkpoint;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestHoodieIncrSourceCheckpointValUtils {
+
+  @Test
+  public void testResolveToV1V2CheckpointWithRequestTime() {
+    StreamerCheckpointFromCfgCkp mockCheckpoint = 
mock(StreamerCheckpointFromCfgCkp.class);
+    
when(mockCheckpoint.getCheckpointKey()).thenReturn("resumeFromInstantRequestTime:20240301");
+
+    Checkpoint result = 
HoodieIncrSourceCheckpointValUtils.resolveToV1V2Checkpoint(mockCheckpoint);
+
+    assertTrue(result instanceof StreamerCheckpointV1);
+    assertEquals("20240301", result.getCheckpointKey());
+  }
+
+  @Test
+  public void testResolveToV1V2CheckpointWithCompletionTime() {
+    StreamerCheckpointFromCfgCkp mockCheckpoint = 
mock(StreamerCheckpointFromCfgCkp.class);
+    
when(mockCheckpoint.getCheckpointKey()).thenReturn("resumeFromInstantCompletionTime:20240302");

Review Comment:
   can you add constants for "20240302" and re-use it. 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/CheckpointUtils.java:
##########
@@ -61,6 +61,20 @@ public static Checkpoint getCheckpoint(HoodieCommitMetadata 
commitMetadata) {
     throw new HoodieException("Checkpoint is not found in the commit metadata: 
" + commitMetadata.getExtraMetadata());
   }
 
+  public static Checkpoint buildCheckpointFromGeneralSource(

Review Comment:
   lets be cautious about access specifiers. 
   can you add VisibleForTesting annotation if need be. 
   but avoid making public out of the box



##########
hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/StreamerCheckpointFromCfgCkp.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.checkpoint;
+
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+
+/**
+ * A special checkpoint v2 class that indicates its checkpoint key comes from 
streamer config checkpoint
+ * overrides.
+ *
+ * For hoodie incremental source, based on the content of the checkpoint 
override value, it can indicate
+ * either request time based checkpoint or completion time based. So the class 
serves as an indicator to
+ * data sources of interest that it needs to be further parsed and resolved to 
either checkpoint v1 or v2.
+ *
+ * For all the other data sources, it behaves exactly the same as checkpoint 
v2.
+ *
+ * To keep the checkpoint class design ignorant of which data source it 
serves, the class only indicates where
+ * the checkpoint key comes from.
+ * */
+public class StreamerCheckpointFromCfgCkp extends StreamerCheckpointV2 {

Review Comment:
   minor. 
   We could name this UnresolvedStreamerCheckpointBasedOnCfg



##########
hudi-common/src/test/java/org/apache/hudi/common/table/checkpoint/TestCheckpointUtils.java:
##########
@@ -207,7 +212,49 @@ public void testConvertCheckpointWithUseTransitionTime() {
       "8, org.apache.hudi.utilities.sources.MockS3EventsHoodieIncrSource, 
false",
       "8, org.apache.hudi.utilities.sources.MockGcsEventsHoodieIncrSource, 
false"
   })
-  public void testTargetCheckpointV2(int version, String sourceClassName, 
boolean expected) {
-    assertEquals(expected, CheckpointUtils.shouldTargetCheckpointV2(version, 
sourceClassName));
+  public void testTargetCheckpointV2(int version, String sourceClassName, 
boolean isV2Checkpoint) {
+    assertEquals(isV2Checkpoint, 
CheckpointUtils.buildCheckpointFromGeneralSource(sourceClassName, version, 
"ignored") instanceof StreamerCheckpointV2);
+  }
+
+  @Test
+  public void testBuildCheckpointFromGeneralSource() {
+    // Test V2 checkpoint creation (newer table version + general source)
+    Checkpoint checkpoint1 = CheckpointUtils.buildCheckpointFromGeneralSource(
+        GENERAL_SOURCE,
+        HoodieTableVersion.EIGHT.versionCode(),
+        CHECKPOINT_TO_RESUME
+    );
+    assertInstanceOf(StreamerCheckpointV2.class, checkpoint1);
+    assertEquals(CHECKPOINT_TO_RESUME, checkpoint1.getCheckpointKey());
+
+    // Test V1 checkpoint creation (older table version)
+    Checkpoint checkpoint2 = CheckpointUtils.buildCheckpointFromGeneralSource(
+        GENERAL_SOURCE,
+        HoodieTableVersion.SEVEN.versionCode(),
+        CHECKPOINT_TO_RESUME
+    );
+    assertInstanceOf(StreamerCheckpointV1.class, checkpoint2);
+    assertEquals(CHECKPOINT_TO_RESUME, checkpoint2.getCheckpointKey());
+  }
+
+  @Test
+  public void testBuildCheckpointFromConfigOverride() {
+    // Test checkpoint from config creation (newer table version + general 
source)
+    Checkpoint checkpoint1 = CheckpointUtils.buildCheckpointFromConfigOverride(

Review Comment:
   where do we have coverage for HoodieIncrSource



##########
hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/HoodieIncrSourceCheckpointValUtils.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.checkpoint;
+
+/**
+ * Utility class providing methods to check if a string starts with specific 
resume-related prefixes.
+ */
+public class HoodieIncrSourceCheckpointValUtils {
+  public static final String RESET_CHECKPOINT_V2_SEPARATOR = ":";
+  public static final String REQUEST_TIME_PREFIX = 
"resumeFromInstantRequestTime";
+  public static final String COMPLETION_TIME_PREFIX = 
"resumeFromInstantCompletionTime";
+
+  /**
+   * For hoodie incremental source ingestion, if the target table is version 8 
or higher, the checkpoint
+   * key set by streamer config can be in either of the following format:
+   * - resumeFromInstantRequestTime:[checkpoint value based on request time]
+   * - resumeFromInstantCompletionTime:[checkpoint value based on completion 
time]
+   *
+   * StreamerCheckpointV2FromCfgCkp class itself captured the fact that this 
is version 8 and higher, plus
+   * the checkpoint source is from streamer config override.
+   *
+   * When the checkpoint is consumed by individual data sources, we need to 
convert them to either vanilla
+   * checkpoint v1 (request time based) or checkpoint v2 (completion time 
based).
+   */
+  public static Checkpoint 
resolveToV1V2Checkpoint(StreamerCheckpointFromCfgCkp checkpoint) {
+    String[] parts = extractKeyValues(checkpoint);
+    switch (parts[0]) {
+      case REQUEST_TIME_PREFIX: {
+        return new StreamerCheckpointV1(checkpoint).setCheckpointKey(parts[1]);
+      }
+      case COMPLETION_TIME_PREFIX: {
+        return new StreamerCheckpointV2(checkpoint).setCheckpointKey(parts[1]);
+      }
+      default:
+        throw new IllegalArgumentException("Unknown event ordering mode " + 
parts[0]);
+    }
+  }
+
+  private static String [] extractKeyValues(StreamerCheckpointFromCfgCkp 
checkpoint) {
+    String checkpointKey = checkpoint.getCheckpointKey();
+    String[] parts = checkpointKey.split(RESET_CHECKPOINT_V2_SEPARATOR);
+    if (parts.length != 2
+        || (
+          !parts[0].trim().equals(REQUEST_TIME_PREFIX)
+          && !parts[0].trim().equals(COMPLETION_TIME_PREFIX)
+        )) {
+      throw new IllegalArgumentException(
+          "Illegal checkpoint key override `" + checkpointKey + "`. Valid 
format is either `resumeFromInstantRequestTime:<checkpoint value>` or "
+          + "`resumeFromInstantCompletionTime:<checkpoint value>`.");
+    }
+    parts[0] = parts[0].trim();
+    parts[1] = parts[1].trim();
+    return parts;
+  }
+
+  /**
+   * Immutable class to hold the parts of a resume string.
+   */
+  public static class CheckpointWithMode {

Review Comment:
   looks like never used. can we remove this



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamerCheckpointUtils.java:
##########
@@ -44,44 +45,86 @@
 
 import java.io.IOException;
 
+import static 
org.apache.hudi.common.table.checkpoint.CheckpointUtils.buildCheckpointFromConfigOverride;
 import static 
org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2.STREAMER_CHECKPOINT_KEY_V2;
 import static 
org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2.STREAMER_CHECKPOINT_RESET_KEY_V2;
 import static 
org.apache.hudi.common.table.timeline.InstantComparison.LESSER_THAN;
 import static 
org.apache.hudi.common.table.timeline.InstantComparison.compareTimestamps;
 import static org.apache.hudi.common.util.ConfigUtils.removeConfigFromProps;
+import static 
org.apache.hudi.table.upgrade.UpgradeDowngrade.needsUpgradeOrDowngrade;
 
 public class StreamerCheckpointUtils {
   private static final Logger LOG = 
LoggerFactory.getLogger(StreamerCheckpointUtils.class);
 
-  public static Option<Checkpoint> 
getCheckpointToResumeFrom(Option<HoodieTimeline> commitsTimelineOpt,
-                                                             
HoodieStreamer.Config streamerConfig,
-                                                             TypedProperties 
props) throws IOException {
+  /**
+   * The first phase of checkpoint resolution - read the checkpoint configs 
from 2 sources and resolve
+   * conflicts:
+   * <ul>
+   *   <li>commit metadata from the last completed instant, which can contain 
what is the last checkpoint
+   *       from the previous streamer ingestion.</li>
+   *   <li>user checkpoint overrides specified in the writer config {@code 
streamerConfig}. Users might want to
+   *       forcefully set the checkpoint to an arbitrary position or start 
from the very beginning.</li>
+   * </ul>
+   * The 2 sources can have conflicts, and we need to decide which config 
should prevail.
+   * <p>
+   * For the second phase of checkpoint resolution please refer
+   * {@link org.apache.hudi.utilities.sources.Source#translateCheckpoint} and 
child class overrides of this
+   * method.
+   */
+  public static Option<Checkpoint> 
resolveWhatCheckpointToResumeFrom(Option<HoodieTimeline> commitsTimelineOpt,
+                                                                     
HoodieStreamer.Config streamerConfig,
+                                                                     
TypedProperties props,
+                                                                     
HoodieTableMetaClient metaClient) throws IOException {
     Option<Checkpoint> checkpoint = Option.empty();
+    assertNoCheckpointOverrideDuringUpgrade(metaClient, streamerConfig, props);
+    // If we have both streamer config and commits specifying what checkpoint 
to use, go with the
+    // checkpoint resolution logic to resolve conflicting configurations.
     if (commitsTimelineOpt.isPresent()) {
-      checkpoint = getCheckpointToResumeString(commitsTimelineOpt.get(), 
streamerConfig, props);
+      checkpoint = 
resolveCheckpointBetweenConfigAndPrevCommit(commitsTimelineOpt.get(), 
streamerConfig, props);
     }
+    // If there is only streamer config, extract the checkpoint directly.
+    checkpoint = useCkpFromOverrideConfigIfAny(streamerConfig, props, 
checkpoint);
+    return checkpoint;
+  }
 
+  @VisibleForTesting
+  static void assertNoCheckpointOverrideDuringUpgrade(HoodieTableMetaClient 
metaClient, HoodieStreamer.Config streamerConfig, TypedProperties props) {
+    if (!StringUtils.isNullOrEmpty(streamerConfig.checkpoint)
+        || !StringUtils.isNullOrEmpty(streamerConfig.ignoreCheckpoint)) {
+      HoodieTableVersion writeTableVersion = 
HoodieTableVersion.fromVersionCode(ConfigUtils.getIntWithAltKeys(props, 
HoodieWriteConfig.WRITE_TABLE_VERSION));
+      HoodieWriteConfig config = 
HoodieWriteConfig.newBuilder().withPath(streamerConfig.targetBasePath).withProps(props).build();
+      if (config.autoUpgrade() && needsUpgradeOrDowngrade(metaClient, config, 
writeTableVersion)) {

Review Comment:
   can we confine this to only HoodieIncr sources? 



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamerCheckpointUtils.java:
##########
@@ -44,44 +45,86 @@
 
 import java.io.IOException;
 
+import static 
org.apache.hudi.common.table.checkpoint.CheckpointUtils.buildCheckpointFromConfigOverride;
 import static 
org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2.STREAMER_CHECKPOINT_KEY_V2;
 import static 
org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2.STREAMER_CHECKPOINT_RESET_KEY_V2;
 import static 
org.apache.hudi.common.table.timeline.InstantComparison.LESSER_THAN;
 import static 
org.apache.hudi.common.table.timeline.InstantComparison.compareTimestamps;
 import static org.apache.hudi.common.util.ConfigUtils.removeConfigFromProps;
+import static 
org.apache.hudi.table.upgrade.UpgradeDowngrade.needsUpgradeOrDowngrade;
 
 public class StreamerCheckpointUtils {
   private static final Logger LOG = 
LoggerFactory.getLogger(StreamerCheckpointUtils.class);
 
-  public static Option<Checkpoint> 
getCheckpointToResumeFrom(Option<HoodieTimeline> commitsTimelineOpt,
-                                                             
HoodieStreamer.Config streamerConfig,
-                                                             TypedProperties 
props) throws IOException {
+  /**
+   * The first phase of checkpoint resolution - read the checkpoint configs 
from 2 sources and resolve
+   * conflicts:
+   * <ul>
+   *   <li>commit metadata from the last completed instant, which can contain 
what is the last checkpoint
+   *       from the previous streamer ingestion.</li>
+   *   <li>user checkpoint overrides specified in the writer config {@code 
streamerConfig}. Users might want to
+   *       forcefully set the checkpoint to an arbitrary position or start 
from the very beginning.</li>
+   * </ul>
+   * The 2 sources can have conflicts, and we need to decide which config 
should prevail.
+   * <p>
+   * For the second phase of checkpoint resolution please refer
+   * {@link org.apache.hudi.utilities.sources.Source#translateCheckpoint} and 
child class overrides of this
+   * method.
+   */
+  public static Option<Checkpoint> 
resolveWhatCheckpointToResumeFrom(Option<HoodieTimeline> commitsTimelineOpt,

Review Comment:
   minor. 
   rename to `resolveCheckpointToResumeFrom`



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamerCheckpointUtils.java:
##########
@@ -44,44 +45,86 @@
 
 import java.io.IOException;
 
+import static 
org.apache.hudi.common.table.checkpoint.CheckpointUtils.buildCheckpointFromConfigOverride;
 import static 
org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2.STREAMER_CHECKPOINT_KEY_V2;
 import static 
org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2.STREAMER_CHECKPOINT_RESET_KEY_V2;
 import static 
org.apache.hudi.common.table.timeline.InstantComparison.LESSER_THAN;
 import static 
org.apache.hudi.common.table.timeline.InstantComparison.compareTimestamps;
 import static org.apache.hudi.common.util.ConfigUtils.removeConfigFromProps;
+import static 
org.apache.hudi.table.upgrade.UpgradeDowngrade.needsUpgradeOrDowngrade;
 
 public class StreamerCheckpointUtils {
   private static final Logger LOG = 
LoggerFactory.getLogger(StreamerCheckpointUtils.class);
 
-  public static Option<Checkpoint> 
getCheckpointToResumeFrom(Option<HoodieTimeline> commitsTimelineOpt,
-                                                             
HoodieStreamer.Config streamerConfig,
-                                                             TypedProperties 
props) throws IOException {
+  /**
+   * The first phase of checkpoint resolution - read the checkpoint configs 
from 2 sources and resolve
+   * conflicts:
+   * <ul>
+   *   <li>commit metadata from the last completed instant, which can contain 
what is the last checkpoint
+   *       from the previous streamer ingestion.</li>
+   *   <li>user checkpoint overrides specified in the writer config {@code 
streamerConfig}. Users might want to
+   *       forcefully set the checkpoint to an arbitrary position or start 
from the very beginning.</li>
+   * </ul>
+   * The 2 sources can have conflicts, and we need to decide which config 
should prevail.
+   * <p>
+   * For the second phase of checkpoint resolution please refer
+   * {@link org.apache.hudi.utilities.sources.Source#translateCheckpoint} and 
child class overrides of this
+   * method.
+   */
+  public static Option<Checkpoint> 
resolveWhatCheckpointToResumeFrom(Option<HoodieTimeline> commitsTimelineOpt,
+                                                                     
HoodieStreamer.Config streamerConfig,
+                                                                     
TypedProperties props,
+                                                                     
HoodieTableMetaClient metaClient) throws IOException {
     Option<Checkpoint> checkpoint = Option.empty();
+    assertNoCheckpointOverrideDuringUpgrade(metaClient, streamerConfig, props);
+    // If we have both streamer config and commits specifying what checkpoint 
to use, go with the
+    // checkpoint resolution logic to resolve conflicting configurations.
     if (commitsTimelineOpt.isPresent()) {
-      checkpoint = getCheckpointToResumeString(commitsTimelineOpt.get(), 
streamerConfig, props);
+      checkpoint = 
resolveCheckpointBetweenConfigAndPrevCommit(commitsTimelineOpt.get(), 
streamerConfig, props);
     }
+    // If there is only streamer config, extract the checkpoint directly.
+    checkpoint = useCkpFromOverrideConfigIfAny(streamerConfig, props, 
checkpoint);
+    return checkpoint;
+  }
 
+  @VisibleForTesting
+  static void assertNoCheckpointOverrideDuringUpgrade(HoodieTableMetaClient 
metaClient, HoodieStreamer.Config streamerConfig, TypedProperties props) {
+    if (!StringUtils.isNullOrEmpty(streamerConfig.checkpoint)
+        || !StringUtils.isNullOrEmpty(streamerConfig.ignoreCheckpoint)) {
+      HoodieTableVersion writeTableVersion = 
HoodieTableVersion.fromVersionCode(ConfigUtils.getIntWithAltKeys(props, 
HoodieWriteConfig.WRITE_TABLE_VERSION));
+      HoodieWriteConfig config = 
HoodieWriteConfig.newBuilder().withPath(streamerConfig.targetBasePath).withProps(props).build();
+      if (config.autoUpgrade() && needsUpgradeOrDowngrade(metaClient, config, 
writeTableVersion)) {
+        throw new HoodieUpgradeDowngradeException(
+            String.format("When upgrade/downgrade is happening, please avoid 
setting --checkpoint option and --ignore-checkpoint for your delta streamers."
+                + " Detected invalid streamer configuration:\n%s", 
streamerConfig));
+      }
+    }
+  }
+
+  private static Option<Checkpoint> useCkpFromOverrideConfigIfAny(

Review Comment:
   to abbreviate "checkpoint", we can use "ckpt" instead of "ckp" 



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamerCheckpointUtils.java:
##########
@@ -126,13 +167,26 @@ static Option<Checkpoint> 
getCheckpointToResumeString(HoodieTimeline commitsTime
         }
       } else if (streamerConfig.checkpoint != null) {
         // 
getLatestCommitMetadataWithValidCheckpointInfo(commitTimelineOpt.get()) will 
never return a commit metadata w/o any checkpoint key set.
-        resumeCheckpoint = 
Option.of(CheckpointUtils.shouldTargetCheckpointV2(writeTableVersion, 
streamerConfig.sourceClassName)
-            ? new StreamerCheckpointV2(streamerConfig.checkpoint) : new 
StreamerCheckpointV1(streamerConfig.checkpoint));
+        resumeCheckpoint = 
Option.of(buildCheckpointFromConfigOverride(streamerConfig.sourceClassName, 
writeTableVersion, streamerConfig.checkpoint));
       }
     }
     return resumeCheckpoint;
   }
 
+  private static boolean shouldUseCkpFromPrevCommit(Checkpoint 
checkpointFromCommit) {
+    return !StringUtils.isNullOrEmpty(checkpointFromCommit.getCheckpointKey());
+  }
+
+  private static boolean 
ckpOverrideCfgPrevailsOverCkpFromPrevCommit(HoodieStreamer.Config 
streamerConfig, Checkpoint checkpointFromCommit) {

Review Comment:
   same comment as above. "ckpt" instead of "ckp" 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/HoodieIncrSourceCheckpointValUtils.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.checkpoint;
+
+/**
+ * Utility class providing methods to check if a string starts with specific 
resume-related prefixes.
+ */
+public class HoodieIncrSourceCheckpointValUtils {
+  public static final String RESET_CHECKPOINT_V2_SEPARATOR = ":";
+  public static final String REQUEST_TIME_PREFIX = 
"resumeFromInstantRequestTime";
+  public static final String COMPLETION_TIME_PREFIX = 
"resumeFromInstantCompletionTime";
+
+  /**
+   * For hoodie incremental source ingestion, if the target table is version 8 
or higher, the checkpoint
+   * key set by streamer config can be in either of the following format:
+   * - resumeFromInstantRequestTime:[checkpoint value based on request time]
+   * - resumeFromInstantCompletionTime:[checkpoint value based on completion 
time]
+   *
+   * StreamerCheckpointV2FromCfgCkp class itself captured the fact that this 
is version 8 and higher, plus
+   * the checkpoint source is from streamer config override.
+   *
+   * When the checkpoint is consumed by individual data sources, we need to 
convert them to either vanilla
+   * checkpoint v1 (request time based) or checkpoint v2 (completion time 
based).
+   */
+  public static Checkpoint 
resolveToV1V2Checkpoint(StreamerCheckpointFromCfgCkp checkpoint) {

Review Comment:
   Do we have UTs for this



##########
hudi-common/src/test/java/org/apache/hudi/common/table/checkpoint/TestHoodieIncrSourceCheckpointValUtils.java:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.checkpoint;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestHoodieIncrSourceCheckpointValUtils {
+
+  @Test
+  public void testResolveToV1V2CheckpointWithRequestTime() {
+    StreamerCheckpointFromCfgCkp mockCheckpoint = 
mock(StreamerCheckpointFromCfgCkp.class);
+    
when(mockCheckpoint.getCheckpointKey()).thenReturn("resumeFromInstantRequestTime:20240301");
+
+    Checkpoint result = 
HoodieIncrSourceCheckpointValUtils.resolveToV1V2Checkpoint(mockCheckpoint);
+
+    assertTrue(result instanceof StreamerCheckpointV1);
+    assertEquals("20240301", result.getCheckpointKey());
+  }
+
+  @Test
+  public void testResolveToV1V2CheckpointWithCompletionTime() {
+    StreamerCheckpointFromCfgCkp mockCheckpoint = 
mock(StreamerCheckpointFromCfgCkp.class);
+    
when(mockCheckpoint.getCheckpointKey()).thenReturn("resumeFromInstantCompletionTime:20240302");
+
+    Checkpoint result = 
HoodieIncrSourceCheckpointValUtils.resolveToV1V2Checkpoint(mockCheckpoint);
+
+    assertTrue(result instanceof StreamerCheckpointV2);
+    assertEquals("20240302", result.getCheckpointKey());
+  }
+
+  @Test
+  public void testResolveToV1V2CheckpointWithInvalidPrefix() {
+    StreamerCheckpointFromCfgCkp mockCheckpoint = 
mock(StreamerCheckpointFromCfgCkp.class);
+    
when(mockCheckpoint.getCheckpointKey()).thenReturn("invalidPrefix:20240303");
+
+    IllegalArgumentException exception = assertThrows(
+        IllegalArgumentException.class,
+        () -> 
HoodieIncrSourceCheckpointValUtils.resolveToV1V2Checkpoint(mockCheckpoint)
+    );
+    assertTrue(exception.getMessage().contains("Illegal checkpoint key 
override"));
+  }
+
+  @Test
+  public void testResolveToV1V2CheckpointWithMalformedInput() {
+    StreamerCheckpointFromCfgCkp mockCheckpoint = 
mock(StreamerCheckpointFromCfgCkp.class);
+    when(mockCheckpoint.getCheckpointKey()).thenReturn("malformedInput");
+
+    IllegalArgumentException exception = assertThrows(
+        IllegalArgumentException.class,
+        () -> 
HoodieIncrSourceCheckpointValUtils.resolveToV1V2Checkpoint(mockCheckpoint)
+    );
+    assertTrue(exception.getMessage().contains("Illegal checkpoint key 
override"));
+  }
+
+  @Test
+  public void testCheckpointWithModeToString() {
+    HoodieIncrSourceCheckpointValUtils.CheckpointWithMode checkpointWithMode =

Review Comment:
   probably might need to remove this test as well



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamerCheckpointUtils.java:
##########
@@ -44,44 +45,86 @@
 
 import java.io.IOException;
 
+import static 
org.apache.hudi.common.table.checkpoint.CheckpointUtils.buildCheckpointFromConfigOverride;
 import static 
org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2.STREAMER_CHECKPOINT_KEY_V2;
 import static 
org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2.STREAMER_CHECKPOINT_RESET_KEY_V2;
 import static 
org.apache.hudi.common.table.timeline.InstantComparison.LESSER_THAN;
 import static 
org.apache.hudi.common.table.timeline.InstantComparison.compareTimestamps;
 import static org.apache.hudi.common.util.ConfigUtils.removeConfigFromProps;
+import static 
org.apache.hudi.table.upgrade.UpgradeDowngrade.needsUpgradeOrDowngrade;
 
 public class StreamerCheckpointUtils {
   private static final Logger LOG = 
LoggerFactory.getLogger(StreamerCheckpointUtils.class);
 
-  public static Option<Checkpoint> 
getCheckpointToResumeFrom(Option<HoodieTimeline> commitsTimelineOpt,
-                                                             
HoodieStreamer.Config streamerConfig,
-                                                             TypedProperties 
props) throws IOException {
+  /**
+   * The first phase of checkpoint resolution - read the checkpoint configs 
from 2 sources and resolve
+   * conflicts:
+   * <ul>
+   *   <li>commit metadata from the last completed instant, which can contain 
what is the last checkpoint
+   *       from the previous streamer ingestion.</li>
+   *   <li>user checkpoint overrides specified in the writer config {@code 
streamerConfig}. Users might want to
+   *       forcefully set the checkpoint to an arbitrary position or start 
from the very beginning.</li>
+   * </ul>
+   * The 2 sources can have conflicts, and we need to decide which config 
should prevail.
+   * <p>
+   * For the second phase of checkpoint resolution please refer
+   * {@link org.apache.hudi.utilities.sources.Source#translateCheckpoint} and 
child class overrides of this
+   * method.
+   */
+  public static Option<Checkpoint> 
resolveWhatCheckpointToResumeFrom(Option<HoodieTimeline> commitsTimelineOpt,
+                                                                     
HoodieStreamer.Config streamerConfig,
+                                                                     
TypedProperties props,
+                                                                     
HoodieTableMetaClient metaClient) throws IOException {
     Option<Checkpoint> checkpoint = Option.empty();
+    assertNoCheckpointOverrideDuringUpgrade(metaClient, streamerConfig, props);
+    // If we have both streamer config and commits specifying what checkpoint 
to use, go with the
+    // checkpoint resolution logic to resolve conflicting configurations.
     if (commitsTimelineOpt.isPresent()) {
-      checkpoint = getCheckpointToResumeString(commitsTimelineOpt.get(), 
streamerConfig, props);
+      checkpoint = 
resolveCheckpointBetweenConfigAndPrevCommit(commitsTimelineOpt.get(), 
streamerConfig, props);
     }
+    // If there is only streamer config, extract the checkpoint directly.
+    checkpoint = useCkpFromOverrideConfigIfAny(streamerConfig, props, 
checkpoint);
+    return checkpoint;
+  }
 
+  @VisibleForTesting
+  static void assertNoCheckpointOverrideDuringUpgrade(HoodieTableMetaClient 
metaClient, HoodieStreamer.Config streamerConfig, TypedProperties props) {

Review Comment:
   java docs please



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamerCheckpointUtils.java:
##########
@@ -44,44 +45,86 @@
 
 import java.io.IOException;
 
+import static 
org.apache.hudi.common.table.checkpoint.CheckpointUtils.buildCheckpointFromConfigOverride;
 import static 
org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2.STREAMER_CHECKPOINT_KEY_V2;
 import static 
org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2.STREAMER_CHECKPOINT_RESET_KEY_V2;
 import static 
org.apache.hudi.common.table.timeline.InstantComparison.LESSER_THAN;
 import static 
org.apache.hudi.common.table.timeline.InstantComparison.compareTimestamps;
 import static org.apache.hudi.common.util.ConfigUtils.removeConfigFromProps;
+import static 
org.apache.hudi.table.upgrade.UpgradeDowngrade.needsUpgradeOrDowngrade;
 
 public class StreamerCheckpointUtils {
   private static final Logger LOG = 
LoggerFactory.getLogger(StreamerCheckpointUtils.class);
 
-  public static Option<Checkpoint> 
getCheckpointToResumeFrom(Option<HoodieTimeline> commitsTimelineOpt,
-                                                             
HoodieStreamer.Config streamerConfig,
-                                                             TypedProperties 
props) throws IOException {
+  /**
+   * The first phase of checkpoint resolution - read the checkpoint configs 
from 2 sources and resolve
+   * conflicts:
+   * <ul>
+   *   <li>commit metadata from the last completed instant, which can contain 
what is the last checkpoint
+   *       from the previous streamer ingestion.</li>
+   *   <li>user checkpoint overrides specified in the writer config {@code 
streamerConfig}. Users might want to
+   *       forcefully set the checkpoint to an arbitrary position or start 
from the very beginning.</li>
+   * </ul>
+   * The 2 sources can have conflicts, and we need to decide which config 
should prevail.
+   * <p>
+   * For the second phase of checkpoint resolution please refer
+   * {@link org.apache.hudi.utilities.sources.Source#translateCheckpoint} and 
child class overrides of this
+   * method.
+   */
+  public static Option<Checkpoint> 
resolveWhatCheckpointToResumeFrom(Option<HoodieTimeline> commitsTimelineOpt,
+                                                                     
HoodieStreamer.Config streamerConfig,
+                                                                     
TypedProperties props,
+                                                                     
HoodieTableMetaClient metaClient) throws IOException {
     Option<Checkpoint> checkpoint = Option.empty();
+    assertNoCheckpointOverrideDuringUpgrade(metaClient, streamerConfig, props);
+    // If we have both streamer config and commits specifying what checkpoint 
to use, go with the
+    // checkpoint resolution logic to resolve conflicting configurations.
     if (commitsTimelineOpt.isPresent()) {
-      checkpoint = getCheckpointToResumeString(commitsTimelineOpt.get(), 
streamerConfig, props);
+      checkpoint = 
resolveCheckpointBetweenConfigAndPrevCommit(commitsTimelineOpt.get(), 
streamerConfig, props);
     }
+    // If there is only streamer config, extract the checkpoint directly.
+    checkpoint = useCkpFromOverrideConfigIfAny(streamerConfig, props, 
checkpoint);
+    return checkpoint;
+  }
 
+  @VisibleForTesting
+  static void assertNoCheckpointOverrideDuringUpgrade(HoodieTableMetaClient 
metaClient, HoodieStreamer.Config streamerConfig, TypedProperties props) {
+    if (!StringUtils.isNullOrEmpty(streamerConfig.checkpoint)
+        || !StringUtils.isNullOrEmpty(streamerConfig.ignoreCheckpoint)) {
+      HoodieTableVersion writeTableVersion = 
HoodieTableVersion.fromVersionCode(ConfigUtils.getIntWithAltKeys(props, 
HoodieWriteConfig.WRITE_TABLE_VERSION));
+      HoodieWriteConfig config = 
HoodieWriteConfig.newBuilder().withPath(streamerConfig.targetBasePath).withProps(props).build();
+      if (config.autoUpgrade() && needsUpgradeOrDowngrade(metaClient, config, 
writeTableVersion)) {
+        throw new HoodieUpgradeDowngradeException(
+            String.format("When upgrade/downgrade is happening, please avoid 
setting --checkpoint option and --ignore-checkpoint for your delta streamers."
+                + " Detected invalid streamer configuration:\n%s", 
streamerConfig));

Review Comment:
   do you know if we are logging each individual entries here. or it just the 
hash of the object



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to