This is an automated email from the ASF dual-hosted git repository.
vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new f8d9242 [HUDI-2050] Support rollback inflight compaction instances
for batch flink compactor (#3124)
f8d9242 is described below
commit f8d9242372e9c682ea2573dde8b308d5895357f9
Author: swuferhong <[email protected]>
AuthorDate: Mon Jun 21 20:32:48 2021 +0800
[HUDI-2050] Support rollback inflight compaction instances for batch flink
compactor (#3124)
---
.../hudi/sink/compact/HoodieFlinkCompactor.java | 21 +++++++++++++++++++--
.../java/org/apache/hudi/util/StreamerUtil.java | 7 +++++++
2 files changed, 26 insertions(+), 2 deletions(-)
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
index 0ba2351..f1102e8 100644
---
a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
+++
b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
@@ -21,6 +21,7 @@ package org.apache.hudi.sink.compact;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CompactionUtils;
@@ -45,6 +46,7 @@ public class HoodieFlinkCompactor {
protected static final Logger LOG =
LoggerFactory.getLogger(HoodieFlinkCompactor.class);
+ @SuppressWarnings("unchecked, rawtypes")
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -66,17 +68,32 @@ public class HoodieFlinkCompactor {
// set table schema
CompactionUtil.setAvroSchema(conf, metaClient);
+ HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf,
null);
+ HoodieFlinkTable<?> table = writeClient.getHoodieTable();
+
+ // rolls back inflight compaction first
+ // condition: the schedule compaction is in INFLIGHT state for max delta
seconds.
+ String curInstantTime = HoodieActiveTimeline.createNewInstantTime();
+ int deltaSeconds = conf.getInteger(FlinkOptions.COMPACTION_DELTA_SECONDS);
+ HoodieTimeline inflightCompactionTimeline = metaClient.getActiveTimeline()
+ .filterPendingCompactionTimeline()
+ .filter(instant ->
+ instant.getState() == HoodieInstant.State.INFLIGHT
+ && StreamerUtil.instantTimeDiff(curInstantTime,
instant.getTimestamp()) >= deltaSeconds);
+ inflightCompactionTimeline.getInstants().forEach(inflightInstant -> {
+ writeClient.rollbackInflightCompaction(inflightInstant, table);
+ table.getMetaClient().reloadActiveTimeline();
+ });
+
// judge whether have operation
// to compute the compaction instant time and do compaction.
String compactionInstantTime =
CompactionUtil.getCompactionInstantTime(metaClient);
- HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf,
null);
boolean scheduled =
writeClient.scheduleCompactionAtInstant(compactionInstantTime, Option.empty());
if (!scheduled) {
// do nothing.
LOG.info("No compaction plan for this job ");
return;
}
- HoodieFlinkTable<?> table = writeClient.getHoodieTable();
// generate compaction plan
// should support configurable commit metadata
HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(
diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index fcbdb21..b49343d 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -312,4 +312,11 @@ public class StreamerUtil {
long median = low + (high - low) / 2;
return String.valueOf(median);
}
+
+ /**
+ * Returns the time interval in seconds between the given instant time.
+ */
+ public static long instantTimeDiff(String newInstantTime, String
oldInstantTime) {
+ return Long.parseLong(newInstantTime) - Long.parseLong(oldInstantTime);
+ }
}