HBASE-16744 Procedure V2 - Lock procedures to allow clients to acquire locks on tables/namespaces/regions (Matteo Bertozzi)
Incorporates review comments from https://reviews.apache.org/r/52589/ https://reviews.apache.org/r/54388/ M hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java Fix for eclipse complaint (from Duo Zhang) M hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java M hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java M hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java Log formatting M hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java Added wait procedures utility. A hbase-protocol-shaded/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/generated/LockServiceProtos.java A hbase-protocol-shaded/src/main/protobuf/LockService.proto b/hbase-protocol-shaded/src/main/protobuf/LockService.proto Implement new locking CP overrides. A hbase-server/src/main/java/org/apache/hadoop/hbase/client/locking/EntityLock.java New hbase entity lock (ns, table, or regions) A hbase-server/src/main/java/org/apache/hadoop/hbase/client/locking/LockServiceClient.java Client that can use the new internal locking service. Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/4cb09a49 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/4cb09a49 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/4cb09a49 Branch: refs/heads/master Commit: 4cb09a494c4148de2b4e8c6cd011bacdf7f33b1a Parents: 9fd5dab1 Author: Michael Stack <st...@apache.org> Authored: Wed Jan 11 14:38:59 2017 -0800 Committer: Michael Stack <st...@apache.org> Committed: Fri Jan 13 21:07:03 2017 -0800 ---------------------------------------------------------------------- .../hadoop/hbase/client/AsyncTableBase.java | 4 +- .../hadoop/hbase/procedure2/Procedure.java | 10 +- .../hbase/procedure2/ProcedureExecutor.java | 98 +- .../procedure2/store/wal/WALProcedureStore.java | 8 +- .../procedure2/ProcedureTestingUtility.java | 12 + .../protobuf/generated/LockServiceProtos.java | 5328 ++++++++++++++++++ .../src/main/protobuf/LockService.proto | 79 + .../hbase/rsgroup/RSGroupAdminEndpoint.java | 54 +- .../hadoop/hbase/client/locking/EntityLock.java | 266 + .../hbase/client/locking/LockServiceClient.java | 111 + .../BaseMasterAndRegionObserver.java | 23 + .../hbase/coprocessor/BaseMasterObserver.java | 25 +- .../hbase/coprocessor/MasterObserver.java | 36 +- .../org/apache/hadoop/hbase/master/HMaster.java | 9 +- .../hbase/master/MasterCoprocessorHost.java | 43 + .../hadoop/hbase/master/MasterRpcServices.java | 68 +- .../hadoop/hbase/master/MasterServices.java | 5 + .../hbase/master/locking/LockManager.java | 271 + .../hbase/master/locking/LockProcedure.java | 462 ++ .../org/apache/hadoop/hbase/util/IdLock.java | 12 +- .../hbase/client/locking/TestEntityLocks.java | 182 + .../hbase/coprocessor/TestMasterObserver.java | 64 +- .../hbase/master/MockNoopMasterServices.java | 8 +- .../hadoop/hbase/master/MockRegionServer.java | 1 + .../hbase/master/locking/TestLockManager.java | 161 + .../hbase/master/locking/TestLockProcedure.java | 456 ++ 26 files changed, 7709 insertions(+), 87 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/4cb09a49/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java index 19a22c0..d80627f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java @@ -386,8 +386,8 @@ public interface AsyncTableBase { * @return A list of {@link CompletableFuture}s that represent the existence for each get. */ default List<CompletableFuture<Boolean>> exists(List<Get> gets) { - return get(toCheckExistenceOnly(gets)).stream().map(f -> f.thenApply(r -> r.getExists())) - .collect(toList()); + return get(toCheckExistenceOnly(gets)).stream(). + <CompletableFuture<Boolean>>map(f -> f.thenApply(r -> r.getExists())).collect(toList()); } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/4cb09a49/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java index cb4ee47..3f3cf33 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java @@ -243,24 +243,24 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> { final StringBuilder sb = new StringBuilder(); toStringClassDetails(sb); - sb.append(" id="); + sb.append(", procId="); sb.append(getProcId()); if (hasParent()) { - sb.append(" parent="); + sb.append(", parent="); sb.append(getParentProcId()); } if (hasOwner()) { - sb.append(" owner="); + sb.append(", owner="); sb.append(getOwner()); } - sb.append(" state="); + sb.append(", state="); toStringState(sb); if (hasException()) { - sb.append(" failed=" + getException()); + sb.append(", failed=" + getException()); } return sb; http://git-wip-us.apache.org/repos/asf/hbase/blob/4cb09a49/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java index c65f3fb..d3b65e8 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java @@ -28,6 +28,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -91,7 +92,7 @@ public class ProcedureExecutor<TEnvironment> { final boolean kill = this.killBeforeStoreUpdate; if (this.toggleKillBeforeStoreUpdate) { this.killBeforeStoreUpdate = !kill; - LOG.warn("Toggle Kill before store update to: " + this.killBeforeStoreUpdate); + LOG.warn("Toggle KILL before store update to: " + this.killBeforeStoreUpdate); } return kill; } @@ -172,7 +173,7 @@ public class ProcedureExecutor<TEnvironment> { final long now = EnvironmentEdgeManager.currentTime(); final Iterator<Map.Entry<Long, ProcedureInfo>> it = completed.entrySet().iterator(); - final boolean isDebugEnabled = LOG.isDebugEnabled(); + final boolean debugEnabled = LOG.isDebugEnabled(); while (it.hasNext() && store.isRunning()) { final Map.Entry<Long, ProcedureInfo> entry = it.next(); final ProcedureInfo procInfo = entry.getValue(); @@ -180,8 +181,8 @@ public class ProcedureExecutor<TEnvironment> { // TODO: Select TTL based on Procedure type if ((procInfo.hasClientAckTime() && (now - procInfo.getClientAckTime()) >= evictAckTtl) || (now - procInfo.getLastUpdate()) >= evictTtl) { - if (isDebugEnabled) { - LOG.debug("Evict completed procedure: " + procInfo); + if (debugEnabled) { + LOG.debug("Evict completed " + procInfo); } batchIds[batchCount++] = entry.getKey(); if (batchCount == batchIds.length) { @@ -281,7 +282,7 @@ public class ProcedureExecutor<TEnvironment> { @Override public void setMaxProcId(long maxProcId) { assert lastProcId.get() < 0 : "expected only one call to setMaxProcId()"; - LOG.debug("load procedures maxProcId=" + maxProcId); + LOG.debug("Load maxProcId=" + maxProcId); lastProcId.set(maxProcId); } @@ -295,7 +296,7 @@ public class ProcedureExecutor<TEnvironment> { int corruptedCount = 0; while (procIter.hasNext()) { ProcedureInfo proc = procIter.nextAsProcedureInfo(); - LOG.error("corrupted procedure: " + proc); + LOG.error("Corrupt " + proc); corruptedCount++; } if (abortOnCorruption && corruptedCount > 0) { @@ -307,7 +308,7 @@ public class ProcedureExecutor<TEnvironment> { private void loadProcedures(final ProcedureIterator procIter, final boolean abortOnCorruption) throws IOException { - final boolean isDebugEnabled = LOG.isDebugEnabled(); + final boolean debugEnabled = LOG.isDebugEnabled(); // 1. Build the rollback stack int runnablesCount = 0; @@ -320,8 +321,8 @@ public class ProcedureExecutor<TEnvironment> { nonceKey = proc.getNonceKey(); procId = proc.getProcId(); completed.put(proc.getProcId(), proc); - if (isDebugEnabled) { - LOG.debug("The procedure is completed: " + proc); + if (debugEnabled) { + LOG.debug("Completed " + proc); } } else { Procedure proc = procIter.nextAsProcedure(); @@ -361,8 +362,8 @@ public class ProcedureExecutor<TEnvironment> { Procedure proc = procIter.nextAsProcedure(); assert !(proc.isFinished() && !proc.hasParent()) : "unexpected completed proc=" + proc; - if (isDebugEnabled) { - LOG.debug(String.format("Loading procedure state=%s isFailed=%s: %s", + if (debugEnabled) { + LOG.debug(String.format("Loading state=%s isFailed=%s: %s", proc.getState(), proc.hasException(), proc)); } @@ -425,7 +426,7 @@ public class ProcedureExecutor<TEnvironment> { if (procStack.isValid()) continue; for (Procedure proc: procStack.getSubproceduresStack()) { - LOG.error("corrupted procedure: " + proc); + LOG.error("Corrupted " + proc); procedures.remove(proc.getProcId()); runnableList.remove(proc); if (waitingSet != null) waitingSet.remove(proc); @@ -485,7 +486,7 @@ public class ProcedureExecutor<TEnvironment> { // We have numThreads executor + one timer thread used for timing out // procedures and triggering periodic procedures. this.corePoolSize = numThreads; - LOG.info("Starting procedure executor threads=" + corePoolSize); + LOG.info("Starting executor threads=" + corePoolSize); // Create the Thread Group for the executors threadGroup = new ThreadGroup("ProcedureExecutor"); @@ -506,7 +507,7 @@ public class ProcedureExecutor<TEnvironment> { st = EnvironmentEdgeManager.currentTime(); store.recoverLease(); et = EnvironmentEdgeManager.currentTime(); - LOG.info(String.format("recover procedure store (%s) lease: %s", + LOG.info(String.format("Recover store (%s) lease: %s", store.getClass().getSimpleName(), StringUtils.humanTimeDiff(et - st))); // start the procedure scheduler @@ -520,11 +521,11 @@ public class ProcedureExecutor<TEnvironment> { st = EnvironmentEdgeManager.currentTime(); load(abortOnCorruption); et = EnvironmentEdgeManager.currentTime(); - LOG.info(String.format("load procedure store (%s): %s", + LOG.info(String.format("Load store (%s): %s", store.getClass().getSimpleName(), StringUtils.humanTimeDiff(et - st))); // Start the executors. Here we must have the lastProcId set. - LOG.debug("start workers " + workerThreads.size()); + LOG.debug("Start workers " + workerThreads.size()); timeoutExecutor.start(); for (WorkerThread worker: workerThreads) { worker.start(); @@ -542,7 +543,7 @@ public class ProcedureExecutor<TEnvironment> { return; } - LOG.info("Stopping the procedure executor"); + LOG.info("Stopping"); scheduler.stop(); timeoutExecutor.sendStopSignal(); } @@ -564,7 +565,7 @@ public class ProcedureExecutor<TEnvironment> { try { threadGroup.destroy(); } catch (IllegalThreadStateException e) { - LOG.error("thread group " + threadGroup + " contains running threads"); + LOG.error("Thread group " + threadGroup + " contains running threads"); threadGroup.list(); } finally { threadGroup = null; @@ -693,12 +694,12 @@ public class ProcedureExecutor<TEnvironment> { // we found a registered nonce, but the procedure may not have been submitted yet. // since the client expect the procedure to be submitted, spin here until it is. - final boolean isTraceEnabled = LOG.isTraceEnabled(); + final boolean traceEnabled = LOG.isTraceEnabled(); while (isRunning() && !(procedures.containsKey(oldProcId) || completed.containsKey(oldProcId)) && nonceKeysToProcIdsMap.containsKey(nonceKey)) { - if (isTraceEnabled) { - LOG.trace("waiting for procId=" + oldProcId.longValue() + " to be submitted"); + if (traceEnabled) { + LOG.trace("Waiting for procId=" + oldProcId.longValue() + " to be submitted"); } Threads.sleep(100); } @@ -787,7 +788,7 @@ public class ProcedureExecutor<TEnvironment> { // Commit the transaction store.insert(proc, null); if (LOG.isDebugEnabled()) { - LOG.debug("Procedure " + proc + " added to the store."); + LOG.debug("Stored " + proc); } // Add the procedure to the executor @@ -811,7 +812,7 @@ public class ProcedureExecutor<TEnvironment> { // Commit the transaction store.insert(procs); if (LOG.isDebugEnabled()) { - LOG.debug("Procedures added to the store: " + Arrays.toString(procs)); + LOG.debug("Stored " + Arrays.toString(procs)); } // Add the procedure to the executor @@ -880,6 +881,14 @@ public class ProcedureExecutor<TEnvironment> { return procedures.get(procId); } + public <T extends Procedure> T getProcedure(final Class<T> clazz, final long procId) { + final Procedure proc = getProcedure(procId); + if (clazz.isInstance(proc)) { + return (T)proc; + } + return null; + } + public ProcedureInfo getResult(final long procId) { return completed.get(procId); } @@ -917,7 +926,7 @@ public class ProcedureExecutor<TEnvironment> { if (result == null) { assert !procedures.containsKey(procId) : "procId=" + procId + " is still running"; if (LOG.isDebugEnabled()) { - LOG.debug("Procedure procId=" + procId + " already removed by the cleaner."); + LOG.debug("procId=" + procId + " already removed by the cleaner."); } return; } @@ -999,7 +1008,7 @@ public class ProcedureExecutor<TEnvironment> { try { listener.procedureLoaded(procId); } catch (Throwable e) { - LOG.error("The listener " + listener + " had an error: " + e.getMessage(), e); + LOG.error("Listener " + listener + " had an error: " + e.getMessage(), e); } } } @@ -1011,7 +1020,7 @@ public class ProcedureExecutor<TEnvironment> { try { listener.procedureAdded(procId); } catch (Throwable e) { - LOG.error("The listener " + listener + " had an error: " + e.getMessage(), e); + LOG.error("Listener " + listener + " had an error: " + e.getMessage(), e); } } } @@ -1023,7 +1032,7 @@ public class ProcedureExecutor<TEnvironment> { try { listener.procedureFinished(procId); } catch (Throwable e) { - LOG.error("The listener " + listener + " had an error: " + e.getMessage(), e); + LOG.error("Listener " + listener + " had an error: " + e.getMessage(), e); } } } @@ -1053,6 +1062,11 @@ public class ProcedureExecutor<TEnvironment> { return lastProcId.get(); } + @VisibleForTesting + public Set<Long> getActiveProcIds() { + return procedures.keySet(); + } + private Long getRootProcedureId(Procedure proc) { return Procedure.getRootProcedureId(procedures, proc); } @@ -1111,7 +1125,7 @@ public class ProcedureExecutor<TEnvironment> { if (proc.isSuccess()) { if (LOG.isDebugEnabled()) { - LOG.debug("Procedure completed in " + + LOG.debug("Completed in " + StringUtils.humanTimeDiff(proc.elapsedTime()) + ": " + proc); } // Finalize the procedure state @@ -1203,7 +1217,7 @@ public class ProcedureExecutor<TEnvironment> { } // Finalize the procedure state - LOG.info("Rolledback procedure " + rootProc + + LOG.info("Rolled back " + rootProc + " exec-time=" + StringUtils.humanTimeDiff(rootProc.elapsedTime()) + " exception=" + exception.getMessage()); procedureFinished(rootProc); @@ -1220,7 +1234,7 @@ public class ProcedureExecutor<TEnvironment> { proc.doRollback(getEnvironment()); } catch (IOException e) { if (LOG.isDebugEnabled()) { - LOG.debug("rollback attempt failed for " + proc, e); + LOG.debug("Roll back attempt failed for " + proc, e); } return false; } catch (InterruptedException e) { @@ -1294,7 +1308,7 @@ public class ProcedureExecutor<TEnvironment> { isSuspended = true; } catch (ProcedureYieldException e) { if (LOG.isTraceEnabled()) { - LOG.trace("Yield procedure: " + procedure + ": " + e.getMessage()); + LOG.trace("Yield " + procedure + ": " + e.getMessage()); } scheduler.yield(procedure); return; @@ -1418,8 +1432,8 @@ public class ProcedureExecutor<TEnvironment> { } // If this procedure is the last child awake the parent procedure - final boolean isTraceEnabled = LOG.isTraceEnabled(); - if (isTraceEnabled) { + final boolean traceEnabled = LOG.isTraceEnabled(); + if (traceEnabled) { LOG.trace(parent + " child is done: " + procedure); } @@ -1427,7 +1441,7 @@ public class ProcedureExecutor<TEnvironment> { parent.setState(ProcedureState.RUNNABLE); store.update(parent); scheduler.addFront(parent); - if (isTraceEnabled) { + if (traceEnabled) { LOG.trace(parent + " all the children finished their work, resume."); } return; @@ -1438,7 +1452,7 @@ public class ProcedureExecutor<TEnvironment> { final Procedure procedure, final Procedure[] subprocs) { if (subprocs != null && !procedure.isFailed()) { if (LOG.isTraceEnabled()) { - LOG.trace("Store add " + procedure + " children " + Arrays.toString(subprocs)); + LOG.trace("Stored " + procedure + ", children " + Arrays.toString(subprocs)); } store.insert(procedure, subprocs); } else { @@ -1464,7 +1478,7 @@ public class ProcedureExecutor<TEnvironment> { private void handleInterruptedException(final Procedure proc, final InterruptedException e) { if (LOG.isTraceEnabled()) { - LOG.trace("got an interrupt during " + proc + ". suspend and retry it later.", e); + LOG.trace("Interrupt during " + proc + ". suspend and retry it later.", e); } // NOTE: We don't call Thread.currentThread().interrupt() @@ -1530,7 +1544,7 @@ public class ProcedureExecutor<TEnvironment> { @Override public void run() { - final boolean isTraceEnabled = LOG.isTraceEnabled(); + final boolean traceEnabled = LOG.isTraceEnabled(); long lastUpdate = EnvironmentEdgeManager.currentTime(); while (isRunning() && keepAlive(lastUpdate)) { final Procedure procedure = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS); @@ -1539,7 +1553,7 @@ public class ProcedureExecutor<TEnvironment> { store.setRunningProcedureCount(activeExecutorCount.incrementAndGet()); executionStartTime.set(EnvironmentEdgeManager.currentTime()); try { - if (isTraceEnabled) { + if (traceEnabled) { LOG.trace("Trying to start the execution of " + procedure); } executeProcedure(procedure); @@ -1549,7 +1563,7 @@ public class ProcedureExecutor<TEnvironment> { executionStartTime.set(Long.MAX_VALUE); } } - LOG.debug("worker thread terminated " + this); + LOG.debug("Worker thread terminated " + this); workerThreads.remove(this); } @@ -1691,7 +1705,7 @@ public class ProcedureExecutor<TEnvironment> { sendStopSignal(); join(250); if (i > 0 && (i % 8) == 0) { - LOG.warn("waiting termination of thread " + getName() + ", " + + LOG.warn("Waiting termination of thread " + getName() + ", " + StringUtils.humanTimeDiff(EnvironmentEdgeManager.currentTime() - startTime)); } } @@ -1767,7 +1781,7 @@ public class ProcedureExecutor<TEnvironment> { // WARN the worker is stuck stuckCount++; - LOG.warn("found worker stuck " + worker + + LOG.warn("Worker stuck " + worker + " run time " + StringUtils.humanTimeDiff(worker.getCurrentRunTime())); } return stuckCount; @@ -1785,7 +1799,7 @@ public class ProcedureExecutor<TEnvironment> { final WorkerThread worker = new WorkerThread(threadGroup); workerThreads.add(worker); worker.start(); - LOG.debug("added a new worker thread " + worker); + LOG.debug("Added new worker thread " + worker); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/4cb09a49/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java index 4465993..d4d5773 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java @@ -294,7 +294,7 @@ public class WALProcedureStore extends ProcedureStoreBase { @Override public void setRunningProcedureCount(final int count) { - LOG.debug("set running procedure count=" + count + " slots=" + slots.length); + LOG.debug("Set running procedure count=" + count + ", slots=" + slots.length); this.runningProcCount = count > 0 ? Math.min(count, slots.length) : slots.length; } @@ -326,7 +326,7 @@ public class WALProcedureStore extends ProcedureStoreBase { try { flushLogId = initOldLogs(oldLogs); } catch (FileNotFoundException e) { - LOG.warn("someone else is active and deleted logs. retrying.", e); + LOG.warn("Someone else is active and deleted logs. retrying.", e); oldLogs = getLogFiles(); continue; } @@ -334,7 +334,7 @@ public class WALProcedureStore extends ProcedureStoreBase { // Create new state-log if (!rollWriter(flushLogId + 1)) { // someone else has already created this log - LOG.debug("someone else has already created log " + flushLogId); + LOG.debug("Someone else has already created log " + flushLogId); continue; } @@ -428,7 +428,7 @@ public class WALProcedureStore extends ProcedureStoreBase { try { periodicRoll(); } catch (IOException e) { - LOG.warn("unable to cleanup logs on load: " + e.getMessage(), e); + LOG.warn("Unable to cleanup logs on load: " + e.getMessage(), e); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/4cb09a49/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java index 9edc711..8aa2088 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java @@ -206,6 +206,18 @@ public class ProcedureTestingUtility { } } + public static <TEnv> void waitProcedures(ProcedureExecutor<TEnv> procExecutor, long... procIds) { + for (int i = 0; i < procIds.length; ++i) { + waitProcedure(procExecutor, procIds[i]); + } + } + + public static <TEnv> void waitAllProcedures(ProcedureExecutor<TEnv> procExecutor) { + for (long procId : procExecutor.getActiveProcIds()) { + waitProcedure(procExecutor, procId); + } + } + public static <TEnv> void waitNoProcedureRunning(ProcedureExecutor<TEnv> procExecutor) { int stableRuns = 0; while (stableRuns < 10) {