nsivabalan commented on code in PR #18448:
URL: https://github.com/apache/hudi/pull/18448#discussion_r3095407275
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -1279,14 +1311,17 @@ public boolean rollback(final String commitInstantTime,
Option<HoodiePendingRoll
// is set to false since they are already deleted.
// Execute rollback
HoodieRollbackMetadata rollbackMetadata = commitInstantOpt.isPresent()
- ? table.rollback(context, rollbackInstantTime,
commitInstantOpt.get(), true, skipLocking)
- : table.rollback(context, rollbackInstantTime,
table.getMetaClient().createNewInstant(
+ ? table.rollback(context, rollbackInstantTimeOpt.get(),
commitInstantOpt.get(), true, skipLocking)
Review Comment:
for rollback execution, we should do something like below.
```
boolean isMultiWriter =
config.getWriteConcurrencyMode().supportsMultiWriter();
if (isMultiWriter and ) {
acquire lock
validate heart beat. i.e. if heart beat is expired, throw
exception.
reload timeline
if rollback is already completed, throw.
start heart beat for rollback instant.
release lock
.
.
execute rollback.
finally once rollback is completed, w/n finally block
if multi writer, stop heart beat.
```
btw, this change is irrespective of isExclusiveRollbackEnabled config.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -1578,6 +1584,10 @@ public boolean shouldRollbackUsingMarkers() {
return getBoolean(ROLLBACK_USING_MARKERS_ENABLE);
}
+ public boolean isExclusiveRollbackEnabled() {
Review Comment:
same here. lets fix the naming across
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -1245,25 +1245,57 @@ public boolean rollback(final String commitInstantTime,
Option<HoodiePendingRoll
final Timer.Context timerContext = this.metrics.getRollbackCtx();
try {
HoodieTable table = createTable(config, storageConf, skipVersionCheck);
+
Option<HoodieInstant> commitInstantOpt =
Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()
.filter(instant -> EQUALS.test(instant.requestedTime(),
commitInstantTime))
.findFirst());
- Option<HoodieRollbackPlan> rollbackPlanOption;
- String rollbackInstantTime;
- if (pendingRollbackInfo.isPresent()) {
+ Option<HoodieRollbackPlan> rollbackPlanOption = Option.empty();
+ Option<String> rollbackInstantTimeOpt;
+ if (!config.isExclusiveRollbackEnabled() &&
pendingRollbackInfo.isPresent()) {
+ // Only case when lock can be skipped is if exclusive rollback is
disabled and
+ // there is a pending rollback info available
rollbackPlanOption =
Option.of(pendingRollbackInfo.get().getRollbackPlan());
- rollbackInstantTime =
pendingRollbackInfo.get().getRollbackInstant().requestedTime();
+ rollbackInstantTimeOpt =
Option.of(pendingRollbackInfo.get().getRollbackInstant().requestedTime());
} else {
- if (commitInstantOpt.isEmpty()) {
- log.error("Cannot find instant {} in the timeline of table {} for
rollback", commitInstantTime, config.getBasePath());
- return false;
- }
if (!skipLocking) {
txnManager.beginStateChange(Option.empty(), Option.empty());
}
try {
- rollbackInstantTime = suppliedRollbackInstantTime.orElseGet(() ->
createNewInstantTime(false));
- rollbackPlanOption = table.scheduleRollback(context,
rollbackInstantTime, commitInstantOpt.get(), false,
config.shouldRollbackUsingMarkers(), false);
+ if (config.isExclusiveRollbackEnabled()) {
+ // Reload meta client within the lock so that the timeline is
latest while executing pending rollback
+ table.getMetaClient().reloadActiveTimeline();
+ Option<HoodiePendingRollbackInfo> pendingRollbackOpt =
getPendingRollbackInfo(table.getMetaClient(), commitInstantTime);
+ rollbackInstantTimeOpt = pendingRollbackOpt.map(info ->
info.getRollbackInstant().requestedTime());
+ if (pendingRollbackOpt.isPresent()) {
+ // If pending rollback and heartbeat is expired, writer should
start heartbeat and execute rollback
+ if
(heartbeatClient.isHeartbeatExpired(rollbackInstantTimeOpt.get())) {
+ LOG.info("Heartbeat expired for rollback instant {}, executing
rollback now", rollbackInstantTimeOpt);
+ HeartbeatUtils.deleteHeartbeatFile(storage, basePath,
rollbackInstantTimeOpt.get(), config);
+ heartbeatClient.start(rollbackInstantTimeOpt.get());
+ rollbackPlanOption =
pendingRollbackOpt.map(HoodiePendingRollbackInfo::getRollbackPlan);
+ } else {
+ // Heartbeat is still active for another writer, ignore
rollback for now
+ // TODO: ABCDEFGHI revisit return value
+ return false;
+ }
+ } else if
(Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()
+ .filter(instant -> EQUALS.test(instant.requestedTime(),
commitInstantTime))
+ .findFirst()).isEmpty()) {
+ // Assume rollback is already executed since the commit is no
longer present in the timeline
+ return false;
+ }
+ } else {
+ // Case where no pending rollback is present,
+ if (commitInstantOpt.isEmpty()) {
Review Comment:
lets move this to L1253 and bail out early.
and then, we can tackle just 2 cases. where the pending rollback info is
already present. or we need to schedule rollback.
this feedback is even for code outside of your changes.
the branching it bit confusing currently.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -1245,25 +1245,57 @@ public boolean rollback(final String commitInstantTime,
Option<HoodiePendingRoll
final Timer.Context timerContext = this.metrics.getRollbackCtx();
try {
HoodieTable table = createTable(config, storageConf, skipVersionCheck);
+
Option<HoodieInstant> commitInstantOpt =
Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()
.filter(instant -> EQUALS.test(instant.requestedTime(),
commitInstantTime))
.findFirst());
- Option<HoodieRollbackPlan> rollbackPlanOption;
- String rollbackInstantTime;
- if (pendingRollbackInfo.isPresent()) {
+ Option<HoodieRollbackPlan> rollbackPlanOption = Option.empty();
+ Option<String> rollbackInstantTimeOpt;
+ if (!config.isExclusiveRollbackEnabled() &&
pendingRollbackInfo.isPresent()) {
+ // Only case when lock can be skipped is if exclusive rollback is
disabled and
+ // there is a pending rollback info available
rollbackPlanOption =
Option.of(pendingRollbackInfo.get().getRollbackPlan());
- rollbackInstantTime =
pendingRollbackInfo.get().getRollbackInstant().requestedTime();
+ rollbackInstantTimeOpt =
Option.of(pendingRollbackInfo.get().getRollbackInstant().requestedTime());
} else {
- if (commitInstantOpt.isEmpty()) {
- log.error("Cannot find instant {} in the timeline of table {} for
rollback", commitInstantTime, config.getBasePath());
- return false;
- }
if (!skipLocking) {
txnManager.beginStateChange(Option.empty(), Option.empty());
}
try {
- rollbackInstantTime = suppliedRollbackInstantTime.orElseGet(() ->
createNewInstantTime(false));
- rollbackPlanOption = table.scheduleRollback(context,
rollbackInstantTime, commitInstantOpt.get(), false,
config.shouldRollbackUsingMarkers(), false);
+ if (config.isExclusiveRollbackEnabled()) {
+ // Reload meta client within the lock so that the timeline is
latest while executing pending rollback
+ table.getMetaClient().reloadActiveTimeline();
+ Option<HoodiePendingRollbackInfo> pendingRollbackOpt =
getPendingRollbackInfo(table.getMetaClient(), commitInstantTime);
+ rollbackInstantTimeOpt = pendingRollbackOpt.map(info ->
info.getRollbackInstant().requestedTime());
+ if (pendingRollbackOpt.isPresent()) {
+ // If pending rollback and heartbeat is expired, writer should
start heartbeat and execute rollback
+ if
(heartbeatClient.isHeartbeatExpired(rollbackInstantTimeOpt.get())) {
+ LOG.info("Heartbeat expired for rollback instant {}, executing
rollback now", rollbackInstantTimeOpt);
+ HeartbeatUtils.deleteHeartbeatFile(storage, basePath,
rollbackInstantTimeOpt.get(), config);
Review Comment:
we should not be needing to meddle w/ heart beat here. heart beat is just
for execution.
for planning, this is what we wanted to do.
if isExclusiveRollbackEnabled
```
a. ensure we are within lock.
b. reload timeline.
c. if pendingRollback is empty from callsite 1. check for pending rollback
again. if we find one, we should fetch it. if not, go ahead so that eventually
we will schedule rollback.
1. if we do not find one, fall through. eventually we will schedule rollback
```
Outside of isExclusiveRollbackEnabled, lets also do this.
```
again check for commit to rollback(w/ reloaded timeline). if it was already
removed from timeline, return early from the method.
```
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -299,6 +299,12 @@ public class HoodieWriteConfig extends HoodieConfig {
.withDocumentation("Enables a more efficient mechanism for rollbacks
based on the marker files generated "
+ "during the writes. Turned on by default.");
+ public static final ConfigProperty<String> ENABLE_EXCLUSIVE_ROLLBACK =
ConfigProperty
+ .key("hoodie.rollback.enforce.single.rollback.instant")
Review Comment:
we should always align variable name w/ config key
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]