This is an automated email from the ASF dual-hosted git repository. zhangduo pushed a commit to branch branch-2.5 in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.5 by this push: new 8f70d84b809 HBASE-28212 Do not need to maintain rollback step when root procedure does not support rollback (#5538) (#5547) 8f70d84b809 is described below commit 8f70d84b8095fd112bb141e10ba88cffdd54f5d1 Author: Duo Zhang <zhang...@apache.org> AuthorDate: Wed Nov 29 19:40:44 2023 +0800 HBASE-28212 Do not need to maintain rollback step when root procedure does not support rollback (#5538) (#5547) Signed-off-by: GeorryHuang <huangzhuo...@apache.org> (cherry picked from commit 4b015e6a5486394d70bbf5fc0197e469c0987913) --- .../apache/hadoop/hbase/procedure2/Procedure.java | 22 +- .../hadoop/hbase/procedure2/ProcedureExecutor.java | 346 ++++++++++++++------- .../hadoop/hbase/procedure2/ProcedureUtil.java | 5 + .../hbase/procedure2/RootProcedureState.java | 52 +++- .../hbase/procedure2/StateMachineProcedure.java | 5 + .../hbase/procedure2/ProcedureTestingUtility.java | 37 ++- .../hbase/procedure2/TestProcedureRecovery.java | 15 +- .../procedure2/TestStateMachineProcedure.java | 6 + .../hbase/procedure2/TestYieldProcedures.java | 5 + .../store/wal/TestWALProcedureStore.java | 10 +- .../src/main/protobuf/Procedure.proto | 4 + .../hbase/master/assignment/TestRegionBypass.java | 36 +-- .../hbase/master/assignment/TestRollbackSCP.java | 186 +++++++++++ 13 files changed, 576 insertions(+), 153 deletions(-) diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java index a71026e6007..44639af1a9f 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java @@ -131,6 +131,9 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE private RemoteProcedureException exception = null; private int[] stackIndexes = null; private int childrenLatch = 0; + // since we do not always maintain stackIndexes if the root procedure does not support rollback, + // we need a separated flag to indicate whether a procedure was executed + private boolean wasExecuted; private volatile int timeout = NO_TIMEOUT; private volatile long lastUpdate; @@ -871,6 +874,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE stackIndexes = Arrays.copyOf(stackIndexes, count + 1); stackIndexes[count] = index; } + wasExecuted = true; } protected synchronized boolean removeStackIndex() { @@ -891,16 +895,32 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE for (int i = 0; i < this.stackIndexes.length; ++i) { this.stackIndexes[i] = stackIndexes.get(i); } + // for backward compatible, where a procedure is serialized before we added the executed flag, + // the flag will be false so we need to set the wasExecuted flag here + this.wasExecuted = true; + } + + protected synchronized void setExecuted() { + this.wasExecuted = true; } protected synchronized boolean wasExecuted() { - return stackIndexes != null; + return wasExecuted; } protected synchronized int[] getStackIndexes() { return stackIndexes; } + /** + * Return whether the procedure supports rollback. If the procedure does not support rollback, we + * can skip the rollback state management which could increase the performance. See HBASE-28210 + * and HBASE-28212. + */ + protected boolean isRollbackSupported() { + return true; + } + // ========================================================================== // Internal methods - called by the ProcedureExecutor // ========================================================================== diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java index 46ce065b877..3099c64e00f 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java @@ -24,9 +24,11 @@ import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Comparator; import java.util.Deque; import java.util.HashSet; import java.util.List; +import java.util.PriorityQueue; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; @@ -117,6 +119,9 @@ public class ProcedureExecutor<TEnvironment> { protected volatile boolean killAfterStoreUpdate = false; protected volatile boolean toggleKillAfterStoreUpdate = false; + protected volatile boolean killBeforeStoreUpdateInRollback = false; + protected volatile boolean toggleKillBeforeStoreUpdateInRollback = false; + protected boolean shouldKillBeforeStoreUpdate() { final boolean kill = this.killBeforeStoreUpdate; if (this.toggleKillBeforeStoreUpdate) { @@ -148,6 +153,16 @@ public class ProcedureExecutor<TEnvironment> { protected boolean shouldKillAfterStoreUpdate(final boolean isSuspended) { return (isSuspended && !killIfSuspended) ? false : shouldKillAfterStoreUpdate(); } + + protected boolean shouldKillBeforeStoreUpdateInRollback() { + final boolean kill = this.killBeforeStoreUpdateInRollback; + if (this.toggleKillBeforeStoreUpdateInRollback) { + this.killBeforeStoreUpdateInRollback = !kill; + LOG.warn("Toggle KILL before store update in rollback to: " + + this.killBeforeStoreUpdateInRollback); + } + return kill; + } } public interface ProcedureExecutorListener { @@ -394,68 +409,10 @@ public class ProcedureExecutor<TEnvironment> { }); } - private void loadProcedures(ProcedureIterator procIter) throws IOException { - // 1. Build the rollback stack - int runnableCount = 0; - int failedCount = 0; - int waitingCount = 0; - int waitingTimeoutCount = 0; - while (procIter.hasNext()) { - boolean finished = procIter.isNextFinished(); - @SuppressWarnings("unchecked") - Procedure<TEnvironment> proc = procIter.next(); - NonceKey nonceKey = proc.getNonceKey(); - long procId = proc.getProcId(); - - if (finished) { - completed.put(proc.getProcId(), new CompletedProcedureRetainer<>(proc)); - LOG.debug("Completed {}", proc); - } else { - if (!proc.hasParent()) { - assert !proc.isFinished() : "unexpected finished procedure"; - rollbackStack.put(proc.getProcId(), new RootProcedureState<>()); - } - - // add the procedure to the map - proc.beforeReplay(getEnvironment()); - procedures.put(proc.getProcId(), proc); - switch (proc.getState()) { - case RUNNABLE: - runnableCount++; - break; - case FAILED: - failedCount++; - break; - case WAITING: - waitingCount++; - break; - case WAITING_TIMEOUT: - waitingTimeoutCount++; - break; - default: - break; - } - } - - if (nonceKey != null) { - nonceKeysToProcIdsMap.put(nonceKey, procId); // add the nonce to the map - } - } - - // 2. Initialize the stacks: In the old implementation, for procedures in FAILED state, we will - // push it into the ProcedureScheduler directly to execute the rollback. But this does not work - // after we introduce the restore lock stage. For now, when we acquire a xlock, we will remove - // the queue from runQueue in scheduler, and then when a procedure which has lock access, for - // example, a sub procedure of the procedure which has the xlock, is pushed into the scheduler, - // we will add the queue back to let the workers poll from it. The assumption here is that, the - // procedure which has the xlock should have been polled out already, so when loading we can not - // add the procedure to scheduler first and then call acquireLock, since the procedure is still - // in the queue, and since we will remove the queue from runQueue, then no one can poll it out, - // then there is a dead lock - List<Procedure<TEnvironment>> runnableList = new ArrayList<>(runnableCount); - List<Procedure<TEnvironment>> failedList = new ArrayList<>(failedCount); - List<Procedure<TEnvironment>> waitingList = new ArrayList<>(waitingCount); - List<Procedure<TEnvironment>> waitingTimeoutList = new ArrayList<>(waitingTimeoutCount); + private void initializeStacks(ProcedureIterator procIter, + List<Procedure<TEnvironment>> runnableList, List<Procedure<TEnvironment>> failedList, + List<Procedure<TEnvironment>> waitingList, List<Procedure<TEnvironment>> waitingTimeoutList) + throws IOException { procIter.reset(); while (procIter.hasNext()) { if (procIter.isNextFinished()) { @@ -504,8 +461,19 @@ public class ProcedureExecutor<TEnvironment> { break; } } + rollbackStack.forEach((rootProcId, procStack) -> { + if (procStack.getSubproceduresStack() != null) { + // if we have already record some stack ids, it means we support rollback + procStack.setRollbackSupported(true); + } else { + // otherwise, test the root procedure to see if we support rollback + procStack.setRollbackSupported(procedures.get(rootProcId).isRollbackSupported()); + } + }); + } - // 3. Check the waiting procedures to see if some of them can be added to runnable. + private void processWaitingProcedures(List<Procedure<TEnvironment>> waitingList, + List<Procedure<TEnvironment>> runnableList) { waitingList.forEach(proc -> { if (!proc.hasChildren()) { // Normally, WAITING procedures should be waken by its children. But, there is a case that, @@ -522,16 +490,17 @@ public class ProcedureExecutor<TEnvironment> { proc.afterReplay(getEnvironment()); } }); - // 4. restore locks - restoreLocks(); + } - // 5. Push the procedures to the timeout executor + private void processWaitingTimeoutProcedures(List<Procedure<TEnvironment>> waitingTimeoutList) { waitingTimeoutList.forEach(proc -> { proc.afterReplay(getEnvironment()); timeoutExecutor.add(proc); }); + } - // 6. Push the procedure to the scheduler + private void pushProceduresAfterLoad(List<Procedure<TEnvironment>> runnableList, + List<Procedure<TEnvironment>> failedList) { failedList.forEach(scheduler::addBack); runnableList.forEach(p -> { p.afterReplay(getEnvironment()); @@ -540,6 +509,84 @@ public class ProcedureExecutor<TEnvironment> { } scheduler.addBack(p); }); + } + + private void loadProcedures(ProcedureIterator procIter) throws IOException { + // 1. Build the rollback stack + int runnableCount = 0; + int failedCount = 0; + int waitingCount = 0; + int waitingTimeoutCount = 0; + while (procIter.hasNext()) { + boolean finished = procIter.isNextFinished(); + @SuppressWarnings("unchecked") + Procedure<TEnvironment> proc = procIter.next(); + NonceKey nonceKey = proc.getNonceKey(); + long procId = proc.getProcId(); + + if (finished) { + completed.put(proc.getProcId(), new CompletedProcedureRetainer<>(proc)); + LOG.debug("Completed {}", proc); + } else { + if (!proc.hasParent()) { + assert !proc.isFinished() : "unexpected finished procedure"; + rollbackStack.put(proc.getProcId(), new RootProcedureState<>()); + } + + // add the procedure to the map + proc.beforeReplay(getEnvironment()); + procedures.put(proc.getProcId(), proc); + switch (proc.getState()) { + case RUNNABLE: + runnableCount++; + break; + case FAILED: + failedCount++; + break; + case WAITING: + waitingCount++; + break; + case WAITING_TIMEOUT: + waitingTimeoutCount++; + break; + default: + break; + } + } + + if (nonceKey != null) { + nonceKeysToProcIdsMap.put(nonceKey, procId); // add the nonce to the map + } + } + + // 2. Initialize the stacks: In the old implementation, for procedures in FAILED state, we will + // push it into the ProcedureScheduler directly to execute the rollback. But this does not work + // after we introduce the restore lock stage. For now, when we acquire a xlock, we will remove + // the queue from runQueue in scheduler, and then when a procedure which has lock access, for + // example, a sub procedure of the procedure which has the xlock, is pushed into the scheduler, + // we will add the queue back to let the workers poll from it. The assumption here is that, the + // procedure which has the xlock should have been polled out already, so when loading we can not + // add the procedure to scheduler first and then call acquireLock, since the procedure is still + // in the queue, and since we will remove the queue from runQueue, then no one can poll it out, + // then there is a dead lock + List<Procedure<TEnvironment>> runnableList = new ArrayList<>(runnableCount); + List<Procedure<TEnvironment>> failedList = new ArrayList<>(failedCount); + List<Procedure<TEnvironment>> waitingList = new ArrayList<>(waitingCount); + List<Procedure<TEnvironment>> waitingTimeoutList = new ArrayList<>(waitingTimeoutCount); + + initializeStacks(procIter, runnableList, failedList, waitingList, waitingTimeoutList); + + // 3. Check the waiting procedures to see if some of them can be added to runnable. + processWaitingProcedures(waitingList, runnableList); + + // 4. restore locks + restoreLocks(); + + // 5. Push the procedures to the timeout executor + processWaitingTimeoutProcedures(waitingTimeoutList); + + // 6. Push the procedure to the scheduler + pushProceduresAfterLoad(runnableList, failedList); // After all procedures put into the queue, signal the worker threads. // Otherwise, there is a race condition. See HBASE-21364. scheduler.signalAll(); @@ -1080,6 +1127,7 @@ public class ProcedureExecutor<TEnvironment> { // Create the rollback stack for the procedure RootProcedureState<TEnvironment> stack = new RootProcedureState<>(); + stack.setRollbackSupported(proc.isRollbackSupported()); rollbackStack.put(currentProcId, stack); // Submit the new subprocedures @@ -1441,42 +1489,75 @@ public class ProcedureExecutor<TEnvironment> { } } - /** - * Execute the rollback of the full procedure stack. Once the procedure is rolledback, the - * root-procedure will be visible as finished to user, and the result will be the fatal exception. - */ - private LockState executeRollback(long rootProcId, RootProcedureState<TEnvironment> procStack) { - Procedure<TEnvironment> rootProc = procedures.get(rootProcId); - RemoteProcedureException exception = rootProc.getException(); - // TODO: This needs doc. The root proc doesn't have an exception. Maybe we are - // rolling back because the subprocedure does. Clarify. - if (exception == null) { - exception = procStack.getException(); - rootProc.setFailure(exception); - store.update(rootProc); + // Returning null means we have already held the execution lock, so you do not need to get the + // lock entry for releasing + private IdLock.Entry getLockEntryForRollback(long procId) { + // Hold the execution lock if it is not held by us. The IdLock is not reentrant so we need + // this check, as the worker will hold the lock before executing a procedure. This is the only + // place where we may hold two procedure execution locks, and there is a fence in the + // RootProcedureState where we can make sure that only one worker can execute the rollback of + // a RootProcedureState, so there is no dead lock problem. And the lock here is necessary to + // prevent race between us and the force update thread. + if (!procExecutionLock.isHeldByCurrentThread(procId)) { + try { + return procExecutionLock.getLockEntry(procId); + } catch (IOException e) { + // can only happen if interrupted, so not a big deal to propagate it + throw new UncheckedIOException(e); + } + } + return null; + } + + private void executeUnexpectedRollback(Procedure<TEnvironment> rootProc, + RootProcedureState<TEnvironment> procStack) { + if (procStack.getSubprocs() != null) { + // comparing proc id in reverse order, so we will delete later procedures first, otherwise we + // may delete parent procedure first and if we fail in the middle of this operation, when + // loading we will find some orphan procedures + PriorityQueue<Procedure<TEnvironment>> pq = + new PriorityQueue<>(procStack.getSubprocs().size(), + Comparator.<Procedure<TEnvironment>> comparingLong(Procedure::getProcId).reversed()); + pq.addAll(procStack.getSubprocs()); + for (;;) { + Procedure<TEnvironment> subproc = pq.poll(); + if (subproc == null) { + break; + } + if (!procedures.containsKey(subproc.getProcId())) { + // this means it has already been rolledback + continue; + } + IdLock.Entry lockEntry = getLockEntryForRollback(subproc.getProcId()); + try { + cleanupAfterRollbackOneStep(subproc); + execCompletionCleanup(subproc); + } finally { + if (lockEntry != null) { + procExecutionLock.releaseLockEntry(lockEntry); + } + } + } + } + IdLock.Entry lockEntry = getLockEntryForRollback(rootProc.getProcId()); + try { + cleanupAfterRollbackOneStep(rootProc); + } finally { + if (lockEntry != null) { + procExecutionLock.releaseLockEntry(lockEntry); + } } + } + private LockState executeNormalRollback(Procedure<TEnvironment> rootProc, + RootProcedureState<TEnvironment> procStack) { List<Procedure<TEnvironment>> subprocStack = procStack.getSubproceduresStack(); assert subprocStack != null : "Called rollback with no steps executed rootProc=" + rootProc; int stackTail = subprocStack.size(); while (stackTail-- > 0) { Procedure<TEnvironment> proc = subprocStack.get(stackTail); - IdLock.Entry lockEntry = null; - // Hold the execution lock if it is not held by us. The IdLock is not reentrant so we need - // this check, as the worker will hold the lock before executing a procedure. This is the only - // place where we may hold two procedure execution locks, and there is a fence in the - // RootProcedureState where we can make sure that only one worker can execute the rollback of - // a RootProcedureState, so there is no dead lock problem. And the lock here is necessary to - // prevent race between us and the force update thread. - if (!procExecutionLock.isHeldByCurrentThread(proc.getProcId())) { - try { - lockEntry = procExecutionLock.getLockEntry(proc.getProcId()); - } catch (IOException e) { - // can only happen if interrupted, so not a big deal to propagate it - throw new UncheckedIOException(e); - } - } + IdLock.Entry lockEntry = getLockEntryForRollback(proc.getProcId()); try { // For the sub procedures which are successfully finished, we do not rollback them. // Typically, if we want to rollback a procedure, we first need to rollback it, and then @@ -1526,15 +1607,59 @@ public class ProcedureExecutor<TEnvironment> { } } } + return LockState.LOCK_ACQUIRED; + } + + /** + * Execute the rollback of the full procedure stack. Once the procedure is rolledback, the + * root-procedure will be visible as finished to user, and the result will be the fatal exception. + */ + private LockState executeRollback(long rootProcId, RootProcedureState<TEnvironment> procStack) { + Procedure<TEnvironment> rootProc = procedures.get(rootProcId); + RemoteProcedureException exception = rootProc.getException(); + // TODO: This needs doc. The root proc doesn't have an exception. Maybe we are + // rolling back because the subprocedure does. Clarify. + if (exception == null) { + exception = procStack.getException(); + rootProc.setFailure(exception); + store.update(rootProc); + } + + if (procStack.isRollbackSupported()) { + LockState lockState = executeNormalRollback(rootProc, procStack); + if (lockState != LockState.LOCK_ACQUIRED) { + return lockState; + } + } else { + // the procedure does not support rollback, so typically we should not reach here, this + // usually means there are code bugs, let's just wait all the subprocedures to finish and then + // mark the root procedure as failure. + LOG.error(HBaseMarkers.FATAL, + "Root Procedure {} does not support rollback but the execution failed" + + " and try to rollback, code bug?", + rootProc, exception); + executeUnexpectedRollback(rootProc, procStack); + } + + IdLock.Entry lockEntry = getLockEntryForRollback(rootProc.getProcId()); + try { + // Finalize the procedure state + LOG.info("Rolled back {} exec-time={}", rootProc, + StringUtils.humanTimeDiff(rootProc.elapsedTime())); + procedureFinished(rootProc); + } finally { + if (lockEntry != null) { + procExecutionLock.releaseLockEntry(lockEntry); + } + } - // Finalize the procedure state - LOG.info("Rolled back {} exec-time={}", rootProc, - StringUtils.humanTimeDiff(rootProc.elapsedTime())); - procedureFinished(rootProc); return LockState.LOCK_ACQUIRED; } private void cleanupAfterRollbackOneStep(Procedure<TEnvironment> proc) { + if (testing != null && testing.shouldKillBeforeStoreUpdateInRollback()) { + kill("TESTING: Kill BEFORE store update in rollback: " + proc); + } if (proc.removeStackIndex()) { if (!proc.isSuccess()) { proc.setState(ProcedureState.ROLLEDBACK); @@ -1577,15 +1702,6 @@ public class ProcedureExecutor<TEnvironment> { LOG.error(HBaseMarkers.FATAL, "CODE-BUG: Uncaught runtime exception for " + proc, e); } - // allows to kill the executor before something is stored to the wal. - // useful to test the procedure recovery. - if (testing != null && testing.shouldKillBeforeStoreUpdate()) { - String msg = "TESTING: Kill before store update"; - LOG.debug(msg); - stop(); - throw new RuntimeException(msg); - } - cleanupAfterRollbackOneStep(proc); return LockState.LOCK_ACQUIRED; @@ -1714,8 +1830,20 @@ public class ProcedureExecutor<TEnvironment> { if (procedure.needPersistence()) { // Add the procedure to the stack // See HBASE-28210 on why we need synchronized here + boolean needUpdateStoreOutsideLock = false; synchronized (procStack) { - procStack.addRollbackStep(procedure); + if (procStack.addRollbackStep(procedure)) { + updateStoreOnExec(procStack, procedure, subprocs); + } else { + needUpdateStoreOutsideLock = true; + } + } + // this is an optimization if we do not need to maintain rollback step, as all subprocedures + // of the same root procedure share the same root procedure state, if we can only update + // store under the above lock, the sub procedures of the same root procedure can only be + // persistent sequentially, which will have a bad performance. See HBASE-28212 for more + // details. + if (needUpdateStoreOutsideLock) { updateStoreOnExec(procStack, procedure, subprocs); } } diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureUtil.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureUtil.java index 4a225161dbf..04ae16ddc3f 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureUtil.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureUtil.java @@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; */ @InterfaceAudience.Private public final class ProcedureUtil { + private ProcedureUtil() { } @@ -188,6 +189,7 @@ public final class ProcedureUtil { builder.addStackId(stackIds[i]); } } + builder.setExecuted(proc.wasExecuted()); if (proc.hasException()) { RemoteProcedureException exception = proc.getException(); @@ -253,6 +255,9 @@ public final class ProcedureUtil { if (proto.getStackIdCount() > 0) { proc.setStackIndexes(proto.getStackIdList()); } + if (proto.getExecuted()) { + proc.setExecuted(); + } if (proto.hasException()) { assert proc.getState() == ProcedureProtos.ProcedureState.FAILED diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RootProcedureState.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RootProcedureState.java index 9990bdeb430..c9f5bad2a13 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RootProcedureState.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RootProcedureState.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.procedure2; import java.util.ArrayList; +import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -32,8 +33,13 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.Procedu * Internal state of the ProcedureExecutor that describes the state of a "Root Procedure". A "Root * Procedure" is a Procedure without parent, each subprocedure will be added to the "Root Procedure" * stack (or rollback-stack). RootProcedureState is used and managed only by the ProcedureExecutor. - * Long rootProcId = getRootProcedureId(proc); rollbackStack.get(rootProcId).acquire(proc) - * rollbackStack.get(rootProcId).release(proc) ... + * + * <pre> + * Long rootProcId = getRootProcedureId(proc); + * rollbackStack.get(rootProcId).acquire(proc) + * rollbackStack.get(rootProcId).release(proc) + * ... + * </pre> */ @InterfaceAudience.Private @InterfaceStability.Evolving @@ -50,8 +56,15 @@ class RootProcedureState<TEnvironment> { private ArrayList<Procedure<TEnvironment>> subprocStack = null; private State state = State.RUNNING; private int running = 0; - - public synchronized boolean isFailed() { + // for some procedures such as SCP and TRSP, there is no way to rollback, so we do not need to + // maintain the rollback steps + // TODO: the rollback logic is a bit complicated, so here we will only test whether the root + // procedure supports rollback at the very beginning, actually, lots of procedure can only + // rollback at the pre check step, after that there is no rollback too, we should try to support + // this too. + private boolean rollbackSupported; + + protected synchronized boolean isFailed() { switch (state) { case ROLLINGBACK: case FAILED: @@ -62,7 +75,7 @@ class RootProcedureState<TEnvironment> { return false; } - public synchronized boolean isRollingback() { + protected synchronized boolean isRollingback() { return state == State.ROLLINGBACK; } @@ -85,6 +98,14 @@ class RootProcedureState<TEnvironment> { state = State.FAILED; } + protected synchronized void setRollbackSupported(boolean rollbackSupported) { + this.rollbackSupported = rollbackSupported; + } + + protected synchronized boolean isRollbackSupported() { + return rollbackSupported; + } + protected synchronized long[] getSubprocedureIds() { if (subprocs == null) { return null; @@ -92,13 +113,17 @@ class RootProcedureState<TEnvironment> { return subprocs.stream().mapToLong(Procedure::getProcId).toArray(); } + protected synchronized Collection<Procedure<TEnvironment>> getSubprocs() { + return subprocs; + } + protected synchronized List<Procedure<TEnvironment>> getSubproceduresStack() { return subprocStack; } protected synchronized RemoteProcedureException getException() { - if (subprocStack != null) { - for (Procedure<TEnvironment> proc : subprocStack) { + if (subprocs != null) { + for (Procedure<TEnvironment> proc : subprocs) { if (proc.hasException()) { return proc.getException(); } @@ -134,18 +159,27 @@ class RootProcedureState<TEnvironment> { /** * Called by the ProcedureExecutor after the procedure step is completed, to add the step to the - * rollback list (or procedure stack) + * rollback list (or procedure stack). + * <p> + * Return whether we successfully added the rollback step. If the root procedure has already + * crossed the PONR, we do not need to maintain the rollback step, */ - protected synchronized void addRollbackStep(Procedure<TEnvironment> proc) { + protected synchronized boolean addRollbackStep(Procedure<TEnvironment> proc) { if (proc.isFailed()) { state = State.FAILED; } + if (!rollbackSupported) { + // just record executed, skip adding rollback step + proc.setExecuted(); + return false; + } if (subprocStack == null) { subprocStack = new ArrayList<>(); } proc.addStackIndex(subprocStack.size()); LOG.trace("Add procedure {} as the {}th rollback step", proc, subprocStack.size()); subprocStack.add(proc); + return true; } protected synchronized void addSubProcedure(Procedure<TEnvironment> proc) { diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java index d7ab269cb55..b90600b4707 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java @@ -249,6 +249,11 @@ public abstract class StateMachineProcedure<TEnvironment, TState> extends Proced } } + @Override + protected final boolean isRollbackSupported() { + return isRollbackSupported(getCurrentState()); + } + /** * Used by the default implementation of abort() to know if the current state can be aborted and * rollback can be triggered. diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java index 926a46e9c56..32be56e44db 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java @@ -194,6 +194,15 @@ public final class ProcedureTestingUtility { assertSingleExecutorForKillTests(procExecutor); } + public static <TEnv> void setKillBeforeStoreUpdateInRollback(ProcedureExecutor<TEnv> procExecutor, + boolean value) { + createExecutorTesting(procExecutor); + procExecutor.testing.killBeforeStoreUpdateInRollback = value; + LOG.warn("Set Kill before store update in rollback to: " + + procExecutor.testing.killBeforeStoreUpdateInRollback); + assertSingleExecutorForKillTests(procExecutor); + } + public static <TEnv> void setToggleKillBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor, boolean value) { createExecutorTesting(procExecutor); @@ -201,6 +210,13 @@ public final class ProcedureTestingUtility { assertSingleExecutorForKillTests(procExecutor); } + public static <TEnv> void + setToggleKillBeforeStoreUpdateInRollback(ProcedureExecutor<TEnv> procExecutor, boolean value) { + createExecutorTesting(procExecutor); + procExecutor.testing.toggleKillBeforeStoreUpdateInRollback = value; + assertSingleExecutorForKillTests(procExecutor); + } + public static <TEnv> void toggleKillBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor) { createExecutorTesting(procExecutor); procExecutor.testing.killBeforeStoreUpdate = !procExecutor.testing.killBeforeStoreUpdate; @@ -208,6 +224,16 @@ public final class ProcedureTestingUtility { assertSingleExecutorForKillTests(procExecutor); } + public static <TEnv> void + toggleKillBeforeStoreUpdateInRollback(ProcedureExecutor<TEnv> procExecutor) { + createExecutorTesting(procExecutor); + procExecutor.testing.killBeforeStoreUpdateInRollback = + !procExecutor.testing.killBeforeStoreUpdateInRollback; + LOG.warn("Set Kill before store update to in rollback: " + + procExecutor.testing.killBeforeStoreUpdateInRollback); + assertSingleExecutorForKillTests(procExecutor); + } + public static <TEnv> void toggleKillAfterStoreUpdate(ProcedureExecutor<TEnv> procExecutor) { createExecutorTesting(procExecutor); procExecutor.testing.killAfterStoreUpdate = !procExecutor.testing.killAfterStoreUpdate; @@ -217,8 +243,15 @@ public final class ProcedureTestingUtility { public static <TEnv> void setKillAndToggleBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor, boolean value) { - ProcedureTestingUtility.setKillBeforeStoreUpdate(procExecutor, value); - ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExecutor, value); + setKillBeforeStoreUpdate(procExecutor, value); + setToggleKillBeforeStoreUpdate(procExecutor, value); + assertSingleExecutorForKillTests(procExecutor); + } + + public static <TEnv> void setKillAndToggleBeforeStoreUpdateInRollback( + ProcedureExecutor<TEnv> procExecutor, boolean value) { + setKillBeforeStoreUpdateInRollback(procExecutor, value); + setToggleKillBeforeStoreUpdateInRollback(procExecutor, value); assertSingleExecutorForKillTests(procExecutor); } diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java index 72b7f61471f..8fd02e412d8 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java @@ -129,6 +129,7 @@ public class TestProcedureRecovery { env.waitOnLatch(); LOG.debug("execute procedure " + this + " step=" + step); ProcedureTestingUtility.toggleKillBeforeStoreUpdate(procExecutor); + ProcedureTestingUtility.toggleKillBeforeStoreUpdateInRollback(procExecutor); step++; Threads.sleepWithoutInterrupt(procSleepInterval); if (isAborted()) { @@ -143,6 +144,7 @@ public class TestProcedureRecovery { protected void rollback(TestProcEnv env) { LOG.debug("rollback procedure " + this + " step=" + step); ProcedureTestingUtility.toggleKillBeforeStoreUpdate(procExecutor); + ProcedureTestingUtility.toggleKillBeforeStoreUpdateInRollback(procExecutor); step++; } @@ -359,6 +361,11 @@ public class TestProcedureRecovery { return Flow.HAS_MORE_STATE; } + @Override + protected boolean isRollbackSupported(State state) { + return true; + } + @Override protected void rollbackState(TestProcEnv env, final State state) { switch (state) { @@ -425,8 +432,8 @@ public class TestProcedureRecovery { @Test public void testStateMachineRecovery() throws Exception { - ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExecutor, true); - ProcedureTestingUtility.setKillBeforeStoreUpdate(procExecutor, true); + ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExecutor, true); + ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdateInRollback(procExecutor, true); // Step 1 - kill Procedure proc = new TestStateMachineProcedure(); @@ -463,8 +470,8 @@ public class TestProcedureRecovery { @Test public void testStateMachineRollbackRecovery() throws Exception { - ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExecutor, true); - ProcedureTestingUtility.setKillBeforeStoreUpdate(procExecutor, true); + ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExecutor, true); + ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdateInRollback(procExecutor, true); // Step 1 - kill Procedure proc = new TestStateMachineProcedure(); diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestStateMachineProcedure.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestStateMachineProcedure.java index df648413b89..831ff59aac1 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestStateMachineProcedure.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestStateMachineProcedure.java @@ -171,6 +171,7 @@ public class TestStateMachineProcedure { public void testChildOnLastStepWithRollbackDoubleExecution() throws Exception { procExecutor.getEnvironment().triggerChildRollback = true; ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExecutor, true); + ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdateInRollback(procExecutor, true); long procId = procExecutor.submitProcedure(new TestSMProcedure()); ProcedureTestingUtility.testRecoveryAndDoubleExecution(procExecutor, procId, true); assertEquals(6, procExecutor.getEnvironment().execCount.get()); @@ -249,6 +250,11 @@ public class TestStateMachineProcedure { return Flow.HAS_MORE_STATE; } + @Override + protected boolean isRollbackSupported(TestSMProcedureState state) { + return true; + } + @Override protected void rollbackState(TestProcEnv env, TestSMProcedureState state) { LOG.info("ROLLBACK " + state + " " + this); diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java index f84e1f70940..c43139e3e17 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java @@ -238,6 +238,11 @@ public class TestYieldProcedures { return executionInfo; } + @Override + protected boolean isRollbackSupported(State state) { + return true; + } + @Override protected StateMachineProcedure.Flow executeFromState(TestProcEnv env, State state) throws InterruptedException { diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java index 01fc8666dae..1df22ae7533 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java @@ -30,6 +30,7 @@ import java.util.Arrays; import java.util.Comparator; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -73,7 +74,7 @@ public class TestWALProcedureStore { private WALProcedureStore procStore; - private HBaseCommonTestingUtility htu; + private final HBaseCommonTestingUtility htu = new HBaseCommonTestingUtility(); private FileSystem fs; private Path testDir; private Path logDir; @@ -84,13 +85,13 @@ public class TestWALProcedureStore { @Before public void setUp() throws IOException { - htu = new HBaseCommonTestingUtility(); testDir = htu.getDataTestDir(); htu.getConfiguration().set(HConstants.HBASE_DIR, testDir.toString()); fs = testDir.getFileSystem(htu.getConfiguration()); htu.getConfiguration().set(HConstants.HBASE_DIR, testDir.toString()); assertTrue(testDir.depth() > 1); + TestSequentialProcedure.seqId.set(0); setupConfig(htu.getConfiguration()); logDir = new Path(testDir, "proc-logs"); procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir); @@ -835,10 +836,11 @@ public class TestWALProcedureStore { } public static class TestSequentialProcedure extends SequentialProcedure<Void> { - private static long seqid = 0; + + private static final AtomicLong seqId = new AtomicLong(0); public TestSequentialProcedure() { - setProcId(++seqid); + setProcId(seqId.incrementAndGet()); } @Override diff --git a/hbase-protocol-shaded/src/main/protobuf/Procedure.proto b/hbase-protocol-shaded/src/main/protobuf/Procedure.proto index b588f3b954b..e93d952ec79 100644 --- a/hbase-protocol-shaded/src/main/protobuf/Procedure.proto +++ b/hbase-protocol-shaded/src/main/protobuf/Procedure.proto @@ -69,6 +69,10 @@ message Procedure { // whether the procedure need to be bypassed optional bool bypass = 17 [default = false]; + + // whether the procedure has been executed + // since we do not always maintain the stack_id now, we need a separated flag + optional bool executed = 18 [default = false]; } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestRegionBypass.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestRegionBypass.java index 2ef227acd8f..0a695456d6a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestRegionBypass.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestRegionBypass.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.master.assignment; import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionStateTransitionState.REGION_STATE_TRANSITION_OPEN; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import java.io.IOException; @@ -93,7 +94,7 @@ public class TestRegionBypass { TEST_UTIL.getHBaseCluster().getMaster().getMasterProcedureExecutor().getEnvironment(); List<RegionInfo> regions = admin.getRegions(this.tableName); for (RegionInfo ri : regions) { - admin.unassign(ri.getRegionName(), false); + admin.unassign(ri.getRegionName()); } List<Long> pids = new ArrayList<>(regions.size()); for (RegionInfo ri : regions) { @@ -102,11 +103,8 @@ public class TestRegionBypass { pids.add( TEST_UTIL.getHBaseCluster().getMaster().getMasterProcedureExecutor().submitProcedure(p)); } - for (Long pid : pids) { - while (!TEST_UTIL.getHBaseCluster().getMaster().getMasterProcedureExecutor().isStarted(pid)) { - Thread.sleep(100); - } - } + TEST_UTIL.waitFor(30000, () -> pids.stream().allMatch( + pid -> TEST_UTIL.getHBaseCluster().getMaster().getMasterProcedureExecutor().isStarted(pid))); List<Procedure<MasterProcedureEnv>> ps = TEST_UTIL.getHBaseCluster().getMaster().getMasterProcedureExecutor().getProcedures(); for (Procedure<MasterProcedureEnv> p : ps) { @@ -120,29 +118,17 @@ public class TestRegionBypass { } // Try and assign WITHOUT override flag. Should fail!. for (RegionInfo ri : regions) { - try { - admin.assign(ri.getRegionName()); - } catch (Throwable dnrioe) { - // Expected - LOG.info("Expected {}", dnrioe); - } - } - while ( - !TEST_UTIL.getHBaseCluster().getMaster().getMasterProcedureExecutor().getActiveProcIds() - .isEmpty() - ) { - Thread.sleep(100); + IOException error = assertThrows(IOException.class, () -> admin.assign(ri.getRegionName())); + LOG.info("Expected {}", error); } + TEST_UTIL.waitFor(30000, () -> TEST_UTIL.getHBaseCluster().getMaster() + .getMasterProcedureExecutor().getActiveProcIds().isEmpty()); // Now assign with the override flag. for (RegionInfo ri : regions) { TEST_UTIL.getHbck().assigns(Arrays.<String> asList(ri.getEncodedName()), true); } - while ( - !TEST_UTIL.getHBaseCluster().getMaster().getMasterProcedureExecutor().getActiveProcIds() - .isEmpty() - ) { - Thread.sleep(100); - } + TEST_UTIL.waitFor(60000, () -> TEST_UTIL.getHBaseCluster().getMaster() + .getMasterProcedureExecutor().getActiveProcIds().isEmpty()); for (RegionInfo ri : regions) { assertTrue(ri.toString(), TEST_UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager() .getRegionStates().isRegionOnline(ri)); @@ -173,6 +159,8 @@ public class TestRegionBypass { @Override protected Flow executeFromState(MasterProcedureEnv env, RegionStateTransitionState state) throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { + // add a sleep so we will not consume all the CPUs and write a bunch of logs + Thread.sleep(100); switch (state) { case REGION_STATE_TRANSITION_GET_ASSIGN_CANDIDATE: LOG.info("LATCH1 {}", this.latch.getCount()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestRollbackSCP.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestRollbackSCP.java new file mode 100644 index 00000000000..25f2e582068 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestRollbackSCP.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.assignment; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.everyItem; +import static org.hamcrest.Matchers.not; +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.StartMiniClusterOption; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.BalanceRequest; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureTestingUtility; +import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure; +import org.apache.hadoop.hbase.master.region.MasterRegion; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; +import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; + +/** + * SCP does not support rollback actually, here we just want to simulate that when there is a code + * bug, SCP and its sub procedures will not hang there forever, and it will not mess up the + * procedure store. + */ +@Category({ MasterTests.class, LargeTests.class }) +public class TestRollbackSCP { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestRollbackSCP.class); + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private static final TableName TABLE_NAME = TableName.valueOf("test"); + + private static final byte[] FAMILY = Bytes.toBytes("family"); + + private static final AtomicBoolean INJECTED = new AtomicBoolean(false); + + private static final class AssignmentManagerForTest extends AssignmentManager { + + public AssignmentManagerForTest(MasterServices master, MasterRegion masterRegion) { + super(master, masterRegion); + } + + @Override + void persistToMeta(RegionStateNode regionNode) throws IOException { + TransitRegionStateProcedure proc = regionNode.getProcedure(); + if (!regionNode.getRegionInfo().isMetaRegion() && proc.hasParent()) { + Procedure<?> p = + getMaster().getMasterProcedureExecutor().getProcedure(proc.getRootProcId()); + // fail the procedure if it is a sub procedure for SCP + if (p instanceof ServerCrashProcedure) { + if (INJECTED.compareAndSet(false, true)) { + ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdateInRollback( + getMaster().getMasterProcedureExecutor(), true); + } + throw new RuntimeException("inject code bug"); + } + } + super.persistToMeta(regionNode); + } + } + + public static final class HMasterForTest extends HMaster { + + public HMasterForTest(Configuration conf) throws IOException { + super(conf); + } + + @Override + protected AssignmentManager createAssignmentManager(MasterServices master, + MasterRegion masterRegion) { + return new AssignmentManagerForTest(master, masterRegion); + } + } + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + UTIL.getConfiguration().setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 1); + UTIL.startMiniCluster(StartMiniClusterOption.builder().numDataNodes(3).numRegionServers(3) + .masterClass(HMasterForTest.class).build()); + UTIL.createMultiRegionTable(TABLE_NAME, FAMILY); + UTIL.waitTableAvailable(TABLE_NAME); + UTIL.getAdmin().balance(BalanceRequest.newBuilder().setIgnoreRegionsInTransition(true).build()); + UTIL.waitUntilNoRegionsInTransition(); + UTIL.getAdmin().balancerSwitch(false, true); + } + + @AfterClass + public static void tearDownAfterClass() throws IOException { + UTIL.shutdownMiniCluster(); + } + + @Before + public void setUp() throws IOException { + UTIL.ensureSomeNonStoppedRegionServersAvailable(2); + } + + private ServerCrashProcedure getSCPForServer(ServerName serverName) throws IOException { + return UTIL.getMiniHBaseCluster().getMaster().getProcedures().stream() + .filter(p -> p instanceof ServerCrashProcedure).map(p -> (ServerCrashProcedure) p) + .filter(p -> p.getServerName().equals(serverName)).findFirst().orElse(null); + } + + private Matcher<Procedure<MasterProcedureEnv>> subProcOf(Procedure<MasterProcedureEnv> proc) { + return new BaseMatcher<Procedure<MasterProcedureEnv>>() { + + @Override + public boolean matches(Object item) { + if (!(item instanceof Procedure)) { + return false; + } + Procedure<?> p = (Procedure<?>) item; + return p.hasParent() && p.getRootProcId() == proc.getProcId(); + } + + @Override + public void describeTo(Description description) { + description.appendText("sub procedure of(").appendValue(proc).appendText(")"); + } + }; + } + + @Test + public void testFailAndRollback() throws Exception { + HRegionServer rsWithMeta = UTIL.getRSForFirstRegionInTable(TableName.META_TABLE_NAME); + UTIL.getMiniHBaseCluster().killRegionServer(rsWithMeta.getServerName()); + UTIL.waitFor(15000, () -> getSCPForServer(rsWithMeta.getServerName()) != null); + ServerCrashProcedure scp = getSCPForServer(rsWithMeta.getServerName()); + ProcedureExecutor<MasterProcedureEnv> procExec = + UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor(); + // wait for the procedure to stop, as we inject a code bug and also set kill before store update + UTIL.waitFor(30000, () -> !procExec.isRunning()); + // make sure that finally we could successfully rollback the procedure + while (scp.getState() != ProcedureState.FAILED || !procExec.isRunning()) { + MasterProcedureTestingUtility.restartMasterProcedureExecutor(procExec); + ProcedureTestingUtility.waitProcedure(procExec, scp); + } + assertEquals(scp.getState(), ProcedureState.FAILED); + assertThat(scp.getException().getMessage(), containsString("inject code bug")); + // make sure all sub procedures are cleaned up + assertThat(procExec.getProcedures(), everyItem(not(subProcOf(scp)))); + } +}