This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 46a2ccf56ee [HUDI-5881] Handle pending clean instants while running 
savepoint (#8105)
46a2ccf56ee is described below

commit 46a2ccf56eeefabf03bd339640607b996e307987
Author: Sagar Sumit <[email protected]>
AuthorDate: Tue Mar 7 04:11:15 2023 +0530

    [HUDI-5881] Handle pending clean instants while running savepoint (#8105)
---
 .../action/savepoint/SavepointActionExecutor.java  | 31 +++++++++++++++-------
 1 file changed, 21 insertions(+), 10 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java
index 4e0ae1da223..0c90311fe99 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java
@@ -18,7 +18,6 @@
 
 package org.apache.hudi.table.action.savepoint;
 
-import org.apache.hudi.avro.model.HoodieCleanMetadata;
 import org.apache.hudi.avro.model.HoodieSavepointMetadata;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
@@ -44,6 +43,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import static 
org.apache.hudi.common.table.timeline.HoodieInstant.State.REQUESTED;
+import static 
org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deserializeCleanerPlan;
+import static 
org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deserializeHoodieCleanMetadata;
+
 public class SavepointActionExecutor<T, I, K, O> extends BaseActionExecutor<T, 
I, K, O, HoodieSavepointMetadata> {
 
   private static final Logger LOG = 
LogManager.getLogger(SavepointActionExecutor.class);
@@ -64,21 +67,29 @@ public class SavepointActionExecutor<T, I, K, O> extends 
BaseActionExecutor<T, I
 
   @Override
   public HoodieSavepointMetadata execute() {
-    Option<HoodieInstant> cleanInstant = 
table.getCompletedCleanTimeline().lastInstant();
     if (!table.getCompletedCommitsTimeline().containsInstant(instantTime)) {
       throw new HoodieSavepointException("Could not savepoint non-existing 
commit " + instantTime);
     }
 
     try {
       // Check the last commit that was not cleaned and check if savepoint 
time is > that commit
-      String lastCommitRetained;
-      if (cleanInstant.isPresent()) {
-        HoodieCleanMetadata cleanMetadata = TimelineMetadataUtils
-            
.deserializeHoodieCleanMetadata(table.getActiveTimeline().getInstantDetails(cleanInstant.get()).get());
-        lastCommitRetained = cleanMetadata.getEarliestCommitToRetain();
-      } else {
-        lastCommitRetained = 
table.getCompletedCommitsTimeline().firstInstant().get().getTimestamp();
-      }
+      Option<HoodieInstant> cleanInstant = 
table.getCleanTimeline().lastInstant();
+      String lastCommitRetained = cleanInstant.map(instant -> {
+        try {
+          if (instant.isCompleted()) {
+            return deserializeHoodieCleanMetadata(
+                table.getActiveTimeline().getInstantDetails(instant).get())
+                .getEarliestCommitToRetain();
+          } else {
+            // clean is pending or inflight
+            return deserializeCleanerPlan(
+                table.getActiveTimeline().getInstantDetails(new 
HoodieInstant(REQUESTED, instant.getAction(), instant.getTimestamp())).get())
+                .getEarliestInstantToRetain().getTimestamp();
+          }
+        } catch (IOException e) {
+          throw new HoodieSavepointException("Failed to savepoint " + 
instantTime, e);
+        }
+      
}).orElse(table.getCompletedCommitsTimeline().firstInstant().get().getTimestamp());
 
       // Cannot allow savepoint time on a commit that could have been cleaned
       
ValidationUtils.checkArgument(HoodieTimeline.compareTimestamps(instantTime, 
HoodieTimeline.GREATER_THAN_OR_EQUALS, lastCommitRetained),

Reply via email to