This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 9b252adcf85 [HUDI-8619] Fix a bug for checkpoint translation (#12418)
9b252adcf85 is described below
commit 9b252adcf8598f16be6952a2600100665e7dafc7
Author: Lin Liu <[email protected]>
AuthorDate: Tue Dec 3 18:36:02 2024 -0800
[HUDI-8619] Fix a bug for checkpoint translation (#12418)
---
.../hudi/common/table/checkpoint/CheckpointUtils.java | 7 +++++++
.../hudi/common/table/checkpoint/TestCheckpointUtils.java | 14 ++++++++++++++
2 files changed, 21 insertions(+)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/CheckpointUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/CheckpointUtils.java
index 67a87f7d74e..8e5c2db3aff 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/CheckpointUtils.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/CheckpointUtils.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.exception.HoodieException;
@@ -56,6 +57,9 @@ public class CheckpointUtils {
// instant or completion time
public static StreamerCheckpointV2 convertToCheckpointV2ForCommitTime(
Checkpoint checkpoint, HoodieTableMetaClient metaClient) {
+ if (checkpoint.checkpointKey.equals(HoodieTimeline.INIT_INSTANT_TS)) {
+ return new StreamerCheckpointV2(HoodieTimeline.INIT_INSTANT_TS);
+ }
if (checkpoint instanceof StreamerCheckpointV2) {
return (StreamerCheckpointV2) checkpoint;
}
@@ -81,6 +85,9 @@ public class CheckpointUtils {
public static StreamerCheckpointV1 convertToCheckpointV1ForCommitTime(
Checkpoint checkpoint, HoodieTableMetaClient metaClient) {
+ if (checkpoint.checkpointKey.equals(HoodieTimeline.INIT_INSTANT_TS)) {
+ return new StreamerCheckpointV1(HoodieTimeline.INIT_INSTANT_TS);
+ }
if (checkpoint instanceof StreamerCheckpointV1) {
return (StreamerCheckpointV1) checkpoint;
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/checkpoint/TestCheckpointUtils.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/checkpoint/TestCheckpointUtils.java
index 61ee28737b9..715d89c5078 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/checkpoint/TestCheckpointUtils.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/checkpoint/TestCheckpointUtils.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.versioning.v1.InstantComparatorV1;
import org.apache.hudi.common.table.timeline.versioning.v2.InstantComparatorV2;
import org.apache.hudi.exception.HoodieException;
@@ -157,4 +158,17 @@ public class TestCheckpointUtils {
() -> CheckpointUtils.convertToCheckpointV2ForCommitTime(checkpoint,
metaClient));
assertTrue(exception.getMessage().contains("Unable to find completion
time"));
}
+
+ @Test
+ public void testConvertCheckpointWithInitTimestamp() {
+ String instantTime = HoodieTimeline.INIT_INSTANT_TS;
+
+ Checkpoint checkpoint = new StreamerCheckpointV1(instantTime);
+ Checkpoint translated =
CheckpointUtils.convertToCheckpointV1ForCommitTime(checkpoint, metaClient);
+ assertEquals(HoodieTimeline.INIT_INSTANT_TS,
translated.getCheckpointKey());
+
+ checkpoint = new StreamerCheckpointV2(instantTime);
+ translated =
CheckpointUtils.convertToCheckpointV2ForCommitTime(checkpoint, metaClient);
+ assertEquals(HoodieTimeline.INIT_INSTANT_TS,
translated.getCheckpointKey());
+ }
}