This is an automated email from the ASF dual-hosted git repository.

biyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new fdce2b80c8 [HUDI-5279] move logic for deleting active instant to 
HoodieActiveTimeline (#7196)
fdce2b80c8 is described below

commit fdce2b80c8aec49be0e3506b7120283861eafa6e
Author: Yann Byron <[email protected]>
AuthorDate: Wed Nov 30 18:11:23 2022 +0800

    [HUDI-5279] move logic for deleting active instant to HoodieActiveTimeline 
(#7196)
---
 .../apache/hudi/client/HoodieTimelineArchiver.java | 40 ++++++++++++----------
 .../table/timeline/HoodieActiveTimeline.java       | 14 +++++---
 2 files changed, 30 insertions(+), 24 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
index bb814f817d..a61a5c9008 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
@@ -537,15 +537,14 @@ public class HoodieTimelineArchiver<T extends 
HoodieAvroPayload, I, K, O> {
   private boolean deleteArchivedInstants(List<HoodieInstant> archivedInstants, 
HoodieEngineContext context) throws IOException {
     LOG.info("Deleting instants " + archivedInstants);
 
-    List<String> pendingInstantFiles = new ArrayList<>();
-    List<String> completedInstantFiles = new ArrayList<>();
+    List<HoodieInstant> pendingInstants = new ArrayList<>();
+    List<HoodieInstant> completedInstants = new ArrayList<>();
 
     for (HoodieInstant instant : archivedInstants) {
-      String filePath = new Path(metaClient.getMetaPath(), 
instant.getFileName()).toString();
       if (instant.isCompleted()) {
-        completedInstantFiles.add(filePath);
+        completedInstants.add(instant);
       } else {
-        pendingInstantFiles.add(filePath);
+        pendingInstants.add(instant);
       }
     }
 
@@ -556,27 +555,30 @@ public class HoodieTimelineArchiver<T extends 
HoodieAvroPayload, I, K, O> {
     // other monitors on the timeline(such as the compaction or clustering 
services) would
     // mistakenly recognize the pending file as a pending operation,
     // then all kinds of weird bugs occur.
-    boolean success = deleteArchivedInstantFiles(context, true, 
pendingInstantFiles);
-    success &= deleteArchivedInstantFiles(context, success, 
completedInstantFiles);
+    HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
+    if (!pendingInstants.isEmpty()) {
+      context.foreach(
+          pendingInstants,
+          instant -> activeTimeline.deleteInstantFileIfExists(instant),
+          Math.min(pendingInstants.size(), 
config.getArchiveDeleteParallelism())
+      );
+    }
+    if (!completedInstants.isEmpty()) {
+      context.foreach(
+          completedInstants,
+          instant -> activeTimeline.deleteInstantFileIfExists(instant),
+          Math.min(completedInstants.size(), 
config.getArchiveDeleteParallelism())
+      );
+    }
 
     // Remove older meta-data from auxiliary path too
     Option<HoodieInstant> latestCommitted = 
Option.fromJavaOptional(archivedInstants.stream().filter(i -> i.isCompleted() 
&& (i.getAction().equals(HoodieTimeline.COMMIT_ACTION)
         || 
(i.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION)))).max(Comparator.comparing(HoodieInstant::getTimestamp)));
     LOG.info("Latest Committed Instant=" + latestCommitted);
     if (latestCommitted.isPresent()) {
-      success &= 
deleteAllInstantsOlderOrEqualsInAuxMetaFolder(latestCommitted.get());
+      return 
deleteAllInstantsOlderOrEqualsInAuxMetaFolder(latestCommitted.get());
     }
-    return success;
-  }
-
-  private boolean deleteArchivedInstantFiles(HoodieEngineContext context, 
boolean success, List<String> files) {
-    Map<String, Boolean> resultDeleteInstantFiles = 
deleteFilesParallelize(metaClient, files, context, false);
-
-    for (Map.Entry<String, Boolean> result : 
resultDeleteInstantFiles.entrySet()) {
-      LOG.info("Archived and deleted instant file " + result.getKey() + " : " 
+ result.getValue());
-      success &= result.getValue();
-    }
-    return success;
+    return true;
   }
 
   /**
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
index be351ab8e8..0ef46031ec 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
@@ -266,22 +266,26 @@ public class HoodieActiveTimeline extends 
HoodieDefaultTimeline {
     deleteInstantFile(instant);
   }
 
+  /**
+   * Note: This method should only be used in the case that delete 
requested/inflight instant or empty clean instant,
+   * and completed commit instant in an archive operation.
+   */
   public void deleteInstantFileIfExists(HoodieInstant instant) {
     LOG.info("Deleting instant " + instant);
-    Path inFlightCommitFilePath = 
getInstantFileNamePath(instant.getFileName());
+    Path commitFilePath = getInstantFileNamePath(instant.getFileName());
     try {
-      if (metaClient.getFs().exists(inFlightCommitFilePath)) {
-        boolean result = metaClient.getFs().delete(inFlightCommitFilePath, 
false);
+      if (metaClient.getFs().exists(commitFilePath)) {
+        boolean result = metaClient.getFs().delete(commitFilePath, false);
         if (result) {
           LOG.info("Removed instant " + instant);
         } else {
           throw new HoodieIOException("Could not delete instant " + instant);
         }
       } else {
-        LOG.warn("The commit " + inFlightCommitFilePath + " to remove does not 
exist");
+        LOG.warn("The commit " + commitFilePath + " to remove does not exist");
       }
     } catch (IOException e) {
-      throw new HoodieIOException("Could not remove inflight commit " + 
inFlightCommitFilePath, e);
+      throw new HoodieIOException("Could not remove commit " + commitFilePath, 
e);
     }
   }
 

Reply via email to