vinothchandar commented on a change in pull request #942: [HUDI-137] Fix state 
transitions for Hudi cleaning action
URL: https://github.com/apache/incubator-hudi/pull/942#discussion_r337003143
 
 

 ##########
 File path: hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java
 ##########
 @@ -1002,19 +1004,98 @@ public void clean() throws HoodieIOException {
    * Clean up any stale/old files/data lying around (either on file storage or 
index storage) based
    * on the configurations and CleaningPolicy used. (typically files that no 
longer can be used by a
    * running query can be cleaned)
+   *
+   * @param startCleanTime  Cleaner Instant Timestamp
+   * @return
+   * @throws HoodieIOException in case of any IOException
    */
-  private void clean(String startCleanTime) throws HoodieIOException {
+  protected HoodieCleanMetadata clean(String startCleanTime) throws 
HoodieIOException {
+    // Create a Hoodie table which encapsulated the commits and files visible
+    final HoodieTable<T> table = 
HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);
+
+    // If there are inflight(failed) or previously requested clean operation, 
first perform them
+    
table.getCleanTimeline().filterInflightsAndRequested().getInstants().forEach(hoodieInstant
 -> {
+      logger.info("There were previously unfinished cleaner operations. 
Finishing Instant=" + hoodieInstant);
+      runClean(hoodieInstant.getTimestamp());
+    });
+
+    Option<HoodieCleanerPlan> cleanerPlanOpt = scheduleClean(startCleanTime);
+
+    if (cleanerPlanOpt.isPresent()) {
+      HoodieCleanerPlan cleanerPlan = cleanerPlanOpt.get();
+      if ((cleanerPlan.getFilesToBeDeletedPerPartition() != null)
+          && !cleanerPlan.getFilesToBeDeletedPerPartition().isEmpty()) {
+        return runClean(startCleanTime);
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Creates a Cleaner plan if there are files to be cleaned and stores them 
in instant file
+   * @param startCleanTime   Cleaner Instant Time
+   * @return Cleaner Plan if generated
+   */
+  @VisibleForTesting
+  protected Option<HoodieCleanerPlan> scheduleClean(String startCleanTime) {
+    // Create a Hoodie table which encapsulated the commits and files visible
+    HoodieTable<T> table = HoodieTable.getHoodieTable(
+        createMetaClient(true), config, jsc);
+
+    HoodieCleanerPlan cleanerPlan = table.scheduleClean(jsc);
+
+    if ((cleanerPlan.getFilesToBeDeletedPerPartition() != null)
+        && !cleanerPlan.getFilesToBeDeletedPerPartition().isEmpty()) {
+
+      HoodieInstant cleanInstant = new HoodieInstant(State.REQUESTED, 
HoodieTimeline.CLEAN_ACTION, startCleanTime);
+      // Save to both aux and timeline folder
+      try {
+        table.getActiveTimeline()
+            .saveToCleanRequested(cleanInstant, 
AvroUtils.serializeCleanerPlan(cleanerPlan));
+        logger.info("Requesting Cleaning with instant time " + cleanInstant);
+      } catch (IOException e) {
+        logger.error("Got exception when saving cleaner requested file", e);
+        throw new HoodieIOException(e.getMessage(), e);
+      }
+      return Option.of(cleanerPlan);
+    }
+    return Option.empty();
+  }
+
+  /**
+   * Executes the Cleaner plan stored in the instant metadata
+   * @param cleanInstantTs  Cleaner Instant Timestamp
+   * @return
+   */
+  @VisibleForTesting
+  protected HoodieCleanMetadata runClean(String cleanInstantTs) {
+    // Create a Hoodie table which encapsulated the commits and files visible
+    HoodieTable<T> table = HoodieTable.getHoodieTable(
+        createMetaClient(false), config, jsc);
+
+    HoodieInstant cleanInstant =
+        table.getCleanTimeline().getInstants().filter(x -> 
x.getTimestamp().equals(cleanInstantTs)).findFirst().get();
+
+    Preconditions.checkArgument(cleanInstant.getState().equals(State.REQUESTED)
+        || cleanInstant.getState().equals(State.INFLIGHT));
+
+
+    if (cleanInstant.isInflight()) {
+      cleanInstant = table.getActiveTimeline().revertToRequested(cleanInstant);
 
 Review comment:
   what is your thought on moving forward and skipping the requested -> 
inflight transition if it was already in inflight instead of reverting back to 
state? do we do the same thing for compaction

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to