This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch branch-0.x in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 5582cde47b7af6f18b66eff39546120d7377e3c4 Author: Sampan S Nayak <[email protected]> AuthorDate: Fri Apr 19 11:55:43 2024 +0530 [HUDI-7618] Add ability to ignore checkpoints in delta streamer (#11018) --- .../hudi/utilities/streamer/HoodieStreamer.java | 7 +++ .../apache/hudi/utilities/streamer/StreamSync.java | 13 ++++- .../streamer/TestStreamSyncUnitTests.java | 61 ++++++++++++++++++++++ 3 files changed, 79 insertions(+), 2 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java index 643a240638c..b42b3dbeda2 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java @@ -428,6 +428,13 @@ public class HoodieStreamer implements Serializable { @Parameter(names = {"--config-hot-update-strategy-class"}, description = "Configuration hot update in continuous mode") public String configHotUpdateStrategyClass = ""; + @Parameter(names = {"--ignore-checkpoint"}, description = "Set this config with a unique value, recommend using a timestamp value or UUID." + + " Setting this config indicates that the subsequent sync should ignore the last committed checkpoint for the source. The config value is stored" + + " in the commit history, so setting the config with same values would not have any affect. This config can be used in scenarios like kafka topic change," + + " where we would want to start ingesting from the latest or earliest offset after switching the topic (in this case we would want to ignore the previously" + + " committed checkpoint, and rely on other configs to pick the starting offsets).") + public String ignoreCheckpoint = null; + public boolean isAsyncCompactionEnabled() { return continuousMode && !forceDisableCompaction && HoodieTableType.MERGE_ON_READ.equals(HoodieTableType.valueOf(tableType)); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java index f1184a75abe..3c6c36d2a3e 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java @@ -164,6 +164,7 @@ public class StreamSync implements Serializable, Closeable { private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(StreamSync.class); private static final String NULL_PLACEHOLDER = "[null]"; + public static final String CHECKPOINT_IGNORE_KEY = "deltastreamer.checkpoint.ignore_key"; /** * Delta Sync Config. @@ -732,7 +733,8 @@ public class StreamSync implements Serializable, Closeable { * @return the checkpoint to resume from if applicable. * @throws IOException */ - private Option<String> getCheckpointToResume(Option<HoodieTimeline> commitsTimelineOpt) throws IOException { + @VisibleForTesting + Option<String> getCheckpointToResume(Option<HoodieTimeline> commitsTimelineOpt) throws IOException { Option<String> resumeCheckpointStr = Option.empty(); // try get checkpoint from commits(including commit and deltacommit) // in COW migrating to MOR case, the first batch of the deltastreamer will lost the checkpoint from COW table, cause the dataloss @@ -749,7 +751,11 @@ public class StreamSync implements Serializable, Closeable { if (commitMetadataOption.isPresent()) { HoodieCommitMetadata commitMetadata = commitMetadataOption.get(); LOG.debug("Checkpoint reset from metadata: " + commitMetadata.getMetadata(CHECKPOINT_RESET_KEY)); - if (cfg.checkpoint != null && (StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY)) + if (cfg.ignoreCheckpoint != null && (StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_IGNORE_KEY)) + || !cfg.ignoreCheckpoint.equals(commitMetadata.getMetadata(CHECKPOINT_IGNORE_KEY)))) { + // we ignore any existing checkpoint and start ingesting afresh + resumeCheckpointStr = Option.empty(); + } else if (cfg.checkpoint != null && (StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY)) || !cfg.checkpoint.equals(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY)))) { resumeCheckpointStr = Option.of(cfg.checkpoint); } else if (!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_KEY))) { @@ -852,6 +858,9 @@ public class StreamSync implements Serializable, Closeable { if (cfg.checkpoint != null) { checkpointCommitMetadata.put(CHECKPOINT_RESET_KEY, cfg.checkpoint); } + if (cfg.ignoreCheckpoint != null) { + checkpointCommitMetadata.put(CHECKPOINT_IGNORE_KEY, cfg.ignoreCheckpoint); + } } if (hasErrors) { diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestStreamSyncUnitTests.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestStreamSyncUnitTests.java index c22c948e70b..8ff5b6ee933 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestStreamSyncUnitTests.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestStreamSyncUnitTests.java @@ -22,7 +22,10 @@ package org.apache.hudi.utilities.streamer; import org.apache.hudi.DataSourceWriteOptions; import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieErrorTableConfig; import org.apache.hudi.storage.HoodieStorage; @@ -43,9 +46,13 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; +import java.io.IOException; import java.util.stream.Stream; import static org.apache.hudi.config.HoodieErrorTableConfig.ERROR_ENABLE_VALIDATE_TARGET_SCHEMA; +import static org.apache.hudi.utilities.streamer.HoodieStreamer.CHECKPOINT_KEY; +import static org.apache.hudi.utilities.streamer.HoodieStreamer.CHECKPOINT_RESET_KEY; +import static org.apache.hudi.utilities.streamer.StreamSync.CHECKPOINT_IGNORE_KEY; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; @@ -130,6 +137,60 @@ public class TestStreamSyncUnitTests { HoodieErrorTableConfig.ERROR_ENABLE_VALIDATE_TARGET_SCHEMA.defaultValue()); } + @ParameterizedTest + @MethodSource("getCheckpointToResumeCases") + void testGetCheckpointToResume(HoodieStreamer.Config cfg, HoodieCommitMetadata commitMetadata, Option<String> expectedResumeCheckpoint) throws IOException { + HoodieSparkEngineContext hoodieSparkEngineContext = mock(HoodieSparkEngineContext.class); + FileSystem fs = mock(FileSystem.class); + TypedProperties props = new TypedProperties(); + SparkSession sparkSession = mock(SparkSession.class); + Configuration configuration = mock(Configuration.class); + HoodieTimeline commitsTimeline = mock(HoodieTimeline.class); + HoodieInstant hoodieInstant = mock(HoodieInstant.class); + + when(commitsTimeline.filter(any())).thenReturn(commitsTimeline); + when(commitsTimeline.lastInstant()).thenReturn(Option.of(hoodieInstant)); + + StreamSync streamSync = new StreamSync(cfg, sparkSession, props, hoodieSparkEngineContext, + fs, configuration, client -> true, null,Option.empty(),null,Option.empty(),true,true); + StreamSync spy = spy(streamSync); + doReturn(Option.of(commitMetadata)).when(spy).getLatestCommitMetadataWithValidCheckpointInfo(any()); + + Option<String> resumeCheckpoint = spy.getCheckpointToResume(Option.of(commitsTimeline)); + assertEquals(expectedResumeCheckpoint,resumeCheckpoint); + } + + private static Stream<Arguments> getCheckpointToResumeCases() { + return Stream.of( + // Checkpoint has been manually overridden (reset-checkpoint) + Arguments.of(generateDeltaStreamerConfig("new-reset-checkpoint",null),generateCommitMetadata("old-reset-checkpoint",null,null),Option.of("new-reset-checkpoint")), + // Checkpoint not reset/ Ignored, continuing from previous run + Arguments.of(generateDeltaStreamerConfig("old-reset-checkpoint",null),generateCommitMetadata("old-reset-checkpoint",null,"checkpoint-prev-run"),Option.of("checkpoint-prev-run")), + // Checkpoint not reset/ Ignored, continuing from previous run (ignore checkpoint has not changed) + Arguments.of(generateDeltaStreamerConfig("old-reset-checkpoint","123445"),generateCommitMetadata("old-reset-checkpoint","123445","checkpoint-prev-run"),Option.of("checkpoint-prev-run")), + // Ignore checkpoint set, existing checkpoints will be ignored + Arguments.of(generateDeltaStreamerConfig("old-reset-checkpoint","123445"),generateCommitMetadata("old-reset-checkpoint","123422","checkpoint-prev-run"),Option.empty()), + // Ignore checkpoint set, existing checkpoints will be ignored (reset-checkpoint ignored) + Arguments.of(generateDeltaStreamerConfig("new-reset-checkpoint","123445"),generateCommitMetadata("old-reset-checkpoint","123422","checkpoint-prev-run"),Option.empty()) + ); + } + + private static HoodieStreamer.Config generateDeltaStreamerConfig(String checkpoint, String ignoreCheckpoint) { + HoodieStreamer.Config cfg = new HoodieStreamer.Config(); + cfg.checkpoint = checkpoint; + cfg.ignoreCheckpoint = ignoreCheckpoint; + cfg.tableType = "MERGE_ON_READ"; + return cfg; + } + + private static HoodieCommitMetadata generateCommitMetadata(String resetCheckpointValue, String ignoreCheckpointValue, String checkpointValue) { + HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata(); + commitMetadata.addMetadata(CHECKPOINT_RESET_KEY,resetCheckpointValue); + commitMetadata.addMetadata(CHECKPOINT_IGNORE_KEY,ignoreCheckpointValue); + commitMetadata.addMetadata(CHECKPOINT_KEY,checkpointValue); + return commitMetadata; + } + private SchemaProvider getSchemaProvider(String name, boolean isNullTargetSchema) { SchemaProvider schemaProvider = mock(SchemaProvider.class); Schema sourceSchema = mock(Schema.class);
