This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 9b252adcf85 [HUDI-8619] Fix a bug for checkpoint translation (#12418)
9b252adcf85 is described below

commit 9b252adcf8598f16be6952a2600100665e7dafc7
Author: Lin Liu <[email protected]>
AuthorDate: Tue Dec 3 18:36:02 2024 -0800

    [HUDI-8619] Fix a bug for checkpoint translation (#12418)
---
 .../hudi/common/table/checkpoint/CheckpointUtils.java      |  7 +++++++
 .../hudi/common/table/checkpoint/TestCheckpointUtils.java  | 14 ++++++++++++++
 2 files changed, 21 insertions(+)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/CheckpointUtils.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/CheckpointUtils.java
index 67a87f7d74e..8e5c2db3aff 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/CheckpointUtils.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/CheckpointUtils.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.HoodieTableVersion;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.exception.HoodieException;
@@ -56,6 +57,9 @@ public class CheckpointUtils {
   // instant or completion time
   public static StreamerCheckpointV2 convertToCheckpointV2ForCommitTime(
       Checkpoint checkpoint, HoodieTableMetaClient metaClient) {
+    if (checkpoint.checkpointKey.equals(HoodieTimeline.INIT_INSTANT_TS)) {
+      return new StreamerCheckpointV2(HoodieTimeline.INIT_INSTANT_TS);
+    }
     if (checkpoint instanceof StreamerCheckpointV2) {
       return (StreamerCheckpointV2) checkpoint;
     }
@@ -81,6 +85,9 @@ public class CheckpointUtils {
 
   public static StreamerCheckpointV1 convertToCheckpointV1ForCommitTime(
       Checkpoint checkpoint, HoodieTableMetaClient metaClient) {
+    if (checkpoint.checkpointKey.equals(HoodieTimeline.INIT_INSTANT_TS)) {
+      return new StreamerCheckpointV1(HoodieTimeline.INIT_INSTANT_TS);
+    }
     if (checkpoint instanceof StreamerCheckpointV1) {
       return (StreamerCheckpointV1) checkpoint;
     }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/checkpoint/TestCheckpointUtils.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/checkpoint/TestCheckpointUtils.java
index 61ee28737b9..715d89c5078 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/checkpoint/TestCheckpointUtils.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/checkpoint/TestCheckpointUtils.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.versioning.v1.InstantComparatorV1;
 import org.apache.hudi.common.table.timeline.versioning.v2.InstantComparatorV2;
 import org.apache.hudi.exception.HoodieException;
@@ -157,4 +158,17 @@ public class TestCheckpointUtils {
         () -> CheckpointUtils.convertToCheckpointV2ForCommitTime(checkpoint, 
metaClient));
     assertTrue(exception.getMessage().contains("Unable to find completion 
time"));
   }
+
+  @Test
+  public void testConvertCheckpointWithInitTimestamp() {
+    String instantTime = HoodieTimeline.INIT_INSTANT_TS;
+
+    Checkpoint checkpoint = new StreamerCheckpointV1(instantTime);
+    Checkpoint translated = 
CheckpointUtils.convertToCheckpointV1ForCommitTime(checkpoint, metaClient);
+    assertEquals(HoodieTimeline.INIT_INSTANT_TS, 
translated.getCheckpointKey());
+
+    checkpoint = new StreamerCheckpointV2(instantTime);
+    translated = 
CheckpointUtils.convertToCheckpointV2ForCommitTime(checkpoint, metaClient);
+    assertEquals(HoodieTimeline.INIT_INSTANT_TS, 
translated.getCheckpointKey());
+  }
 }

Reply via email to