HBASE-17149 Procedure V2 - Fix nonce submission to avoid unnecessary calling coprocessor multiple times (Matteo Bertozzi)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e32f8ac4 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e32f8ac4 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e32f8ac4 Branch: refs/heads/branch-1.1 Commit: e32f8ac425fcde5d98c718062ad00beb26be1dfc Parents: a1d900d Author: Stephen Yuan Jiang <[email protected]> Authored: Thu Dec 29 08:48:28 2016 -0800 Committer: Stephen Yuan Jiang <[email protected]> Committed: Thu Dec 29 08:48:28 2016 -0800 ---------------------------------------------------------------------- .../hadoop/hbase/procedure2/Procedure.java | 68 ++- .../hbase/procedure2/ProcedureExecutor.java | 155 +++++-- .../hbase/procedure2/SequentialProcedure.java | 2 +- .../procedure2/ProcedureTestingUtility.java | 22 +- .../hbase/procedure2/TestProcedureNonce.java | 284 ++++++++++++ .../hbase/procedure2/TestProcedureRecovery.java | 35 -- .../org/apache/hadoop/hbase/master/HMaster.java | 463 ++++++++++++------- .../master/procedure/MasterProcedureUtil.java | 88 ++++ .../procedure/TestAddColumnFamilyProcedure.java | 70 +-- .../procedure/TestCreateTableProcedure.java | 42 +- .../TestDeleteColumnFamilyProcedure.java | 78 +--- .../procedure/TestDeleteTableProcedure.java | 44 +- .../procedure/TestDisableTableProcedure.java | 40 +- .../procedure/TestEnableTableProcedure.java | 46 +- .../TestModifyColumnFamilyProcedure.java | 31 +- .../procedure/TestModifyTableProcedure.java | 17 +- .../master/procedure/TestProcedureAdmin.java | 16 +- .../procedure/TestTruncateTableProcedure.java | 12 +- 18 files changed, 921 insertions(+), 592 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/e32f8ac4/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java index 813bbf5..44e1da1 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java @@ -87,10 +87,13 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> { * The main code of the procedure. It must be idempotent since execute() * may be called multiple time in case of machine failure in the middle * of the execution. + * @param env the environment passed to the ProcedureExecutor * @return a set of sub-procedures or null if there is nothing else to execute. + * @throws ProcedureYieldException the procedure will be added back to the queue and retried later + * @throws InterruptedException the procedure will be added back to the queue and retried later */ protected abstract Procedure[] execute(TEnvironment env) - throws ProcedureYieldException; + throws ProcedureYieldException, InterruptedException; /** * The code to undo what done by the execute() code. @@ -99,6 +102,8 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> { * the execute() call. The implementation must be idempotent since rollback() * may be called multiple time in case of machine failure in the middle * of the execution. + * + * @param env the environment passed to the ProcedureExecutor * @throws IOException temporary failure, the rollback will retry later */ protected abstract void rollback(TEnvironment env) @@ -115,6 +120,8 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> { * NOTE: abort() is not like Thread.interrupt() it is just a notification * that allows the procedure implementor where to abort to avoid leak and * have a better control on what was executed and what not. + * + * @param env the environment passed to the ProcedureExecutor */ protected abstract boolean abort(TEnvironment env); @@ -122,6 +129,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> { * The user-level code of the procedure may have some state to * persist (e.g. input arguments) to be able to resume on failure. * @param stream the stream that will contain the user serialized data + * @throws IOException failure to stream data */ protected abstract void serializeStateData(final OutputStream stream) throws IOException; @@ -141,6 +149,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> { * create t1 and create t2 can be executed at the same time. * anything else on t1/t2 is queued waiting that specific table create to happen. * + * @param env the environment passed to the ProcedureExecutor * @return true if the lock was acquired and false otherwise */ protected boolean acquireLock(final TEnvironment env) { @@ -149,6 +158,8 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> { /** * The user should override this method, and release lock if necessary. + * + * @param env the environment passed to the ProcedureExecutor */ protected void releaseLock(final TEnvironment env) { // no-op @@ -159,6 +170,8 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> { * The procedure implementor may use this method to perform some quick * operation before replay. * e.g. failing the procedure if the state on replay may be unknown. + * + * @param env the environment passed to the ProcedureExecutor */ protected void beforeReplay(final TEnvironment env) { // no-op @@ -168,6 +181,8 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> { * Called when the procedure is marked as completed (success or rollback). * The procedure implementor may use this method to cleanup in-memory states. * This operation will not be retried on failure. + * + * @param env the environment passed to the ProcedureExecutor */ protected void completionCleanup(final TEnvironment env) { // no-op @@ -368,6 +383,9 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> { return Math.max(0, timeout - (EnvironmentEdgeManager.currentTime() - startTime)); } + /** + * @param owner the owner passed in + */ @VisibleForTesting @InterfaceAudience.Private public void setOwner(final String owner) { @@ -382,6 +400,9 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> { return owner != null; } + /** + * @param state current procedure state + */ @VisibleForTesting @InterfaceAudience.Private protected synchronized void setState(final ProcedureState state) { @@ -394,10 +415,17 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> { return state; } + /** + * @param source exception string + * @param cause the cause of failure + */ protected void setFailure(final String source, final Throwable cause) { setFailure(new RemoteProcedureException(source, cause)); } + /** + * @param exception exception thrown + */ protected synchronized void setFailure(final RemoteProcedureException exception) { this.exception = exception; if (!isFinished()) { @@ -405,6 +433,10 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> { } } + /** + * @param source exception string + * @param msg message to pass on + */ protected void setAbortFailure(final String source, final String msg) { setFailure(source, new ProcedureAbortedException(msg)); } @@ -433,6 +465,8 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> { /** * Called by the ProcedureExecutor to assign the parent to the newly created procedure. + * + * @param parentProcId parent procedure Id */ @InterfaceAudience.Private protected void setParentProcId(final long parentProcId) { @@ -441,6 +475,8 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> { /** * Called by the ProcedureExecutor to set the value to the newly created procedure. + * + * @param nonceKey the key to detect duplicate call */ @VisibleForTesting @InterfaceAudience.Private @@ -451,10 +487,14 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> { /** * Internal method called by the ProcedureExecutor that starts the * user-level code execute(). + * @param env the environment passed to the ProcedureExecutor + * @return a set of sub-procedures or null if there is nothing else to execute. + * @throws ProcedureYieldException the procedure will be added back to the queue and retried later + * @throws InterruptedException */ @InterfaceAudience.Private protected Procedure[] doExecute(final TEnvironment env) - throws ProcedureYieldException { + throws ProcedureYieldException, InterruptedException { try { updateTimestamp(); return execute(env); @@ -466,6 +506,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> { /** * Internal method called by the ProcedureExecutor that starts the * user-level code rollback(). + * @param env the environment passed to the ProcedureExecutor */ @InterfaceAudience.Private protected void doRollback(final TEnvironment env) throws IOException { @@ -480,6 +521,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> { /** * Called on store load to initialize the Procedure internals after * the creation/deserialization. + * @param startTime procedure start time */ @InterfaceAudience.Private protected void setStartTime(final long startTime) { @@ -489,6 +531,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> { /** * Called on store load to initialize the Procedure internals after * the creation/deserialization. + * @param lastUpdate last time to update procedure */ private synchronized void setLastUpdate(final long lastUpdate) { this.lastUpdate = lastUpdate; @@ -500,6 +543,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> { /** * Called by the ProcedureExecutor on procedure-load to restore the latch state + * @param numChildren children count */ @InterfaceAudience.Private protected synchronized void setChildrenLatch(final int numChildren) { @@ -528,6 +572,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> { /** * Called by the RootProcedureState on procedure execution. * Each procedure store its stack-index positions. + * @param index the place where procedure is in */ @InterfaceAudience.Private protected synchronized void addStackIndex(final int index) { @@ -554,6 +599,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> { /** * Called on store load to initialize the Procedure internals after * the creation/deserialization. + * @param stackIndexes the list of positions of procedures */ @InterfaceAudience.Private protected synchronized void setStackIndexes(final List<Integer> stackIndexes) { @@ -573,6 +619,9 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> { return stackIndexes; } + /** + * @param other the procedure to compare to + */ @Override public int compareTo(final Procedure other) { long diff = getProcId() - other.getProcId(); @@ -581,6 +630,8 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> { /* * Helper to lookup the root Procedure ID given a specified procedure. + * @param procedures list of procedure + * @param the procedure to look for */ @InterfaceAudience.Private protected static Long getRootProcedureId(final Map<Long, Procedure> procedures, Procedure proc) { @@ -591,6 +642,10 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> { return proc.getProcId(); } + /* + * @param className procedure class name + * @throws IOException failure + */ protected static Procedure newInstance(final String className) throws IOException { try { Class<?> clazz = Class.forName(className); @@ -610,6 +665,10 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> { } } + /* + * @param proc procedure + * @throws IOException failure + */ protected static void validateClass(final Procedure proc) throws IOException { try { Class<?> clazz = proc.getClass(); @@ -630,6 +689,8 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> { /** * Helper to create the ProcedureInfo from Procedure. + * @param proc procedure + * @param nonceKey the key to detect duplicate call */ @InterfaceAudience.Private public static ProcedureInfo createProcedureInfo(final Procedure proc, final NonceKey nonceKey) { @@ -651,6 +712,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> { /** * Helper to convert the procedure to protobuf. * Used by ProcedureStore implementations. + * @param proc procedure */ @InterfaceAudience.Private public static ProcedureProtos.Procedure convert(final Procedure proc) @@ -717,6 +779,8 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> { * (e.g. className, procId, parentId, ...). * We can split in 'data' and 'state', and the store * may take advantage of it by storing the data only on insert(). + * + * @param proto procedure protobuf */ @InterfaceAudience.Private public static Procedure convert(final ProcedureProtos.Procedure proto) http://git-wip-us.apache.org/repos/asf/hbase/blob/e32f8ac4/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java index 5277fa2..7dfea4d 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java @@ -52,8 +52,10 @@ import org.apache.hadoop.hbase.procedure2.util.TimeoutBlockingQueue.TimeoutRetri import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureState; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.ForeignExceptionUtil; import org.apache.hadoop.hbase.util.NonceKey; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.Threads; import com.google.common.base.Preconditions; @@ -556,59 +558,142 @@ public class ProcedureExecutor<TEnvironment> { return procedureLists; } + // ========================================================================== + // Nonce Procedure helpers + // ========================================================================== + /** + * Create a NoneKey from the specified nonceGroup and nonce. + * @param nonceGroup + * @param nonce + * @return the generated NonceKey + */ + public NonceKey createNonceKey(final long nonceGroup, final long nonce) { + return (nonce == HConstants.NO_NONCE) ? null : new NonceKey(nonceGroup, nonce); + } + + /** + * Register a nonce for a procedure that is going to be submitted. + * A procId will be reserved and on submitProcedure(), + * the procedure with the specified nonce will take the reserved ProcId. + * If someone already reserved the nonce, this method will return the procId reserved, + * otherwise an invalid procId will be returned. and the caller should procede + * and submit the procedure. + * + * @param nonceKey A unique identifier for this operation from the client or process. + * @return the procId associated with the nonce, if any otherwise an invalid procId. + */ + public long registerNonce(final NonceKey nonceKey) { + if (nonceKey == null) return -1; + + // check if we have already a Reserved ID for the nonce + Long oldProcId = nonceKeysToProcIdsMap.get(nonceKey); + if (oldProcId == null) { + // reserve a new Procedure ID, this will be associated with the nonce + // and the procedure submitted with the specified nonce will use this ID. + final long newProcId = nextProcId(); + oldProcId = nonceKeysToProcIdsMap.putIfAbsent(nonceKey, newProcId); + if (oldProcId == null) return -1; + } + + // we found a registered nonce, but the procedure may not have been submitted yet. + // since the client expect the procedure to be submitted, spin here until it is. + final boolean isTraceEnabled = LOG.isTraceEnabled(); + while (isRunning() && + !(procedures.containsKey(oldProcId) || completed.containsKey(oldProcId)) && + nonceKeysToProcIdsMap.containsKey(nonceKey)) { + if (isTraceEnabled) { + LOG.trace("waiting for procId=" + oldProcId.longValue() + " to be submitted"); + } + Threads.sleep(100); + } + return oldProcId.longValue(); + } + + /** + * Remove the NonceKey if the procedure was not submitted to the executor. + * @param nonceKey A unique identifier for this operation from the client or process. + */ + public void unregisterNonceIfProcedureWasNotSubmitted(final NonceKey nonceKey) { + if (nonceKey == null) return; + + final Long procId = nonceKeysToProcIdsMap.get(nonceKey); + if (procId == null) return; + + // if the procedure was not submitted, remove the nonce + if (!(procedures.containsKey(procId) || completed.containsKey(procId))) { + nonceKeysToProcIdsMap.remove(nonceKey); + } + } + + /** + * If the failure failed before submitting it, we may want to give back the + * same error to the requests with the same nonceKey. + * + * @param nonceKey A unique identifier for this operation from the client or process + * @param procName name of the procedure, used to inform the user + * @param procOwner name of the owner of the procedure, used to inform the user + * @param exception the failure to report to the user + */ + public void setFailureResultForNonce(final NonceKey nonceKey, final String procName, + final User procOwner, final IOException exception) { + if (nonceKey == null) return; + + final Long procId = nonceKeysToProcIdsMap.get(nonceKey); + if (procId == null || completed.containsKey(procId)) return; + + final long currentTime = EnvironmentEdgeManager.currentTime(); + final ProcedureInfo result = new ProcedureInfo( + procId.longValue(), + procName, + procOwner != null ? procOwner.getShortName() : null, + ProcedureState.ROLLEDBACK, + -1, + nonceKey, + ForeignExceptionUtil.toProtoForeignException("ProcedureExecutor", exception), + currentTime, + currentTime, + null); + completed.putIfAbsent(procId, result); + } + + // ========================================================================== + // Submit/Abort Procedure + // ========================================================================== /** * Add a new root-procedure to the executor. * @param proc the new procedure to execute. * @return the procedure id, that can be used to monitor the operation */ public long submitProcedure(final Procedure proc) { - return submitProcedure(proc, HConstants.NO_NONCE, HConstants.NO_NONCE); + return submitProcedure(proc, null); } /** * Add a new root-procedure to the executor. * @param proc the new procedure to execute. - * @param nonceGroup - * @param nonce + * @param nonceKey the registered unique identifier for this operation from the client or process. * @return the procedure id, that can be used to monitor the operation */ - public long submitProcedure( - final Procedure proc, - final long nonceGroup, - final long nonce) { + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH", + justification = "FindBugs is blind to the check-for-null") + public long submitProcedure(final Procedure proc, final NonceKey nonceKey) { Preconditions.checkArgument(proc.getState() == ProcedureState.INITIALIZING); - Preconditions.checkArgument(isRunning()); + Preconditions.checkArgument(isRunning(), "executor not running"); Preconditions.checkArgument(lastProcId.get() >= 0); - Preconditions.checkArgument(!proc.hasParent()); - - Long currentProcId; - - // The following part of the code has to be synchronized to prevent multiple request - // with the same nonce to execute at the same time. - synchronized (this) { - // Check whether the proc exists. If exist, just return the proc id. - // This is to prevent the same proc to submit multiple times (it could happen - // when client could not talk to server and resubmit the same request). - NonceKey noncekey = null; - if (nonce != HConstants.NO_NONCE) { - noncekey = new NonceKey(nonceGroup, nonce); - currentProcId = nonceKeysToProcIdsMap.get(noncekey); - if (currentProcId != null) { - // Found the proc - return currentProcId; - } - } + Preconditions.checkArgument(!proc.hasParent(), "unexpected parent", proc); - // Initialize the Procedure ID + final Long currentProcId; + if (nonceKey != null) { + currentProcId = nonceKeysToProcIdsMap.get(nonceKey); + Preconditions.checkArgument(currentProcId != null, + "expected nonceKey=" + nonceKey + " to be reserved, use registerNonce()"); + } else { currentProcId = nextProcId(); - proc.setProcId(currentProcId); + } - // This is new procedure. Set the noncekey and insert into the map. - if (noncekey != null) { - proc.setNonceKey(noncekey); - nonceKeysToProcIdsMap.put(noncekey, currentProcId); - } - } // end of synchronized (this) + // Initialize the procedure + proc.setNonceKey(nonceKey); + proc.setProcId(currentProcId.longValue()); // Commit the transaction store.insert(proc, null); http://git-wip-us.apache.org/repos/asf/hbase/blob/e32f8ac4/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SequentialProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SequentialProcedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SequentialProcedure.java index 8ddb36e..61b2911 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SequentialProcedure.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SequentialProcedure.java @@ -42,7 +42,7 @@ public abstract class SequentialProcedure<TEnvironment> extends Procedure<TEnvir @Override protected Procedure[] doExecute(final TEnvironment env) - throws ProcedureYieldException { + throws ProcedureYieldException, InterruptedException { updateTimestamp(); try { Procedure[] children = !executed ? execute(env) : null; http://git-wip-us.apache.org/repos/asf/hbase/blob/e32f8ac4/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java index bffc53f..bdffae8 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java @@ -35,7 +35,8 @@ import org.apache.hadoop.hbase.exceptions.TimeoutIOException; import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; import org.apache.hadoop.hbase.protobuf.generated.ErrorHandlingProtos.ForeignExceptionMessage; - +import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos; +import org.apache.hadoop.hbase.util.NonceKey; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -133,13 +134,20 @@ public class ProcedureTestingUtility { } public static <TEnv> long submitAndWait(ProcedureExecutor<TEnv> procExecutor, Procedure proc, - final long nonceGroup, - final long nonce) { - long procId = procExecutor.submitProcedure(proc, nonceGroup, nonce); + final long nonceGroup, final long nonce) { + long procId = submitProcedure(procExecutor, proc, nonceGroup, nonce); waitProcedure(procExecutor, procId); return procId; } + public static <TEnv> long submitProcedure(ProcedureExecutor<TEnv> procExecutor, Procedure proc, + final long nonceGroup, final long nonce) { + final NonceKey nonceKey = procExecutor.createNonceKey(nonceGroup, nonce); + long procId = procExecutor.registerNonce(nonceKey); + assertFalse(procId >= 0); + return procExecutor.submitProcedure(proc, nonceKey); + } + public static <TEnv> void waitProcedure(ProcedureExecutor<TEnv> procExecutor, long procId) { while (!procExecutor.isFinished(procId) && procExecutor.isRunning()) { Threads.sleepWithoutInterrupt(250); @@ -178,6 +186,12 @@ public class ProcedureTestingUtility { assertFalse(msg, result.isFailed()); } + public static Throwable assertProcFailed(final ProcedureInfo result) { + assertEquals(true, result.isFailed()); + LOG.info("procId=" + result.getProcId() + " exception: " + result.getException().getMessage()); + return getExceptionCause(result); + } + public static void assertIsAbortException(final ProcedureInfo result) { assertEquals(true, result.isFailed()); LOG.info(result.getExceptionFullMessage()); http://git-wip-us.apache.org/repos/asf/hbase/blob/e32f8ac4/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureNonce.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureNonce.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureNonce.java new file mode 100644 index 0000000..312ca5b --- /dev/null +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureNonce.java @@ -0,0 +1,284 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.procedure2; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.CountDownLatch; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseCommonTestingUtility; +import org.apache.hadoop.hbase.ProcedureInfo; +import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.NonceKey; +import org.apache.hadoop.hbase.util.Threads; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +@Category({MasterTests.class, SmallTests.class}) +public class TestProcedureNonce { + private static final Log LOG = LogFactory.getLog(TestProcedureNonce.class); + + private static final int PROCEDURE_EXECUTOR_SLOTS = 2; + + private static TestProcEnv procEnv; + private static ProcedureExecutor<TestProcEnv> procExecutor; + private static ProcedureStore procStore; + + private HBaseCommonTestingUtility htu; + private FileSystem fs; + private Path logDir; + + @Before + public void setUp() throws IOException { + htu = new HBaseCommonTestingUtility(); + Path testDir = htu.getDataTestDir(); + fs = testDir.getFileSystem(htu.getConfiguration()); + assertTrue(testDir.depth() > 1); + + logDir = new Path(testDir, "proc-logs"); + procEnv = new TestProcEnv(); + procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), fs, logDir); + procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore); + procExecutor.testing = new ProcedureExecutor.Testing(); + procStore.start(PROCEDURE_EXECUTOR_SLOTS); + procExecutor.start(PROCEDURE_EXECUTOR_SLOTS); + } + + @After + public void tearDown() throws IOException { + procExecutor.stop(); + procStore.stop(false); + fs.delete(logDir, true); + } + + @Test(timeout=30000) + public void testCompletedProcWithSameNonce() throws Exception { + final long nonceGroup = 123; + final long nonce = 2222; + + // register the nonce + final NonceKey nonceKey = procExecutor.createNonceKey(nonceGroup, nonce); + assertFalse(procExecutor.registerNonce(nonceKey) >= 0); + + // Submit a proc and wait for its completion + Procedure proc = new TestSingleStepProcedure(); + long procId = procExecutor.submitProcedure(proc, nonceKey); + ProcedureTestingUtility.waitProcedure(procExecutor, procId); + + // Restart + ProcedureTestingUtility.restart(procExecutor); + ProcedureTestingUtility.waitProcedure(procExecutor, procId); + + // try to register a procedure with the same nonce + // we should get back the old procId + assertEquals(procId, procExecutor.registerNonce(nonceKey)); + + ProcedureInfo result = procExecutor.getResult(procId); + ProcedureTestingUtility.assertProcNotFailed(result); + } + + @Test(timeout=30000) + public void testRunningProcWithSameNonce() throws Exception { + final long nonceGroup = 456; + final long nonce = 33333; + + // register the nonce + final NonceKey nonceKey = procExecutor.createNonceKey(nonceGroup, nonce); + assertFalse(procExecutor.registerNonce(nonceKey) >= 0); + + // Submit a proc and use a latch to prevent the step execution until we submitted proc2 + CountDownLatch latch = new CountDownLatch(1); + TestSingleStepProcedure proc = new TestSingleStepProcedure(); + procEnv.setWaitLatch(latch); + long procId = procExecutor.submitProcedure(proc, nonceKey); + while (proc.step != 1) Threads.sleep(25); + + // try to register a procedure with the same nonce + // we should get back the old procId + assertEquals(procId, procExecutor.registerNonce(nonceKey)); + + // complete the procedure + latch.countDown(); + + // Restart, the procedure is not completed yet + ProcedureTestingUtility.restart(procExecutor); + ProcedureTestingUtility.waitProcedure(procExecutor, procId); + + // try to register a procedure with the same nonce + // we should get back the old procId + assertEquals(procId, procExecutor.registerNonce(nonceKey)); + + ProcedureInfo result = procExecutor.getResult(procId); + ProcedureTestingUtility.assertProcNotFailed(result); + } + + @Test + public void testSetFailureResultForNonce() throws IOException { + final long nonceGroup = 234; + final long nonce = 55555; + + // check and register the request nonce + final NonceKey nonceKey = procExecutor.createNonceKey(nonceGroup, nonce); + assertFalse(procExecutor.registerNonce(nonceKey) >= 0); + + procExecutor.setFailureResultForNonce(nonceKey, "testProc", User.getCurrent(), + new IOException("test failure")); + + final long procId = procExecutor.registerNonce(nonceKey); + ProcedureInfo result = procExecutor.getResult(procId); + ProcedureTestingUtility.assertProcFailed(result); + } + + @Test(timeout=30000) + public void testConcurrentNonceRegistration() throws IOException { + testConcurrentNonceRegistration(true, 567, 44444); + } + + @Test(timeout=30000) + public void testConcurrentNonceRegistrationWithRollback() throws IOException { + testConcurrentNonceRegistration(false, 890, 55555); + } + + private void testConcurrentNonceRegistration(final boolean submitProcedure, + final long nonceGroup, final long nonce) throws IOException { + // register the nonce + final NonceKey nonceKey = procExecutor.createNonceKey(nonceGroup, nonce); + + final AtomicReference<Throwable> t1Exception = new AtomicReference(); + final AtomicReference<Throwable> t2Exception = new AtomicReference(); + + final CountDownLatch t1NonceRegisteredLatch = new CountDownLatch(1); + final CountDownLatch t2BeforeNonceRegisteredLatch = new CountDownLatch(1); + final Thread[] threads = new Thread[2]; + threads[0] = new Thread() { + @Override + public void run() { + try { + // release the nonce and wake t2 + assertFalse("unexpected already registered nonce", + procExecutor.registerNonce(nonceKey) >= 0); + t1NonceRegisteredLatch.countDown(); + + // hold the submission until t2 is registering the nonce + t2BeforeNonceRegisteredLatch.await(); + Threads.sleep(1000); + + if (submitProcedure) { + CountDownLatch latch = new CountDownLatch(1); + TestSingleStepProcedure proc = new TestSingleStepProcedure(); + procEnv.setWaitLatch(latch); + + procExecutor.submitProcedure(proc, nonceKey); + Threads.sleep(100); + + // complete the procedure + latch.countDown(); + } else { + procExecutor.unregisterNonceIfProcedureWasNotSubmitted(nonceKey); + } + } catch (Throwable e) { + t1Exception.set(e); + } finally { + t1NonceRegisteredLatch.countDown(); + t2BeforeNonceRegisteredLatch.countDown(); + } + } + }; + + threads[1] = new Thread() { + @Override + public void run() { + try { + // wait until t1 has registered the nonce + t1NonceRegisteredLatch.await(); + + // register the nonce + t2BeforeNonceRegisteredLatch.countDown(); + assertFalse("unexpected non registered nonce", + procExecutor.registerNonce(nonceKey) < 0); + } catch (Throwable e) { + t2Exception.set(e); + } finally { + t1NonceRegisteredLatch.countDown(); + t2BeforeNonceRegisteredLatch.countDown(); + } + } + }; + + for (int i = 0; i < threads.length; ++i) threads[i].start(); + for (int i = 0; i < threads.length; ++i) Threads.shutdown(threads[i]); + ProcedureTestingUtility.waitNoProcedureRunning(procExecutor); + assertEquals(null, t1Exception.get()); + assertEquals(null, t2Exception.get()); + } + + public static class TestSingleStepProcedure extends SequentialProcedure<TestProcEnv> { + private int step = 0; + + public TestSingleStepProcedure() { } + + @Override + protected Procedure[] execute(TestProcEnv env) throws InterruptedException { + step++; + env.waitOnLatch(); + LOG.debug("execute procedure " + this + " step=" + step); + step++; + setResult(Bytes.toBytes(step)); + return null; + } + + @Override + protected void rollback(TestProcEnv env) { } + + @Override + protected boolean abort(TestProcEnv env) { return true; } + } + + private static class TestProcEnv { + private CountDownLatch latch = null; + + /** + * set/unset a latch. every procedure execute() step will wait on the latch if any. + */ + public void setWaitLatch(CountDownLatch latch) { + this.latch = latch; + } + + public void waitOnLatch() throws InterruptedException { + if (latch != null) { + latch.await(); + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/e32f8ac4/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java index 1039952..e4881e8 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java @@ -285,41 +285,6 @@ public class TestProcedureRecovery { ProcedureTestingUtility.assertIsAbortException(result); } - @Test(timeout=30000) - public void testCompletedProcWithSameNonce() throws Exception { - final long nonceGroup = 123; - final long nonce = 2222; - Procedure proc = new TestSingleStepProcedure(); - // Submit a proc and wait for its completion - long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc, nonceGroup, nonce); - - // Restart - restart(); - Procedure proc2 = new TestSingleStepProcedure(); - // Submit a procedure with the same nonce and expect the same procedure would return. - long procId2 = ProcedureTestingUtility.submitAndWait(procExecutor, proc2, nonceGroup, nonce); - assertTrue(procId == procId2); - - ProcedureInfo result = procExecutor.getResult(procId2); - ProcedureTestingUtility.assertProcNotFailed(result); - } - - @Test(timeout=30000) - public void testRunningProcWithSameNonce() throws Exception { - final long nonceGroup = 456; - final long nonce = 33333; - Procedure proc = new TestMultiStepProcedure(); - long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc, nonceGroup, nonce); - - // Restart - restart(); - Procedure proc2 = new TestMultiStepProcedure(); - // Submit a procedure with the same nonce and expect the same procedure would return. - long procId2 = ProcedureTestingUtility.submitAndWait(procExecutor, proc2, nonceGroup, nonce); - // The original proc is not completed and the new submission should have the same proc Id. - assertTrue(procId == procId2); - } - public static class TestStateMachineProcedure extends StateMachineProcedure<Void, TestStateMachineProcedure.State> { enum State { STATE_1, STATE_2, STATE_3, DONE } http://git-wip-us.apache.org/repos/asf/hbase/blob/e32f8ac4/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 9ed115f..b786522 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -111,6 +111,7 @@ import org.apache.hadoop.hbase.master.procedure.EnableTableProcedure; import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.ModifyColumnFamilyProcedure; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil; import org.apache.hadoop.hbase.master.procedure.ModifyTableProcedure; import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch; import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait; @@ -1458,42 +1459,54 @@ public class HMaster extends HRegionServer implements MasterServices, Server { String namespace = tableName.getNamespaceAsString(); ensureNamespaceExists(namespace); - HRegionInfo[] newRegions = ModifyRegionUtils.createHRegionInfos(hTableDescriptor, splitKeys); + final HRegionInfo[] newRegions = + ModifyRegionUtils.createHRegionInfos(hTableDescriptor, splitKeys); checkInitialized(); sanityCheckTableDescriptor(hTableDescriptor); - if (cpHost != null) { - cpHost.preCreateTable(hTableDescriptor, newRegions); - } - LOG.info(getClientIdAuditPrefix() + " create " + hTableDescriptor); - long procId = -1; if (isMasterProcedureExecutorEnabled()) { - // TODO: We can handle/merge duplicate requests, and differentiate the case of - // TableExistsException by saying if the schema is the same or not. - ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch(); - procId = this.procedureExecutor.submitProcedure( - new CreateTableProcedure( - procedureExecutor.getEnvironment(), hTableDescriptor, newRegions, latch), - nonceGroup, - nonce); - latch.await(); + return MasterProcedureUtil.submitProcedure( + new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) { + @Override + protected void run() throws IOException { + getMaster().getMasterCoprocessorHost().preCreateTable(hTableDescriptor, newRegions); + + LOG.info(getClientIdAuditPrefix() + " create " + hTableDescriptor); + + // TODO: We can handle/merge duplicate requests, and differentiate the case of + // TableExistsException by saying if the schema is the same or not. + ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch(); + submitProcedure(new CreateTableProcedure( + procedureExecutor.getEnvironment(), hTableDescriptor, newRegions, latch)); + latch.await(); + + getMaster().getMasterCoprocessorHost().postCreateTable(hTableDescriptor, newRegions); + } + + @Override + protected String getDescription() { + return "CreateTableProcedure"; + } + }); } else { try { this.quotaManager.checkNamespaceTableAndRegionQuota(tableName, newRegions.length); + if (cpHost != null) { + cpHost.preCreateTable(hTableDescriptor, newRegions); + } + LOG.info(getClientIdAuditPrefix() + " create " + hTableDescriptor); this.service.submit(new CreateTableHandler(this, this.fileSystemManager, hTableDescriptor, conf, newRegions, this).prepare()); + if (cpHost != null) { + cpHost.postCreateTable(hTableDescriptor, newRegions); + } } catch (IOException e) { this.quotaManager.removeTableFromNamespaceQuota(tableName); LOG.error("Exception occurred while creating the table " + tableName.getNameAsString(), e); throw e; } + return -1; } - - if (cpHost != null) { - cpHost.postCreateTable(hTableDescriptor, newRegions); - } - - return procId; } /** @@ -1728,29 +1741,41 @@ public class HMaster extends HRegionServer implements MasterServices, Server { final long nonceGroup, final long nonce) throws IOException { checkInitialized(); - if (cpHost != null) { - cpHost.preDeleteTable(tableName); - } - LOG.info(getClientIdAuditPrefix() + " delete " + tableName); - long procId = -1; if (isMasterProcedureExecutorEnabled()) { - // TODO: We can handle/merge duplicate request - ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch(); - procId = this.procedureExecutor.submitProcedure( - new DeleteTableProcedure(procedureExecutor.getEnvironment(), tableName, latch), - nonceGroup, - nonce); - latch.await(); + return MasterProcedureUtil.submitProcedure( + new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) { + @Override + protected void run() throws IOException { + getMaster().getMasterCoprocessorHost().preDeleteTable(tableName); + + LOG.info(getClientIdAuditPrefix() + " delete " + tableName); + + // TODO: We can handle/merge duplicate request + ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch(); + submitProcedure(new DeleteTableProcedure(procedureExecutor.getEnvironment(), + tableName, latch)); + latch.await(); + + getMaster().getMasterCoprocessorHost().postDeleteTable(tableName); + } + + @Override + protected String getDescription() { + return "DeleteTableProcedure"; + } + }); } else { + if (cpHost != null) { + cpHost.preDeleteTable(tableName); + } + LOG.info(getClientIdAuditPrefix() + " delete " + tableName); this.service.submit(new DeleteTableHandler(tableName, this, this).prepare()); + if (cpHost != null) { + cpHost.postDeleteTable(tableName); + } + return -1; } - - if (cpHost != null) { - cpHost.postDeleteTable(tableName); - } - - return procId; } @Override @@ -1760,26 +1785,40 @@ public class HMaster extends HRegionServer implements MasterServices, Server { final long nonceGroup, final long nonce) throws IOException { checkInitialized(); - if (cpHost != null) { - cpHost.preTruncateTable(tableName); - } - LOG.info(getClientIdAuditPrefix() + " truncate " + tableName); if (isMasterProcedureExecutorEnabled()) { - long procId = this.procedureExecutor.submitProcedure( - new TruncateTableProcedure(procedureExecutor.getEnvironment(), tableName, preserveSplits), - nonceGroup, - nonce); - ProcedureSyncWait.waitForProcedureToComplete(procedureExecutor, procId); + MasterProcedureUtil.submitProcedure( + new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) { + @Override + protected void run() throws IOException { + getMaster().getMasterCoprocessorHost().preTruncateTable(tableName); + + LOG.info(getClientIdAuditPrefix() + " truncate " + tableName); + + long procId = submitProcedure(new TruncateTableProcedure(procedureExecutor.getEnvironment(), + tableName, preserveSplits)); + ProcedureSyncWait.waitForProcedureToComplete(procedureExecutor, procId); + + getMaster().getMasterCoprocessorHost().postTruncateTable(tableName); + } + + @Override + protected String getDescription() { + return "TruncateTableProcedure"; + } + }); } else { + if (cpHost != null) { + cpHost.preTruncateTable(tableName); + } + LOG.info(getClientIdAuditPrefix() + " truncate " + tableName); TruncateTableHandler handler = new TruncateTableHandler(tableName, this, this, preserveSplits); handler.prepare(); handler.process(); - } - - if (cpHost != null) { - cpHost.postTruncateTable(tableName); + if (cpHost != null) { + cpHost.postTruncateTable(tableName); + } } } @@ -1793,27 +1832,41 @@ public class HMaster extends HRegionServer implements MasterServices, Server { checkInitialized(); checkCompression(columnDescriptor); checkEncryption(conf, columnDescriptor); - if (cpHost != null) { - if (cpHost.preAddColumn(tableName, columnDescriptor)) { - return; - } - } - - LOG.info(getClientIdAuditPrefix() + " add " + columnDescriptor); if (isMasterProcedureExecutorEnabled()) { - // Execute the operation synchronously - wait for the operation to complete before continuing. - long procId = this.procedureExecutor.submitProcedure( - new AddColumnFamilyProcedure( - procedureExecutor.getEnvironment(), tableName, columnDescriptor), - nonceGroup, - nonce); - ProcedureSyncWait.waitForProcedureToComplete(procedureExecutor, procId); + MasterProcedureUtil.submitProcedure( + new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) { + @Override + protected void run() throws IOException { + if (getMaster().getMasterCoprocessorHost().preAddColumn(tableName, columnDescriptor)) { + return; + } + + LOG.info(getClientIdAuditPrefix() + " add " + columnDescriptor); + // Execute the operation synchronously, wait for the operation to complete before + // continuing + long procId = submitProcedure(new AddColumnFamilyProcedure( + procedureExecutor.getEnvironment(), tableName, columnDescriptor)); + ProcedureSyncWait.waitForProcedureToComplete(procedureExecutor, procId); + + getMaster().getMasterCoprocessorHost().postAddColumn(tableName, columnDescriptor); + } + @Override + protected String getDescription() { + return "AddColumnFamilyProcedure"; + } + }); } else { + if (cpHost != null) { + if (cpHost.preAddColumn(tableName, columnDescriptor)) { + return; + } + } + LOG.info(getClientIdAuditPrefix() + " add " + columnDescriptor); new TableAddFamilyHandler(tableName, columnDescriptor, this, this).prepare().process(); - } - if (cpHost != null) { - cpHost.postAddColumn(tableName, columnDescriptor); + if (cpHost != null) { + cpHost.postAddColumn(tableName, columnDescriptor); + } } } @@ -1827,26 +1880,43 @@ public class HMaster extends HRegionServer implements MasterServices, Server { checkInitialized(); checkCompression(descriptor); checkEncryption(conf, descriptor); - if (cpHost != null) { - if (cpHost.preModifyColumn(tableName, descriptor)) { - return; - } - } - LOG.info(getClientIdAuditPrefix() + " modify " + descriptor); if (isMasterProcedureExecutorEnabled()) { - // Execute the operation synchronously - wait for the operation to complete before continuing. - long procId = this.procedureExecutor.submitProcedure( - new ModifyColumnFamilyProcedure(procedureExecutor.getEnvironment(), tableName, descriptor), - nonceGroup, - nonce); - ProcedureSyncWait.waitForProcedureToComplete(procedureExecutor, procId); + MasterProcedureUtil.submitProcedure( + new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) { + @Override + protected void run() throws IOException { + if (getMaster().getMasterCoprocessorHost().preModifyColumn(tableName, descriptor)) { + return; + } + + LOG.info(getClientIdAuditPrefix() + " modify " + descriptor); + + // Execute the operation synchronously - wait for the operation to complete before + // continuing. + long procId = submitProcedure(new ModifyColumnFamilyProcedure( + procedureExecutor.getEnvironment(), tableName, descriptor)); + ProcedureSyncWait.waitForProcedureToComplete(procedureExecutor, procId); + + getMaster().getMasterCoprocessorHost().postModifyColumn(tableName, descriptor); + } + + @Override + protected String getDescription() { + return "ModifyColumnFamilyProcedure"; + } + }); } else { + if (cpHost != null) { + if (cpHost.preModifyColumn(tableName, descriptor)) { + return; + } + } + LOG.info(getClientIdAuditPrefix() + " modify " + descriptor); new TableModifyFamilyHandler(tableName, descriptor, this, this).prepare().process(); - } - - if (cpHost != null) { - cpHost.postModifyColumn(tableName, descriptor); + if (cpHost != null) { + cpHost.postModifyColumn(tableName, descriptor); + } } } @@ -1858,103 +1928,140 @@ public class HMaster extends HRegionServer implements MasterServices, Server { final long nonce) throws IOException { checkInitialized(); - if (cpHost != null) { - if (cpHost.preDeleteColumn(tableName, columnName)) { - return; - } - } - LOG.info(getClientIdAuditPrefix() + " delete " + Bytes.toString(columnName)); if (isMasterProcedureExecutorEnabled()) { - // Execute the operation synchronously - wait for the operation to complete before continuing. - long procId = this.procedureExecutor.submitProcedure( - new DeleteColumnFamilyProcedure(procedureExecutor.getEnvironment(), tableName, columnName), - nonceGroup, - nonce); - ProcedureSyncWait.waitForProcedureToComplete(procedureExecutor, procId); + MasterProcedureUtil.submitProcedure( + new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) { + @Override + protected void run() throws IOException { + if (getMaster().getMasterCoprocessorHost().preDeleteColumn(tableName, columnName)) { + return; + } + + LOG.info(getClientIdAuditPrefix() + " delete " + Bytes.toString(columnName)); + + // Execute the operation synchronously - wait for the operation to complete before + // continuing. + long procId = submitProcedure(new DeleteColumnFamilyProcedure( + procedureExecutor.getEnvironment(), tableName, columnName)); + ProcedureSyncWait.waitForProcedureToComplete(procedureExecutor, procId); + + getMaster().getMasterCoprocessorHost().postDeleteColumn(tableName, columnName); + } + + @Override + protected String getDescription() { + return "DeleteColumnFamilyProcedure"; + } + }); } else { + if (cpHost != null) { + if (cpHost.preDeleteColumn(tableName, columnName)) { + return; + } + } + LOG.info(getClientIdAuditPrefix() + " delete " + Bytes.toString(columnName)); new TableDeleteFamilyHandler(tableName, columnName, this, this).prepare().process(); - } - - if (cpHost != null) { - cpHost.postDeleteColumn(tableName, columnName); + if (cpHost != null) { + cpHost.postDeleteColumn(tableName, columnName); + } } } @Override - public long enableTable( - final TableName tableName, - final long nonceGroup, - final long nonce) throws IOException { + public long enableTable(final TableName tableName, final long nonceGroup, final long nonce) + throws IOException { checkInitialized(); - if (cpHost != null) { - cpHost.preEnableTable(tableName); - } - LOG.info(getClientIdAuditPrefix() + " enable " + tableName); - long procId = -1; if (isMasterProcedureExecutorEnabled()) { - // Execute the operation asynchronously - client will check the progress of the operation - final ProcedurePrepareLatch prepareLatch = ProcedurePrepareLatch.createLatch(); - procId = this.procedureExecutor.submitProcedure( - new EnableTableProcedure( - procedureExecutor.getEnvironment(), tableName, false, prepareLatch), - nonceGroup, - nonce); - this.procedureExecutor.submitProcedure(new EnableTableProcedure(procedureExecutor - .getEnvironment(), tableName, false, prepareLatch)); - // Before returning to client, we want to make sure that the table is prepared to be - // enabled (the table is locked and the table state is set). - // - // Note: if the procedure throws exception, we will catch it and rethrow. - prepareLatch.await(); + return MasterProcedureUtil.submitProcedure( + new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) { + @Override + protected void run() throws IOException { + getMaster().getMasterCoprocessorHost().preEnableTable(tableName); + + LOG.info(getClientIdAuditPrefix() + " enable " + tableName); + + // Execute the operation asynchronously - client will check the progress of the operation + // In case the request is from a <1.1 client before returning, + // we want to make sure that the table is prepared to be + // enabled (the table is locked and the table state is set). + // Note: if the procedure throws exception, we will catch it and rethrow. + final ProcedurePrepareLatch prepareLatch = ProcedurePrepareLatch.createLatch(); + submitProcedure(new EnableTableProcedure( + procedureExecutor.getEnvironment(), tableName, false, prepareLatch)); + // Before returning to client, we want to make sure that the table is prepared to be + // enabled (the table is locked and the table state is set). + // + // Note: if the procedure throws exception, we will catch it and rethrow. + prepareLatch.await(); + + getMaster().getMasterCoprocessorHost().postEnableTable(tableName); + } + + @Override + protected String getDescription() { + return "EnableTableProcedure"; + } + }); } else { + if (cpHost != null) { + cpHost.preEnableTable(tableName); + } + LOG.info(getClientIdAuditPrefix() + " enable " + tableName); this.service.submit(new EnableTableHandler(this, tableName, assignmentManager, tableLockManager, false).prepare()); + if (cpHost != null) { + cpHost.postEnableTable(tableName); + } + return -1; } - - if (cpHost != null) { - cpHost.postEnableTable(tableName); - } - - return procId; } @Override - public long disableTable( - final TableName tableName, - final long nonceGroup, - final long nonce) throws IOException { + public long disableTable(final TableName tableName, final long nonceGroup, final long nonce) + throws IOException { checkInitialized(); - if (cpHost != null) { - cpHost.preDisableTable(tableName); - } - LOG.info(getClientIdAuditPrefix() + " disable " + tableName); - long procId = -1; if (isMasterProcedureExecutorEnabled()) { - // Execute the operation asynchronously - client will check the progress of the operation - final ProcedurePrepareLatch prepareLatch = ProcedurePrepareLatch.createLatch(); - procId = this.procedureExecutor.submitProcedure( - new DisableTableProcedure( - procedureExecutor.getEnvironment(), tableName, false, prepareLatch), - nonceGroup, - nonce); - // Before returning to client, we want to make sure that the table is prepared to be - // enabled (the table is locked and the table state is set). - // - // Note: if the procedure throws exception, we will catch it and rethrow. - prepareLatch.await(); + return MasterProcedureUtil.submitProcedure( + new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) { + @Override + protected void run() throws IOException { + getMaster().getMasterCoprocessorHost().preDisableTable(tableName); + + LOG.info(getClientIdAuditPrefix() + " disable " + tableName); + + // Execute the operation asynchronously - client will check the progress of the operation + // In case the request is from a <1.1 client before returning, + // we want to make sure that the table is prepared to be + // enabled (the table is locked and the table state is set). + // Note: if the procedure throws exception, we will catch it and rethrow. + final ProcedurePrepareLatch prepareLatch = ProcedurePrepareLatch.createLatch(); + submitProcedure(new DisableTableProcedure(procedureExecutor.getEnvironment(), + tableName, false, prepareLatch)); + prepareLatch.await(); + + getMaster().getMasterCoprocessorHost().postDisableTable(tableName); + } + + @Override + protected String getDescription() { + return "DisableTableProcedure"; + } + }); } else { + if (cpHost != null) { + cpHost.preDisableTable(tableName); + } + LOG.info(getClientIdAuditPrefix() + " disable " + tableName); this.service.submit(new DisableTableHandler(this, tableName, assignmentManager, tableLockManager, false).prepare()); + if (cpHost != null) { + cpHost.postDisableTable(tableName); + } + return -1; } - - if (cpHost != null) { - cpHost.postDisableTable(tableName); - } - - return procId; } /** @@ -2002,25 +2109,37 @@ public class HMaster extends HRegionServer implements MasterServices, Server { throws IOException { checkInitialized(); sanityCheckTableDescriptor(descriptor); - if (cpHost != null) { - cpHost.preModifyTable(tableName, descriptor); - } - - LOG.info(getClientIdAuditPrefix() + " modify " + tableName); if (isMasterProcedureExecutorEnabled()) { - // Execute the operation synchronously - wait for the operation completes before continuing. - long procId = this.procedureExecutor.submitProcedure( - new ModifyTableProcedure(procedureExecutor.getEnvironment(), descriptor), - nonceGroup, - nonce); - ProcedureSyncWait.waitForProcedureToComplete(procedureExecutor, procId); + MasterProcedureUtil.submitProcedure( + new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) { + @Override + protected void run() throws IOException { + getMaster().getMasterCoprocessorHost().preModifyTable(tableName, descriptor); + LOG.info(getClientIdAuditPrefix() + " modify " + tableName); + + // Execute the operation synchronously - wait for the operation completes before continuing. + long procId = submitProcedure(new ModifyTableProcedure( + procedureExecutor.getEnvironment(), descriptor)); + ProcedureSyncWait.waitForProcedureToComplete(procedureExecutor, procId); + + getMaster().getMasterCoprocessorHost().postModifyTable(tableName, descriptor); + } + + @Override + protected String getDescription() { + return "ModifyTableProcedure"; + } + }); } else { + if (cpHost != null) { + cpHost.preModifyTable(tableName, descriptor); + } + LOG.info(getClientIdAuditPrefix() + " modify " + tableName); new ModifyTableHandler(tableName, descriptor, this, this).prepare().process(); - } - - if (cpHost != null) { - cpHost.postModifyTable(tableName, descriptor); + if (cpHost != null) { + cpHost.postModifyTable(tableName, descriptor); + } } } http://git-wip-us.apache.org/repos/asf/hbase/blob/e32f8ac4/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureUtil.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureUtil.java index d7c0b92..4759e7d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureUtil.java @@ -18,11 +18,18 @@ package org.apache.hadoop.hbase.master.procedure; +import java.io.IOException; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.util.NonceKey; import org.apache.hadoop.security.UserGroupInformation; @InterfaceAudience.Private @@ -53,4 +60,85 @@ public final class MasterProcedureUtil { } return null; } + + /** + * Helper Runnable used in conjunction with submitProcedure() to deal with + * submitting procs with nonce. + * See submitProcedure() for an example. + */ + public static abstract class NonceProcedureRunnable { + private final MasterServices master; + private final NonceKey nonceKey; + private Long procId; + + public NonceProcedureRunnable(final MasterServices master, + final long nonceGroup, final long nonce) { + this.master = master; + this.nonceKey = getProcedureExecutor().createNonceKey(nonceGroup, nonce); + } + + protected NonceKey getNonceKey() { + return nonceKey; + } + + protected MasterServices getMaster() { + return master; + } + + protected ProcedureExecutor<MasterProcedureEnv> getProcedureExecutor() { + return master.getMasterProcedureExecutor(); + } + + protected long getProcId() { + return procId != null ? procId.longValue() : -1; + } + + protected long setProcId(final long procId) { + this.procId = procId; + return procId; + } + + protected abstract void run() throws IOException; + protected abstract String getDescription(); + + protected long submitProcedure(final Procedure proc) { + assert procId == null : "submitProcedure() was already called, running procId=" + procId; + procId = getProcedureExecutor().submitProcedure(proc, nonceKey); + return procId; + } + } + + /** + * Helper used to deal with submitting procs with nonce. + * Internally the NonceProcedureRunnable.run() will be called only if no one else + * registered the nonce. any Exception thrown by the run() method will be + * collected/handled and rethrown. + * <code> + * long procId = MasterProcedureUtil.submitProcedure( + * new NonceProcedureRunnable(procExec, nonceGroup, nonce) { + * {@literal @}Override + * public void run() { + * cpHost.preOperation(); + * submitProcedure(new MyProc()); + * cpHost.postOperation(); + * } + * }); + * </code> + */ + public static long submitProcedure(final NonceProcedureRunnable runnable) throws IOException { + final ProcedureExecutor<MasterProcedureEnv> procExec = runnable.getProcedureExecutor(); + final long procId = procExec.registerNonce(runnable.getNonceKey()); + if (procId >= 0) return procId; // someone already registered the nonce + try { + runnable.run(); + } catch (IOException e) { + procExec.setFailureResultForNonce(runnable.getNonceKey(), + runnable.getDescription(), + procExec.getEnvironment().getRequestUser(), e); + throw e; + } finally { + procExec.unregisterNonceIfProcedureWasNotSubmitted(runnable.getNonceKey()); + } + return runnable.getProcId(); + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/e32f8ac4/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestAddColumnFamilyProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestAddColumnFamilyProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestAddColumnFamilyProcedure.java index 97a287e..b3fa10a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestAddColumnFamilyProcedure.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestAddColumnFamilyProcedure.java @@ -25,7 +25,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.InvalidFamilyOperationException; import org.apache.hadoop.hbase.ProcedureInfo; @@ -47,9 +46,6 @@ public class TestAddColumnFamilyProcedure { protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); - private static long nonceGroup = HConstants.NO_NONCE; - private static long nonce = HConstants.NO_NONCE; - private static void setupConf(Configuration conf) { conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 1); } @@ -72,9 +68,6 @@ public class TestAddColumnFamilyProcedure { @Before public void setup() throws Exception { ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(getMasterProcedureExecutor(), false); - nonceGroup = - MasterProcedureTestingUtility.generateNonceGroup(UTIL.getHBaseCluster().getMaster()); - nonce = MasterProcedureTestingUtility.generateNonce(UTIL.getHBaseCluster().getMaster()); } @After @@ -99,9 +92,7 @@ public class TestAddColumnFamilyProcedure { // Test 1: Add a column family online long procId1 = procExec.submitProcedure( - new AddColumnFamilyProcedure(procExec.getEnvironment(), tableName, columnDescriptor1), - nonceGroup, - nonce); + new AddColumnFamilyProcedure(procExec.getEnvironment(), tableName, columnDescriptor1)); // Wait the completion ProcedureTestingUtility.waitProcedure(procExec, procId1); ProcedureTestingUtility.assertProcNotFailed(procExec, procId1); @@ -112,9 +103,7 @@ public class TestAddColumnFamilyProcedure { // Test 2: Add a column family offline UTIL.getHBaseAdmin().disableTable(tableName); long procId2 = procExec.submitProcedure( - new AddColumnFamilyProcedure(procExec.getEnvironment(), tableName, columnDescriptor2), - nonceGroup + 1, - nonce + 1); + new AddColumnFamilyProcedure(procExec.getEnvironment(), tableName, columnDescriptor2)); // Wait the completion ProcedureTestingUtility.waitProcedure(procExec, procId2); ProcedureTestingUtility.assertProcNotFailed(procExec, procId2); @@ -134,9 +123,7 @@ public class TestAddColumnFamilyProcedure { // add the column family long procId1 = procExec.submitProcedure( - new AddColumnFamilyProcedure(procExec.getEnvironment(), tableName, columnDescriptor), - nonceGroup, - nonce); + new AddColumnFamilyProcedure(procExec.getEnvironment(), tableName, columnDescriptor)); // Wait the completion ProcedureTestingUtility.waitProcedure(procExec, procId1); ProcedureTestingUtility.assertProcNotFailed(procExec, procId1); @@ -145,9 +132,7 @@ public class TestAddColumnFamilyProcedure { // add the column family that exists long procId2 = procExec.submitProcedure( - new AddColumnFamilyProcedure(procExec.getEnvironment(), tableName, columnDescriptor), - nonceGroup + 1, - nonce + 1); + new AddColumnFamilyProcedure(procExec.getEnvironment(), tableName, columnDescriptor)); // Wait the completion ProcedureTestingUtility.waitProcedure(procExec, procId2); @@ -161,9 +146,7 @@ public class TestAddColumnFamilyProcedure { // Do the same add the existing column family - this time offline UTIL.getHBaseAdmin().disableTable(tableName); long procId3 = procExec.submitProcedure( - new AddColumnFamilyProcedure(procExec.getEnvironment(), tableName, columnDescriptor), - nonceGroup + 2, - nonce + 2); + new AddColumnFamilyProcedure(procExec.getEnvironment(), tableName, columnDescriptor)); // Wait the completion ProcedureTestingUtility.waitProcedure(procExec, procId3); @@ -175,37 +158,6 @@ public class TestAddColumnFamilyProcedure { ProcedureTestingUtility.getExceptionCause(result) instanceof InvalidFamilyOperationException); } - @Test(timeout=60000) - public void testAddSameColumnFamilyTwiceWithSameNonce() throws Exception { - final TableName tableName = TableName.valueOf("testAddSameColumnFamilyTwiceWithSameNonce"); - final String cf2 = "cf2"; - final HColumnDescriptor columnDescriptor = new HColumnDescriptor(cf2); - - final ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor(); - - MasterProcedureTestingUtility.createTable(procExec, tableName, null, "f1"); - - // add the column family - long procId1 = procExec.submitProcedure( - new AddColumnFamilyProcedure(procExec.getEnvironment(), tableName, columnDescriptor), - nonceGroup, - nonce); - long procId2 = procExec.submitProcedure( - new AddColumnFamilyProcedure(procExec.getEnvironment(), tableName, columnDescriptor), - nonceGroup, - nonce); - // Wait the completion - ProcedureTestingUtility.waitProcedure(procExec, procId1); - ProcedureTestingUtility.assertProcNotFailed(procExec, procId1); - MasterProcedureTestingUtility.validateColumnFamilyAddition(UTIL.getHBaseCluster().getMaster(), - tableName, cf2); - - // Wait the completion and expect not fail - because it is the same proc - ProcedureTestingUtility.waitProcedure(procExec, procId2); - ProcedureTestingUtility.assertProcNotFailed(procExec, procId2); - assertTrue(procId1 == procId2); - } - @Test(timeout = 60000) public void testRecoveryAndDoubleExecutionOffline() throws Exception { final TableName tableName = TableName.valueOf("testRecoveryAndDoubleExecutionOffline"); @@ -221,9 +173,7 @@ public class TestAddColumnFamilyProcedure { // Start the AddColumnFamily procedure && kill the executor long procId = procExec.submitProcedure( - new AddColumnFamilyProcedure(procExec.getEnvironment(), tableName, columnDescriptor), - nonceGroup, - nonce); + new AddColumnFamilyProcedure(procExec.getEnvironment(), tableName, columnDescriptor)); // Restart the executor and execute the step twice int numberOfSteps = AddColumnFamilyState.values().length; @@ -248,9 +198,7 @@ public class TestAddColumnFamilyProcedure { // Start the AddColumnFamily procedure && kill the executor long procId = procExec.submitProcedure( - new AddColumnFamilyProcedure(procExec.getEnvironment(), tableName, columnDescriptor), - nonceGroup, - nonce); + new AddColumnFamilyProcedure(procExec.getEnvironment(), tableName, columnDescriptor)); // Restart the executor and execute the step twice int numberOfSteps = AddColumnFamilyState.values().length; @@ -275,9 +223,7 @@ public class TestAddColumnFamilyProcedure { // Start the AddColumnFamily procedure && kill the executor long procId = procExec.submitProcedure( - new AddColumnFamilyProcedure(procExec.getEnvironment(), tableName, columnDescriptor), - nonceGroup, - nonce); + new AddColumnFamilyProcedure(procExec.getEnvironment(), tableName, columnDescriptor)); int numberOfSteps = AddColumnFamilyState.values().length - 2; // failing in the middle of proc MasterProcedureTestingUtility.testRollbackAndDoubleExecution(procExec, procId, numberOfSteps, http://git-wip-us.apache.org/repos/asf/hbase/blob/e32f8ac4/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestCreateTableProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestCreateTableProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestCreateTableProcedure.java index 73843e0..955a4e0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestCreateTableProcedure.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestCreateTableProcedure.java @@ -24,7 +24,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableExistsException; @@ -50,9 +49,6 @@ public class TestCreateTableProcedure { protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); - private static long nonceGroup = HConstants.NO_NONCE; - private static long nonce = HConstants.NO_NONCE; - private static void setupConf(Configuration conf) { conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 1); } @@ -75,9 +71,6 @@ public class TestCreateTableProcedure { @Before public void setup() throws Exception { resetProcExecutorTestingKillFlag(); - nonceGroup = - MasterProcedureTestingUtility.generateNonceGroup(UTIL.getHBaseCluster().getMaster()); - nonce = MasterProcedureTestingUtility.generateNonce(UTIL.getHBaseCluster().getMaster()); } @After @@ -127,14 +120,12 @@ public class TestCreateTableProcedure { // create the table long procId1 = procExec.submitProcedure( - new CreateTableProcedure(procExec.getEnvironment(), htd, regions), nonceGroup, nonce); + new CreateTableProcedure(procExec.getEnvironment(), htd, regions)); // create another with the same name ProcedurePrepareLatch latch2 = new ProcedurePrepareLatch.CompatibilityLatch(); long procId2 = procExec.submitProcedure( - new CreateTableProcedure(procExec.getEnvironment(), htd, regions, latch2), - nonceGroup + 1, - nonce + 1); + new CreateTableProcedure(procExec.getEnvironment(), htd, regions, latch2)); ProcedureTestingUtility.waitProcedure(procExec, procId1); ProcedureTestingUtility.assertProcNotFailed(procExec.getResult(procId1)); @@ -144,29 +135,6 @@ public class TestCreateTableProcedure { } @Test(timeout=60000) - public void testCreateTwiceWithSameNonce() throws Exception { - final TableName tableName = TableName.valueOf("testCreateTwiceWithSameNonce"); - final ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor(); - final HTableDescriptor htd = MasterProcedureTestingUtility.createHTD(tableName, "f"); - final HRegionInfo[] regions = ModifyRegionUtils.createHRegionInfos(htd, null); - - // create the table - long procId1 = procExec.submitProcedure( - new CreateTableProcedure(procExec.getEnvironment(), htd, regions), nonceGroup, nonce); - - // create another with the same name - long procId2 = procExec.submitProcedure( - new CreateTableProcedure(procExec.getEnvironment(), htd, regions), nonceGroup, nonce); - - ProcedureTestingUtility.waitProcedure(procExec, procId1); - ProcedureTestingUtility.assertProcNotFailed(procExec.getResult(procId1)); - - ProcedureTestingUtility.waitProcedure(procExec, procId2); - ProcedureTestingUtility.assertProcNotFailed(procExec.getResult(procId2)); - assertTrue(procId1 == procId2); - } - - @Test(timeout=60000) public void testRecoveryAndDoubleExecution() throws Exception { final TableName tableName = TableName.valueOf("testRecoveryAndDoubleExecution"); @@ -179,7 +147,7 @@ public class TestCreateTableProcedure { HTableDescriptor htd = MasterProcedureTestingUtility.createHTD(tableName, "f1", "f2"); HRegionInfo[] regions = ModifyRegionUtils.createHRegionInfos(htd, splitKeys); long procId = procExec.submitProcedure( - new CreateTableProcedure(procExec.getEnvironment(), htd, regions), nonceGroup, nonce); + new CreateTableProcedure(procExec.getEnvironment(), htd, regions)); // Restart the executor and execute the step twice // NOTE: the 6 (number of CreateTableState steps) is hardcoded, @@ -207,7 +175,7 @@ public class TestCreateTableProcedure { htd.setRegionReplication(3); HRegionInfo[] regions = ModifyRegionUtils.createHRegionInfos(htd, splitKeys); long procId = procExec.submitProcedure( - new CreateTableProcedure(procExec.getEnvironment(), htd, regions), nonceGroup, nonce); + new CreateTableProcedure(procExec.getEnvironment(), htd, regions)); // NOTE: the 4 (number of CreateTableState steps) is hardcoded, // so you have to look at this test at least once when you add a new step. @@ -237,7 +205,7 @@ public class TestCreateTableProcedure { HTableDescriptor htd = MasterProcedureTestingUtility.createHTD(tableName, "f1", "f2"); HRegionInfo[] regions = ModifyRegionUtils.createHRegionInfos(htd, splitKeys); long procId = procExec.submitProcedure( - new FaultyCreateTableProcedure(procExec.getEnvironment(), htd, regions), nonceGroup, nonce); + new CreateTableProcedure(procExec.getEnvironment(), htd, regions)); // NOTE: the 4 (number of CreateTableState steps) is hardcoded, // so you have to look at this test at least once when you add a new step.
