This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new a23faa26125 [HUDI-941] Update restore/rollback/indexing planning and
instant generation (#13368)
a23faa26125 is described below
commit a23faa2612559cbaf9c31f89f077dac36f8996eb
Author: Tim Brown <[email protected]>
AuthorDate: Thu May 29 19:10:29 2025 -0500
[HUDI-941] Update restore/rollback/indexing planning and instant generation
(#13368)
---
.../hudi/client/BaseHoodieTableServiceClient.java | 73 +++++++++++++---------
.../apache/hudi/client/BaseHoodieWriteClient.java | 24 ++++---
.../action/index/ScheduleIndexActionExecutor.java | 7 ---
3 files changed, 59 insertions(+), 45 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
index f73a47826e1..54898d1123b 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
@@ -1088,21 +1088,20 @@ public abstract class BaseHoodieTableServiceClient<I,
T, O> extends BaseHoodieCl
@Deprecated
public boolean rollback(final String commitInstantTime,
Option<HoodiePendingRollbackInfo> pendingRollbackInfo,
boolean skipLocking, boolean skipVersionCheck)
throws HoodieRollbackException {
- final String rollbackInstantTime = pendingRollbackInfo.map(entry ->
entry.getRollbackInstant().requestedTime())
- .orElseGet(() -> createNewInstantTime(!skipLocking));
- return rollback(commitInstantTime, pendingRollbackInfo,
rollbackInstantTime, skipLocking, skipVersionCheck);
+ return rollback(commitInstantTime, pendingRollbackInfo, Option.empty(),
skipLocking, skipVersionCheck);
}
/**
* @param commitInstantTime Instant time of the commit
* @param pendingRollbackInfo pending rollback instant and plan if rollback
failed from previous attempt.
+ * @param suppliedRollbackInstantTime the user provided instant to use for
the rollback. This is only set for rolling back instants on the metadata table.
* @param skipLocking if this is triggered by another parent
transaction, locking can be skipped.
* @throws HoodieRollbackException if rollback cannot be performed
successfully
* @Deprecated Rollback the inflight record changes with the given commit
time. This
* will be removed in future in favor of {@link
BaseHoodieWriteClient#restoreToInstant(String, boolean)
*/
@Deprecated
- public boolean rollback(final String commitInstantTime,
Option<HoodiePendingRollbackInfo> pendingRollbackInfo, String
rollbackInstantTime,
+ public boolean rollback(final String commitInstantTime,
Option<HoodiePendingRollbackInfo> pendingRollbackInfo, Option<String>
suppliedRollbackInstantTime,
boolean skipLocking, boolean skipVersionCheck)
throws HoodieRollbackException {
LOG.info("Begin rollback of instant {} for table {}", commitInstantTime,
config.getBasePath());
final Timer.Context timerContext = this.metrics.getRollbackCtx();
@@ -1111,36 +1110,48 @@ public abstract class BaseHoodieTableServiceClient<I,
T, O> extends BaseHoodieCl
Option<HoodieInstant> commitInstantOpt =
Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()
.filter(instant -> EQUALS.test(instant.requestedTime(),
commitInstantTime))
.findFirst());
- if (commitInstantOpt.isPresent() || pendingRollbackInfo.isPresent()) {
- LOG.info("Scheduling Rollback at instant time : {} "
- + "(exists in active timeline: {}), with rollback plan: {} for
table {}",
- rollbackInstantTime, commitInstantOpt.isPresent(),
pendingRollbackInfo.isPresent(), config.getBasePath());
- Option<HoodieRollbackPlan> rollbackPlanOption =
pendingRollbackInfo.map(entry -> Option.of(entry.getRollbackPlan()))
- .orElseGet(() -> table.scheduleRollback(context,
rollbackInstantTime, commitInstantOpt.get(), false,
config.shouldRollbackUsingMarkers(),
- false));
- if (rollbackPlanOption.isPresent()) {
- // There can be a case where the inflight rollback failed after the
instant files
- // are deleted for commitInstantTime, so that commitInstantOpt is
empty as it is
- // not present in the timeline. In such a case, the hoodie instant
instance
- // is reconstructed to allow the rollback to be reattempted, and the
deleteInstants
- // is set to false since they are already deleted.
- // Execute rollback
- HoodieRollbackMetadata rollbackMetadata =
commitInstantOpt.isPresent()
- ? table.rollback(context, rollbackInstantTime,
commitInstantOpt.get(), true, skipLocking)
- : table.rollback(context, rollbackInstantTime,
table.getMetaClient().createNewInstant(
- HoodieInstant.State.INFLIGHT,
rollbackPlanOption.get().getInstantToRollback().getAction(), commitInstantTime),
- false, skipLocking);
- if (timerContext != null) {
- long durationInMs = metrics.getDurationInMs(timerContext.stop());
- metrics.updateRollbackMetrics(durationInMs,
rollbackMetadata.getTotalFilesDeleted());
+ Option<HoodieRollbackPlan> rollbackPlanOption;
+ String rollbackInstantTime;
+ if (pendingRollbackInfo.isPresent()) {
+ rollbackPlanOption =
Option.of(pendingRollbackInfo.get().getRollbackPlan());
+ rollbackInstantTime =
pendingRollbackInfo.get().getRollbackInstant().requestedTime();
+ } else {
+ if (commitInstantOpt.isEmpty()) {
+ LOG.warn("Cannot find instant {} in the timeline of table {} for
rollback", commitInstantTime, config.getBasePath());
+ return false;
+ }
+ if (!skipLocking) {
+ txnManager.beginTransaction(Option.empty(), Option.empty());
+ }
+ try {
+ rollbackInstantTime = suppliedRollbackInstantTime.orElseGet(() ->
createNewInstantTime(false));
+ rollbackPlanOption = table.scheduleRollback(context,
rollbackInstantTime, commitInstantOpt.get(), false,
config.shouldRollbackUsingMarkers(), false);
+ } finally {
+ if (!skipLocking) {
+ txnManager.endTransaction(Option.empty());
}
- return true;
- } else {
- throw new HoodieRollbackException("Failed to rollback " +
config.getBasePath() + " commits " + commitInstantTime);
}
+ }
+
+ if (rollbackPlanOption.isPresent()) {
+ // There can be a case where the inflight rollback failed after the
instant files
+ // are deleted for commitInstantTime, so that commitInstantOpt is
empty as it is
+ // not present in the timeline. In such a case, the hoodie instant
instance
+ // is reconstructed to allow the rollback to be reattempted, and the
deleteInstants
+ // is set to false since they are already deleted.
+ // Execute rollback
+ HoodieRollbackMetadata rollbackMetadata = commitInstantOpt.isPresent()
+ ? table.rollback(context, rollbackInstantTime,
commitInstantOpt.get(), true, skipLocking)
+ : table.rollback(context, rollbackInstantTime,
table.getMetaClient().createNewInstant(
+ HoodieInstant.State.INFLIGHT,
rollbackPlanOption.get().getInstantToRollback().getAction(), commitInstantTime),
+ false, skipLocking);
+ if (timerContext != null) {
+ long durationInMs = metrics.getDurationInMs(timerContext.stop());
+ metrics.updateRollbackMetrics(durationInMs,
rollbackMetadata.getTotalFilesDeleted());
+ }
+ return true;
} else {
- LOG.warn("Cannot find instant {} in the timeline of table {} for
rollback", commitInstantTime, config.getBasePath());
- return false;
+ throw new HoodieRollbackException("Failed to rollback " +
config.getBasePath() + " commits " + commitInstantTime);
}
} catch (Exception e) {
throw new HoodieRollbackException("Failed to rollback " +
config.getBasePath() + " commits " + commitInstantTime, e);
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index 007493e029d..80f12c1357e 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -813,7 +813,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O>
extends BaseHoodieClient
public boolean rollback(final String commitInstantTime, String
rollbackInstantTimestamp) throws HoodieRollbackException {
HoodieTable<T, I, K, O> table = initTable(WriteOperationType.UNKNOWN,
Option.empty());
Option<HoodiePendingRollbackInfo> pendingRollbackInfo =
tableServiceClient.getPendingRollbackInfo(table.getMetaClient(),
commitInstantTime);
- return tableServiceClient.rollback(commitInstantTime, pendingRollbackInfo,
rollbackInstantTimestamp, false, false);
+ return tableServiceClient.rollback(commitInstantTime, pendingRollbackInfo,
false, false);
}
/**
@@ -859,8 +859,13 @@ public abstract class BaseHoodieWriteClient<T, I, K, O>
extends BaseHoodieClient
if (failedRestore.isPresent() &&
savepointToRestoreTimestamp.equals(RestoreUtils.getSavepointToRestoreTimestamp(table,
failedRestore.get()))) {
return Pair.of(failedRestore.get().requestedTime(),
Option.of(RestoreUtils.getRestorePlan(table.getMetaClient(),
failedRestore.get())));
}
- final String restoreInstantTimestamp = createNewInstantTime();
- return Pair.of(restoreInstantTimestamp, table.scheduleRestore(context,
restoreInstantTimestamp, savepointToRestoreTimestamp));
+ txnManager.beginTransaction(Option.empty(), Option.empty());
+ try {
+ final String restoreInstantTimestamp = createNewInstantTime();
+ return Pair.of(restoreInstantTimestamp, table.scheduleRestore(context,
restoreInstantTimestamp, savepointToRestoreTimestamp));
+ } finally {
+ txnManager.endTransaction(Option.empty());
+ }
}
/**
@@ -1019,10 +1024,15 @@ public abstract class BaseHoodieWriteClient<T, I, K, O>
extends BaseHoodieClient
* @return instant time for the requested INDEX action
*/
public Option<String> scheduleIndexing(List<MetadataPartitionType>
partitionTypes, List<String> partitionPaths) {
- String instantTime = createNewInstantTime();
- Option<HoodieIndexPlan> indexPlan = createTable(config)
- .scheduleIndexing(context, instantTime, partitionTypes,
partitionPaths);
- return indexPlan.isPresent() ? Option.of(instantTime) : Option.empty();
+ txnManager.beginTransaction(Option.empty(), Option.empty());
+ try {
+ String instantTime = createNewInstantTime(false);
+ Option<HoodieIndexPlan> indexPlan = createTable(config)
+ .scheduleIndexing(context, instantTime, partitionTypes,
partitionPaths);
+ return indexPlan.map(plan -> instantTime);
+ } finally {
+ txnManager.endTransaction(Option.empty());
+ }
}
/**
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/ScheduleIndexActionExecutor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/ScheduleIndexActionExecutor.java
index cd93d73912d..48ec417f3b3 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/ScheduleIndexActionExecutor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/ScheduleIndexActionExecutor.java
@@ -21,7 +21,6 @@ package org.apache.hudi.table.action.index;
import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
import org.apache.hudi.avro.model.HoodieIndexPlan;
-import org.apache.hudi.client.transaction.TransactionManager;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -72,8 +71,6 @@ public class ScheduleIndexActionExecutor<T, I, K, O> extends
BaseActionExecutor<
private final List<String> partitionPaths;
- private final TransactionManager txnManager;
-
public ScheduleIndexActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config,
HoodieTable<T, I, K, O> table,
@@ -83,7 +80,6 @@ public class ScheduleIndexActionExecutor<T, I, K, O> extends
BaseActionExecutor<
super(context, config, table, instantTime);
this.partitionIndexTypes = partitionIndexTypes;
this.partitionPaths = partitionPaths;
- this.txnManager = new TransactionManager(config, table.getStorage());
}
@Override
@@ -107,7 +103,6 @@ public class ScheduleIndexActionExecutor<T, I, K, O>
extends BaseActionExecutor<
.filter(p ->
requestedPartitions.contains(p.getPartitionPath())).collect(Collectors.toList());
final HoodieInstant indexInstant =
instantGenerator.getIndexRequestedInstant(instantTime);
try {
- this.txnManager.beginTransaction(Option.of(indexInstant),
Option.empty());
// get last completed instant
Option<HoodieInstant> indexUptoInstant =
table.getActiveTimeline().getContiguousCompletedWriteTimeline().lastInstant();
if (indexUptoInstant.isPresent()) {
@@ -124,8 +119,6 @@ public class ScheduleIndexActionExecutor<T, I, K, O>
extends BaseActionExecutor<
LOG.error("Could not initialize file groups", e);
// abort gracefully
abort(indexInstant);
- } finally {
- this.txnManager.endTransaction(Option.of(indexInstant));
}
return Option.empty();