This is an automated email from the ASF dual-hosted git repository. xushiyan pushed a commit to branch release-0.11.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit fcaeaea62f42e0bd1e49970ebcc1830519c80f2a Author: 吴祥平 <[email protected]> AuthorDate: Wed Apr 20 19:23:39 2022 +0800 [HUDI-3912] Fix lose data when rollback in flink async compact (#5357) * stop add event when has failed compact event Co-authored-by: wxp <[email protected]> --- .../apache/hudi/sink/compact/CompactionCommitSink.java | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java index ecd66936e8..c9fb7aceb2 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java @@ -101,11 +101,6 @@ public class CompactionCommitSink extends CleanFunction<CompactionCommitEvent> { @Override public void invoke(CompactionCommitEvent event, Context context) throws Exception { final String instant = event.getInstant(); - if (event.isFailed()) { - // handle failure case - CompactionUtil.rollbackCompaction(table, event.getInstant()); - return; - } commitBuffer.computeIfAbsent(instant, k -> new HashMap<>()) .put(event.getFileId(), event); commitIfNecessary(instant, commitBuffer.get(instant).values()); @@ -132,6 +127,18 @@ public class CompactionCommitSink extends CleanFunction<CompactionCommitEvent> { if (!isReady) { return; } + + if (events.stream().anyMatch(CompactionCommitEvent::isFailed)) { + try { + // handle failure case + CompactionUtil.rollbackCompaction(table, instant); + } finally { + // remove commitBuffer to avoid obsolete metadata commit + reset(instant); + return; + } + } + try { doCommit(instant, events); } catch (Throwable throwable) {
