hudi-bot opened a new issue, #16128:
URL: https://github.com/apache/hudi/issues/16128

   h1. Issue
   
   The existing rollback API in 0.14 
[https://github.com/apache/hudi/blob/master/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java#L877]
 executes a rollback plan, either taking in an existing rollback plan provided 
by the caller for a previous rollback or attempt, or scheduling a new rollback 
instant if none is provided. Currently it is not safe for two concurrent jobs 
to call this API (when skipLocking=False and the callers aren't already holding 
a lock), as this can lead to an issue where multiple rollback requested plans 
are created or two jobs are executing the same rollback instant at the same 
time.
   h1. Proposed change
   
   One way to resolve this issue is to refactor this rollback function such 
that if skipLocking=false, the following steps are followed
    # Acquire the table lock
    # Reload the active timeline
    # Look at the active timeline to see if there is a inflight rollback 
instant from a previous rollback attempt, if it exists then assign this is as 
the rollback plan to execute. Also, check if a pending rollback plan was passed 
in by caller. Then it executes the following steps depending on whether the 
caller passed a pending rollback instant plan.
    ##  [a] If a pending inflight rollback plan was passed in by caller, then 
check that there is a previous attempted rollback instant on timeline (and that 
the instant times match) and continue to use this rollback plan. If that isn't 
the case, then raise a rollback exception since this means another job has 
concurrently already executed this plan. Note that in a valid HUDI dataset 
there can be at most one rollback instant for a corresponding commit instant, 
which is why if we no longer see a pending rollback in timeline in this phase 
we can safely assume that it had already been executed to completion.
    ##  [b] If no pending inflight rollback plan was passed in by caller and no 
pending rollback instant was found in timeline earlier, then schedule a new 
rollback plan
    # Now that a rollback plan and requested rollback instant time has been 
assigned, check for an active heartbeat for the rollback instant time. If there 
is one, then abort the rollback as that means there is a concurrent job 
executing that rollback. If not, then start a heartbeat for that rollback 
instant time.
    # Release the table lock
    # Execute the rollback plan and complete the rollback instant. Regardless 
of whether this succeeds or fails with an exception, close the heartbeat. This 
increases the chance that the next job that tries to call this rollback API 
will follow through with the rollback and not abort due to an active previous 
heartbeat
   
    
    * These steps will only be enforced for  skipLocking=false, since if  
skipLocking=true then that means the caller may already be explicitly holding a 
table lock. In this case, acquiring the lock again in step (1) will fail.
    * Acquiring a lock and reloading timeline for (1-3) will guard against data 
race conditions where another job calls this rollback API at same time and 
schedules its own rollback plan and instant. This is since if no rollback has 
been attempted before for this instant, then before step (1), there is a window 
of time where another concurrent rollback job could have scheduled a rollback 
plan, failed execution, and cleaned up heartbeat, all while the current 
rollback job is running. As a result, even if the current job was passed in an 
empty pending rollback plan, it still needs to check the active timeline to 
ensure that no new rollback pending instant has been created. 
    * Using a heartbeat will signal to other callers in other jobs that there 
is another job already executing this rollback. Checking for expired heartbeat 
and (re)-starting the heartbeat has to be done under a lock, so that multiple 
jobs don't each start it at the same time and assume that they are the only 
ones that are heartbeating. 
    * The table lock is no longer needed after (5), since it can now be safely 
assumed that no other job (calling this rollback API) will execute this 
rollback instant. 
   
   One example implementation to achieve this:
   
    
   {code:java}
   @Deprecated
   public boolean rollback(final String commitInstantTime, 
Option<HoodiePendingRollbackInfo> pendingRollbackInfo, boolean skipLocking,
       Option<String> rollbackInstantTimeOpt) throws HoodieRollbackException {
     final Timer.Context timerContext = this.metrics.getRollbackCtx();
     final Option<HoodieInstant> commitInstantOpt;
     final HoodieTable<T, I, K, O> table;
     try {
       table = createTable(config, hadoopConf);
     } catch (Exception e) {
       throw new HoodieRollbackException("Failed to initalize table for 
rollback " + config.getBasePath() + " commits " + commitInstantTime, e);
     }
     final String rollbackInstantTime;
     final boolean deleteInstantsDuringRollback;
     final HoodieInstant instantToRollback;
     try {
       if (!skipLocking) {
         // Do step 1 and 2
         txnManager.beginTransaction();
         table.getMetaClient().reloadActiveTimeline();
       }
       final Option<HoodiePendingRollbackInfo> previousAttemptedRollback;
       if (skipLocking) {
         // If skipLocking = true, then there directly use pendingRollbackInfo 
without checking the status of this rollback instant on active timeline
         // This is since the caller is responsible for ensuring there is no 
concurrent rollback
         previousAttemptedRollback = pendingRollbackInfo;
       } else {
         // step 3
         // If skipLocking = false, we need to check the timeline for the 
latest pending rollback, in case a concurrent rollback before
         // step 1 has already executed pendingRollbackInfo
         previousAttemptedRollback = 
getPendingRollbackInfo(table.getMetaClient(), commitInstantTime, false);
         if (pendingRollbackInfo.isPresent()) {
           // step 3a If a pendingRollbackInfo was passed in, verify that it is 
the same as the pending rollback that was just observed. If not, then
           // abort the rollback
           previousAttemptedRollback.orElseThrow(
               () -> new HoodieRollbackException(
                   String.format("Pending rollback instant %s no longer 
inflight", pendingRollbackInfo.get().getRollbackInstant().getTimestamp())
               )
           );
           // This will only fail if the table is in an illegal state, where 
there are 2+ rollback plans for one instant. This
           // check shouldn't be necessary, but just keeping it here for now to 
demonstrate
           
ValidationUtils.checkArgument(previousAttemptedRollback.get().getRollbackInstant().getTimestamp().equals(
               pendingRollbackInfo.get().getRollbackInstant().getTimestamp())
           );
         }
       }
       rollbackInstantTime = previousAttemptedRollback
           .map(pendingRollback -> 
pendingRollback.getRollbackInstant().getTimestamp())
           .orElse(rollbackInstantTimeOpt.orElseGet(() -> 
HoodieActiveTimeline.createNewInstantTime()));
       commitInstantOpt = 
Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstants()
           .filter(instant -> 
HoodieActiveTimeline.EQUALS.test(instant.getTimestamp(), commitInstantTime))
           .findFirst());
       LOG.info(String.format("Begin rollback of instant %s at instantTime %s", 
commitInstantTime, rollbackInstantTime));
       LOG.info(String.format("Scheduling Rollback at instant time : %s "
               + "(exists in active timeline: %s), with rollback plan: %s",
           rollbackInstantTime, commitInstantOpt.isPresent(), 
previousAttemptedRollback.isPresent()));
       if (previousAttemptedRollback.isPresent()) {
         if (commitInstantOpt.isPresent()) {
           instantToRollback = commitInstantOpt.get();
           deleteInstantsDuringRollback = true;
         } else {
           // A previous pending rollback plan still needs to be executed and 
completed even if the instant to rollback
           // is no longer in active timeline. This can be safely done by 
re-creating the instant to rollback and
           // configuring the rollback execution later on to not delete the 
instants during rollback.
           instantToRollback = new HoodieInstant(
               true, 
previousAttemptedRollback.get().getRollbackPlan().getInstantToRollback().getAction(),
 commitInstantTime);
           deleteInstantsDuringRollback = false;
         }
       } else {
         // Step 3b
         // A new rollback can only be scheduled if the commit to rollback is 
still in the active timeline
         if (!commitInstantOpt.isPresent()) {
           LOG.warn("Cannot find instant " + commitInstantTime + " in the 
timeline, for rollback");
           return false;
         }
         instantToRollback = commitInstantOpt.get();
         deleteInstantsDuringRollback = true;
         Option<HoodieRollbackPlan> newRollbackPlanOption =
             table.scheduleRollback(context, rollbackInstantTime, 
commitInstantOpt.get(), false, config.shouldRollbackUsingMarkers());
         newRollbackPlanOption.orElseThrow(() -> new HoodieRollbackException(
             String.format("Failed to schedule rollback of %s at instant time 
%s", commitInstantTime, rollbackInstantTime))
         );
       }
       // Step 4
       // This heartbeating logic should/will only be triggered if skipLocking 
= false. If
       // the  rollback instant time has just been newly scheduled these 
heartbeat checks will still correctly
       // show the (non-existent) heartbeat as expired
       if (!skipLocking) {
         try {
           if (heartbeatClient.isHeartbeatExpired(rollbackInstantTime)) {
             heartbeatClient.stop(rollbackInstantTime);
           } else {
             throw new HoodieRollbackException(String.format("Cannot execute 
rollback instant %s due to active heartbeat", rollbackInstantTime);
           }
           heartbeatClient.start(rollbackInstantTime);
         } catch (IOException e) {
           throw new HoodieRollbackException(String.format("Could not access 
last heartbeat for %s", rollbackInstantTime);
         }
       }
     } catch (Exception e) {
       throw new HoodieRollbackException("Failed to use/create rollback plan 
for" + config.getBasePath() + " commits " + commitInstantTime, e);
     } finally {
       // Step 5
       if (!skipLocking) {
         txnManager.endTransaction();
       }
     }
    // Step 6
     try {
       HoodieRollbackMetadata rollbackMetadata = table.rollback(context, 
rollbackInstantTime, instantToRollback, deleteInstantsDuringRollback, 
skipLocking);
       if (timerContext != null) {
         long durationInMs = metrics.getDurationInMs(timerContext.stop());
         metrics.updateRollbackMetrics(durationInMs, 
rollbackMetadata.getTotalFilesDeleted());
       }
       return true;
     } catch (Exception e) {
       throw new HoodieRollbackException("Failed to execute rollback " + 
config.getBasePath() + " commits " + commitInstantTime, e);
     } finally {
       if (!skipLocking) {
         heartbeatClient.stop(rollbackInstantTime);
       }
     }
   }{code}
    
   
    
   h2. Why might this change be useful?
   
   Although these scenarios can be resolved at the application/orchestration 
level rather than HUDI, we are still working on this fix in our internal 
deployment of HUDI since we want to avoid edge cases where 2+ jobs can call 
this rollback API for the same instant at the same time.
   
    
   
   ## JIRA info
   
   - Link: https://issues.apache.org/jira/browse/HUDI-6596
   - Type: Wish
   - Epic: https://issues.apache.org/jira/browse/HUDI-1456
   - Fix version(s):
     - 1.1.0
   
   
   ---
   
   
   ## Comments
   
   03/Aug/23 04:15;codope;[~krishen] Overall, your proposed approach seems 
robust and thoughtful. Few considerations:
   
   > Acquire the table lock
   
   The table lock could become a bottleneck, potentially leading to performance 
issues as other operations might be blocked too. It might be useful to consider 
how frequently you expect concurrent rollbacks to occur and whether this might 
create a performance problem.
   
   > check for an active heartbeat for the rollback instant time. If there is 
one, then abort the rollback as that means there is a concurrent job executing 
that rollback.
   
   Worth considering edge cases where heartbeats could become stale or be 
missed (e.g., if a job crashes without properly closing its heartbeat). 
Handling these scenarios gracefully will help ensure that rollbacks can still 
proceed when needed. Can we ensure rollbacks are idempotent in case of repeated 
failures or retries?;;;
   
   ---
   
   03/Aug/23 20:09;krishen;Thanks for the reply!
   > The table lock could become a bottleneck, potentially leading to 
performance issues as other operations might be blocked too. It might be useful 
to consider how frequently you expect concurrent rollbacks to occur and whether 
this might create a performance problem.
   
   Based on our use case at least, I would expect attempted concurrent 
rollbacks to be uncommon / an edge case for now, and more rare as my 
organization internally makes more fixes/changes to our orchestration process. 
   But you bring up a good concern that because this locks around the 
scheduledRollback call , it will block other hudi writers  from progressing 
while it generates the rollback plan, which might be relatively time-consuming 
(since in spark HUDI 0.10 can lead to launching a new spark stage) at least 
compared to other places where HUDI holds the table lock. If this becomes a 
concern for other HUDI users (and we want HUDI OCC locking to avoid holding 
table lock while reading/creating an arbitrary/unbounded number of 
instants/data files in parallel), one solution I can think of would be to 
refactor HoodieTable scheduleRollback call such that we can first create the 
rollback plan before acquiring the table lock (in my proposed approach), and 
then later if we actually call HoodieTable scheduleRollback we just pass in 
this existing rollback plan. This way the actual "work" needed to generate 
rollback plan is done beforehand before locking, and we only check and update 
instant file(s) whil
 e under lock (and since in my proposed approach we anyway only schedule 
rollback if none has ever been scheduled before, I think that we shouldn't have 
to worry about the rollback plan we created becoming invalid/stale). Though I'm 
not sure how feasible this is since it since it may affect public APIs.
   
   > Can we ensure rollbacks are idempotent in case of repeated failures or 
retries?
   
   Yes making sure rollbacks are idempotent ( in the sense that a pending 
rollback can keep on being retried until success) is a must. Both the 
current/original implementation and the proposed approach should address this. 
Both approaches/implementations handle the case where rollback is pending but 
the instant to rollback is gone (where we need to re-materialize the instant 
info and tell the rollback execution to not actually delete instants). Also, in 
the proposed approach here in step 3 we are directly using the pending rollback 
instant that we observe in the timeline, if one exists. Unfortunately the logic 
and my phrasing for step 3 is a bit awkward, since because the caller can pass 
in a 
   pendingRollbackInfo
   that it expects to be executed, I decided that we couldn't just ignore it, 
but rather we needed to make validate that this pendingRollbackInfo is the same 
as the pending rollback instant we just saw in the timeline, and abort the 
rollback if not. 
   
   > Worth considering edge cases where heartbeats could become stale or be 
missed (e.g., if a job crashes without properly closing its heartbeat). 
Handling these scenarios gracefully will help ensure that rollbacks can still 
proceed when needed. 
   
   Yeah, if a failed rollback job doesn't clean up the heartbeat after failure, 
any rollback attempt right after (within the interval of  `[heartbeat timeout * 
(allowed heartbeat misses + 1)]` I think) will fail. The alternative (that I 
can think of) would be to simply allow for the chance of 2+ rollback jobs to 
work on the same rollback instant. The issue though is that even if the 
HoodieTable executeRollback implementation prevents the dataset from being 
corrupted and will just produce a retry-able failure, I thought that it would 
be noisy/tricky for a user to understand/debug. So I decided to add Step 4, 
since from my perspective/understanding it was a tradeoff between "always 
failing with an easy to understand retry-able exception" versus "rarely failing 
with a hard to understand retry-able exception". ;;;
   
   ---
   
   04/Aug/23 19:13;krishen;Sorry I realized that I might now have fully 
addressed your concern about idempotency/concurrent retries. Currently with my 
proposal if concurrent jobs try to execute a rollback instant around the same 
time, even though only one job will execute the rollback at a time, the other 
jobs will all instantly fail, which may not always be desirable (as that 
exception might propagate upwards and cause the entire job to fail, creating 
more noise). In order to minimize this, I think I am going to revise my 
approach to 
   - Add a new custom write config value "time to wait for rollback"
   - remove step 4 
   - Add the following step in between 5 and 6:
   {code:java}
    Acquire the table lock, check if the rollback heartbeat has expired, and if 
so start the rollback heartbeat again and release lock. Let's call this "claim 
the heartbeat". If the rollback heartbeat is still active, then 
   - If "time to wait for rollback" isn't enabled, the raise an exception and 
release lock
   - If "time to wait for rollback" is enabled, then release the table lock and 
keep on trying (every x seconds) to "claim the heartbeat". If we are not able 
to "claim the heartbeat" within "time to wait for rollback" seconds then we 
raise an exception, like the previous case. Otherwise, if we successfully 
"claim the heartbeat", then this means the recent rollback attempt either 
finished or failed. In order to determine which, we reload the active timeline 
again. If the pending rollback instant is still in timeline, we execute it, 
otherwise we safely return False. Note that although we are repeatedly holding 
and releasing the table lock every time we try to "claim the heartbeat", we are 
only making 1-2 DFS calsl (to check the heartbeat file and create it if 
possible) while the lock is held.{code}
   Although this does unfortunately add some code complexity, I plan to add 
this to my organization's internal HUDI fork since I think it will be helpful 
for us to reduce frequency of failures, and I figured I should share this here 
(and on the PR I will create) in case this might be useful for other Apache 
HUDI users. 
   
    
   
    
   
    
   
    
   
    
   
   
    ;;;
   
   ---
   
   08/Aug/23 02:52;krishen;I was going to create my PR 
[https://github.com/kbuci/hudi/pull/2] for this change on the hudi repo, but 
realized there was an issue since the assumptions made in the rollback 
implementation (both the existing one and my proposed change) where 
{{org.apache.hudi.client.BaseHoodieTableServiceClient#rollback(org.apache.hudi.table.HoodieTable,
 java.lang.String, 
org.apache.hudi.common.util.Option<org.apache.hudi.common.HoodiePendingRollbackInfo>,
 java.lang.String)}}  is inconsistent with the changes here in 
[https://github.com/apache/hudi/pull/8849]
   Specifically,  
{{org.apache.hudi.client.BaseHoodieTableServiceClient#rollback(org.apache.hudi.table.HoodieTable,
 java.lang.String, 
org.apache.hudi.common.util.Option<org.apache.hudi.common.HoodiePendingRollbackInfo>,
 java.lang.String)}} seems to have been implemented (base on code and comments) 
under the assumption that a rollback operation will delete all instant files 
from {{commit instant to rollback}} before completing the rollback operation 
itself, which is what I had thought when I was working on my rollback fix(es). 
But it seems that after [https://github.com/apache/hudi/pull/8849] this is 
(retroactively) incorrect as now we are deleting instant files from {{commit 
instant to rollback}} after completing the rollback instant, leaving rollback 
operation as a special type of case where it is possible for rollback instant 
to be complete even if the actual rollback operation has not fully completed 
(due to failing after completing the rollback instant but before cleanin
 g up instant files of `{{{}commit instant to rollback{}}} ). Although 
[https://github.com/apache/hudi/pull/8849] handles this by delegating the 
deleting of instant files from `{{{}commit instant to rollback{}}} to some 
clean rollbackFailedWrites operation,  I think the intention/invariants/rules 
of how rollback operates is a bit ambiguous to me and something that should be 
reconciled. To further add complexity, it seems that based on 
[https://github.com/kbuci/hudi/blob/35be9bbbc7ef7ae6ad0a4955da78da4c0463074f/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java#L630]
 it is also currently legal to remove a request rollback plan, in other words 
"rolling back" a pending rollback plan.
   
   Also after taking another look at the reason for 
[https://github.com/apache/hudi/pull/8849] I think the fix there can be 
reverted and handled alternatively, since it seems to me that fixing/finding  
bugs with getPendingRollbackInfo and preventing concurrent rollback 
scheduling/execution might prevent the underlying issue/reason for PR in the 
first place;;;
   
   ---
   
   17/Aug/23 17:19;suryaprasanna;I think we should use a different name to 
skipLocking, if acquiring the lock is skipped because we already acquired the 
lock, then we should use a different variable something like isLockAcquired or 
something.
    
   Without complicating the rollback logic, let us see all the cases where we 
use rollback.
   1. Rollback failed writes: Lock has to be acquired until scheduling the 
rollback plans for pending instantsToRollback and for execution it need not 
acquire a lock.
   2. Rollback a specific instant: Only schedule step needs to be under a lock.
   3. Restore operation: Entire operation needs to be under a lock. 
    
   For rollbackFailedWrites method, break it down to two Stages
    
   *Stage 1: Scheduling stage*
    
   Step 1: Acquire lock and reload active timeline
   Step 2: getInstantsToRollback
   Step 3: removeInflightFilesAlreadyRolledBack
   Step 4: getPendingRollbackInfos
   Step 5: Use existing plan or schedule rollback
   Step 6: Release lock
    
   *Stage 2: Execution stage*
    
   Step 7: Check if heartbeat exist for pending rollback plan. If yes abort 
else start an heartbeat and proceed further for executing it.
    
   Rollback operation are not that common. We only do rollback if something 
fails. So, it is not like .clean or .commit operations. So, we should be ok in 
seeing some noise.;;;
   
   ---
   
   17/Aug/23 19:08;krishen;{quote}  I think we should use a different name to 
skipLocking, if acquiring the lock is skipped because we already acquired the 
lock, then we should use a different variable something like isLockAcquired or 
something. 
   {quote}
   I think that was the convention I noticed, but sure I can address that once 
I post the PR for review, thanks!
   {quote}
   Without complicating the rollback logic, let us see all the cases where we 
use rollback.
   1. Rollback failed writes: Lock has to be acquired until scheduling the 
rollback plans for pending instantsToRollback and for execution it need not 
acquire a lock.
   2. Rollback a specific instant: Only schedule step needs to be under a lock.
   3. Restore operation: Entire operation needs to be under a lock. 
    
   For rollbackFailedWrites method, break it down to two Stages
    
   Stage 1: Scheduling stage
    
   Step 1: Acquire lock and reload active timeline
   Step 2: getInstantsToRollback
   Step 3: removeInflightFilesAlreadyRolledBack
   Step 4: getPendingRollbackInfos
   Step 5: Use existing plan or schedule rollback
   Step 6: Release lock
    
   Stage 2: Execution stage
    
   Step 7: Check if heartbeat exist for pending rollback plan. If yes abort 
else start an heartbeat and proceed further for executing it.
   {quote} 
   For now in this ticket the intention is to just focus on (2) `Rollback a 
specific instant:` . Depending on how this implementation goes, I think we 
could follow your approach for (1) `rollbackFailedWrites` when I create a 
ticket to address that. Sorry, I should rename the name of this JIRA ticket to 
clarify that.
   
   {quote}
   Rollback operation are not that common. We only do rollback if something 
fails. So, it is not like .clean or .commit operations. So, we should be ok in 
seeing some noise.
   {quote}
   The issue is that although the chance of an individual job transiently 
failing on a upsert is low, as we add more concurrent writers to our pool of 
upsert jobs on a dataset, the chance that at least one upsert job will fail 
increases. In addition there is the case of underlying infrastructure (like 
Spark/YARN) service degradations (that we've seen internally in our 
organization) it's possible that all writers might fail during an 
upsert/rollback in the same window of time. This means that we should try to 
gracefully/resiliently account for a chance that there is a concurrent rollback 
going on during a job's upsert operation, or even a concurrent rollback that 
itself has failed. Although locking the table during a rollback is out of the 
question, we can still go with an approach like I suggested in 
https://issues.apache.org/jira/browse/HUDI-6596?focusedCommentId=17751201&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17751201
 , to greatly reduce the c
 hance that some sporadic rollback/failures will cause all concurrent upsert 
jobs to fail.
   
   ;;;
   
   ---
   
   21/Aug/23 06:39;suryaprasanna;{color:#FF0000}For now in this ticket the 
intention is to just focus on (2) `Rollback a specific instant:` . Depending on 
how this implementation goes, I think we could follow your approach for (1) 
`rollbackFailedWrites` when I create a ticket to address that. Sorry, I should 
rename the name of this JIRA ticket to clarify that.{color}
   
   {color:#172b4d}> Since, the rollback API schedule & execution has an issue, 
we should fix the issue at the root. If we dont fix the root cause and start 
providing the patches by locking individual read APIs to timeline, another set 
of issues will come up. So, I strongly suggest you to fix the root cause. We 
also require this for supporting rollbacks on failed clustering commits within 
rollbackFailedWrites API.{color}
   
   {color:#172b4d}Reducing the noise will be secondary, we can tackle/track 
that problem with another ticket.{color};;;


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to