This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 6ad7e9b50fb [HUDI-6858] Fix checkpoint reading in Spark structured 
streaming (#9711)
6ad7e9b50fb is described below

commit 6ad7e9b50fb5ecc8eec1b407a691397aeef2b5cc
Author: Y Ethan Guo <[email protected]>
AuthorDate: Wed Sep 13 22:45:52 2023 -0700

    [HUDI-6858] Fix checkpoint reading in Spark structured streaming (#9711)
---
 .../org/apache/hudi/common/util/CommitUtils.java   |  33 +++---
 .../apache/hudi/common/util/TestCommitUtils.java   | 118 ++++++++++++++++++++-
 2 files changed, 134 insertions(+), 17 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java
index ed31f79e518..07901d14b6b 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java
@@ -164,22 +164,23 @@ public class CommitUtils {
    */
   public static Option<String> 
getValidCheckpointForCurrentWriter(HoodieTimeline timeline, String 
checkpointKey,
                                                                   String 
keyToLookup) {
-    return (Option<String>) 
timeline.getWriteTimeline().getReverseOrderedInstants().map(instant -> {
-      try {
-        HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
-            .fromBytes(timeline.getInstantDetails(instant).get(), 
HoodieCommitMetadata.class);
-        // process commits only with checkpoint entries
-        String checkpointValue = commitMetadata.getMetadata(checkpointKey);
-        if (StringUtils.nonEmpty(checkpointValue)) {
-          // return if checkpoint for "keyForLookup" exists.
-          return readCheckpointValue(checkpointValue, keyToLookup);
-        } else {
-          return Option.empty();
-        }
-      } catch (IOException e) {
-        throw new HoodieIOException("Failed to parse HoodieCommitMetadata for 
" + instant.toString(), e);
-      }
-    }).filter(Option::isPresent).findFirst().orElse(Option.empty());
+    return (Option<String>) 
timeline.getWriteTimeline().filterCompletedInstants().getReverseOrderedInstants()
+        .map(instant -> {
+          try {
+            HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
+                .fromBytes(timeline.getInstantDetails(instant).get(), 
HoodieCommitMetadata.class);
+            // process commits only with checkpoint entries
+            String checkpointValue = commitMetadata.getMetadata(checkpointKey);
+            if (StringUtils.nonEmpty(checkpointValue)) {
+              // return if checkpoint for "keyForLookup" exists.
+              return readCheckpointValue(checkpointValue, keyToLookup);
+            } else {
+              return Option.empty();
+            }
+          } catch (IOException e) {
+            throw new HoodieIOException("Failed to parse HoodieCommitMetadata 
for " + instant.toString(), e);
+          }
+        }).filter(Option::isPresent).findFirst().orElse(Option.empty());
   }
 
   public static Option<String> readCheckpointValue(String value, String id) {
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestCommitUtils.java 
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestCommitUtils.java
index 6d0b2738b3c..e524f298129 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestCommitUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestCommitUtils.java
@@ -18,20 +18,37 @@
 
 package org.apache.hudi.common.util;
 
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.avro.model.HoodieCompactionStrategy;
+import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
+import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
 
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
 
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
 import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_SCHEMA;
+import static 
org.apache.hudi.common.util.CommitUtils.getCheckpointValueAsString;
+import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -40,6 +57,12 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
  * Tests {@link CommitUtils}.
  */
 public class TestCommitUtils {
+  private static final String SINK_CHECKPOINT_KEY = 
"_hudi_streaming_sink_checkpoint";
+  private static final String ID1 = "id1";
+  private static final String ID2 = "id2";
+  private static final String ID3 = "id3";
+  @TempDir
+  public java.nio.file.Path tempDir;
 
   @Test
   public void testCommitMetadataCreation() {
@@ -78,7 +101,7 @@ public class TestCommitUtils {
         Option.empty(),
         WriteOperationType.INSERT,
         TRIP_SCHEMA,
-        HoodieTimeline.REPLACE_COMMIT_ACTION);
+        REPLACE_COMMIT_ACTION);
 
     assertTrue(commitMetadata instanceof HoodieReplaceCommitMetadata);
     HoodieReplaceCommitMetadata replaceCommitMetadata = 
(HoodieReplaceCommitMetadata) commitMetadata;
@@ -91,10 +114,103 @@ public class TestCommitUtils {
     assertEquals(TRIP_SCHEMA, 
commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY));
   }
 
+  @Test
+  public void testGetValidCheckpointForCurrentWriter() throws IOException {
+    java.nio.file.Path basePath = tempDir.resolve("dataset");
+    java.nio.file.Files.createDirectories(basePath);
+    String basePathStr = basePath.toAbsolutePath().toString();
+    HoodieTableMetaClient metaClient =
+        HoodieTestUtils.init(basePathStr, HoodieTableType.MERGE_ON_READ);
+    HoodieActiveTimeline timeline = new HoodieActiveTimeline(metaClient);
+
+    // Deltacommit 1 completed: (id1, 3)
+    addDeltaCommit(timeline, "20230913001000000", ID1, "3", true);
+    // Deltacommit 2 completed: (id2, 4)
+    addDeltaCommit(timeline, "20230913002000000", ID2, "4", true);
+    // Deltacommit 3 completed: (id1, 5)
+    addDeltaCommit(timeline, "20230913003000000", ID1, "5", true);
+    // Request compaction:
+    addRequestedCompaction(timeline, "20230913003800000");
+    // Deltacommit 4 completed: (id2, 6)
+    addDeltaCommit(timeline, "20230913004000000", ID2, "6", true);
+    // Requested replacecommit (clustering):
+    addRequestedReplaceCommit(timeline, "20230913004800000");
+    // Deltacommit 5 inflight: (id2, 7)
+    addDeltaCommit(timeline, "20230913005000000", ID2, "7", false);
+    // Commit 6 completed without checkpoints (e.g., compaction that does not 
affect checkpointing)
+    addCommit(timeline, "20230913006000000");
+
+    timeline = timeline.reload();
+    assertEquals(Option.of("5"), 
CommitUtils.getValidCheckpointForCurrentWriter(timeline, SINK_CHECKPOINT_KEY, 
ID1));
+    assertEquals(Option.of("6"), 
CommitUtils.getValidCheckpointForCurrentWriter(timeline, SINK_CHECKPOINT_KEY, 
ID2));
+    assertEquals(Option.empty(), 
CommitUtils.getValidCheckpointForCurrentWriter(timeline, SINK_CHECKPOINT_KEY, 
ID3));
+  }
+
   private HoodieWriteStat createWriteStat(String partition, String fileId) {
     HoodieWriteStat writeStat1 = new HoodieWriteStat();
     writeStat1.setPartitionPath(partition);
     writeStat1.setFileId(fileId);
     return writeStat1;
   }
+
+  private void addDeltaCommit(HoodieActiveTimeline timeline,
+                              String ts, String id, String batchId,
+                              boolean isCompleted) throws IOException {
+    HoodieInstant instant = new HoodieInstant(
+        HoodieInstant.State.REQUESTED, HoodieTimeline.DELTA_COMMIT_ACTION, ts);
+    HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
+    commitMetadata.setOperationType(WriteOperationType.UPSERT);
+    commitMetadata.addMetadata(SINK_CHECKPOINT_KEY,
+        getCheckpointValueAsString(id, batchId));
+    timeline.createNewInstant(instant);
+    timeline.transitionRequestedToInflight(
+        instant, Option.of(getUTF8Bytes(commitMetadata.toJsonString())));
+    if (isCompleted) {
+      timeline.saveAsComplete(new HoodieInstant(
+              true, instant.getAction(), instant.getTimestamp()),
+          Option.of(getUTF8Bytes(commitMetadata.toJsonString())));
+    }
+  }
+
+  private void addCommit(HoodieActiveTimeline timeline,
+                         String ts) throws IOException {
+    HoodieInstant instant = new HoodieInstant(
+        HoodieInstant.State.REQUESTED, HoodieTimeline.COMMIT_ACTION, ts);
+    HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
+    commitMetadata.setOperationType(WriteOperationType.COMPACT);
+    timeline.createNewInstant(instant);
+    timeline.transitionRequestedToInflight(
+        instant, Option.of(getUTF8Bytes(commitMetadata.toJsonString())));
+    timeline.saveAsComplete(new HoodieInstant(
+            true, instant.getAction(), instant.getTimestamp()),
+        Option.of(getUTF8Bytes(commitMetadata.toJsonString())));
+  }
+
+  private void addRequestedCompaction(HoodieActiveTimeline timeline,
+                                      String ts) throws IOException {
+    HoodieCompactionPlan compactionPlan = HoodieCompactionPlan.newBuilder()
+        .setOperations(Collections.emptyList())
+        .setVersion(CompactionUtils.LATEST_COMPACTION_METADATA_VERSION)
+        .setStrategy(HoodieCompactionStrategy.newBuilder().build())
+        .setPreserveHoodieMetadata(true)
+        .build();
+    timeline.saveToCompactionRequested(
+        new HoodieInstant(HoodieInstant.State.REQUESTED, COMPACTION_ACTION, 
ts),
+        TimelineMetadataUtils.serializeCompactionPlan(compactionPlan)
+    );
+  }
+
+  private void addRequestedReplaceCommit(HoodieActiveTimeline timeline,
+                                         String ts) throws IOException {
+    HoodieRequestedReplaceMetadata requestedReplaceMetadata =
+        HoodieRequestedReplaceMetadata.newBuilder()
+            .setOperationType(WriteOperationType.CLUSTER.name())
+            .setExtraMetadata(Collections.emptyMap())
+            .setClusteringPlan(new HoodieClusteringPlan())
+            .build();
+    timeline.saveToPendingReplaceCommit(
+        new HoodieInstant(HoodieInstant.State.REQUESTED, 
REPLACE_COMMIT_ACTION, ts),
+        
TimelineMetadataUtils.serializeRequestedReplaceMetadata(requestedReplaceMetadata)
+    );
+  }
 }
\ No newline at end of file

Reply via email to