This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 6ad7e9b50fb [HUDI-6858] Fix checkpoint reading in Spark structured
streaming (#9711)
6ad7e9b50fb is described below
commit 6ad7e9b50fb5ecc8eec1b407a691397aeef2b5cc
Author: Y Ethan Guo <[email protected]>
AuthorDate: Wed Sep 13 22:45:52 2023 -0700
[HUDI-6858] Fix checkpoint reading in Spark structured streaming (#9711)
---
.../org/apache/hudi/common/util/CommitUtils.java | 33 +++---
.../apache/hudi/common/util/TestCommitUtils.java | 118 ++++++++++++++++++++-
2 files changed, 134 insertions(+), 17 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java
index ed31f79e518..07901d14b6b 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java
@@ -164,22 +164,23 @@ public class CommitUtils {
*/
public static Option<String>
getValidCheckpointForCurrentWriter(HoodieTimeline timeline, String
checkpointKey,
String
keyToLookup) {
- return (Option<String>)
timeline.getWriteTimeline().getReverseOrderedInstants().map(instant -> {
- try {
- HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
- .fromBytes(timeline.getInstantDetails(instant).get(),
HoodieCommitMetadata.class);
- // process commits only with checkpoint entries
- String checkpointValue = commitMetadata.getMetadata(checkpointKey);
- if (StringUtils.nonEmpty(checkpointValue)) {
- // return if checkpoint for "keyForLookup" exists.
- return readCheckpointValue(checkpointValue, keyToLookup);
- } else {
- return Option.empty();
- }
- } catch (IOException e) {
- throw new HoodieIOException("Failed to parse HoodieCommitMetadata for
" + instant.toString(), e);
- }
- }).filter(Option::isPresent).findFirst().orElse(Option.empty());
+ return (Option<String>)
timeline.getWriteTimeline().filterCompletedInstants().getReverseOrderedInstants()
+ .map(instant -> {
+ try {
+ HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
+ .fromBytes(timeline.getInstantDetails(instant).get(),
HoodieCommitMetadata.class);
+ // process commits only with checkpoint entries
+ String checkpointValue = commitMetadata.getMetadata(checkpointKey);
+ if (StringUtils.nonEmpty(checkpointValue)) {
+ // return if checkpoint for "keyForLookup" exists.
+ return readCheckpointValue(checkpointValue, keyToLookup);
+ } else {
+ return Option.empty();
+ }
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to parse HoodieCommitMetadata
for " + instant.toString(), e);
+ }
+ }).filter(Option::isPresent).findFirst().orElse(Option.empty());
}
public static Option<String> readCheckpointValue(String value, String id) {
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestCommitUtils.java
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestCommitUtils.java
index 6d0b2738b3c..e524f298129 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestCommitUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestCommitUtils.java
@@ -18,20 +18,37 @@
package org.apache.hudi.common.util;
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.avro.model.HoodieCompactionStrategy;
+import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
+import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION;
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_SCHEMA;
+import static
org.apache.hudi.common.util.CommitUtils.getCheckpointValueAsString;
+import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -40,6 +57,12 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
* Tests {@link CommitUtils}.
*/
public class TestCommitUtils {
+ private static final String SINK_CHECKPOINT_KEY =
"_hudi_streaming_sink_checkpoint";
+ private static final String ID1 = "id1";
+ private static final String ID2 = "id2";
+ private static final String ID3 = "id3";
+ @TempDir
+ public java.nio.file.Path tempDir;
@Test
public void testCommitMetadataCreation() {
@@ -78,7 +101,7 @@ public class TestCommitUtils {
Option.empty(),
WriteOperationType.INSERT,
TRIP_SCHEMA,
- HoodieTimeline.REPLACE_COMMIT_ACTION);
+ REPLACE_COMMIT_ACTION);
assertTrue(commitMetadata instanceof HoodieReplaceCommitMetadata);
HoodieReplaceCommitMetadata replaceCommitMetadata =
(HoodieReplaceCommitMetadata) commitMetadata;
@@ -91,10 +114,103 @@ public class TestCommitUtils {
assertEquals(TRIP_SCHEMA,
commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY));
}
+ @Test
+ public void testGetValidCheckpointForCurrentWriter() throws IOException {
+ java.nio.file.Path basePath = tempDir.resolve("dataset");
+ java.nio.file.Files.createDirectories(basePath);
+ String basePathStr = basePath.toAbsolutePath().toString();
+ HoodieTableMetaClient metaClient =
+ HoodieTestUtils.init(basePathStr, HoodieTableType.MERGE_ON_READ);
+ HoodieActiveTimeline timeline = new HoodieActiveTimeline(metaClient);
+
+ // Deltacommit 1 completed: (id1, 3)
+ addDeltaCommit(timeline, "20230913001000000", ID1, "3", true);
+ // Deltacommit 2 completed: (id2, 4)
+ addDeltaCommit(timeline, "20230913002000000", ID2, "4", true);
+ // Deltacommit 3 completed: (id1, 5)
+ addDeltaCommit(timeline, "20230913003000000", ID1, "5", true);
+ // Request compaction:
+ addRequestedCompaction(timeline, "20230913003800000");
+ // Deltacommit 4 completed: (id2, 6)
+ addDeltaCommit(timeline, "20230913004000000", ID2, "6", true);
+ // Requested replacecommit (clustering):
+ addRequestedReplaceCommit(timeline, "20230913004800000");
+ // Deltacommit 5 inflight: (id2, 7)
+ addDeltaCommit(timeline, "20230913005000000", ID2, "7", false);
+ // Commit 6 completed without checkpoints (e.g., compaction that does not
affect checkpointing)
+ addCommit(timeline, "20230913006000000");
+
+ timeline = timeline.reload();
+ assertEquals(Option.of("5"),
CommitUtils.getValidCheckpointForCurrentWriter(timeline, SINK_CHECKPOINT_KEY,
ID1));
+ assertEquals(Option.of("6"),
CommitUtils.getValidCheckpointForCurrentWriter(timeline, SINK_CHECKPOINT_KEY,
ID2));
+ assertEquals(Option.empty(),
CommitUtils.getValidCheckpointForCurrentWriter(timeline, SINK_CHECKPOINT_KEY,
ID3));
+ }
+
private HoodieWriteStat createWriteStat(String partition, String fileId) {
HoodieWriteStat writeStat1 = new HoodieWriteStat();
writeStat1.setPartitionPath(partition);
writeStat1.setFileId(fileId);
return writeStat1;
}
+
+ private void addDeltaCommit(HoodieActiveTimeline timeline,
+ String ts, String id, String batchId,
+ boolean isCompleted) throws IOException {
+ HoodieInstant instant = new HoodieInstant(
+ HoodieInstant.State.REQUESTED, HoodieTimeline.DELTA_COMMIT_ACTION, ts);
+ HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
+ commitMetadata.setOperationType(WriteOperationType.UPSERT);
+ commitMetadata.addMetadata(SINK_CHECKPOINT_KEY,
+ getCheckpointValueAsString(id, batchId));
+ timeline.createNewInstant(instant);
+ timeline.transitionRequestedToInflight(
+ instant, Option.of(getUTF8Bytes(commitMetadata.toJsonString())));
+ if (isCompleted) {
+ timeline.saveAsComplete(new HoodieInstant(
+ true, instant.getAction(), instant.getTimestamp()),
+ Option.of(getUTF8Bytes(commitMetadata.toJsonString())));
+ }
+ }
+
+ private void addCommit(HoodieActiveTimeline timeline,
+ String ts) throws IOException {
+ HoodieInstant instant = new HoodieInstant(
+ HoodieInstant.State.REQUESTED, HoodieTimeline.COMMIT_ACTION, ts);
+ HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
+ commitMetadata.setOperationType(WriteOperationType.COMPACT);
+ timeline.createNewInstant(instant);
+ timeline.transitionRequestedToInflight(
+ instant, Option.of(getUTF8Bytes(commitMetadata.toJsonString())));
+ timeline.saveAsComplete(new HoodieInstant(
+ true, instant.getAction(), instant.getTimestamp()),
+ Option.of(getUTF8Bytes(commitMetadata.toJsonString())));
+ }
+
+ private void addRequestedCompaction(HoodieActiveTimeline timeline,
+ String ts) throws IOException {
+ HoodieCompactionPlan compactionPlan = HoodieCompactionPlan.newBuilder()
+ .setOperations(Collections.emptyList())
+ .setVersion(CompactionUtils.LATEST_COMPACTION_METADATA_VERSION)
+ .setStrategy(HoodieCompactionStrategy.newBuilder().build())
+ .setPreserveHoodieMetadata(true)
+ .build();
+ timeline.saveToCompactionRequested(
+ new HoodieInstant(HoodieInstant.State.REQUESTED, COMPACTION_ACTION,
ts),
+ TimelineMetadataUtils.serializeCompactionPlan(compactionPlan)
+ );
+ }
+
+ private void addRequestedReplaceCommit(HoodieActiveTimeline timeline,
+ String ts) throws IOException {
+ HoodieRequestedReplaceMetadata requestedReplaceMetadata =
+ HoodieRequestedReplaceMetadata.newBuilder()
+ .setOperationType(WriteOperationType.CLUSTER.name())
+ .setExtraMetadata(Collections.emptyMap())
+ .setClusteringPlan(new HoodieClusteringPlan())
+ .build();
+ timeline.saveToPendingReplaceCommit(
+ new HoodieInstant(HoodieInstant.State.REQUESTED,
REPLACE_COMMIT_ACTION, ts),
+
TimelineMetadataUtils.serializeRequestedReplaceMetadata(requestedReplaceMetadata)
+ );
+ }
}
\ No newline at end of file