lanyuanxiaoyao commented on code in PR #5677:
URL: https://github.com/apache/hudi/pull/5677#discussion_r914871240


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java:
##########
@@ -218,74 +228,104 @@ private void compact() throws Exception {
       }
 
       // fetch the instant based on the configured execution sequence
-      HoodieTimeline timeline = 
table.getActiveTimeline().filterPendingCompactionTimeline();
-      Option<HoodieInstant> requested = 
CompactionUtil.isLIFO(cfg.compactionSeq) ? timeline.lastInstant() : 
timeline.firstInstant();
-      if (!requested.isPresent()) {
+      HoodieTimeline timeline = table.getActiveTimeline();
+      List<HoodieInstant> requested = ((CompactionPlanSelectStrategy) 
ReflectionUtils.loadClass(cfg.compactionPlanSelectStrategy))
+          .select(timeline.filterPendingCompactionTimeline(), cfg);
+      if (requested.isEmpty()) {
         // do nothing.
         LOG.info("No compaction plan scheduled, turns on the compaction plan 
schedule with --schedule option");
         return;
       }
 
-      String compactionInstantTime = requested.get().getTimestamp();
-
-      HoodieInstant inflightInstant = 
HoodieTimeline.getCompactionInflightInstant(compactionInstantTime);
-      if (timeline.containsInstant(inflightInstant)) {
-        LOG.info("Rollback inflight compaction instant: [" + 
compactionInstantTime + "]");
-        table.rollbackInflightCompaction(inflightInstant);
-        table.getMetaClient().reloadActiveTimeline();
-      }
+      List<String> compactionInstantTimes = 
requested.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
+      compactionInstantTimes.forEach(timestamp -> {
+        HoodieInstant inflightInstant = 
HoodieTimeline.getCompactionInflightInstant(timestamp);
+        if (timeline.containsInstant(inflightInstant)) {
+          LOG.info("Rollback inflight compaction instant: [" + timestamp + 
"]");
+          table.rollbackInflightCompaction(inflightInstant);
+          table.getMetaClient().reloadActiveTimeline();
+        }
+      });
 
-      // generate compaction plan
+      // generate timestamp and compaction plan pair
       // should support configurable commit metadata
-      HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(
-          table.getMetaClient(), compactionInstantTime);
-
-      if (compactionPlan == null || (compactionPlan.getOperations() == null)
-          || (compactionPlan.getOperations().isEmpty())) {
+      List<Pair<String, HoodieCompactionPlan>> compactionPlans = 
compactionInstantTimes.stream()
+          .map(timestamp -> {
+            try {
+              return Pair.of(timestamp, 
CompactionUtils.getCompactionPlan(table.getMetaClient(), timestamp));
+            } catch (IOException e) {
+              throw new HoodieException(e);
+            }
+          })
+          // reject empty compaction plan

Review Comment:
   Normally, the compaction plan would not be empty, unless the process of 
generating compaction plan meet some error. Maybe i should not filter out empty 
compaction plan, it could make a misunderstanding.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to