This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 6f9b02decb [HUDI-3870] Add timeout rollback for flink online
compaction (#5314)
6f9b02decb is described below
commit 6f9b02decb5bb2b83709b1b6ec04a97e4d102c11
Author: Danny Chan <[email protected]>
AuthorDate: Wed Apr 13 20:05:48 2022 +0800
[HUDI-3870] Add timeout rollback for flink online compaction (#5314)
---
.../apache/hudi/sink/compact/CompactionPlanOperator.java | 13 ++++++++++---
1 file changed, 10 insertions(+), 3 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java
index 6df11fe224..48d4f48989 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java
@@ -88,8 +88,7 @@ public class CompactionPlanOperator extends
AbstractStreamOperator<CompactionPla
// when the earliest inflight instant has timed out, assumes it has
failed
// already and just rolls it back.
- // comment out: do we really need the timeout rollback ?
- // CompactionUtil.rollbackEarliestCompaction(table, conf);
+ CompactionUtil.rollbackEarliestCompaction(table, conf);
scheduleCompaction(table, checkpointId);
} catch (Throwable throwable) {
// make it fail-safe
@@ -99,7 +98,8 @@ public class CompactionPlanOperator extends
AbstractStreamOperator<CompactionPla
private void scheduleCompaction(HoodieFlinkTable<?> table, long
checkpointId) throws IOException {
// the first instant takes the highest priority.
- Option<HoodieInstant> firstRequested =
table.getActiveTimeline().filterPendingCompactionTimeline()
+ HoodieTimeline pendingCompactionTimeline =
table.getActiveTimeline().filterPendingCompactionTimeline();
+ Option<HoodieInstant> firstRequested = pendingCompactionTimeline
.filter(instant -> instant.getState() ==
HoodieInstant.State.REQUESTED).firstInstant();
if (!firstRequested.isPresent()) {
// do nothing.
@@ -107,6 +107,13 @@ public class CompactionPlanOperator extends
AbstractStreamOperator<CompactionPla
return;
}
+ Option<HoodieInstant> firstInflight = pendingCompactionTimeline
+ .filter(instant -> instant.getState() ==
HoodieInstant.State.INFLIGHT).firstInstant();
+ if (firstInflight.isPresent()) {
+ LOG.warn("Waiting for pending compaction instant : " + firstInflight + "
to complete, skip scheduling new compaction plans");
+ return;
+ }
+
String compactionInstantTime = firstRequested.get().getTimestamp();
// generate compaction plan