HBASE-20846 Restore procedure locks when master restarts
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/833657c4 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/833657c4 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/833657c4 Branch: refs/heads/branch-2.1 Commit: 833657c46dafbb9727d625617446424f62117c68 Parents: 78948d9 Author: zhangduo <[email protected]> Authored: Sun Jul 22 15:10:06 2018 +0800 Committer: zhangduo <[email protected]> Committed: Wed Jul 25 14:37:36 2018 +0800 ---------------------------------------------------------------------- .../procedure2/AbstractProcedureScheduler.java | 2 +- .../hbase/procedure2/DelayedProcedure.java | 5 +- .../hadoop/hbase/procedure2/Procedure.java | 384 +++++++++------- .../hbase/procedure2/ProcedureExecutor.java | 439 ++++++++++--------- .../hadoop/hbase/procedure2/ProcedureUtil.java | 7 + .../hbase/procedure2/RootProcedureState.java | 44 +- .../hbase/procedure2/TimeoutExecutorThread.java | 28 +- .../procedure2/TestProcedureReplayOrder.java | 8 +- .../procedure2/TestProcedureSuspended.java | 6 - .../src/main/protobuf/Procedure.proto | 3 + .../hbase/master/ClusterSchemaServiceImpl.java | 4 +- .../org/apache/hadoop/hbase/master/HMaster.java | 7 +- .../master/assignment/GCRegionProcedure.java | 5 - .../assignment/MergeTableRegionsProcedure.java | 12 +- .../assignment/RegionTransitionProcedure.java | 33 +- .../hbase/master/locking/LockProcedure.java | 9 - .../AbstractStateMachineNamespaceProcedure.java | 6 +- .../AbstractStateMachineRegionProcedure.java | 9 - .../AbstractStateMachineTableProcedure.java | 8 +- .../procedure/CreateNamespaceProcedure.java | 35 +- .../master/procedure/CreateTableProcedure.java | 12 +- .../master/procedure/InitMetaProcedure.java | 7 +- .../procedure/MasterProcedureScheduler.java | 37 +- .../master/procedure/MasterProcedureUtil.java | 2 +- .../hbase/master/procedure/PeerQueue.java | 14 - .../master/procedure/ProcedureSyncWait.java | 4 +- .../hadoop/hbase/master/procedure/Queue.java | 13 +- .../replication/AbstractPeerProcedure.java | 14 +- .../hbase-webapps/master/procedures.jsp | 2 +- .../hbase/client/TestGetProcedureResult.java | 2 +- .../assignment/TestAssignmentManager.java | 3 +- .../procedure/TestMasterProcedureEvents.java | 2 +- .../master/procedure/TestProcedureAdmin.java | 2 +- .../hbase/procedure/TestFailedProcCleanup.java | 5 +- .../security/access/TestAccessController.java | 2 +- 35 files changed, 624 insertions(+), 551 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/833657c4/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java index c036163..5645f89 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java @@ -163,8 +163,8 @@ public abstract class AbstractProcedureScheduler implements ProcedureScheduler { return null; } } - final Procedure pollResult = dequeue(); + pollCalls++; nullPollCalls += (pollResult == null) ? 1 : 0; return pollResult; http://git-wip-us.apache.org/repos/asf/hbase/blob/833657c4/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/DelayedProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/DelayedProcedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/DelayedProcedure.java index a9f3e7d..3fc9750 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/DelayedProcedure.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/DelayedProcedure.java @@ -24,8 +24,9 @@ import org.apache.yetus.audience.InterfaceAudience; * Vessel that carries a Procedure and a timeout. */ @InterfaceAudience.Private -class DelayedProcedure extends DelayedUtil.DelayedContainerWithTimestamp<Procedure<?>> { - public DelayedProcedure(Procedure<?> procedure) { +class DelayedProcedure<TEnvironment> + extends DelayedUtil.DelayedContainerWithTimestamp<Procedure<TEnvironment>> { + public DelayedProcedure(Procedure<TEnvironment> procedure) { super(procedure, procedure.getTimeoutTimestamp()); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/833657c4/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java index 545bedf..58757bb 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java @@ -22,76 +22,94 @@ import java.io.IOException; import java.util.Arrays; import java.util.List; import java.util.Map; - -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.yetus.audience.InterfaceStability; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.exceptions.TimeoutIOException; import org.apache.hadoop.hbase.metrics.Counter; import org.apache.hadoop.hbase.metrics.Histogram; +import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; import org.apache.hadoop.hbase.procedure2.util.StringUtils; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.NonceKey; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; + /** - * Base Procedure class responsible for Procedure Metadata; - * e.g. state, submittedTime, lastUpdate, stack-indexes, etc. - * - * <p>Procedures are run by a {@link ProcedureExecutor} instance. They are submitted and then - * the ProcedureExecutor keeps calling {@link #execute(Object)} until the Procedure is done. - * Execute may be called multiple times in the case of failure or a restart, so code must be - * idempotent. The return from an execute call is either: null to indicate we are done; - * ourself if there is more to do; or, a set of sub-procedures that need to - * be run to completion before the framework resumes our execution. - * - * <p>The ProcedureExecutor keeps its - * notion of Procedure State in the Procedure itself; e.g. it stamps the Procedure as INITIALIZING, - * RUNNABLE, SUCCESS, etc. Here are some of the States defined in the ProcedureState enum from - * protos: - *<ul> - * <li>{@link #isFailed()} A procedure has executed at least once and has failed. The procedure - * may or may not have rolled back yet. Any procedure in FAILED state will be eventually moved - * to ROLLEDBACK state.</li> - * + * Base Procedure class responsible for Procedure Metadata; e.g. state, submittedTime, lastUpdate, + * stack-indexes, etc. + * <p/> + * Procedures are run by a {@link ProcedureExecutor} instance. They are submitted and then the + * ProcedureExecutor keeps calling {@link #execute(Object)} until the Procedure is done. Execute may + * be called multiple times in the case of failure or a restart, so code must be idempotent. The + * return from an execute call is either: null to indicate we are done; ourself if there is more to + * do; or, a set of sub-procedures that need to be run to completion before the framework resumes + * our execution. + * <p/> + * The ProcedureExecutor keeps its notion of Procedure State in the Procedure itself; e.g. it stamps + * the Procedure as INITIALIZING, RUNNABLE, SUCCESS, etc. Here are some of the States defined in the + * ProcedureState enum from protos: + * <ul> + * <li>{@link #isFailed()} A procedure has executed at least once and has failed. The procedure may + * or may not have rolled back yet. Any procedure in FAILED state will be eventually moved to + * ROLLEDBACK state.</li> * <li>{@link #isSuccess()} A procedure is completed successfully without exception.</li> - * * <li>{@link #isFinished()} As a procedure in FAILED state will be tried forever for rollback, only * condition when scheduler/ executor will drop procedure from further processing is when procedure * state is ROLLEDBACK or isSuccess() returns true. This is a terminal state of the procedure.</li> - * * <li>{@link #isWaiting()} - Procedure is in one of the two waiting states * ({@link ProcedureState#WAITING}, {@link ProcedureState#WAITING_TIMEOUT}).</li> - *</ul> - * NOTE: These states are of the ProcedureExecutor. Procedure implementations in turn can keep - * their own state. This can lead to confusion. Try to keep the two distinct. - * - * <p>rollback() is called when the procedure or one of the sub-procedures - * has failed. The rollback step is supposed to cleanup the resources created - * during the execute() step. In case of failure and restart, rollback() may be - * called multiple times, so again the code must be idempotent. - * - * <p>Procedure can be made respect a locking regime. It has acquire/release methods as - * well as an {@link #hasLock(Object)}. The lock implementation is up to the implementor. - * If an entity needs to be locked for the life of a procedure -- not just the calls to - * execute -- then implementations should say so with the {@link #holdLock(Object)} - * method. - * - * <p>Procedures can be suspended or put in wait state with a callback that gets executed on + * </ul> + * NOTE: These states are of the ProcedureExecutor. Procedure implementations in turn can keep their + * own state. This can lead to confusion. Try to keep the two distinct. + * <p/> + * rollback() is called when the procedure or one of the sub-procedures has failed. The rollback + * step is supposed to cleanup the resources created during the execute() step. In case of failure + * and restart, rollback() may be called multiple times, so again the code must be idempotent. + * <p/> + * Procedure can be made respect a locking regime. It has acquire/release methods as well as an + * {@link #hasLock()}. The lock implementation is up to the implementor. If an entity needs to be + * locked for the life of a procedure -- not just the calls to execute -- then implementations + * should say so with the {@link #holdLock(Object)} method. + * <p/> + * And since we need to restore the lock when restarting to keep the logic correct(HBASE-20846), the + * implementation is a bit tricky so we add some comments hrre about it. + * <ul> + * <li>Make {@link #hasLock()} method final, and add a {@link #locked} field in Procedure to record + * whether we have the lock. We will set it to {@code true} in + * {@link #doAcquireLock(Object, ProcedureStore)} and to {@code false} in + * {@link #doReleaseLock(Object, ProcedureStore)}. The sub classes do not need to manage it any + * more.</li> + * <li>Also added a locked field in the proto message. When storing, the field will be set according + * to the return value of {@link #hasLock()}. And when loading, there is a new field in Procedure + * called {@link #lockedWhenLoading}. We will set it to {@code true} if the locked field in proto + * message is {@code true}.</li> + * <li>The reason why we can not set the {@link #locked} field directly to {@code true} by calling + * {@link #doAcquireLock(Object, ProcedureStore)} is that, during initialization, most procedures + * need to wait until master is initialized. So the solution here is that, we introduced a new + * method called {@link #waitInitialized(Object)} in Procedure, and move the wait master initialized + * related code from {@link #acquireLock(Object)} to this method. And we added a restoreLock method + * to Procedure, if {@link #lockedWhenLoading} is {@code true}, we will call the + * {@link #acquireLock(Object)} to get the lock, but do not set {@link #locked} to true. And later + * when we call {@link #doAcquireLock(Object, ProcedureStore)} and pass the + * {@link #waitInitialized(Object)} check, we will test {@link #lockedWhenLoading}, if it is + * {@code true}, when we just set the {@link #locked} field to true and return, without actually + * calling the {@link #acquireLock(Object)} method since we have already called it once.</li> + * </ul> + * <p/> + * Procedures can be suspended or put in wait state with a callback that gets executed on * Procedure-specified timeout. See {@link #setTimeout(int)}}, and - * {@link #setTimeoutFailure(Object)}. See TestProcedureEvents and the - * TestTimeoutEventProcedure class for an example usage.</p> - * - * <p>There are hooks for collecting metrics on submit of the procedure and on finish. - * See {@link #updateMetricsOnSubmit(Object)} and - * {@link #updateMetricsOnFinish(Object, long, boolean)}. + * {@link #setTimeoutFailure(Object)}. See TestProcedureEvents and the TestTimeoutEventProcedure + * class for an example usage. + * </p> + * <p/> + * There are hooks for collecting metrics on submit of the procedure and on finish. See + * {@link #updateMetricsOnSubmit(Object)} and {@link #updateMetricsOnFinish(Object, long, boolean)}. */ @InterfaceAudience.Private [email protected] public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TEnvironment>> { private static final Logger LOG = LoggerFactory.getLogger(Procedure.class); public static final long NO_PROC_ID = -1; @@ -122,6 +140,10 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE private volatile byte[] result = null; + private volatile boolean locked = false; + + private boolean lockedWhenLoading = false; + /** * The main code of the procedure. It must be idempotent since execute() * may be called multiple times in case of machine failure in the middle @@ -170,7 +192,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE * be able to resume on failure. * @param serializer stores the serializable state */ - protected abstract void serializeStateData(final ProcedureStateSerializer serializer) + protected abstract void serializeStateData(ProcedureStateSerializer serializer) throws IOException; /** @@ -178,52 +200,65 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE * state. * @param serializer contains the serialized state */ - protected abstract void deserializeStateData(final ProcedureStateSerializer serializer) + protected abstract void deserializeStateData(ProcedureStateSerializer serializer) throws IOException; /** - * The user should override this method if they need a lock on an Entity. - * A lock can be anything, and it is up to the implementor. The Procedure - * Framework will call this method just before it invokes {@link #execute(Object)}. - * It calls {@link #releaseLock(Object)} after the call to execute. - * - * <p>If you need to hold the lock for the life of the Procedure -- i.e. you do not - * want any other Procedure interfering while this Procedure is running, see - * {@link #holdLock(Object)}. - * - * <p>Example: in our Master we can execute request in parallel for different tables. - * We can create t1 and create t2 and these creates can be executed at the same time. - * Anything else on t1/t2 is queued waiting that specific table create to happen. - * - * <p>There are 3 LockState: - * <ul><li>LOCK_ACQUIRED should be returned when the proc has the lock and the proc is - * ready to execute.</li> - * <li>LOCK_YIELD_WAIT should be returned when the proc has not the lock and the framework - * should take care of readding the procedure back to the runnable set for retry</li> - * <li>LOCK_EVENT_WAIT should be returned when the proc has not the lock and someone will - * take care of readding the procedure back to the runnable set when the lock is available. - * </li></ul> + * The {@link #doAcquireLock(Object, ProcedureStore)} will be split into two steps, first, it will + * call us to determine whether we need to wait for initialization, second, it will call + * {@link #acquireLock(Object)} to actually handle the lock for this procedure. + * <p/> + * This is because that when master restarts, we need to restore the lock state for all the + * procedures to not break the semantic if {@link #holdLock(Object)} is true. But the + * {@link ProcedureExecutor} will be started before the master finish initialization(as it is part + * of the initialization!), so we need to split the code into two steps, and when restore, we just + * restore the lock part and ignore the waitInitialized part. Otherwise there will be dead lock. + * @return true means we need to wait until the environment has been initialized, otherwise true. + */ + protected boolean waitInitialized(TEnvironment env) { + return false; + } + + /** + * The user should override this method if they need a lock on an Entity. A lock can be anything, + * and it is up to the implementor. The Procedure Framework will call this method just before it + * invokes {@link #execute(Object)}. It calls {@link #releaseLock(Object)} after the call to + * execute. + * <p/> + * If you need to hold the lock for the life of the Procedure -- i.e. you do not want any other + * Procedure interfering while this Procedure is running, see {@link #holdLock(Object)}. + * <p/> + * Example: in our Master we can execute request in parallel for different tables. We can create + * t1 and create t2 and these creates can be executed at the same time. Anything else on t1/t2 is + * queued waiting that specific table create to happen. + * <p/> + * There are 3 LockState: + * <ul> + * <li>LOCK_ACQUIRED should be returned when the proc has the lock and the proc is ready to + * execute.</li> + * <li>LOCK_YIELD_WAIT should be returned when the proc has not the lock and the framework should + * take care of readding the procedure back to the runnable set for retry</li> + * <li>LOCK_EVENT_WAIT should be returned when the proc has not the lock and someone will take + * care of readding the procedure back to the runnable set when the lock is available.</li> + * </ul> * @return the lock state as described above. */ - protected LockState acquireLock(final TEnvironment env) { + protected LockState acquireLock(TEnvironment env) { return LockState.LOCK_ACQUIRED; } /** * The user should override this method, and release lock if necessary. */ - protected void releaseLock(final TEnvironment env) { + protected void releaseLock(TEnvironment env) { // no-op } /** * Used to keep the procedure lock even when the procedure is yielding or suspended. - * Must implement {@link #hasLock(Object)} if you want to hold the lock for life - * of the Procedure. - * @see #hasLock(Object) * @return true if the procedure should hold on the lock until completionCleanup() */ - protected boolean holdLock(final TEnvironment env) { + protected boolean holdLock(TEnvironment env) { return false; } @@ -235,8 +270,8 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE * @see #holdLock(Object) * @return true if the procedure has the lock, false otherwise. */ - protected boolean hasLock(final TEnvironment env) { - return false; + protected final boolean hasLock() { + return locked; } /** @@ -245,7 +280,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE * operation before replay. * e.g. failing the procedure if the state on replay may be unknown. */ - protected void beforeReplay(final TEnvironment env) { + protected void beforeReplay(TEnvironment env) { // no-op } @@ -253,7 +288,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE * Called when the procedure is ready to be added to the queue after * the loading/replay operation. */ - protected void afterReplay(final TEnvironment env) { + protected void afterReplay(TEnvironment env) { // no-op } @@ -263,7 +298,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE * This operation will not be retried on failure. If a procedure took a lock, * it will have been released when this method runs. */ - protected void completionCleanup(final TEnvironment env) { + protected void completionCleanup(TEnvironment env) { // no-op } @@ -275,7 +310,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE * @return Return true if the executor should yield on completion of an execution step. * Defaults to return false. */ - protected boolean isYieldAfterExecutionStep(final TEnvironment env) { + protected boolean isYieldAfterExecutionStep(TEnvironment env) { return false; } @@ -288,7 +323,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE * @return true if the executor should wait the client ack for the result. * Defaults to return true. */ - protected boolean shouldWaitClientAck(final TEnvironment env) { + protected boolean shouldWaitClientAck(TEnvironment env) { return true; } @@ -298,7 +333,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE * @param env The environment passed to the procedure executor * @return Container object for procedure related metric */ - protected ProcedureMetrics getProcedureMetrics(final TEnvironment env) { + protected ProcedureMetrics getProcedureMetrics(TEnvironment env) { return null; } @@ -308,7 +343,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE * updates submitted counter if {@link #getProcedureMetrics(Object)} returns non-null * {@link ProcedureMetrics}. */ - protected void updateMetricsOnSubmit(final TEnvironment env) { + protected void updateMetricsOnSubmit(TEnvironment env) { ProcedureMetrics metrics = getProcedureMetrics(env); if (metrics == null) { return; @@ -322,21 +357,19 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE /** * This function will be called just after procedure execution is finished. Override this method - * to update metrics at the end of the procedure. If {@link #getProcedureMetrics(Object)} - * returns non-null {@link ProcedureMetrics}, the default implementation adds runtime of a - * procedure to a time histogram for successfully completed procedures. Increments failed - * counter for failed procedures. - * - * TODO: As any of the sub-procedures on failure rolls back all procedures in the stack, - * including successfully finished siblings, this function may get called twice in certain - * cases for certain procedures. Explore further if this can be called once. - * + * to update metrics at the end of the procedure. If {@link #getProcedureMetrics(Object)} returns + * non-null {@link ProcedureMetrics}, the default implementation adds runtime of a procedure to a + * time histogram for successfully completed procedures. Increments failed counter for failed + * procedures. + * <p/> + * TODO: As any of the sub-procedures on failure rolls back all procedures in the stack, including + * successfully finished siblings, this function may get called twice in certain cases for certain + * procedures. Explore further if this can be called once. * @param env The environment passed to the procedure executor * @param runtime Runtime of the procedure in milliseconds * @param success true if procedure is completed successfully */ - protected void updateMetricsOnFinish(final TEnvironment env, final long runtime, - boolean success) { + protected void updateMetricsOnFinish(TEnvironment env, long runtime, boolean success) { ProcedureMetrics metrics = getProcedureMetrics(env); if (metrics == null) { return; @@ -362,8 +395,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE } /** - * Build the StringBuilder for the simple form of - * procedure string. + * Build the StringBuilder for the simple form of procedure string. * @return the StringBuilder */ protected StringBuilder toStringSimpleSB() { @@ -389,6 +421,8 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE sb.append(", state="); // pState for Procedure State as opposed to any other kind. toStringState(sb); + sb.append(", hasLock=").append(locked); + if (hasException()) { sb.append(", exception=" + getException()); } @@ -400,8 +434,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE } /** - * Extend the toString() information with more procedure - * details + * Extend the toString() information with more procedure details */ public String toStringDetails() { final StringBuilder sb = toStringSimpleSB(); @@ -429,8 +462,8 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE } /** - * Called from {@link #toString()} when interpolating {@link Procedure} State. - * Allows decorating generic Procedure State with Procedure particulars. + * Called from {@link #toString()} when interpolating {@link Procedure} State. Allows decorating + * generic Procedure State with Procedure particulars. * @param builder Append current {@link ProcedureState} */ protected void toStringState(StringBuilder builder) { @@ -493,8 +526,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE * Called by the ProcedureExecutor to assign the ID to the newly created procedure. */ @VisibleForTesting - @InterfaceAudience.Private - protected void setProcId(final long procId) { + protected void setProcId(long procId) { this.procId = procId; this.submittedTime = EnvironmentEdgeManager.currentTime(); setState(ProcedureState.RUNNABLE); @@ -503,13 +535,11 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE /** * Called by the ProcedureExecutor to assign the parent to the newly created procedure. */ - @InterfaceAudience.Private - protected void setParentProcId(final long parentProcId) { + protected void setParentProcId(long parentProcId) { this.parentProcId = parentProcId; } - @InterfaceAudience.Private - protected void setRootProcId(final long rootProcId) { + protected void setRootProcId(long rootProcId) { this.rootProcId = rootProcId; } @@ -517,18 +547,16 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE * Called by the ProcedureExecutor to set the value to the newly created procedure. */ @VisibleForTesting - @InterfaceAudience.Private - protected void setNonceKey(final NonceKey nonceKey) { + protected void setNonceKey(NonceKey nonceKey) { this.nonceKey = nonceKey; } @VisibleForTesting - @InterfaceAudience.Private - public void setOwner(final String owner) { + public void setOwner(String owner) { this.owner = StringUtils.isEmpty(owner) ? null : owner; } - public void setOwner(final User owner) { + public void setOwner(User owner) { assert owner != null : "expected owner to be not null"; setOwner(owner.getShortName()); } @@ -537,8 +565,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE * Called on store load to initialize the Procedure internals after * the creation/deserialization. */ - @InterfaceAudience.Private - protected void setSubmittedTime(final long submittedTime) { + protected void setSubmittedTime(long submittedTime) { this.submittedTime = submittedTime; } @@ -548,7 +575,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE /** * @param timeout timeout interval in msec */ - protected void setTimeout(final int timeout) { + protected void setTimeout(int timeout) { this.timeout = timeout; } @@ -567,15 +594,13 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE * Called on store load to initialize the Procedure internals after * the creation/deserialization. */ - @InterfaceAudience.Private - protected void setLastUpdate(final long lastUpdate) { + protected void setLastUpdate(long lastUpdate) { this.lastUpdate = lastUpdate; } /** * Called by ProcedureExecutor after each time a procedure step is executed. */ - @InterfaceAudience.Private protected void updateTimestamp() { this.lastUpdate = EnvironmentEdgeManager.currentTime(); } @@ -590,7 +615,6 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE * the procedure is in the waiting queue. * @return the timestamp of the next timeout. */ - @InterfaceAudience.Private protected long getTimeoutTimestamp() { return getLastUpdate() + getTimeout(); } @@ -616,10 +640,19 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE * The procedure may leave a "result" on completion. * @param result the serialized result that will be passed to the client */ - protected void setResult(final byte[] result) { + protected void setResult(byte[] result) { this.result = result; } + /** + * Will only be called when loading procedures from procedure store, where we need to record + * whether the procedure has already held a lock. Later we will call + * {@link #doAcquireLock(Object)} to actually acquire the lock. + */ + final void lockedWhenLoading() { + this.lockedWhenLoading = true; + } + // ============================================================================================== // Runtime state, updated every operation by the ProcedureExecutor // @@ -677,13 +710,11 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE } @VisibleForTesting - @InterfaceAudience.Private protected synchronized void setState(final ProcedureState state) { this.state = state; updateTimestamp(); } - @InterfaceAudience.Private public synchronized ProcedureState getState() { return state; } @@ -705,10 +736,10 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE /** * Called by the ProcedureExecutor when the timeout set by setTimeout() is expired. - * @return true to let the framework handle the timeout as abort, - * false in case the procedure handled the timeout itself. + * @return true to let the framework handle the timeout as abort, false in case the procedure + * handled the timeout itself. */ - protected synchronized boolean setTimeoutFailure(final TEnvironment env) { + protected synchronized boolean setTimeoutFailure(TEnvironment env) { if (state == ProcedureState.WAITING_TIMEOUT) { long timeDiff = EnvironmentEdgeManager.currentTime() - lastUpdate; setFailure("ProcedureExecutor", new TimeoutIOException( @@ -729,8 +760,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE /** * Called by the ProcedureExecutor on procedure-load to restore the latch state */ - @InterfaceAudience.Private - protected synchronized void setChildrenLatch(final int numChildren) { + protected synchronized void setChildrenLatch(int numChildren) { this.childrenLatch = numChildren; if (LOG.isTraceEnabled()) { LOG.trace("CHILD LATCH INCREMENT SET " + @@ -741,7 +771,6 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE /** * Called by the ProcedureExecutor on procedure-load to restore the latch state */ - @InterfaceAudience.Private protected synchronized void incChildrenLatch() { // TODO: can this be inferred from the stack? I think so... this.childrenLatch++; @@ -753,7 +782,6 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE /** * Called by the ProcedureExecutor to notify that one of the sub-procedures has completed. */ - @InterfaceAudience.Private private synchronized boolean childrenCountDown() { assert childrenLatch > 0: this; boolean b = --childrenLatch == 0; @@ -770,17 +798,18 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE */ synchronized boolean tryRunnable() { // Don't use isWaiting in the below; it returns true for WAITING and WAITING_TIMEOUT - boolean b = getState() == ProcedureState.WAITING && childrenCountDown(); - if (b) setState(ProcedureState.RUNNABLE); - return b; + if (getState() == ProcedureState.WAITING && childrenCountDown()) { + setState(ProcedureState.RUNNABLE); + return true; + } else { + return false; + } } - @InterfaceAudience.Private protected synchronized boolean hasChildren() { return childrenLatch > 0; } - @InterfaceAudience.Private protected synchronized int getChildrenLatch() { return childrenLatch; } @@ -789,7 +818,6 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE * Called by the RootProcedureState on procedure execution. * Each procedure store its stack-index positions. */ - @InterfaceAudience.Private protected synchronized void addStackIndex(final int index) { if (stackIndexes == null) { stackIndexes = new int[] { index }; @@ -800,7 +828,6 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE } } - @InterfaceAudience.Private protected synchronized boolean removeStackIndex() { if (stackIndexes != null && stackIndexes.length > 1) { stackIndexes = Arrays.copyOf(stackIndexes, stackIndexes.length - 1); @@ -815,7 +842,6 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE * Called on store load to initialize the Procedure internals after * the creation/deserialization. */ - @InterfaceAudience.Private protected synchronized void setStackIndexes(final List<Integer> stackIndexes) { this.stackIndexes = new int[stackIndexes.size()]; for (int i = 0; i < this.stackIndexes.length; ++i) { @@ -823,12 +849,10 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE } } - @InterfaceAudience.Private protected synchronized boolean wasExecuted() { return stackIndexes != null; } - @InterfaceAudience.Private protected synchronized int[] getStackIndexes() { return stackIndexes; } @@ -840,10 +864,9 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE /** * Internal method called by the ProcedureExecutor that starts the user-level code execute(). * @throws ProcedureSuspendedException This is used when procedure wants to halt processing and - * skip out without changing states or releasing any locks held. + * skip out without changing states or releasing any locks held. */ - @InterfaceAudience.Private - protected Procedure<TEnvironment>[] doExecute(final TEnvironment env) + protected Procedure<TEnvironment>[] doExecute(TEnvironment env) throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { try { updateTimestamp(); @@ -856,8 +879,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE /** * Internal method called by the ProcedureExecutor that starts the user-level code rollback(). */ - @InterfaceAudience.Private - protected void doRollback(final TEnvironment env) + protected void doRollback(TEnvironment env) throws IOException, InterruptedException { try { updateTimestamp(); @@ -867,19 +889,60 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE } } + final void restoreLock(TEnvironment env) { + if (!lockedWhenLoading) { + LOG.debug("{} didn't hold the lock before restarting, skip acquiring lock.", this); + return; + } + + LOG.debug("{} held the lock before restarting, call acquireLock to restore it.", this); + LockState state = acquireLock(env); + assert state == LockState.LOCK_ACQUIRED; + } + /** * Internal method called by the ProcedureExecutor that starts the user-level code acquireLock(). */ - @InterfaceAudience.Private - protected LockState doAcquireLock(final TEnvironment env) { - return acquireLock(env); + final LockState doAcquireLock(TEnvironment env, ProcedureStore store) { + if (waitInitialized(env)) { + return LockState.LOCK_EVENT_WAIT; + } + if (lockedWhenLoading) { + // reset it so we will not consider it anymore + lockedWhenLoading = false; + locked = true; + // Here we return without persist the locked state, as lockedWhenLoading is true means + // that the locked field of the procedure stored in procedure store is true, so we do not need + // to store it again. + return LockState.LOCK_ACQUIRED; + } + LockState state = acquireLock(env); + if (state == LockState.LOCK_ACQUIRED) { + locked = true; + // persist that we have held the lock. This must be done before we actually execute the + // procedure, otherwise when restarting, we may consider the procedure does not have a lock, + // but it may have already done some changes as we have already executed it, and if another + // procedure gets the lock, then the semantic will be broken if the holdLock is true, as we do + // not expect that another procedure can be executed in the middle. + store.update(this); + } + return state; } /** * Internal method called by the ProcedureExecutor that starts the user-level code releaseLock(). */ - @InterfaceAudience.Private - protected void doReleaseLock(final TEnvironment env) { + final void doReleaseLock(TEnvironment env, ProcedureStore store) { + locked = false; + // persist that we have released the lock. This must be done before we actually release the + // lock. Another procedure may take this lock immediately after we release the lock, and if we + // crash before persist the information that we have already released the lock, then when + // restarting there will be two procedures which both have the lock and cause problems. + if (getState() != ProcedureState.ROLLEDBACK) { + // If the state is ROLLEDBACK, it means that we have already deleted the procedure from + // procedure store, so do not need to log the release operation any more. + store.update(this); + } releaseLock(env); } @@ -896,7 +959,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE * Get an hashcode for the specified Procedure ID * @return the hashcode for the specified procId */ - public static long getProcIdHashCode(final long procId) { + public static long getProcIdHashCode(long procId) { long h = procId; h ^= h >> 16; h *= 0x85ebca6b; @@ -906,15 +969,16 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE return h; } - /* + /** * Helper to lookup the root Procedure ID given a specified procedure. */ - @InterfaceAudience.Private - protected static Long getRootProcedureId(final Map<Long, Procedure> procedures, - Procedure<?> proc) { + protected static <T> Long getRootProcedureId(Map<Long, Procedure<T>> procedures, + Procedure<T> proc) { while (proc.hasParent()) { proc = procedures.get(proc.getParentProcId()); - if (proc == null) return null; + if (proc == null) { + return null; + } } return proc.getProcId(); } @@ -924,7 +988,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE * @param b the second procedure to be compared. * @return true if the two procedures have the same parent */ - public static boolean haveSameParent(final Procedure<?> a, final Procedure<?> b) { + public static boolean haveSameParent(Procedure<?> a, Procedure<?> b) { return a.hasParent() && b.hasParent() && (a.getParentProcId() == b.getParentProcId()); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/833657c4/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java index db7c118..f1bec72 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java @@ -19,8 +19,10 @@ package org.apache.hadoop.hbase.procedure2; import java.io.IOException; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Arrays; +import java.util.Deque; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -48,7 +50,6 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.NonceKey; import org.apache.hadoop.hbase.util.Threads; import org.apache.yetus.audience.InterfaceAudience; -import org.apache.yetus.audience.InterfaceStability; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -71,7 +72,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.Procedu * and get the result via getResult(procId) */ @InterfaceAudience.Private [email protected] public class ProcedureExecutor<TEnvironment> { private static final Logger LOG = LoggerFactory.getLogger(ProcedureExecutor.class); @@ -108,16 +108,16 @@ public class ProcedureExecutor<TEnvironment> { void procedureFinished(long procId); } - private static class CompletedProcedureRetainer { - private final Procedure<?> procedure; + private static final class CompletedProcedureRetainer<TEnvironment> { + private final Procedure<TEnvironment> procedure; private long clientAckTime; - public CompletedProcedureRetainer(Procedure<?> procedure) { + public CompletedProcedureRetainer(Procedure<TEnvironment> procedure) { this.procedure = procedure; clientAckTime = -1; } - public Procedure<?> getProcedure() { + public Procedure<TEnvironment> getProcedure() { return procedure; } @@ -172,13 +172,13 @@ public class ProcedureExecutor<TEnvironment> { private static final String BATCH_SIZE_CONF_KEY = "hbase.procedure.cleaner.evict.batch.size"; private static final int DEFAULT_BATCH_SIZE = 32; - private final Map<Long, CompletedProcedureRetainer> completed; + private final Map<Long, CompletedProcedureRetainer<TEnvironment>> completed; private final Map<NonceKey, Long> nonceKeysToProcIdsMap; private final ProcedureStore store; private Configuration conf; - public CompletedProcedureCleaner(final Configuration conf, final ProcedureStore store, - final Map<Long, CompletedProcedureRetainer> completedMap, + public CompletedProcedureCleaner(Configuration conf, final ProcedureStore store, + final Map<Long, CompletedProcedureRetainer<TEnvironment>> completedMap, final Map<NonceKey, Long> nonceKeysToProcIdsMap) { // set the timeout interval that triggers the periodic-procedure super(conf.getInt(CLEANER_INTERVAL_CONF_KEY, DEFAULT_CLEANER_INTERVAL)); @@ -205,10 +205,11 @@ public class ProcedureExecutor<TEnvironment> { int batchCount = 0; final long now = EnvironmentEdgeManager.currentTime(); - final Iterator<Map.Entry<Long, CompletedProcedureRetainer>> it = completed.entrySet().iterator(); + final Iterator<Map.Entry<Long, CompletedProcedureRetainer<TEnvironment>>> it = + completed.entrySet().iterator(); while (it.hasNext() && store.isRunning()) { - final Map.Entry<Long, CompletedProcedureRetainer> entry = it.next(); - final CompletedProcedureRetainer retainer = entry.getValue(); + final Map.Entry<Long, CompletedProcedureRetainer<TEnvironment>> entry = it.next(); + final CompletedProcedureRetainer<TEnvironment> retainer = entry.getValue(); final Procedure<?> proc = retainer.getProcedure(); // TODO: Select TTL based on Procedure type @@ -240,28 +241,32 @@ public class ProcedureExecutor<TEnvironment> { * Once a Root-Procedure completes (success or failure), the result will be added to this map. * The user of ProcedureExecutor should call getResult(procId) to get the result. */ - private final ConcurrentHashMap<Long, CompletedProcedureRetainer> completed = new ConcurrentHashMap<>(); + private final ConcurrentHashMap<Long, CompletedProcedureRetainer<TEnvironment>> completed = + new ConcurrentHashMap<>(); /** * Map the the procId returned by submitProcedure(), the Root-ProcID, to the RootProcedureState. * The RootProcedureState contains the execution stack of the Root-Procedure, * It is added to the map by submitProcedure() and removed on procedure completion. */ - private final ConcurrentHashMap<Long, RootProcedureState> rollbackStack = new ConcurrentHashMap<>(); + private final ConcurrentHashMap<Long, RootProcedureState<TEnvironment>> rollbackStack = + new ConcurrentHashMap<>(); /** * Helper map to lookup the live procedures by ID. * This map contains every procedure. root-procedures and subprocedures. */ - private final ConcurrentHashMap<Long, Procedure> procedures = new ConcurrentHashMap<>(); + private final ConcurrentHashMap<Long, Procedure<TEnvironment>> procedures = + new ConcurrentHashMap<>(); /** - * Helper map to lookup whether the procedure already issued from the same client. - * This map contains every root procedure. + * Helper map to lookup whether the procedure already issued from the same client. This map + * contains every root procedure. */ private final ConcurrentHashMap<NonceKey, Long> nonceKeysToProcIdsMap = new ConcurrentHashMap<>(); - private final CopyOnWriteArrayList<ProcedureExecutorListener> listeners = new CopyOnWriteArrayList<>(); + private final CopyOnWriteArrayList<ProcedureExecutorListener> listeners = + new CopyOnWriteArrayList<>(); private Configuration conf; @@ -287,7 +292,7 @@ public class ProcedureExecutor<TEnvironment> { * Overridden when we do the ProcedureTestingUtility.testRecoveryAndDoubleExecution trickery * (Should be ok). */ - private TimeoutExecutorThread timeoutExecutor; + private TimeoutExecutorThread<TEnvironment> timeoutExecutor; private int corePoolSize; private int maxPoolSize; @@ -357,27 +362,68 @@ public class ProcedureExecutor<TEnvironment> { }); } - private void loadProcedures(final ProcedureIterator procIter, - final boolean abortOnCorruption) throws IOException { - final boolean debugEnabled = LOG.isDebugEnabled(); + private void restoreLock(Procedure<TEnvironment> proc, Set<Long> restored) { + proc.restoreLock(getEnvironment()); + restored.add(proc.getProcId()); + } + + private void restoreLocks(Deque<Procedure<TEnvironment>> stack, Set<Long> restored) { + while (!stack.isEmpty()) { + restoreLock(stack.pop(), restored); + } + } + + // Restore the locks for all the procedures. + // Notice that we need to restore the locks starting from the root proc, otherwise there will be + // problem that a sub procedure may hold the exclusive lock first and then we are stuck when + // calling the acquireLock method for the parent procedure. + // The algorithm is straight-forward: + // 1. Use a set to record the procedures which locks have already been restored. + // 2. Use a stack to store the hierarchy of the procedures + // 3. For all the procedure, we will first try to find its parent and push it into the stack, + // unless + // a. We have no parent, i.e, we are the root procedure + // b. The lock has already been restored(by checking the set introduced in #1) + // then we start to pop the stack and call acquireLock for each procedure. + // Notice that this should be done for all procedures, not only the ones in runnableList. + private void restoreLocks() { + Set<Long> restored = new HashSet<>(); + Deque<Procedure<TEnvironment>> stack = new ArrayDeque<>(); + procedures.values().forEach(proc -> { + for (;;) { + if (restored.contains(proc.getProcId())) { + restoreLocks(stack, restored); + return; + } + if (!proc.hasParent()) { + restoreLock(proc, restored); + restoreLocks(stack, restored); + return; + } + stack.push(proc); + proc = procedures.get(proc.getParentProcId()); + } + }); + } + private void loadProcedures(ProcedureIterator procIter, boolean abortOnCorruption) + throws IOException { // 1. Build the rollback stack int runnablesCount = 0; + int failedCount = 0; while (procIter.hasNext()) { boolean finished = procIter.isNextFinished(); - Procedure proc = procIter.next(); + Procedure<TEnvironment> proc = procIter.next(); NonceKey nonceKey = proc.getNonceKey(); long procId = proc.getProcId(); if (finished) { - completed.put(proc.getProcId(), new CompletedProcedureRetainer(proc)); - if (debugEnabled) { - LOG.debug("Completed " + proc); - } + completed.put(proc.getProcId(), new CompletedProcedureRetainer<>(proc)); + LOG.debug("Completed {}", proc); } else { if (!proc.hasParent()) { assert !proc.isFinished() : "unexpected finished procedure"; - rollbackStack.put(proc.getProcId(), new RootProcedureState()); + rollbackStack.put(proc.getProcId(), new RootProcedureState<>()); } // add the procedure to the map @@ -386,6 +432,8 @@ public class ProcedureExecutor<TEnvironment> { if (proc.getState() == ProcedureState.RUNNABLE) { runnablesCount++; + } else if (proc.getState() == ProcedureState.FAILED) { + failedCount++; } } @@ -396,8 +444,19 @@ public class ProcedureExecutor<TEnvironment> { } // 2. Initialize the stacks - final ArrayList<Procedure> runnableList = new ArrayList(runnablesCount); - HashSet<Procedure> waitingSet = null; + // In the old implementation, for procedures in FAILED state, we will push it into the + // ProcedureScheduler directly to execute the rollback. But this does not work after we + // introduce the restore lock stage. + // For now, when we acquire a xlock, we will remove the queue from runQueue in scheduler, and + // then when a procedure which has lock access, for example, a sub procedure of the procedure + // which has the xlock, is pushed into the scheduler, we will add the queue back to let the + // workers poll from it. The assumption here is that, the procedure which has the xlock should + // have been polled out already, so when loading we can not add the procedure to scheduler first + // and then call acquireLock, since the procedure is still in the queue, and since we will + // remove the queue from runQueue, then no one can poll it out, then there is a dead lock + List<Procedure<TEnvironment>> runnableList = new ArrayList<>(runnablesCount); + List<Procedure<TEnvironment>> failedList = new ArrayList<>(failedCount); + Set<Procedure<TEnvironment>> waitingSet = null; procIter.reset(); while (procIter.hasNext()) { if (procIter.isNextFinished()) { @@ -405,12 +464,10 @@ public class ProcedureExecutor<TEnvironment> { continue; } - Procedure proc = procIter.next(); + Procedure<TEnvironment> proc = procIter.next(); assert !(proc.isFinished() && !proc.hasParent()) : "unexpected completed proc=" + proc; - if (debugEnabled) { - LOG.debug(String.format("Loading %s", proc)); - } + LOG.debug("Loading {}", proc); Long rootProcId = getRootProcedureId(proc); if (rootProcId == null) { @@ -420,14 +477,14 @@ public class ProcedureExecutor<TEnvironment> { } if (proc.hasParent()) { - Procedure parent = procedures.get(proc.getParentProcId()); + Procedure<TEnvironment> parent = procedures.get(proc.getParentProcId()); // corrupted procedures are handled later at step 3 if (parent != null && !proc.isFinished()) { parent.incChildrenLatch(); } } - RootProcedureState procStack = rollbackStack.get(rootProcId); + RootProcedureState<TEnvironment> procStack = rollbackStack.get(rootProcId); procStack.loadStack(proc); proc.setRootProcId(rootProcId); @@ -447,8 +504,7 @@ public class ProcedureExecutor<TEnvironment> { waitingSet.add(proc); break; case FAILED: - // add the proc to the scheduler to perform the rollback - scheduler.addBack(proc); + failedList.add(proc); break; case ROLLEDBACK: case INITIALIZING: @@ -462,13 +518,14 @@ public class ProcedureExecutor<TEnvironment> { // 3. Validate the stacks int corruptedCount = 0; - Iterator<Map.Entry<Long, RootProcedureState>> itStack = rollbackStack.entrySet().iterator(); + Iterator<Map.Entry<Long, RootProcedureState<TEnvironment>>> itStack = + rollbackStack.entrySet().iterator(); while (itStack.hasNext()) { - Map.Entry<Long, RootProcedureState> entry = itStack.next(); - RootProcedureState procStack = entry.getValue(); + Map.Entry<Long, RootProcedureState<TEnvironment>> entry = itStack.next(); + RootProcedureState<TEnvironment> procStack = entry.getValue(); if (procStack.isValid()) continue; - for (Procedure proc: procStack.getSubproceduresStack()) { + for (Procedure<TEnvironment> proc : procStack.getSubproceduresStack()) { LOG.error("Corrupted " + proc); procedures.remove(proc.getProcId()); runnableList.remove(proc); @@ -484,30 +541,22 @@ public class ProcedureExecutor<TEnvironment> { // 4. Push the procedures to the timeout executor if (waitingSet != null && !waitingSet.isEmpty()) { - for (Procedure proc: waitingSet) { + for (Procedure<TEnvironment> proc: waitingSet) { proc.afterReplay(getEnvironment()); timeoutExecutor.add(proc); } } - - // 5. Push the procedure to the scheduler - if (!runnableList.isEmpty()) { - // TODO: See ProcedureWALFormatReader#hasFastStartSupport - // some procedure may be started way before this stuff. - for (int i = runnableList.size() - 1; i >= 0; --i) { - Procedure proc = runnableList.get(i); - proc.afterReplay(getEnvironment()); - if (!proc.hasParent()) { - sendProcedureLoadedNotification(proc.getProcId()); - } - if (proc.wasExecuted()) { - scheduler.addFront(proc); - } else { - // if it was not in execution, it can wait. - scheduler.addBack(proc); - } + // 5. restore locks + restoreLocks(); + // 6. Push the procedure to the scheduler + failedList.forEach(scheduler::addBack); + runnableList.forEach(p -> { + p.afterReplay(getEnvironment()); + if (!p.hasParent()) { + sendProcedureLoadedNotification(p.getProcId()); } - } + scheduler.addBack(p); + }); } /** @@ -529,7 +578,7 @@ public class ProcedureExecutor<TEnvironment> { corePoolSize, maxPoolSize); this.threadGroup = new ThreadGroup("PEWorkerGroup"); - this.timeoutExecutor = new TimeoutExecutorThread(this, threadGroup); + this.timeoutExecutor = new TimeoutExecutorThread<>(this, threadGroup); // Create the workers workerId.set(0); @@ -581,7 +630,7 @@ public class ProcedureExecutor<TEnvironment> { timeoutExecutor.add(new WorkerMonitor()); // Add completed cleaner chore - addChore(new CompletedProcedureCleaner(conf, store, completed, nonceKeysToProcIdsMap)); + addChore(new CompletedProcedureCleaner<>(conf, store, completed, nonceKeysToProcIdsMap)); } public void stop() { @@ -686,7 +735,7 @@ public class ProcedureExecutor<TEnvironment> { * Add a chore procedure to the executor * @param chore the chore to add */ - public void addChore(final ProcedureInMemoryChore chore) { + public void addChore(ProcedureInMemoryChore<TEnvironment> chore) { chore.setState(ProcedureState.WAITING_TIMEOUT); timeoutExecutor.add(chore); } @@ -696,7 +745,7 @@ public class ProcedureExecutor<TEnvironment> { * @param chore the chore to remove * @return whether the chore is removed, or it will be removed later */ - public boolean removeChore(final ProcedureInMemoryChore chore) { + public boolean removeChore(ProcedureInMemoryChore<TEnvironment> chore) { chore.setState(ProcedureState.SUCCESS); return timeoutExecutor.remove(chore); } @@ -830,17 +879,21 @@ public class ProcedureExecutor<TEnvironment> { * @param procOwner name of the owner of the procedure, used to inform the user * @param exception the failure to report to the user */ - public void setFailureResultForNonce(final NonceKey nonceKey, final String procName, - final User procOwner, final IOException exception) { - if (nonceKey == null) return; + public void setFailureResultForNonce(NonceKey nonceKey, String procName, User procOwner, + IOException exception) { + if (nonceKey == null) { + return; + } - final Long procId = nonceKeysToProcIdsMap.get(nonceKey); - if (procId == null || completed.containsKey(procId)) return; + Long procId = nonceKeysToProcIdsMap.get(nonceKey); + if (procId == null || completed.containsKey(procId)) { + return; + } - Procedure<?> proc = new FailedProcedure(procId.longValue(), - procName, procOwner, nonceKey, exception); + Procedure<TEnvironment> proc = + new FailedProcedure<>(procId.longValue(), procName, procOwner, nonceKey, exception); - completed.putIfAbsent(procId, new CompletedProcedureRetainer(proc)); + completed.putIfAbsent(procId, new CompletedProcedureRetainer<>(proc)); } // ========================================================================== @@ -851,7 +904,7 @@ public class ProcedureExecutor<TEnvironment> { * @param proc the new procedure to execute. * @return the procedure id, that can be used to monitor the operation */ - public long submitProcedure(final Procedure proc) { + public long submitProcedure(Procedure<TEnvironment> proc) { return submitProcedure(proc, null); } @@ -863,7 +916,7 @@ public class ProcedureExecutor<TEnvironment> { */ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH", justification = "FindBugs is blind to the check-for-null") - public long submitProcedure(final Procedure proc, final NonceKey nonceKey) { + public long submitProcedure(Procedure<TEnvironment> proc, NonceKey nonceKey) { Preconditions.checkArgument(lastProcId.get() >= 0); prepareProcedure(proc); @@ -883,9 +936,7 @@ public class ProcedureExecutor<TEnvironment> { // Commit the transaction store.insert(proc, null); - if (LOG.isDebugEnabled()) { - LOG.debug("Stored " + proc); - } + LOG.debug("Stored {}", proc); // Add the procedure to the executor return pushProcedure(proc); @@ -896,7 +947,7 @@ public class ProcedureExecutor<TEnvironment> { * @param procs the new procedures to execute. */ // TODO: Do we need to take nonces here? - public void submitProcedures(final Procedure[] procs) { + public void submitProcedures(Procedure<TEnvironment>[] procs) { Preconditions.checkArgument(lastProcId.get() >= 0); if (procs == null || procs.length <= 0) { return; @@ -919,7 +970,7 @@ public class ProcedureExecutor<TEnvironment> { } } - private Procedure prepareProcedure(final Procedure proc) { + private Procedure<TEnvironment> prepareProcedure(Procedure<TEnvironment> proc) { Preconditions.checkArgument(proc.getState() == ProcedureState.INITIALIZING); Preconditions.checkArgument(!proc.hasParent(), "unexpected parent", proc); if (this.checkOwnerSet) { @@ -928,14 +979,14 @@ public class ProcedureExecutor<TEnvironment> { return proc; } - private long pushProcedure(final Procedure proc) { + private long pushProcedure(Procedure<TEnvironment> proc) { final long currentProcId = proc.getProcId(); // Update metrics on start of a procedure proc.updateMetricsOnSubmit(getEnvironment()); // Create the rollback stack for the procedure - RootProcedureState stack = new RootProcedureState(); + RootProcedureState<TEnvironment> stack = new RootProcedureState<>(); rollbackStack.put(currentProcId, stack); // Submit the new subprocedures @@ -952,7 +1003,7 @@ public class ProcedureExecutor<TEnvironment> { * @param procId the procedure to abort * @return true if the procedure exists and has received the abort, otherwise false. */ - public boolean abort(final long procId) { + public boolean abort(long procId) { return abort(procId, true); } @@ -963,8 +1014,8 @@ public class ProcedureExecutor<TEnvironment> { * @param mayInterruptIfRunning if the proc completed at least one step, should it be aborted? * @return true if the procedure exists and has received the abort, otherwise false. */ - public boolean abort(final long procId, final boolean mayInterruptIfRunning) { - final Procedure proc = procedures.get(procId); + public boolean abort(long procId, boolean mayInterruptIfRunning) { + Procedure<TEnvironment> proc = procedures.get(procId); if (proc != null) { if (!mayInterruptIfRunning && proc.wasExecuted()) { return false; @@ -977,20 +1028,20 @@ public class ProcedureExecutor<TEnvironment> { // ========================================================================== // Executor query helpers // ========================================================================== - public Procedure getProcedure(final long procId) { + public Procedure<TEnvironment> getProcedure(final long procId) { return procedures.get(procId); } - public <T extends Procedure> T getProcedure(final Class<T> clazz, final long procId) { - final Procedure proc = getProcedure(procId); + public <T extends Procedure<TEnvironment>> T getProcedure(Class<T> clazz, long procId) { + Procedure<TEnvironment> proc = getProcedure(procId); if (clazz.isInstance(proc)) { - return (T)proc; + return clazz.cast(proc); } return null; } - public Procedure getResult(final long procId) { - CompletedProcedureRetainer retainer = completed.get(procId); + public Procedure<TEnvironment> getResult(long procId) { + CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId); if (retainer == null) { return null; } else { @@ -1014,8 +1065,8 @@ public class ProcedureExecutor<TEnvironment> { * @param procId the ID of the procedure to check * @return true if the procedure execution is started, otherwise false. */ - public boolean isStarted(final long procId) { - final Procedure proc = procedures.get(procId); + public boolean isStarted(long procId) { + Procedure<?> proc = procedures.get(procId); if (proc == null) { return completed.get(procId) != null; } @@ -1026,13 +1077,11 @@ public class ProcedureExecutor<TEnvironment> { * Mark the specified completed procedure, as ready to remove. * @param procId the ID of the procedure to remove */ - public void removeResult(final long procId) { - CompletedProcedureRetainer retainer = completed.get(procId); + public void removeResult(long procId) { + CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId); if (retainer == null) { assert !procedures.containsKey(procId) : "pid=" + procId + " is still running"; - if (LOG.isDebugEnabled()) { - LOG.debug("pid=" + procId + " already removed by the cleaner."); - } + LOG.debug("pid={} already removed by the cleaner.", procId); return; } @@ -1040,8 +1089,8 @@ public class ProcedureExecutor<TEnvironment> { retainer.setClientAckTime(EnvironmentEdgeManager.currentTime()); } - public Procedure getResultOrProcedure(final long procId) { - CompletedProcedureRetainer retainer = completed.get(procId); + public Procedure<TEnvironment> getResultOrProcedure(long procId) { + CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId); if (retainer == null) { return procedures.get(procId); } else { @@ -1056,15 +1105,16 @@ public class ProcedureExecutor<TEnvironment> { * @return true if the user is the owner of the procedure, * false otherwise or the owner is unknown. */ - public boolean isProcedureOwner(final long procId, final User user) { - if (user == null) return false; - - final Procedure runningProc = procedures.get(procId); + public boolean isProcedureOwner(long procId, User user) { + if (user == null) { + return false; + } + final Procedure<TEnvironment> runningProc = procedures.get(procId); if (runningProc != null) { return runningProc.getOwner().equals(user.getShortName()); } - final CompletedProcedureRetainer retainer = completed.get(procId); + final CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId); if (retainer != null) { return retainer.getProcedure().getOwner().equals(user.getShortName()); } @@ -1078,19 +1128,17 @@ public class ProcedureExecutor<TEnvironment> { * Get procedures. * @return the procedures in a list */ - public List<Procedure<?>> getProcedures() { - final List<Procedure<?>> procedureLists = new ArrayList<>(procedures.size() + completed.size()); - for (Procedure<?> procedure : procedures.values()) { - procedureLists.add(procedure); - } + public List<Procedure<TEnvironment>> getProcedures() { + List<Procedure<TEnvironment>> procedureList = + new ArrayList<>(procedures.size() + completed.size()); + procedureList.addAll(procedures.values()); // Note: The procedure could show up twice in the list with different state, as // it could complete after we walk through procedures list and insert into // procedureList - it is ok, as we will use the information in the Procedure // to figure it out; to prevent this would increase the complexity of the logic. - for (CompletedProcedureRetainer retainer: completed.values()) { - procedureLists.add(retainer.getProcedure()); - } - return procedureLists; + completed.values().stream().map(CompletedProcedureRetainer::getProcedure) + .forEach(procedureList::add); + return procedureList; } // ========================================================================== @@ -1169,14 +1217,14 @@ public class ProcedureExecutor<TEnvironment> { return procedures.keySet(); } - Long getRootProcedureId(Procedure proc) { + Long getRootProcedureId(Procedure<TEnvironment> proc) { return Procedure.getRootProcedureId(procedures, proc); } // ========================================================================== // Executions // ========================================================================== - private void executeProcedure(final Procedure proc) { + private void executeProcedure(Procedure<TEnvironment> proc) { final Long rootProcId = getRootProcedureId(proc); if (rootProcId == null) { // The 'proc' was ready to run but the root procedure was rolledback @@ -1185,7 +1233,7 @@ public class ProcedureExecutor<TEnvironment> { return; } - final RootProcedureState procStack = rollbackStack.get(rootProcId); + RootProcedureState<TEnvironment> procStack = rollbackStack.get(rootProcId); if (procStack == null) { LOG.warn("RootProcedureState is null for " + proc.getProcId()); return; @@ -1197,7 +1245,7 @@ public class ProcedureExecutor<TEnvironment> { // we have the 'rollback-lock' we can start rollingback switch (executeRollback(rootProcId, procStack)) { case LOCK_ACQUIRED: - break; + break; case LOCK_YIELD_WAIT: procStack.unsetRollback(); scheduler.yield(proc); @@ -1239,7 +1287,6 @@ public class ProcedureExecutor<TEnvironment> { switch (lockState) { case LOCK_ACQUIRED: execProcedure(procStack, proc); - releaseLock(proc, false); break; case LOCK_YIELD_WAIT: LOG.info(lockState + " " + proc); @@ -1254,12 +1301,6 @@ public class ProcedureExecutor<TEnvironment> { } procStack.release(proc); - // allows to kill the executor before something is stored to the wal. - // useful to test the procedure recovery. - if (testing != null && !isRunning()) { - break; - } - if (proc.isSuccess()) { // update metrics on finishing the procedure proc.updateMetricsOnFinish(getEnvironment(), proc.elapsedTime(), true); @@ -1275,33 +1316,31 @@ public class ProcedureExecutor<TEnvironment> { } while (procStack.isFailed()); } - private LockState acquireLock(final Procedure proc) { - final TEnvironment env = getEnvironment(); - // hasLock() is used in conjunction with holdLock(). - // This allows us to not rewrite or carry around the hasLock() flag - // for every procedure. the hasLock() have meaning only if holdLock() is true. - if (proc.holdLock(env) && proc.hasLock(env)) { + private LockState acquireLock(Procedure<TEnvironment> proc) { + TEnvironment env = getEnvironment(); + // if holdLock is true, then maybe we already have the lock, so just return LOCK_ACQUIRED if + // hasLock is true. + if (proc.hasLock()) { return LockState.LOCK_ACQUIRED; } - return proc.doAcquireLock(env); + return proc.doAcquireLock(env, store); } - private void releaseLock(final Procedure proc, final boolean force) { - final TEnvironment env = getEnvironment(); + private void releaseLock(Procedure<TEnvironment> proc, boolean force) { + TEnvironment env = getEnvironment(); // For how the framework works, we know that we will always have the lock // when we call releaseLock(), so we can avoid calling proc.hasLock() - if (force || !proc.holdLock(env)) { - proc.doReleaseLock(env); + if (force || !proc.holdLock(env) || proc.isFinished()) { + proc.doReleaseLock(env, store); } } /** - * Execute the rollback of the full procedure stack. - * Once the procedure is rolledback, the root-procedure will be visible as - * finished to user, and the result will be the fatal exception. + * Execute the rollback of the full procedure stack. Once the procedure is rolledback, the + * root-procedure will be visible as finished to user, and the result will be the fatal exception. */ - private LockState executeRollback(final long rootProcId, final RootProcedureState procStack) { - final Procedure rootProc = procedures.get(rootProcId); + private LockState executeRollback(long rootProcId, RootProcedureState<TEnvironment> procStack) { + Procedure<TEnvironment> rootProc = procedures.get(rootProcId); RemoteProcedureException exception = rootProc.getException(); // TODO: This needs doc. The root proc doesn't have an exception. Maybe we are // rolling back because the subprocedure does. Clarify. @@ -1311,13 +1350,13 @@ public class ProcedureExecutor<TEnvironment> { store.update(rootProc); } - final List<Procedure> subprocStack = procStack.getSubproceduresStack(); + List<Procedure<TEnvironment>> subprocStack = procStack.getSubproceduresStack(); assert subprocStack != null : "Called rollback with no steps executed rootProc=" + rootProc; int stackTail = subprocStack.size(); boolean reuseLock = false; while (stackTail --> 0) { - final Procedure proc = subprocStack.get(stackTail); + Procedure<TEnvironment> proc = subprocStack.get(stackTail); LockState lockState; if (!reuseLock && (lockState = acquireLock(proc)) != LockState.LOCK_ACQUIRED) { @@ -1334,7 +1373,7 @@ public class ProcedureExecutor<TEnvironment> { // (e.g. StateMachineProcedure reuse the same instance) // we can avoid to lock/unlock each step reuseLock = stackTail > 0 && (subprocStack.get(stackTail - 1) == proc) && !abortRollback; - if (!reuseLock) { + if (!reuseLock && proc.hasLock()) { releaseLock(proc, false); } @@ -1368,13 +1407,11 @@ public class ProcedureExecutor<TEnvironment> { * It updates the store with the new state (stack index) * or will remove completly the procedure in case it is a child. */ - private LockState executeRollback(final Procedure proc) { + private LockState executeRollback(Procedure<TEnvironment> proc) { try { proc.doRollback(getEnvironment()); } catch (IOException e) { - if (LOG.isDebugEnabled()) { - LOG.debug("Roll back attempt failed for " + proc, e); - } + LOG.debug("Roll back attempt failed for {}", proc, e); return LockState.LOCK_YIELD_WAIT; } catch (InterruptedException e) { handleInterruptedException(proc, e); @@ -1387,9 +1424,10 @@ public class ProcedureExecutor<TEnvironment> { // allows to kill the executor before something is stored to the wal. // useful to test the procedure recovery. if (testing != null && testing.shouldKillBeforeStoreUpdate()) { - LOG.debug("TESTING: Kill before store update"); + String msg = "TESTING: Kill before store update"; + LOG.debug(msg); stop(); - return LockState.LOCK_YIELD_WAIT; + throw new RuntimeException(msg); } if (proc.removeStackIndex()) { @@ -1416,6 +1454,11 @@ public class ProcedureExecutor<TEnvironment> { return LockState.LOCK_ACQUIRED; } + private void yieldProcedure(Procedure<TEnvironment> proc) { + releaseLock(proc, false); + scheduler.yield(proc); + } + /** * Executes <code>procedure</code> * <ul> @@ -1445,10 +1488,10 @@ public class ProcedureExecutor<TEnvironment> { * </li> * </ul> */ - private void execProcedure(final RootProcedureState procStack, - final Procedure<TEnvironment> procedure) { + private void execProcedure(RootProcedureState<TEnvironment> procStack, + Procedure<TEnvironment> procedure) { Preconditions.checkArgument(procedure.getState() == ProcedureState.RUNNABLE, - procedure.toString()); + procedure.toString()); // Procedures can suspend themselves. They skip out by throwing a ProcedureSuspendedException. // The exception is caught below and then we hurry to the exit without disturbing state. The @@ -1475,22 +1518,16 @@ public class ProcedureExecutor<TEnvironment> { subprocs = null; } } catch (ProcedureSuspendedException e) { - if (LOG.isTraceEnabled()) { - LOG.trace("Suspend " + procedure); - } + LOG.trace("Suspend {}", procedure); suspended = true; } catch (ProcedureYieldException e) { - if (LOG.isTraceEnabled()) { - LOG.trace("Yield " + procedure + ": " + e.getMessage(), e); - } - scheduler.yield(procedure); + LOG.trace("Yield {}", procedure, e); + yieldProcedure(procedure); return; } catch (InterruptedException e) { - if (LOG.isTraceEnabled()) { - LOG.trace("Yield interrupt " + procedure + ": " + e.getMessage(), e); - } + LOG.trace("Yield interrupt {}", procedure, e); handleInterruptedException(procedure, e); - scheduler.yield(procedure); + yieldProcedure(procedure); return; } catch (Throwable e) { // Catch NullPointerExceptions or similar errors... @@ -1506,9 +1543,7 @@ public class ProcedureExecutor<TEnvironment> { // i.e. we go around this loop again rather than go back out on the scheduler queue. subprocs = null; reExecute = true; - if (LOG.isTraceEnabled()) { - LOG.trace("Short-circuit to next step on pid=" + procedure.getProcId()); - } + LOG.trace("Short-circuit to next step on pid={}", procedure.getProcId()); } else { // Yield the current procedure, and make the subprocedure runnable // subprocs may come back 'null'. @@ -1519,9 +1554,7 @@ public class ProcedureExecutor<TEnvironment> { collect(Collectors.toList()).toString())); } } else if (procedure.getState() == ProcedureState.WAITING_TIMEOUT) { - if (LOG.isTraceEnabled()) { - LOG.trace("Added to timeoutExecutor " + procedure); - } + LOG.trace("Added to timeoutExecutor {}", procedure); timeoutExecutor.add(procedure); } else if (!suspended) { // No subtask, so we are done @@ -1535,9 +1568,10 @@ public class ProcedureExecutor<TEnvironment> { // allows to kill the executor before something is stored to the wal. // useful to test the procedure recovery. if (testing != null && testing.shouldKillBeforeStoreUpdate(suspended)) { - LOG.debug("TESTING: Kill before store update: " + procedure); + String msg = "TESTING: Kill before store update: " + procedure; + LOG.debug(msg); stop(); - return; + throw new RuntimeException(msg); } // TODO: The code here doesn't check if store is running before persisting to the store as @@ -1551,11 +1585,13 @@ public class ProcedureExecutor<TEnvironment> { updateStoreOnExec(procStack, procedure, subprocs); // if the store is not running we are aborting - if (!store.isRunning()) return; + if (!store.isRunning()) { + return; + } // if the procedure is kind enough to pass the slot to someone else, yield if (procedure.isRunnable() && !suspended && procedure.isYieldAfterExecutionStep(getEnvironment())) { - scheduler.yield(procedure); + yieldProcedure(procedure); return; } @@ -1566,6 +1602,11 @@ public class ProcedureExecutor<TEnvironment> { submitChildrenProcedures(subprocs); } + // we need to log the release lock operation before waking up the parent procedure, as there + // could be race that the parent procedure may call updateStoreOnExec ahead of us and remove all + // the sub procedures from store and cause problems... + releaseLock(procedure, false); + // if the procedure is complete and has a parent, count down the children latch. // If 'suspended', do nothing to change state -- let other threads handle unsuspend event. if (!suspended && procedure.isFinished() && procedure.hasParent()) { @@ -1573,12 +1614,12 @@ public class ProcedureExecutor<TEnvironment> { } } - private Procedure[] initializeChildren(final RootProcedureState procStack, - final Procedure procedure, final Procedure[] subprocs) { + private Procedure<TEnvironment>[] initializeChildren(RootProcedureState<TEnvironment> procStack, + Procedure<TEnvironment> procedure, Procedure<TEnvironment>[] subprocs) { assert subprocs != null : "expected subprocedures"; final long rootProcId = getRootProcedureId(procedure); for (int i = 0; i < subprocs.length; ++i) { - final Procedure subproc = subprocs[i]; + Procedure<TEnvironment> subproc = subprocs[i]; if (subproc == null) { String msg = "subproc[" + i + "] is null, aborting the procedure"; procedure.setFailure(new RemoteProcedureException(msg, @@ -1609,9 +1650,9 @@ public class ProcedureExecutor<TEnvironment> { return subprocs; } - private void submitChildrenProcedures(final Procedure[] subprocs) { + private void submitChildrenProcedures(Procedure<TEnvironment>[] subprocs) { for (int i = 0; i < subprocs.length; ++i) { - final Procedure subproc = subprocs[i]; + Procedure<TEnvironment> subproc = subprocs[i]; subproc.updateMetricsOnSubmit(getEnvironment()); assert !procedures.containsKey(subproc.getProcId()); procedures.put(subproc.getProcId(), subproc); @@ -1619,8 +1660,9 @@ public class ProcedureExecutor<TEnvironment> { } } - private void countDownChildren(final RootProcedureState procStack, final Procedure procedure) { - final Procedure parent = procedures.get(procedure.getParentProcId()); + private void countDownChildren(RootProcedureState<TEnvironment> procStack, + Procedure<TEnvironment> procedure) { + Procedure<TEnvironment> parent = procedures.get(procedure.getParentProcId()); if (parent == null) { assert procStack.isRollingback(); return; @@ -1637,17 +1679,15 @@ public class ProcedureExecutor<TEnvironment> { } } - private void updateStoreOnExec(final RootProcedureState procStack, - final Procedure procedure, final Procedure[] subprocs) { + private void updateStoreOnExec(RootProcedureState<TEnvironment> procStack, + Procedure<TEnvironment> procedure, Procedure<TEnvironment>[] subprocs) { if (subprocs != null && !procedure.isFailed()) { if (LOG.isTraceEnabled()) { LOG.trace("Stored " + procedure + ", children " + Arrays.toString(subprocs)); } store.insert(procedure, subprocs); } else { - if (LOG.isTraceEnabled()) { - LOG.trace("Store update " + procedure); - } + LOG.trace("Store update {}", procedure); if (procedure.isFinished() && !procedure.hasParent()) { // remove child procedures final long[] childProcIds = procStack.getSubprocedureIds(); @@ -1665,11 +1705,8 @@ public class ProcedureExecutor<TEnvironment> { } } - private void handleInterruptedException(final Procedure proc, final InterruptedException e) { - if (LOG.isTraceEnabled()) { - LOG.trace("Interrupt during " + proc + ". suspend and retry it later.", e); - } - + private void handleInterruptedException(Procedure<TEnvironment> proc, InterruptedException e) { + LOG.trace("Interrupt during {}. suspend and retry it later.", proc, e); // NOTE: We don't call Thread.currentThread().interrupt() // because otherwise all the subsequent calls e.g. Thread.sleep() will throw // the InterruptedException. If the master is going down, we will be notified @@ -1677,9 +1714,13 @@ public class ProcedureExecutor<TEnvironment> { // (The interrupted procedure will be retried on the next run) } - private void execCompletionCleanup(final Procedure proc) { + private void execCompletionCleanup(Procedure<TEnvironment> proc) { final TEnvironment env = getEnvironment(); - if (proc.holdLock(env) && proc.hasLock(env)) { + if (proc.hasLock()) { + LOG.warn("Usually this should not happen, we will release the lock before if the procedure" + + " is finished, even if the holdLock is true, arrive here means we have some holes where" + + " we do not release the lock. And the releaseLock below may fail since the procedure may" + + " have already been deleted from the procedure store."); releaseLock(proc, true); } try { @@ -1690,11 +1731,11 @@ public class ProcedureExecutor<TEnvironment> { } } - private void procedureFinished(final Procedure proc) { + private void procedureFinished(Procedure<TEnvironment> proc) { // call the procedure completion cleanup handler execCompletionCleanup(proc); - CompletedProcedureRetainer retainer = new CompletedProcedureRetainer(proc); + CompletedProcedureRetainer<TEnvironment> retainer = new CompletedProcedureRetainer<>(proc); // update the executor internal state maps if (!proc.shouldWaitClientAck(getEnvironment())) { @@ -1710,14 +1751,14 @@ public class ProcedureExecutor<TEnvironment> { scheduler.completionCleanup(proc); } catch (Throwable e) { // Catch NullPointerExceptions or similar errors... - LOG.error("CODE-BUG: uncatched runtime exception for completion cleanup: " + proc, e); + LOG.error("CODE-BUG: uncatched runtime exception for completion cleanup: {}", proc, e); } // Notify the listeners sendProcedureFinishedNotification(proc.getProcId()); } - RootProcedureState getProcStack(long rootProcId) { + RootProcedureState<TEnvironment> getProcStack(long rootProcId) { return rollbackStack.get(rootProcId); } @@ -1726,7 +1767,7 @@ public class ProcedureExecutor<TEnvironment> { // ========================================================================== private class WorkerThread extends StoppableThread { private final AtomicLong executionStartTime = new AtomicLong(Long.MAX_VALUE); - private volatile Procedure<?> activeProcedure; + private volatile Procedure<TEnvironment> activeProcedure; public WorkerThread(ThreadGroup group) { this(group, "PEWorker-"); @@ -1747,7 +1788,7 @@ public class ProcedureExecutor<TEnvironment> { long lastUpdate = EnvironmentEdgeManager.currentTime(); try { while (isRunning() && keepAlive(lastUpdate)) { - Procedure<?> proc = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS); + Procedure<TEnvironment> proc = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS); if (proc == null) { continue; } http://git-wip-us.apache.org/repos/asf/hbase/blob/833657c4/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureUtil.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureUtil.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureUtil.java index c42dfc4..1215008 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureUtil.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureUtil.java @@ -202,6 +202,9 @@ public final class ProcedureUtil { builder.setNonce(proc.getNonceKey().getNonce()); } + if (proc.hasLock()) { + builder.setLocked(true); + } return builder.build(); } @@ -255,6 +258,10 @@ public final class ProcedureUtil { proc.setNonceKey(new NonceKey(proto.getNonceGroup(), proto.getNonce())); } + if (proto.getLocked()) { + proc.lockedWhenLoading(); + } + ProcedureStateSerializer serializer = null; if (proto.getStateMessageCount() > 0) {
