This is an automated email from the ASF dual-hosted git repository.

vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new f8d9242  [HUDI-2050] Support rollback inflight compaction instances 
for batch flink compactor (#3124)
f8d9242 is described below

commit f8d9242372e9c682ea2573dde8b308d5895357f9
Author: swuferhong <[email protected]>
AuthorDate: Mon Jun 21 20:32:48 2021 +0800

    [HUDI-2050] Support rollback inflight compaction instances for batch flink 
compactor (#3124)
---
 .../hudi/sink/compact/HoodieFlinkCompactor.java     | 21 +++++++++++++++++++--
 .../java/org/apache/hudi/util/StreamerUtil.java     |  7 +++++++
 2 files changed, 26 insertions(+), 2 deletions(-)

diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
 
b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
index 0ba2351..f1102e8 100644
--- 
a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
@@ -21,6 +21,7 @@ package org.apache.hudi.sink.compact;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.client.HoodieFlinkWriteClient;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.CompactionUtils;
@@ -45,6 +46,7 @@ public class HoodieFlinkCompactor {
 
   protected static final Logger LOG = 
LoggerFactory.getLogger(HoodieFlinkCompactor.class);
 
+  @SuppressWarnings("unchecked, rawtypes")
   public static void main(String[] args) throws Exception {
     StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
@@ -66,17 +68,32 @@ public class HoodieFlinkCompactor {
     // set table schema
     CompactionUtil.setAvroSchema(conf, metaClient);
 
+    HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf, 
null);
+    HoodieFlinkTable<?> table = writeClient.getHoodieTable();
+
+    // rolls back inflight compaction first
+    // condition: the schedule compaction is in INFLIGHT state for max delta 
seconds.
+    String curInstantTime = HoodieActiveTimeline.createNewInstantTime();
+    int deltaSeconds = conf.getInteger(FlinkOptions.COMPACTION_DELTA_SECONDS);
+    HoodieTimeline inflightCompactionTimeline = metaClient.getActiveTimeline()
+        .filterPendingCompactionTimeline()
+        .filter(instant ->
+            instant.getState() == HoodieInstant.State.INFLIGHT
+                && StreamerUtil.instantTimeDiff(curInstantTime, 
instant.getTimestamp()) >= deltaSeconds);
+    inflightCompactionTimeline.getInstants().forEach(inflightInstant -> {
+      writeClient.rollbackInflightCompaction(inflightInstant, table);
+      table.getMetaClient().reloadActiveTimeline();
+    });
+
     // judge whether have operation
     // to compute the compaction instant time and do compaction.
     String compactionInstantTime = 
CompactionUtil.getCompactionInstantTime(metaClient);
-    HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf, 
null);
     boolean scheduled = 
writeClient.scheduleCompactionAtInstant(compactionInstantTime, Option.empty());
     if (!scheduled) {
       // do nothing.
       LOG.info("No compaction plan for this job ");
       return;
     }
-    HoodieFlinkTable<?> table = writeClient.getHoodieTable();
     // generate compaction plan
     // should support configurable commit metadata
     HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(
diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java 
b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index fcbdb21..b49343d 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -312,4 +312,11 @@ public class StreamerUtil {
     long median = low + (high - low) / 2;
     return String.valueOf(median);
   }
+
+  /**
+   * Returns the time interval in seconds between the given instant time.
+   */
+  public static long instantTimeDiff(String newInstantTime, String 
oldInstantTime) {
+    return Long.parseLong(newInstantTime) - Long.parseLong(oldInstantTime);
+  }
 }

Reply via email to