This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 5582cde47b7af6f18b66eff39546120d7377e3c4
Author: Sampan S Nayak <[email protected]>
AuthorDate: Fri Apr 19 11:55:43 2024 +0530

    [HUDI-7618] Add ability to ignore checkpoints in delta streamer (#11018)
---
 .../hudi/utilities/streamer/HoodieStreamer.java    |  7 +++
 .../apache/hudi/utilities/streamer/StreamSync.java | 13 ++++-
 .../streamer/TestStreamSyncUnitTests.java          | 61 ++++++++++++++++++++++
 3 files changed, 79 insertions(+), 2 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
index 643a240638c..b42b3dbeda2 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
@@ -428,6 +428,13 @@ public class HoodieStreamer implements Serializable {
     @Parameter(names = {"--config-hot-update-strategy-class"}, description = 
"Configuration hot update in continuous mode")
     public String configHotUpdateStrategyClass = "";
 
+    @Parameter(names = {"--ignore-checkpoint"}, description = "Set this config 
with a unique value, recommend using a timestamp value or UUID."
+        + " Setting this config indicates that the subsequent sync should 
ignore the last committed checkpoint for the source. The config value is stored"
+        + " in the commit history, so setting the config with same values 
would not have any affect. This config can be used in scenarios like kafka 
topic change,"
+        + " where we would want to start ingesting from the latest or earliest 
offset after switching the topic (in this case we would want to ignore the 
previously"
+        + " committed checkpoint, and rely on other configs to pick the 
starting offsets).")
+    public String ignoreCheckpoint = null;
+
     public boolean isAsyncCompactionEnabled() {
       return continuousMode && !forceDisableCompaction
           && 
HoodieTableType.MERGE_ON_READ.equals(HoodieTableType.valueOf(tableType));
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
index f1184a75abe..3c6c36d2a3e 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
@@ -164,6 +164,7 @@ public class StreamSync implements Serializable, Closeable {
   private static final long serialVersionUID = 1L;
   private static final Logger LOG = LoggerFactory.getLogger(StreamSync.class);
   private static final String NULL_PLACEHOLDER = "[null]";
+  public static final String CHECKPOINT_IGNORE_KEY = 
"deltastreamer.checkpoint.ignore_key";
 
   /**
    * Delta Sync Config.
@@ -732,7 +733,8 @@ public class StreamSync implements Serializable, Closeable {
    * @return the checkpoint to resume from if applicable.
    * @throws IOException
    */
-  private Option<String> getCheckpointToResume(Option<HoodieTimeline> 
commitsTimelineOpt) throws IOException {
+  @VisibleForTesting
+  Option<String> getCheckpointToResume(Option<HoodieTimeline> 
commitsTimelineOpt) throws IOException {
     Option<String> resumeCheckpointStr = Option.empty();
     // try get checkpoint from commits(including commit and deltacommit)
     // in COW migrating to MOR case, the first batch of the deltastreamer will 
lost the checkpoint from COW table, cause the dataloss
@@ -749,7 +751,11 @@ public class StreamSync implements Serializable, Closeable 
{
       if (commitMetadataOption.isPresent()) {
         HoodieCommitMetadata commitMetadata = commitMetadataOption.get();
         LOG.debug("Checkpoint reset from metadata: " + 
commitMetadata.getMetadata(CHECKPOINT_RESET_KEY));
-        if (cfg.checkpoint != null && 
(StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))
+        if (cfg.ignoreCheckpoint != null && 
(StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_IGNORE_KEY))
+            || 
!cfg.ignoreCheckpoint.equals(commitMetadata.getMetadata(CHECKPOINT_IGNORE_KEY))))
 {
+          // we ignore any existing checkpoint and start ingesting afresh
+          resumeCheckpointStr = Option.empty();
+        } else if (cfg.checkpoint != null && 
(StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))
             || 
!cfg.checkpoint.equals(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY)))) {
           resumeCheckpointStr = Option.of(cfg.checkpoint);
         } else if 
(!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_KEY))) {
@@ -852,6 +858,9 @@ public class StreamSync implements Serializable, Closeable {
         if (cfg.checkpoint != null) {
           checkpointCommitMetadata.put(CHECKPOINT_RESET_KEY, cfg.checkpoint);
         }
+        if (cfg.ignoreCheckpoint != null) {
+          checkpointCommitMetadata.put(CHECKPOINT_IGNORE_KEY, 
cfg.ignoreCheckpoint);
+        }
       }
 
       if (hasErrors) {
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestStreamSyncUnitTests.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestStreamSyncUnitTests.java
index c22c948e70b..8ff5b6ee933 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestStreamSyncUnitTests.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestStreamSyncUnitTests.java
@@ -22,7 +22,10 @@ package org.apache.hudi.utilities.streamer;
 import org.apache.hudi.DataSourceWriteOptions;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieErrorTableConfig;
 import org.apache.hudi.storage.HoodieStorage;
@@ -43,9 +46,13 @@ import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
 
+import java.io.IOException;
 import java.util.stream.Stream;
 
 import static 
org.apache.hudi.config.HoodieErrorTableConfig.ERROR_ENABLE_VALIDATE_TARGET_SCHEMA;
+import static org.apache.hudi.utilities.streamer.HoodieStreamer.CHECKPOINT_KEY;
+import static 
org.apache.hudi.utilities.streamer.HoodieStreamer.CHECKPOINT_RESET_KEY;
+import static 
org.apache.hudi.utilities.streamer.StreamSync.CHECKPOINT_IGNORE_KEY;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
@@ -130,6 +137,60 @@ public class TestStreamSyncUnitTests {
             
HoodieErrorTableConfig.ERROR_ENABLE_VALIDATE_TARGET_SCHEMA.defaultValue());
   }
 
+  @ParameterizedTest
+  @MethodSource("getCheckpointToResumeCases")
+  void testGetCheckpointToResume(HoodieStreamer.Config cfg, 
HoodieCommitMetadata commitMetadata, Option<String> expectedResumeCheckpoint) 
throws IOException {
+    HoodieSparkEngineContext hoodieSparkEngineContext = 
mock(HoodieSparkEngineContext.class);
+    FileSystem fs = mock(FileSystem.class);
+    TypedProperties props = new TypedProperties();
+    SparkSession sparkSession = mock(SparkSession.class);
+    Configuration configuration = mock(Configuration.class);
+    HoodieTimeline commitsTimeline = mock(HoodieTimeline.class);
+    HoodieInstant hoodieInstant = mock(HoodieInstant.class);
+
+    when(commitsTimeline.filter(any())).thenReturn(commitsTimeline);
+    when(commitsTimeline.lastInstant()).thenReturn(Option.of(hoodieInstant));
+
+    StreamSync streamSync = new StreamSync(cfg, sparkSession, props, 
hoodieSparkEngineContext,
+        fs, configuration, client -> true, 
null,Option.empty(),null,Option.empty(),true,true);
+    StreamSync spy = spy(streamSync);
+    
doReturn(Option.of(commitMetadata)).when(spy).getLatestCommitMetadataWithValidCheckpointInfo(any());
+
+    Option<String> resumeCheckpoint = 
spy.getCheckpointToResume(Option.of(commitsTimeline));
+    assertEquals(expectedResumeCheckpoint,resumeCheckpoint);
+  }
+
+  private static Stream<Arguments> getCheckpointToResumeCases() {
+    return Stream.of(
+        // Checkpoint has been manually overridden (reset-checkpoint)
+        
Arguments.of(generateDeltaStreamerConfig("new-reset-checkpoint",null),generateCommitMetadata("old-reset-checkpoint",null,null),Option.of("new-reset-checkpoint")),
+        // Checkpoint not reset/ Ignored, continuing from previous run
+        
Arguments.of(generateDeltaStreamerConfig("old-reset-checkpoint",null),generateCommitMetadata("old-reset-checkpoint",null,"checkpoint-prev-run"),Option.of("checkpoint-prev-run")),
+        // Checkpoint not reset/ Ignored, continuing from previous run (ignore 
checkpoint has not changed)
+        
Arguments.of(generateDeltaStreamerConfig("old-reset-checkpoint","123445"),generateCommitMetadata("old-reset-checkpoint","123445","checkpoint-prev-run"),Option.of("checkpoint-prev-run")),
+        // Ignore checkpoint set, existing checkpoints will be ignored
+        
Arguments.of(generateDeltaStreamerConfig("old-reset-checkpoint","123445"),generateCommitMetadata("old-reset-checkpoint","123422","checkpoint-prev-run"),Option.empty()),
+        // Ignore checkpoint set, existing checkpoints will be ignored 
(reset-checkpoint ignored)
+        
Arguments.of(generateDeltaStreamerConfig("new-reset-checkpoint","123445"),generateCommitMetadata("old-reset-checkpoint","123422","checkpoint-prev-run"),Option.empty())
+    );
+  }
+
+  private static HoodieStreamer.Config generateDeltaStreamerConfig(String 
checkpoint, String ignoreCheckpoint) {
+    HoodieStreamer.Config cfg = new HoodieStreamer.Config();
+    cfg.checkpoint = checkpoint;
+    cfg.ignoreCheckpoint = ignoreCheckpoint;
+    cfg.tableType = "MERGE_ON_READ";
+    return cfg;
+  }
+
+  private static HoodieCommitMetadata generateCommitMetadata(String 
resetCheckpointValue, String ignoreCheckpointValue, String checkpointValue) {
+    HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
+    commitMetadata.addMetadata(CHECKPOINT_RESET_KEY,resetCheckpointValue);
+    commitMetadata.addMetadata(CHECKPOINT_IGNORE_KEY,ignoreCheckpointValue);
+    commitMetadata.addMetadata(CHECKPOINT_KEY,checkpointValue);
+    return commitMetadata;
+  }
+
   private SchemaProvider getSchemaProvider(String name, boolean 
isNullTargetSchema) {
     SchemaProvider schemaProvider = mock(SchemaProvider.class);
     Schema sourceSchema = mock(Schema.class);

Reply via email to