This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch release-0.12.3 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit ff52597a078c7b6302bc6fb4a2b6b11f4b3bea61 Author: Danny Chan <[email protected]> AuthorDate: Wed Apr 5 09:32:46 2023 +0800 [HUDI-6030] Cleans the ckp meta while the JM restarts (#8374) We received several bug reports since #7620, for example: #8060, this patch revert the changes of CkpMetadata and always report the write metadata events for write task, the coordinator would decide whether to re-commit these metadata stats. --- .../sink/common/AbstractStreamWriteFunction.java | 7 ++++-- .../org/apache/hudi/sink/meta/CkpMetadata.java | 13 +++++++---- .../org/apache/hudi/sink/meta/TestCkpMetadata.java | 27 ++++++++++++++++++++++ 3 files changed, 41 insertions(+), 6 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java index fa4c3db86ea..04cab0b1eff 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java @@ -21,6 +21,7 @@ package org.apache.hudi.sink.common; import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.sink.StreamWriteOperatorCoordinator; @@ -46,7 +47,6 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List; -import java.util.Objects; /** * Base infrastructures for streaming writer function. @@ -194,8 +194,11 @@ public abstract class AbstractStreamWriteFunction<I> private void restoreWriteMetadata() throws Exception { boolean eventSent = false; + HoodieTimeline pendingTimeline = this.metaClient.getActiveTimeline().filterPendingExcludingCompaction(); for (WriteMetadataEvent event : this.writeMetadataState.get()) { - if (Objects.equals(this.currentInstant, event.getInstantTime())) { + // Must filter out the completed instants in case it is a partial failover, + // the write status should not be accumulated in such case. + if (pendingTimeline.containsInstant(event.getInstantTime())) { // Reset taskID for event event.setTaskID(taskID); // The checkpoint succeed but the meta does not commit, diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java index 155f0e6905e..200577428fc 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java @@ -21,6 +21,7 @@ package org.apache.hudi.sink.meta; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.common.util.VisibleForTesting; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.HadoopConfigurations; import org.apache.hudi.exception.HoodieException; @@ -90,14 +91,13 @@ public class CkpMetadata implements Serializable { // ------------------------------------------------------------------------- /** - * Initialize the message bus, would keep all the messages. + * Initialize the message bus, would clean all the messages * * <p>This expects to be called by the driver. */ public void bootstrap() throws IOException { - if (!fs.exists(path)) { - fs.mkdirs(path); - } + fs.delete(path, true); + fs.mkdirs(path); } public void startInstant(String instant) { @@ -194,6 +194,11 @@ public class CkpMetadata implements Serializable { return this.messages.stream().anyMatch(ckpMsg -> instant.equals(ckpMsg.getInstant()) && ckpMsg.isAborted()); } + @VisibleForTesting + public List<String> getInstantCache() { + return this.instantCache; + } + // ------------------------------------------------------------------------- // Utilities // ------------------------------------------------------------------------- diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/meta/TestCkpMetadata.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/meta/TestCkpMetadata.java index fe7ce3f9478..041cd12c97a 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/meta/TestCkpMetadata.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/meta/TestCkpMetadata.java @@ -30,11 +30,14 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import java.io.File; +import java.util.Arrays; +import java.util.Collections; import java.util.stream.IntStream; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertNull; /** * Test cases for {@link CkpMetadata}. @@ -74,4 +77,28 @@ public class TestCkpMetadata { metadata.abortInstant("7"); assertThat(metadata.getMessages().size(), is(5)); } + + @Test + void testBootstrap() throws Exception { + CkpMetadata metadata = getCkpMetadata(""); + // write 4 instants to the ckp_meta + IntStream.range(0, 4).forEach(i -> metadata.startInstant(i + "")); + assertThat("The first instant should be removed from the instant cache", + metadata.getInstantCache(), is(Arrays.asList("1", "2", "3"))); + + // simulate the reboot of coordinator + CkpMetadata metadata1 = getCkpMetadata(""); + metadata1.bootstrap(); + assertNull(metadata1.getInstantCache(), "The instant cache should be recovered from bootstrap"); + + metadata1.startInstant("4"); + assertThat("The first instant should be removed from the instant cache", + metadata1.getInstantCache(), is(Collections.singletonList("4"))); + } + + private CkpMetadata getCkpMetadata(String uniqueId) { + String basePath = tempFile.getAbsolutePath(); + FileSystem fs = FSUtils.getFs(basePath, HadoopConfigurations.getHadoopConf(new Configuration())); + return CkpMetadata.getInstance(fs, basePath); + } }
