This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 8cd1924bf64 [HUDI-6944] Fix flink boostrap concurrency issue (#9867)
8cd1924bf64 is described below
commit 8cd1924bf64af03c4ccb0be00cc8aa012375b40c
Author: YueZhang <[email protected]>
AuthorDate: Mon Oct 16 19:03:07 2023 +0800
[HUDI-6944] Fix flink boostrap concurrency issue (#9867)
---
.../hudi/sink/StreamWriteOperatorCoordinator.java | 30 ++++++++++------------
1 file changed, 14 insertions(+), 16 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index fcb2b1dc295..34fb5cf3c67 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -409,23 +409,21 @@ public class StreamWriteOperatorCoordinator
*/
private void initInstant(String instant) {
HoodieTimeline completedTimeline =
this.metaClient.getActiveTimeline().filterCompletedInstants();
- executor.execute(() -> {
- if (instant.equals(WriteMetadataEvent.BOOTSTRAP_INSTANT) ||
completedTimeline.containsInstant(instant)) {
- // the last instant committed successfully
- reset();
- } else {
- LOG.info("Recommit instant {}", instant);
- // Recommit should start heartbeat for lazy failed writes clean policy
to avoid aborting for heartbeat expired.
- if (writeClient.getConfig().getFailedWritesCleanPolicy().isLazy()) {
- writeClient.getHeartbeatClient().start(instant);
- }
- commitInstant(instant);
+ if (instant.equals(WriteMetadataEvent.BOOTSTRAP_INSTANT) ||
completedTimeline.containsInstant(instant)) {
+ // the last instant committed successfully
+ reset();
+ } else {
+ LOG.info("Recommit instant {}", instant);
+ // Recommit should start heartbeat for lazy failed writes clean policy
to avoid aborting for heartbeat expired.
+ if (writeClient.getConfig().getFailedWritesCleanPolicy().isLazy()) {
+ writeClient.getHeartbeatClient().start(instant);
}
- // starts a new instant
- startInstant();
- // upgrade downgrade
- this.writeClient.upgradeDowngrade(this.instant, this.metaClient);
- }, "initialize instant %s", instant);
+ commitInstant(instant);
+ }
+ // starts a new instant
+ startInstant();
+ // upgrade downgrade
+ this.writeClient.upgradeDowngrade(this.instant, this.metaClient);
}
private void handleBootstrapEvent(WriteMetadataEvent event) {