[
https://issues.apache.org/jira/browse/HUDI-6596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Sagar Sumit updated HUDI-6596:
------------------------------
Fix Version/s: 1.0.0
> Propose rollback implementation changes to guard against concurrent jobs
> -------------------------------------------------------------------------
>
> Key: HUDI-6596
> URL: https://issues.apache.org/jira/browse/HUDI-6596
> Project: Apache Hudi
> Issue Type: Wish
> Reporter: Krishen Bhan
> Priority: Trivial
> Fix For: 1.0.0
>
>
> h1. Issue
> The existing rollback API in 0.14
> [https://github.com/apache/hudi/blob/master/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java#L877]
> executes a rollback plan, either taking in an existing rollback plan
> provided by the caller for a previous rollback or attempt, or scheduling a
> new rollback instant if none is provided. Currently it is not safe for two
> concurrent jobs to call this API (when skipLocking=False and the callers
> aren't already holding a lock), as this can lead to an issue where multiple
> rollback requested plans are created or two jobs are executing the same
> rollback instant at the same time.
> h1. Proposed change
> One way to resolve this issue is to refactor this rollback function such that
> if skipLocking=false, the following steps are followed
> # Acquire the table lock
> # Reload the active timeline
> # Look at the active timeline to see if there is a inflight rollback instant
> from a previous rollback attempt, if it exists then assign this is as the
> rollback plan to execute. Also, check if a pending rollback plan was passed
> in by caller. Then it executes the following steps depending on whether the
> caller passed a pending rollback instant plan.
> ## [a] If a pending inflight rollback plan was passed in by caller, then
> check that there is a previous attempted rollback instant on timeline (and
> that the instant times match) and continue to use this rollback plan. If that
> isn't the case, then raise a rollback exception since this means another job
> has concurrently already executed this plan. Note that in a valid HUDI
> dataset there can be at most one rollback instant for a corresponding commit
> instant, which is why if we no longer see a pending rollback in timeline in
> this phase we can safely assume that it had already been executed to
> completion.
> ## [b] If no pending inflight rollback plan was passed in by caller and no
> pending rollback instant was found in timeline earlier, then schedule a new
> rollback plan
> # Now that a rollback plan and requested rollback instant time has been
> assigned, check for an active heartbeat for the rollback instant time. If
> there is one, then abort the rollback as that means there is a concurrent job
> executing that rollback. If not, then start a heartbeat for that rollback
> instant time.
> # Release the table lock
> # Execute the rollback plan and complete the rollback instant. Regardless of
> whether this succeeds or fails with an exception, close the heartbeat. This
> increases the chance that the next job that tries to call this rollback API
> will follow through with the rollback and not abort due to an active previous
> heartbeat
>
> * These steps will only be enforced for skipLocking=false, since if
> skipLocking=true then that means the caller may already be explicitly holding
> a table lock. In this case, acquiring the lock again in step (1) will fail.
> * Acquiring a lock and reloading timeline for (1-3) will guard against data
> race conditions where another job calls this rollback API at same time and
> schedules its own rollback plan and instant. This is since if no rollback has
> been attempted before for this instant, then before step (1), there is a
> window of time where another concurrent rollback job could have scheduled a
> rollback plan, failed execution, and cleaned up heartbeat, all while the
> current rollback job is running. As a result, even if the current job was
> passed in an empty pending rollback plan, it still needs to check the active
> timeline to ensure that no new rollback pending instant has been created.
> * Using a heartbeat will signal to other callers in other jobs that there is
> another job already executing this rollback. Checking for expired heartbeat
> and (re)-starting the heartbeat has to be done under a lock, so that multiple
> jobs don't each start it at the same time and assume that they are the only
> ones that are heartbeating.
> * The table lock is no longer needed after (5), since it can now be safely
> assumed that no other job (calling this rollback API) will execute this
> rollback instant.
> One example implementation to achieve this:
>
> {code:java}
> @Deprecated
> public boolean rollback(final String commitInstantTime,
> Option<HoodiePendingRollbackInfo> pendingRollbackInfo, boolean skipLocking,
> Option<String> rollbackInstantTimeOpt) throws HoodieRollbackException {
> final Timer.Context timerContext = this.metrics.getRollbackCtx();
> final Option<HoodieInstant> commitInstantOpt;
> final HoodieTable<T, I, K, O> table;
> try {
> table = createTable(config, hadoopConf);
> } catch (Exception e) {
> throw new HoodieRollbackException("Failed to initalize table for rollback
> " + config.getBasePath() + " commits " + commitInstantTime, e);
> }
> final String rollbackInstantTime;
> final boolean deleteInstantsDuringRollback;
> final HoodieInstant instantToRollback;
> try {
> if (!skipLocking) {
> // Do step 1 and 2
> txnManager.beginTransaction();
> table.getMetaClient().reloadActiveTimeline();
> }
> final Option<HoodiePendingRollbackInfo> previousAttemptedRollback;
> if (skipLocking) {
> // If skipLocking = true, then there directly use pendingRollbackInfo
> without checking the status of this rollback instant on active timeline
> // This is since the caller is responsible for ensuring there is no
> concurrent rollback
> previousAttemptedRollback = pendingRollbackInfo;
> } else {
> // step 3
> // If skipLocking = false, we need to check the timeline for the latest
> pending rollback, in case a concurrent rollback before
> // step 1 has already executed pendingRollbackInfo
> previousAttemptedRollback =
> getPendingRollbackInfo(table.getMetaClient(), commitInstantTime, false);
> if (pendingRollbackInfo.isPresent()) {
> // step 3a If a pendingRollbackInfo was passed in, verify that it is
> the same as the pending rollback that was just observed. If not, then
> // abort the rollback
> previousAttemptedRollback.orElseThrow(
> () -> new HoodieRollbackException(
> String.format("Pending rollback instant %s no longer
> inflight", pendingRollbackInfo.get().getRollbackInstant().getTimestamp())
> )
> );
> // This will only fail if the table is in an illegal state, where
> there are 2+ rollback plans for one instant. This
> // check shouldn't be necessary, but just keeping it here for now to
> demonstrate
>
> ValidationUtils.checkArgument(previousAttemptedRollback.get().getRollbackInstant().getTimestamp().equals(
> pendingRollbackInfo.get().getRollbackInstant().getTimestamp())
> );
> }
> }
> rollbackInstantTime = previousAttemptedRollback
> .map(pendingRollback ->
> pendingRollback.getRollbackInstant().getTimestamp())
> .orElse(rollbackInstantTimeOpt.orElseGet(() ->
> HoodieActiveTimeline.createNewInstantTime()));
> commitInstantOpt =
> Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstants()
> .filter(instant ->
> HoodieActiveTimeline.EQUALS.test(instant.getTimestamp(), commitInstantTime))
> .findFirst());
> LOG.info(String.format("Begin rollback of instant %s at instantTime %s",
> commitInstantTime, rollbackInstantTime));
> LOG.info(String.format("Scheduling Rollback at instant time : %s "
> + "(exists in active timeline: %s), with rollback plan: %s",
> rollbackInstantTime, commitInstantOpt.isPresent(),
> previousAttemptedRollback.isPresent()));
> if (previousAttemptedRollback.isPresent()) {
> if (commitInstantOpt.isPresent()) {
> instantToRollback = commitInstantOpt.get();
> deleteInstantsDuringRollback = true;
> } else {
> // A previous pending rollback plan still needs to be executed and
> completed even if the instant to rollback
> // is no longer in active timeline. This can be safely done by
> re-creating the instant to rollback and
> // configuring the rollback execution later on to not delete the
> instants during rollback.
> instantToRollback = new HoodieInstant(
> true,
> previousAttemptedRollback.get().getRollbackPlan().getInstantToRollback().getAction(),
> commitInstantTime);
> deleteInstantsDuringRollback = false;
> }
> } else {
> // Step 3b
> // A new rollback can only be scheduled if the commit to rollback is
> still in the active timeline
> if (!commitInstantOpt.isPresent()) {
> LOG.warn("Cannot find instant " + commitInstantTime + " in the
> timeline, for rollback");
> return false;
> }
> instantToRollback = commitInstantOpt.get();
> deleteInstantsDuringRollback = true;
> Option<HoodieRollbackPlan> newRollbackPlanOption =
> table.scheduleRollback(context, rollbackInstantTime,
> commitInstantOpt.get(), false, config.shouldRollbackUsingMarkers());
> newRollbackPlanOption.orElseThrow(() -> new HoodieRollbackException(
> String.format("Failed to schedule rollback of %s at instant time
> %s", commitInstantTime, rollbackInstantTime))
> );
> }
> // Step 4
> // This heartbeating logic should/will only be triggered if skipLocking =
> false. If
> // the rollback instant time has just been newly scheduled these
> heartbeat checks will still correctly
> // show the (non-existent) heartbeat as expired
> if (!skipLocking) {
> try {
> if (heartbeatClient.isHeartbeatExpired(rollbackInstantTime)) {
> heartbeatClient.stop(rollbackInstantTime);
> } else {
> throw new HoodieRollbackException(String.format("Cannot execute
> rollback instant %s due to active heartbeat", rollbackInstantTime);
> }
> heartbeatClient.start(rollbackInstantTime);
> } catch (IOException e) {
> throw new HoodieRollbackException(String.format("Could not access
> last heartbeat for %s", rollbackInstantTime);
> }
> }
> } catch (Exception e) {
> throw new HoodieRollbackException("Failed to use/create rollback plan
> for" + config.getBasePath() + " commits " + commitInstantTime, e);
> } finally {
> // Step 5
> if (!skipLocking) {
> txnManager.endTransaction();
> }
> }
> // Step 6
> try {
> HoodieRollbackMetadata rollbackMetadata = table.rollback(context,
> rollbackInstantTime, instantToRollback, deleteInstantsDuringRollback,
> skipLocking);
> if (timerContext != null) {
> long durationInMs = metrics.getDurationInMs(timerContext.stop());
> metrics.updateRollbackMetrics(durationInMs,
> rollbackMetadata.getTotalFilesDeleted());
> }
> return true;
> } catch (Exception e) {
> throw new HoodieRollbackException("Failed to execute rollback " +
> config.getBasePath() + " commits " + commitInstantTime, e);
> } finally {
> if (!skipLocking) {
> heartbeatClient.stop(rollbackInstantTime);
> }
> }
> }{code}
>
>
> h2. Why might this change be useful?
> Although these scenarios can be resolved at the application/orchestration
> level rather than HUDI, we are still working on this fix in our internal
> deployment of HUDI since we want to avoid edge cases where 2+ jobs can call
> this rollback API for the same instant at the same time.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)