This is an automated email from the ASF dual-hosted git repository. yuzhaojing pushed a commit to branch release-0.12.1-rc1 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 00d80215dfac458050d242844be1605cfe47ea15 Author: voonhous <[email protected]> AuthorDate: Tue Sep 27 15:52:23 2022 +0800 [HUDI-4907] Prevent single commit multi instant issue (#6766) Co-authored-by: TengHuo <[email protected]> Co-authored-by: yuzhao.cyz <[email protected]> --- .../java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java | 2 +- .../src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java | 6 +++--- .../src/test/java/org/apache/hudi/sink/meta/TestCkpMetadata.java | 3 ++- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index e3b0d82704..c87d5b2443 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -341,7 +341,7 @@ public class StreamWriteOperatorCoordinator private static CkpMetadata initCkpMetadata(HoodieTableMetaClient metaClient) throws IOException { CkpMetadata ckpMetadata = CkpMetadata.getInstance(metaClient.getFs(), metaClient.getBasePath()); - ckpMetadata.bootstrap(metaClient); + ckpMetadata.bootstrap(); return ckpMetadata; } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java index 4cdebf986f..6895b2a0c6 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java @@ -94,7 +94,7 @@ public class CkpMetadata implements Serializable { * * <p>This expects to be called by the driver. */ - public void bootstrap(HoodieTableMetaClient metaClient) throws IOException { + public void bootstrap() throws IOException { fs.delete(path, true); fs.mkdirs(path); } @@ -173,8 +173,8 @@ public class CkpMetadata implements Serializable { @Nullable public String lastPendingInstant() { load(); - for (int i = this.messages.size() - 1; i >= 0; i--) { - CkpMessage ckpMsg = this.messages.get(i); + if (this.messages.size() > 0) { + CkpMessage ckpMsg = this.messages.get(this.messages.size() - 1); // consider 'aborted' as pending too to reuse the instant if (!ckpMsg.isComplete()) { return ckpMsg.getInstant(); diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/meta/TestCkpMetadata.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/meta/TestCkpMetadata.java index a6fb493b9b..fe7ce3f947 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/meta/TestCkpMetadata.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/meta/TestCkpMetadata.java @@ -32,6 +32,7 @@ import org.junit.jupiter.api.io.TempDir; import java.io.File; import java.util.stream.IntStream; +import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; @@ -63,7 +64,7 @@ public class TestCkpMetadata { assertThat(metadata.lastPendingInstant(), is("2")); metadata.commitInstant("2"); - assertThat(metadata.lastPendingInstant(), is("1")); + assertThat(metadata.lastPendingInstant(), equalTo(null)); // test cleaning IntStream.range(3, 6).forEach(i -> metadata.startInstant(i + ""));
