This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new a23faa26125 [HUDI-941] Update restore/rollback/indexing planning and 
instant generation (#13368)
a23faa26125 is described below

commit a23faa2612559cbaf9c31f89f077dac36f8996eb
Author: Tim Brown <[email protected]>
AuthorDate: Thu May 29 19:10:29 2025 -0500

    [HUDI-941] Update restore/rollback/indexing planning and instant generation 
(#13368)
---
 .../hudi/client/BaseHoodieTableServiceClient.java  | 73 +++++++++++++---------
 .../apache/hudi/client/BaseHoodieWriteClient.java  | 24 ++++---
 .../action/index/ScheduleIndexActionExecutor.java  |  7 ---
 3 files changed, 59 insertions(+), 45 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
index f73a47826e1..54898d1123b 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
@@ -1088,21 +1088,20 @@ public abstract class BaseHoodieTableServiceClient<I, 
T, O> extends BaseHoodieCl
   @Deprecated
   public boolean rollback(final String commitInstantTime, 
Option<HoodiePendingRollbackInfo> pendingRollbackInfo,
                           boolean skipLocking, boolean skipVersionCheck) 
throws HoodieRollbackException {
-    final String rollbackInstantTime = pendingRollbackInfo.map(entry -> 
entry.getRollbackInstant().requestedTime())
-        .orElseGet(() -> createNewInstantTime(!skipLocking));
-    return rollback(commitInstantTime, pendingRollbackInfo, 
rollbackInstantTime, skipLocking, skipVersionCheck);
+    return rollback(commitInstantTime, pendingRollbackInfo, Option.empty(), 
skipLocking, skipVersionCheck);
   }
 
   /**
    * @param commitInstantTime   Instant time of the commit
    * @param pendingRollbackInfo pending rollback instant and plan if rollback 
failed from previous attempt.
+   * @param suppliedRollbackInstantTime the user provided instant to use for 
the rollback. This is only set for rolling back instants on the metadata table.
    * @param skipLocking         if this is triggered by another parent 
transaction, locking can be skipped.
    * @throws HoodieRollbackException if rollback cannot be performed 
successfully
    * @Deprecated Rollback the inflight record changes with the given commit 
time. This
    * will be removed in future in favor of {@link 
BaseHoodieWriteClient#restoreToInstant(String, boolean)
    */
   @Deprecated
-  public boolean rollback(final String commitInstantTime, 
Option<HoodiePendingRollbackInfo> pendingRollbackInfo, String 
rollbackInstantTime,
+  public boolean rollback(final String commitInstantTime, 
Option<HoodiePendingRollbackInfo> pendingRollbackInfo, Option<String> 
suppliedRollbackInstantTime,
                           boolean skipLocking, boolean skipVersionCheck) 
throws HoodieRollbackException {
     LOG.info("Begin rollback of instant {} for table {}", commitInstantTime, 
config.getBasePath());
     final Timer.Context timerContext = this.metrics.getRollbackCtx();
@@ -1111,36 +1110,48 @@ public abstract class BaseHoodieTableServiceClient<I, 
T, O> extends BaseHoodieCl
       Option<HoodieInstant> commitInstantOpt = 
Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()
           .filter(instant -> EQUALS.test(instant.requestedTime(), 
commitInstantTime))
           .findFirst());
-      if (commitInstantOpt.isPresent() || pendingRollbackInfo.isPresent()) {
-        LOG.info("Scheduling Rollback at instant time : {} "
-                + "(exists in active timeline: {}), with rollback plan: {} for 
table {}",
-            rollbackInstantTime, commitInstantOpt.isPresent(), 
pendingRollbackInfo.isPresent(), config.getBasePath());
-        Option<HoodieRollbackPlan> rollbackPlanOption = 
pendingRollbackInfo.map(entry -> Option.of(entry.getRollbackPlan()))
-            .orElseGet(() -> table.scheduleRollback(context, 
rollbackInstantTime, commitInstantOpt.get(), false, 
config.shouldRollbackUsingMarkers(),
-                false));
-        if (rollbackPlanOption.isPresent()) {
-          // There can be a case where the inflight rollback failed after the 
instant files
-          // are deleted for commitInstantTime, so that commitInstantOpt is 
empty as it is
-          // not present in the timeline.  In such a case, the hoodie instant 
instance
-          // is reconstructed to allow the rollback to be reattempted, and the 
deleteInstants
-          // is set to false since they are already deleted.
-          // Execute rollback
-          HoodieRollbackMetadata rollbackMetadata = 
commitInstantOpt.isPresent()
-              ? table.rollback(context, rollbackInstantTime, 
commitInstantOpt.get(), true, skipLocking)
-              : table.rollback(context, rollbackInstantTime, 
table.getMetaClient().createNewInstant(
-                  HoodieInstant.State.INFLIGHT, 
rollbackPlanOption.get().getInstantToRollback().getAction(), commitInstantTime),
-              false, skipLocking);
-          if (timerContext != null) {
-            long durationInMs = metrics.getDurationInMs(timerContext.stop());
-            metrics.updateRollbackMetrics(durationInMs, 
rollbackMetadata.getTotalFilesDeleted());
+      Option<HoodieRollbackPlan> rollbackPlanOption;
+      String rollbackInstantTime;
+      if (pendingRollbackInfo.isPresent()) {
+        rollbackPlanOption = 
Option.of(pendingRollbackInfo.get().getRollbackPlan());
+        rollbackInstantTime = 
pendingRollbackInfo.get().getRollbackInstant().requestedTime();
+      } else {
+        if (commitInstantOpt.isEmpty()) {
+          LOG.warn("Cannot find instant {} in the timeline of table {} for 
rollback", commitInstantTime, config.getBasePath());
+          return false;
+        }
+        if (!skipLocking) {
+          txnManager.beginTransaction(Option.empty(), Option.empty());
+        }
+        try {
+          rollbackInstantTime = suppliedRollbackInstantTime.orElseGet(() -> 
createNewInstantTime(false));
+          rollbackPlanOption = table.scheduleRollback(context, 
rollbackInstantTime, commitInstantOpt.get(), false, 
config.shouldRollbackUsingMarkers(), false);
+        } finally {
+          if (!skipLocking) {
+            txnManager.endTransaction(Option.empty());
           }
-          return true;
-        } else {
-          throw new HoodieRollbackException("Failed to rollback " + 
config.getBasePath() + " commits " + commitInstantTime);
         }
+      }
+
+      if (rollbackPlanOption.isPresent()) {
+        // There can be a case where the inflight rollback failed after the 
instant files
+        // are deleted for commitInstantTime, so that commitInstantOpt is 
empty as it is
+        // not present in the timeline.  In such a case, the hoodie instant 
instance
+        // is reconstructed to allow the rollback to be reattempted, and the 
deleteInstants
+        // is set to false since they are already deleted.
+        // Execute rollback
+        HoodieRollbackMetadata rollbackMetadata = commitInstantOpt.isPresent()
+            ? table.rollback(context, rollbackInstantTime, 
commitInstantOpt.get(), true, skipLocking)
+            : table.rollback(context, rollbackInstantTime, 
table.getMetaClient().createNewInstant(
+                HoodieInstant.State.INFLIGHT, 
rollbackPlanOption.get().getInstantToRollback().getAction(), commitInstantTime),
+            false, skipLocking);
+        if (timerContext != null) {
+          long durationInMs = metrics.getDurationInMs(timerContext.stop());
+          metrics.updateRollbackMetrics(durationInMs, 
rollbackMetadata.getTotalFilesDeleted());
+        }
+        return true;
       } else {
-        LOG.warn("Cannot find instant {} in the timeline of table {} for 
rollback", commitInstantTime, config.getBasePath());
-        return false;
+        throw new HoodieRollbackException("Failed to rollback " + 
config.getBasePath() + " commits " + commitInstantTime);
       }
     } catch (Exception e) {
       throw new HoodieRollbackException("Failed to rollback " + 
config.getBasePath() + " commits " + commitInstantTime, e);
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index 007493e029d..80f12c1357e 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -813,7 +813,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> 
extends BaseHoodieClient
   public boolean rollback(final String commitInstantTime, String 
rollbackInstantTimestamp) throws HoodieRollbackException {
     HoodieTable<T, I, K, O> table = initTable(WriteOperationType.UNKNOWN, 
Option.empty());
     Option<HoodiePendingRollbackInfo> pendingRollbackInfo = 
tableServiceClient.getPendingRollbackInfo(table.getMetaClient(), 
commitInstantTime);
-    return tableServiceClient.rollback(commitInstantTime, pendingRollbackInfo, 
rollbackInstantTimestamp, false, false);
+    return tableServiceClient.rollback(commitInstantTime, pendingRollbackInfo, 
false, false);
   }
 
   /**
@@ -859,8 +859,13 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> 
extends BaseHoodieClient
     if (failedRestore.isPresent() && 
savepointToRestoreTimestamp.equals(RestoreUtils.getSavepointToRestoreTimestamp(table,
 failedRestore.get()))) {
       return Pair.of(failedRestore.get().requestedTime(), 
Option.of(RestoreUtils.getRestorePlan(table.getMetaClient(), 
failedRestore.get())));
     }
-    final String restoreInstantTimestamp = createNewInstantTime();
-    return Pair.of(restoreInstantTimestamp, table.scheduleRestore(context, 
restoreInstantTimestamp, savepointToRestoreTimestamp));
+    txnManager.beginTransaction(Option.empty(), Option.empty());
+    try {
+      final String restoreInstantTimestamp = createNewInstantTime();
+      return Pair.of(restoreInstantTimestamp, table.scheduleRestore(context, 
restoreInstantTimestamp, savepointToRestoreTimestamp));
+    } finally {
+      txnManager.endTransaction(Option.empty());
+    }
   }
 
   /**
@@ -1019,10 +1024,15 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> 
extends BaseHoodieClient
    * @return instant time for the requested INDEX action
    */
   public Option<String> scheduleIndexing(List<MetadataPartitionType> 
partitionTypes, List<String> partitionPaths) {
-    String instantTime = createNewInstantTime();
-    Option<HoodieIndexPlan> indexPlan = createTable(config)
-        .scheduleIndexing(context, instantTime, partitionTypes, 
partitionPaths);
-    return indexPlan.isPresent() ? Option.of(instantTime) : Option.empty();
+    txnManager.beginTransaction(Option.empty(), Option.empty());
+    try {
+      String instantTime = createNewInstantTime(false);
+      Option<HoodieIndexPlan> indexPlan = createTable(config)
+          .scheduleIndexing(context, instantTime, partitionTypes, 
partitionPaths);
+      return indexPlan.map(plan -> instantTime);
+    } finally {
+      txnManager.endTransaction(Option.empty());
+    }
   }
 
   /**
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/ScheduleIndexActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/ScheduleIndexActionExecutor.java
index cd93d73912d..48ec417f3b3 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/ScheduleIndexActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/ScheduleIndexActionExecutor.java
@@ -21,7 +21,6 @@ package org.apache.hudi.table.action.index;
 
 import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
 import org.apache.hudi.avro.model.HoodieIndexPlan;
-import org.apache.hudi.client.transaction.TransactionManager;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -72,8 +71,6 @@ public class ScheduleIndexActionExecutor<T, I, K, O> extends 
BaseActionExecutor<
 
   private final List<String> partitionPaths;
 
-  private final TransactionManager txnManager;
-
   public ScheduleIndexActionExecutor(HoodieEngineContext context,
                                      HoodieWriteConfig config,
                                      HoodieTable<T, I, K, O> table,
@@ -83,7 +80,6 @@ public class ScheduleIndexActionExecutor<T, I, K, O> extends 
BaseActionExecutor<
     super(context, config, table, instantTime);
     this.partitionIndexTypes = partitionIndexTypes;
     this.partitionPaths = partitionPaths;
-    this.txnManager = new TransactionManager(config, table.getStorage());
   }
 
   @Override
@@ -107,7 +103,6 @@ public class ScheduleIndexActionExecutor<T, I, K, O> 
extends BaseActionExecutor<
         .filter(p -> 
requestedPartitions.contains(p.getPartitionPath())).collect(Collectors.toList());
     final HoodieInstant indexInstant = 
instantGenerator.getIndexRequestedInstant(instantTime);
     try {
-      this.txnManager.beginTransaction(Option.of(indexInstant), 
Option.empty());
       // get last completed instant
       Option<HoodieInstant> indexUptoInstant = 
table.getActiveTimeline().getContiguousCompletedWriteTimeline().lastInstant();
       if (indexUptoInstant.isPresent()) {
@@ -124,8 +119,6 @@ public class ScheduleIndexActionExecutor<T, I, K, O> 
extends BaseActionExecutor<
       LOG.error("Could not initialize file groups", e);
       // abort gracefully
       abort(indexInstant);
-    } finally {
-      this.txnManager.endTransaction(Option.of(indexInstant));
     }
 
     return Option.empty();

Reply via email to