HBASE-17149 Procedure v2 - Fix nonce submission Signed-off-by: Michael Stack <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/da356069 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/da356069 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/da356069 Branch: refs/heads/master Commit: da356069f244d4a3d35ccf34badda39bc15cf8d7 Parents: a5ee36d Author: Michael Stack <[email protected]> Authored: Fri Dec 16 16:55:41 2016 -0800 Committer: Michael Stack <[email protected]> Committed: Fri Dec 16 21:41:18 2016 -0800 ---------------------------------------------------------------------- .../hbase/procedure2/ProcedureExecutor.java | 138 ++++- .../procedure2/ProcedureTestingUtility.java | 14 +- .../hbase/procedure2/TestProcedureNonce.java | 285 +++++++++ .../hbase/procedure2/TestProcedureRecovery.java | 43 -- .../hadoop/hbase/master/ClusterSchema.java | 19 +- .../hbase/master/ClusterSchemaServiceImpl.java | 28 +- .../org/apache/hadoop/hbase/master/HMaster.java | 583 +++++++++++-------- .../hadoop/hbase/master/MasterRpcServices.java | 14 +- .../hbase/master/TableNamespaceManager.java | 5 +- .../master/procedure/MasterProcedureUtil.java | 87 +++ .../hbase/master/snapshot/SnapshotManager.java | 76 +-- .../procedure/TestAddColumnFamilyProcedure.java | 63 +- .../procedure/TestCloneSnapshotProcedure.java | 27 +- .../procedure/TestCreateNamespaceProcedure.java | 64 +- .../procedure/TestCreateTableProcedure.java | 33 +- .../TestDeleteColumnFamilyProcedure.java | 67 +-- .../procedure/TestDeleteNamespaceProcedure.java | 62 +- .../procedure/TestDeleteTableProcedure.java | 36 +- .../procedure/TestDisableTableProcedure.java | 33 +- .../TestDispatchMergingRegionsProcedure.java | 33 +- .../procedure/TestEnableTableProcedure.java | 39 +- .../procedure/TestMasterProcedureEvents.java | 5 +- .../TestMergeTableRegionsProcedure.java | 26 - .../TestModifyColumnFamilyProcedure.java | 24 +- .../procedure/TestModifyNamespaceProcedure.java | 34 +- .../procedure/TestModifyTableProcedure.java | 8 +- .../master/procedure/TestProcedureAdmin.java | 15 +- .../procedure/TestRestoreSnapshotProcedure.java | 28 +- .../TestSplitTableRegionProcedure.java | 74 +-- .../procedure/TestTruncateTableProcedure.java | 4 +- .../TestCoprocessorWhitelistMasterObserver.java | 41 +- 31 files changed, 989 insertions(+), 1019 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/da356069/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java index fe5982c..80c3804 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java @@ -54,6 +54,7 @@ import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.NonceKey; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.Threads; /** * Thread Pool that executes the submitted procedures. @@ -654,46 +655,135 @@ public class ProcedureExecutor<TEnvironment> { } // ========================================================================== - // Submit/Abort Procedure + // Nonce Procedure helpers // ========================================================================== + /** + * Create a NoneKey from the specified nonceGroup and nonce. + * @param nonceGroup + * @param nonce + * @return the generated NonceKey + */ + public NonceKey createNonceKey(final long nonceGroup, final long nonce) { + return (nonce == HConstants.NO_NONCE) ? null : new NonceKey(nonceGroup, nonce); + } + + /** + * Register a nonce for a procedure that is going to be submitted. + * A procId will be reserved and on submitProcedure(), + * the procedure with the specified nonce will take the reserved ProcId. + * If someone already reserved the nonce, this method will return the procId reserved, + * otherwise an invalid procId will be returned. and the caller should procede + * and submit the procedure. + * + * @param nonceKey A unique identifier for this operation from the client or process. + * @return the procId associated with the nonce, if any otherwise an invalid procId. + */ + public long registerNonce(final NonceKey nonceKey) { + if (nonceKey == null) return -1; + + // check if we have already a Reserved ID for the nonce + Long oldProcId = nonceKeysToProcIdsMap.get(nonceKey); + if (oldProcId == null) { + // reserve a new Procedure ID, this will be associated with the nonce + // and the procedure submitted with the specified nonce will use this ID. + final long newProcId = nextProcId(); + oldProcId = nonceKeysToProcIdsMap.putIfAbsent(nonceKey, newProcId); + if (oldProcId == null) return -1; + } + + // we found a registered nonce, but the procedure may not have been submitted yet. + // since the client expect the procedure to be submitted, spin here until it is. + final boolean isTraceEnabled = LOG.isTraceEnabled(); + while (isRunning() && + !(procedures.containsKey(oldProcId) || completed.containsKey(oldProcId)) && + nonceKeysToProcIdsMap.containsKey(nonceKey)) { + if (isTraceEnabled) { + LOG.trace("waiting for procId=" + oldProcId.longValue() + " to be submitted"); + } + Threads.sleep(100); + } + return oldProcId.longValue(); + } + + /** + * Remove the NonceKey if the procedure was not submitted to the executor. + * @param nonceKey A unique identifier for this operation from the client or process. + */ + public void unregisterNonceIfProcedureWasNotSubmitted(final NonceKey nonceKey) { + if (nonceKey == null) return; + + final Long procId = nonceKeysToProcIdsMap.get(nonceKey); + if (procId == null) return; + + // if the procedure was not submitted, remove the nonce + if (!(procedures.containsKey(procId) || completed.containsKey(procId))) { + nonceKeysToProcIdsMap.remove(nonceKey); + } + } /** + * If the failure failed before submitting it, we may want to give back the + * same error to the requests with the same nonceKey. + * + * @param nonceKey A unique identifier for this operation from the client or process + * @param procName name of the procedure, used to inform the user + * @param procOwner name of the owner of the procedure, used to inform the user + * @param exception the failure to report to the user + */ + public void setFailureResultForNonce(final NonceKey nonceKey, final String procName, + final User procOwner, final IOException exception) { + if (nonceKey == null) return; + + final Long procId = nonceKeysToProcIdsMap.get(nonceKey); + if (procId == null || completed.containsKey(procId)) return; + + final long currentTime = EnvironmentEdgeManager.currentTime(); + final ProcedureInfo result = new ProcedureInfo(procId.longValue(), + procName, procOwner != null ? procOwner.getShortName() : null, + ProcedureUtil.convertToProcedureState(ProcedureState.ROLLEDBACK), + -1, nonceKey, exception, currentTime, currentTime, null); + completed.putIfAbsent(procId, result); + } + + // ========================================================================== + // Submit/Abort Procedure + // ========================================================================== + /** * Add a new root-procedure to the executor. * @param proc the new procedure to execute. * @return the procedure id, that can be used to monitor the operation */ public long submitProcedure(final Procedure proc) { - return submitProcedure(proc, HConstants.NO_NONCE, HConstants.NO_NONCE); + return submitProcedure(proc, null); } /** * Add a new root-procedure to the executor. * @param proc the new procedure to execute. - * @param nonceGroup - * @param nonce + * @param nonceKey the registered unique identifier for this operation from the client or process. * @return the procedure id, that can be used to monitor the operation */ - public long submitProcedure(final Procedure proc, final long nonceGroup, final long nonce) { + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH", + justification = "FindBugs is blind to the check-for-null") + public long submitProcedure(final Procedure proc, final NonceKey nonceKey) { Preconditions.checkArgument(lastProcId.get() >= 0); Preconditions.checkArgument(isRunning(), "executor not running"); - // Prepare procedure prepareProcedure(proc); - // Check whether the proc exists. If exist, just return the proc id. - // This is to prevent the same proc to submit multiple times (it could happen - // when client could not talk to server and resubmit the same request). - if (nonce != HConstants.NO_NONCE) { - final NonceKey noncekey = new NonceKey(nonceGroup, nonce); - proc.setNonceKey(noncekey); - - Long oldProcId = nonceKeysToProcIdsMap.putIfAbsent(noncekey, proc.getProcId()); - if (oldProcId != null) { - // Found the proc - return oldProcId.longValue(); - } + final Long currentProcId; + if (nonceKey != null) { + currentProcId = nonceKeysToProcIdsMap.get(nonceKey); + Preconditions.checkArgument(currentProcId != null, + "expected nonceKey=" + nonceKey + " to be reserved, use registerNonce()"); + } else { + currentProcId = nextProcId(); } + // Initialize the procedure + proc.setNonceKey(nonceKey); + proc.setProcId(currentProcId.longValue()); + // Commit the transaction store.insert(proc, null); if (LOG.isDebugEnabled()) { @@ -708,13 +798,14 @@ public class ProcedureExecutor<TEnvironment> { * Add a set of new root-procedure to the executor. * @param procs the new procedures to execute. */ + // TODO: Do we need to take nonces here? public void submitProcedures(final Procedure[] procs) { Preconditions.checkArgument(lastProcId.get() >= 0); Preconditions.checkArgument(isRunning(), "executor not running"); // Prepare procedure for (int i = 0; i < procs.length; ++i) { - prepareProcedure(procs[i]); + prepareProcedure(procs[i]).setProcId(nextProcId()); } // Commit the transaction @@ -729,17 +820,14 @@ public class ProcedureExecutor<TEnvironment> { } } - private void prepareProcedure(final Procedure proc) { + private Procedure prepareProcedure(final Procedure proc) { Preconditions.checkArgument(proc.getState() == ProcedureState.INITIALIZING); Preconditions.checkArgument(isRunning(), "executor not running"); Preconditions.checkArgument(!proc.hasParent(), "unexpected parent", proc); if (this.checkOwnerSet) { Preconditions.checkArgument(proc.hasOwner(), "missing owner"); } - - // Initialize the Procedure ID - final long currentProcId = nextProcId(); - proc.setProcId(currentProcId); + return proc; } private long pushProcedure(final Procedure proc) { @@ -754,7 +842,7 @@ public class ProcedureExecutor<TEnvironment> { procedures.put(currentProcId, proc); sendProcedureAddedNotification(currentProcId); scheduler.addBack(proc); - return currentProcId; + return proc.getProcId(); } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/da356069/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java index 93f3460..9edc711 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore; import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; +import org.apache.hadoop.hbase.util.NonceKey; import org.apache.hadoop.hbase.util.Threads; import static org.junit.Assert.assertEquals; @@ -178,13 +179,20 @@ public class ProcedureTestingUtility { } public static <TEnv> long submitAndWait(ProcedureExecutor<TEnv> procExecutor, Procedure proc, - final long nonceGroup, - final long nonce) { - long procId = procExecutor.submitProcedure(proc, nonceGroup, nonce); + final long nonceGroup, final long nonce) { + long procId = submitProcedure(procExecutor, proc, nonceGroup, nonce); waitProcedure(procExecutor, procId); return procId; } + public static <TEnv> long submitProcedure(ProcedureExecutor<TEnv> procExecutor, Procedure proc, + final long nonceGroup, final long nonce) { + final NonceKey nonceKey = procExecutor.createNonceKey(nonceGroup, nonce); + long procId = procExecutor.registerNonce(nonceKey); + assertFalse(procId >= 0); + return procExecutor.submitProcedure(proc, nonceKey); + } + public static <TEnv> void waitProcedure(ProcedureExecutor<TEnv> procExecutor, Procedure proc) { while (proc.getState() == ProcedureState.INITIALIZING) { Threads.sleepWithoutInterrupt(250); http://git-wip-us.apache.org/repos/asf/hbase/blob/da356069/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureNonce.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureNonce.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureNonce.java new file mode 100644 index 0000000..f275426 --- /dev/null +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureNonce.java @@ -0,0 +1,285 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.procedure2; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.CountDownLatch; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseCommonTestingUtility; +import org.apache.hadoop.hbase.ProcedureInfo; +import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.NonceKey; +import org.apache.hadoop.hbase.util.Threads; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +@Category({MasterTests.class, SmallTests.class}) +public class TestProcedureNonce { + private static final Log LOG = LogFactory.getLog(TestProcedureNonce.class); + + private static final int PROCEDURE_EXECUTOR_SLOTS = 2; + + private static TestProcEnv procEnv; + private static ProcedureExecutor<TestProcEnv> procExecutor; + private static ProcedureStore procStore; + + private HBaseCommonTestingUtility htu; + private FileSystem fs; + private Path logDir; + + @Before + public void setUp() throws IOException { + htu = new HBaseCommonTestingUtility(); + Path testDir = htu.getDataTestDir(); + fs = testDir.getFileSystem(htu.getConfiguration()); + assertTrue(testDir.depth() > 1); + + logDir = new Path(testDir, "proc-logs"); + procEnv = new TestProcEnv(); + procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), fs, logDir); + procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore); + procExecutor.testing = new ProcedureExecutor.Testing(); + procStore.start(PROCEDURE_EXECUTOR_SLOTS); + procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true); + } + + @After + public void tearDown() throws IOException { + procExecutor.stop(); + procStore.stop(false); + fs.delete(logDir, true); + } + + @Test(timeout=30000) + public void testCompletedProcWithSameNonce() throws Exception { + final long nonceGroup = 123; + final long nonce = 2222; + + // register the nonce + final NonceKey nonceKey = procExecutor.createNonceKey(nonceGroup, nonce); + assertFalse(procExecutor.registerNonce(nonceKey) >= 0); + + // Submit a proc and wait for its completion + Procedure proc = new TestSingleStepProcedure(); + long procId = procExecutor.submitProcedure(proc, nonceKey); + ProcedureTestingUtility.waitProcedure(procExecutor, procId); + + // Restart + ProcedureTestingUtility.restart(procExecutor); + ProcedureTestingUtility.waitProcedure(procExecutor, procId); + + // try to register a procedure with the same nonce + // we should get back the old procId + assertEquals(procId, procExecutor.registerNonce(nonceKey)); + + ProcedureInfo result = procExecutor.getResult(procId); + ProcedureTestingUtility.assertProcNotFailed(result); + } + + @Test(timeout=30000) + public void testRunningProcWithSameNonce() throws Exception { + final long nonceGroup = 456; + final long nonce = 33333; + + // register the nonce + final NonceKey nonceKey = procExecutor.createNonceKey(nonceGroup, nonce); + assertFalse(procExecutor.registerNonce(nonceKey) >= 0); + + // Submit a proc and use a latch to prevent the step execution until we submitted proc2 + CountDownLatch latch = new CountDownLatch(1); + TestSingleStepProcedure proc = new TestSingleStepProcedure(); + procEnv.setWaitLatch(latch); + long procId = procExecutor.submitProcedure(proc, nonceKey); + while (proc.step != 1) Threads.sleep(25); + + // try to register a procedure with the same nonce + // we should get back the old procId + assertEquals(procId, procExecutor.registerNonce(nonceKey)); + + // complete the procedure + latch.countDown(); + + // Restart, the procedure is not completed yet + ProcedureTestingUtility.restart(procExecutor); + ProcedureTestingUtility.waitProcedure(procExecutor, procId); + + // try to register a procedure with the same nonce + // we should get back the old procId + assertEquals(procId, procExecutor.registerNonce(nonceKey)); + + ProcedureInfo result = procExecutor.getResult(procId); + ProcedureTestingUtility.assertProcNotFailed(result); + } + + @Test + public void testSetFailureResultForNonce() throws IOException { + final long nonceGroup = 234; + final long nonce = 55555; + + // check and register the request nonce + final NonceKey nonceKey = procExecutor.createNonceKey(nonceGroup, nonce); + assertFalse(procExecutor.registerNonce(nonceKey) >= 0); + + procExecutor.setFailureResultForNonce(nonceKey, "testProc", User.getCurrent(), + new IOException("test failure")); + + final long procId = procExecutor.registerNonce(nonceKey); + ProcedureInfo result = procExecutor.getResult(procId); + ProcedureTestingUtility.assertProcFailed(result); + } + + @Test(timeout=30000) + public void testConcurrentNonceRegistration() throws IOException { + testConcurrentNonceRegistration(true, 567, 44444); + } + + @Test(timeout=30000) + public void testConcurrentNonceRegistrationWithRollback() throws IOException { + testConcurrentNonceRegistration(false, 890, 55555); + } + + private void testConcurrentNonceRegistration(final boolean submitProcedure, + final long nonceGroup, final long nonce) throws IOException { + // register the nonce + final NonceKey nonceKey = procExecutor.createNonceKey(nonceGroup, nonce); + + final AtomicReference<Throwable> t1Exception = new AtomicReference(); + final AtomicReference<Throwable> t2Exception = new AtomicReference(); + + final CountDownLatch t1NonceRegisteredLatch = new CountDownLatch(1); + final CountDownLatch t2BeforeNonceRegisteredLatch = new CountDownLatch(1); + final Thread[] threads = new Thread[2]; + threads[0] = new Thread() { + @Override + public void run() { + try { + // release the nonce and wake t2 + assertFalse("unexpected already registered nonce", + procExecutor.registerNonce(nonceKey) >= 0); + t1NonceRegisteredLatch.countDown(); + + // hold the submission until t2 is registering the nonce + t2BeforeNonceRegisteredLatch.await(); + Threads.sleep(1000); + + if (submitProcedure) { + CountDownLatch latch = new CountDownLatch(1); + TestSingleStepProcedure proc = new TestSingleStepProcedure(); + procEnv.setWaitLatch(latch); + + procExecutor.submitProcedure(proc, nonceKey); + Threads.sleep(100); + + // complete the procedure + latch.countDown(); + } else { + procExecutor.unregisterNonceIfProcedureWasNotSubmitted(nonceKey); + } + } catch (Throwable e) { + t1Exception.set(e); + } finally { + t1NonceRegisteredLatch.countDown(); + t2BeforeNonceRegisteredLatch.countDown(); + } + } + }; + + threads[1] = new Thread() { + @Override + public void run() { + try { + // wait until t1 has registered the nonce + t1NonceRegisteredLatch.await(); + + // register the nonce + t2BeforeNonceRegisteredLatch.countDown(); + assertFalse("unexpected non registered nonce", + procExecutor.registerNonce(nonceKey) < 0); + } catch (Throwable e) { + t2Exception.set(e); + } finally { + t1NonceRegisteredLatch.countDown(); + t2BeforeNonceRegisteredLatch.countDown(); + } + } + }; + + for (int i = 0; i < threads.length; ++i) threads[i].start(); + for (int i = 0; i < threads.length; ++i) Threads.shutdown(threads[i]); + ProcedureTestingUtility.waitNoProcedureRunning(procExecutor); + assertEquals(null, t1Exception.get()); + assertEquals(null, t2Exception.get()); + } + + public static class TestSingleStepProcedure extends SequentialProcedure<TestProcEnv> { + private int step = 0; + + public TestSingleStepProcedure() { } + + @Override + protected Procedure[] execute(TestProcEnv env) throws InterruptedException { + step++; + env.waitOnLatch(); + LOG.debug("execute procedure " + this + " step=" + step); + step++; + setResult(Bytes.toBytes(step)); + return null; + } + + @Override + protected void rollback(TestProcEnv env) { } + + @Override + protected boolean abort(TestProcEnv env) { return true; } + } + + private static class TestProcEnv { + private CountDownLatch latch = null; + + /** + * set/unset a latch. every procedure execute() step will wait on the latch if any. + */ + public void setWaitLatch(CountDownLatch latch) { + this.latch = latch; + } + + public void waitOnLatch() throws InterruptedException { + if (latch != null) { + latch.await(); + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/da356069/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java index e3cacd2..00920ee 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java @@ -288,49 +288,6 @@ public class TestProcedureRecovery { ProcedureTestingUtility.assertIsAbortException(result); } - @Test(timeout=30000) - public void testCompletedProcWithSameNonce() throws Exception { - final long nonceGroup = 123; - final long nonce = 2222; - Procedure proc = new TestSingleStepProcedure(); - // Submit a proc and wait for its completion - long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc, nonceGroup, nonce); - - // Restart - restart(); - waitProcedure(procId); - - Procedure proc2 = new TestSingleStepProcedure(); - // Submit a procedure with the same nonce and expect the same procedure would return. - long procId2 = ProcedureTestingUtility.submitAndWait(procExecutor, proc2, nonceGroup, nonce); - assertTrue(procId == procId2); - - ProcedureInfo result = procExecutor.getResult(procId2); - ProcedureTestingUtility.assertProcNotFailed(result); - } - - @Test(timeout=30000) - public void testRunningProcWithSameNonce() throws Exception { - final long nonceGroup = 456; - final long nonce = 33333; - Procedure proc = new TestSingleStepProcedure(); - long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc, nonceGroup, nonce); - - // Restart (use a latch to prevent the step execution until we submitted proc2) - CountDownLatch latch = new CountDownLatch(1); - procEnv.setWaitLatch(latch); - restart(); - // Submit a procedure with the same nonce and expect the same procedure would return. - Procedure proc2 = new TestSingleStepProcedure(); - long procId2 = procExecutor.submitProcedure(proc2, nonceGroup, nonce); - latch.countDown(); - procEnv.setWaitLatch(null); - - // The original proc is not completed and the new submission should have the same proc Id. - assertTrue(procId == procId2); - } - - public static class TestStateMachineProcedure extends StateMachineProcedure<TestProcEnv, TestStateMachineProcedure.State> { enum State { STATE_1, STATE_2, STATE_3, DONE } http://git-wip-us.apache.org/repos/asf/hbase/blob/da356069/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterSchema.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterSchema.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterSchema.java index cb3b684..666a6cb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterSchema.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterSchema.java @@ -23,6 +23,7 @@ import java.util.List; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.NonceKey; /** * View and edit the current cluster schema. Use this API making any modification to @@ -77,39 +78,33 @@ public interface ClusterSchema { /** * Create a new Namespace. * @param namespaceDescriptor descriptor for new Namespace - * @param nonceGroup Identifier for the source of the request, a client or process. - * @param nonce A unique identifier for this operation from the client or process identified by - * <code>nonceGroup</code> (the source must ensure each operation gets a unique id). + * @param nonceKey A unique identifier for this operation from the client or process. * @return procedure id * @throws IOException Throws {@link ClusterSchemaException} and {@link InterruptedIOException} * as well as {@link IOException} */ - long createNamespace(NamespaceDescriptor namespaceDescriptor, long nonceGroup, long nonce) + long createNamespace(NamespaceDescriptor namespaceDescriptor, NonceKey nonceKey) throws IOException; /** * Modify an existing Namespace. - * @param nonceGroup Identifier for the source of the request, a client or process. - * @param nonce A unique identifier for this operation from the client or process identified by - * <code>nonceGroup</code> (the source must ensure each operation gets a unique id). + * @param nonceKey A unique identifier for this operation from the client or process. * @return procedure id * @throws IOException Throws {@link ClusterSchemaException} and {@link InterruptedIOException} * as well as {@link IOException} */ - long modifyNamespace(NamespaceDescriptor descriptor, long nonceGroup, long nonce) + long modifyNamespace(NamespaceDescriptor descriptor, NonceKey nonceKey) throws IOException; /** * Delete an existing Namespace. * Only empty Namespaces (no tables) can be removed. - * @param nonceGroup Identifier for the source of the request, a client or process. - * @param nonce A unique identifier for this operation from the client or process identified by - * <code>nonceGroup</code> (the source must ensure each operation gets a unique id). + * @param nonceKey A unique identifier for this operation from the client or process. * @return procedure id * @throws IOException Throws {@link ClusterSchemaException} and {@link InterruptedIOException} * as well as {@link IOException} */ - long deleteNamespace(String name, long nonceGroup, long nonce) + long deleteNamespace(String name, NonceKey nonceKey) throws IOException; /** http://git-wip-us.apache.org/repos/asf/hbase/blob/da356069/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterSchemaServiceImpl.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterSchemaServiceImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterSchemaServiceImpl.java index 0250f36..52af89e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterSchemaServiceImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterSchemaServiceImpl.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.ModifyNamespaceProcedure; import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; +import org.apache.hadoop.hbase.util.NonceKey; @InterfaceAudience.Private class ClusterSchemaServiceImpl implements ClusterSchemaService { @@ -78,38 +79,35 @@ class ClusterSchemaServiceImpl implements ClusterSchemaService { return this.tableNamespaceManager; } - private long submitProcedure(final Procedure<?> procedure, long nonceGroup, - long nonce) - throws ServiceNotRunningException { + private long submitProcedure(final Procedure<?> procedure, final NonceKey nonceKey) + throws ServiceNotRunningException { checkIsRunning(); ProcedureExecutor<MasterProcedureEnv> pe = this.masterServices.getMasterProcedureExecutor(); - return pe.submitProcedure(procedure, nonceGroup, nonce); + return pe.submitProcedure(procedure, nonceKey); } @Override - public long createNamespace(NamespaceDescriptor namespaceDescriptor, - long nonceGroup, long nonce) - throws IOException { + public long createNamespace(NamespaceDescriptor namespaceDescriptor, final NonceKey nonceKey) + throws IOException { return submitProcedure(new CreateNamespaceProcedure( this.masterServices.getMasterProcedureExecutor().getEnvironment(), namespaceDescriptor), - nonceGroup, nonce); + nonceKey); } @Override - public long modifyNamespace(NamespaceDescriptor namespaceDescriptor, - long nonceGroup, long nonce) - throws IOException { + public long modifyNamespace(NamespaceDescriptor namespaceDescriptor, final NonceKey nonceKey) + throws IOException { return submitProcedure(new ModifyNamespaceProcedure( this.masterServices.getMasterProcedureExecutor().getEnvironment(), namespaceDescriptor), - nonceGroup, nonce); + nonceKey); } @Override - public long deleteNamespace(String name, long nonceGroup, long nonce) - throws IOException { + public long deleteNamespace(String name, final NonceKey nonceKey) + throws IOException { return submitProcedure(new DeleteNamespaceProcedure( this.masterServices.getMasterProcedureExecutor().getEnvironment(), name), - nonceGroup, nonce); + nonceKey); } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/da356069/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 167a029..c5c246b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -114,6 +114,7 @@ import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.MergeTableRegionsProcedure; import org.apache.hadoop.hbase.master.procedure.ModifyColumnFamilyProcedure; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil; import org.apache.hadoop.hbase.master.procedure.ModifyTableProcedure; import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch; import org.apache.hadoop.hbase.master.procedure.SplitTableRegionProcedure; @@ -145,16 +146,17 @@ import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionServerInfo; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.SnapshotDescription; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; import org.apache.hadoop.hbase.util.Addressing; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CompressionTest; import org.apache.hadoop.hbase.util.EncryptionTest; -import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.HFileArchiveUtil; import org.apache.hadoop.hbase.util.HasThread; import org.apache.hadoop.hbase.util.IdLock; import org.apache.hadoop.hbase.util.ModifyRegionUtils; +import org.apache.hadoop.hbase.util.NonceKey; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.VersionInfo; @@ -1433,23 +1435,26 @@ public class HMaster extends HRegionServer implements MasterServices { regionsToMerge [0] = regionInfoA; regionsToMerge [1] = regionInfoB; - if (cpHost != null) { - cpHost.preDispatchMerge(regionInfoA, regionInfoB); - } + return MasterProcedureUtil.submitProcedure( + new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) { + @Override + protected void run() throws IOException { + getMaster().getMasterCoprocessorHost().preDispatchMerge(regionInfoA, regionInfoB); - LOG.info(getClientIdAuditPrefix() + " Merge regions " - + regionInfoA.getEncodedName() + " and " + regionInfoB.getEncodedName()); + LOG.info(getClientIdAuditPrefix() + " Merge regions " + + regionInfoA.getEncodedName() + " and " + regionInfoB.getEncodedName()); - long procId = this.procedureExecutor.submitProcedure( - new DispatchMergingRegionsProcedure( - procedureExecutor.getEnvironment(), tableName, regionsToMerge, forcible), - nonceGroup, - nonce); + submitProcedure(new DispatchMergingRegionsProcedure(procedureExecutor.getEnvironment(), + tableName, regionsToMerge, forcible)); - if (cpHost != null) { - cpHost.postDispatchMerge(regionInfoA, regionInfoB); - } - return procId; + getMaster().getMasterCoprocessorHost().postDispatchMerge(regionInfoA, regionInfoB); + } + + @Override + protected String getDescription() { + return "DisableTableProcedure"; + } + }); } @Override @@ -1478,22 +1483,26 @@ public class HMaster extends HRegionServer implements MasterServices { "Cannot merge a region to itself " + regionsToMerge[0] + ", " + regionsToMerge[1]); } - if (cpHost != null) { - cpHost.preMergeRegions(regionsToMerge); - } + return MasterProcedureUtil.submitProcedure( + new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) { + @Override + protected void run() throws IOException { + getMaster().getMasterCoprocessorHost().preMergeRegions(regionsToMerge); - LOG.info(getClientIdAuditPrefix() + " Merge regions " - + regionsToMerge[0].getEncodedName() + " and " + regionsToMerge[1].getEncodedName()); + LOG.info(getClientIdAuditPrefix() + " Merge regions " + + regionsToMerge[0].getEncodedName() + " and " + regionsToMerge[1].getEncodedName()); - long procId = this.procedureExecutor.submitProcedure( - new MergeTableRegionsProcedure(procedureExecutor.getEnvironment(), regionsToMerge, forcible), - nonceGroup, - nonce); + submitProcedure(new MergeTableRegionsProcedure(procedureExecutor.getEnvironment(), + regionsToMerge, forcible)); - if (cpHost != null) { - cpHost.postMergeRegions(regionsToMerge); - } - return procId; + getMaster().getMasterCoprocessorHost().postMergeRegions(regionsToMerge); + } + + @Override + protected String getDescription() { + return "DisableTableProcedure"; + } + }); } @Override @@ -1504,18 +1513,24 @@ public class HMaster extends HRegionServer implements MasterServices { final long nonce) throws IOException { checkInitialized(); - if (cpHost != null) { - cpHost.preSplitRegion(regionInfo.getTable(), splitRow); - } + return MasterProcedureUtil.submitProcedure( + new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) { + @Override + protected void run() throws IOException { + getMaster().getMasterCoprocessorHost().preSplitRegion(regionInfo.getTable(), splitRow); - LOG.info(getClientIdAuditPrefix() + " Split region " + regionInfo); + LOG.info(getClientIdAuditPrefix() + " Split region " + regionInfo); - // Execute the operation asynchronously - long procId = this.procedureExecutor.submitProcedure( - new SplitTableRegionProcedure(procedureExecutor.getEnvironment(), regionInfo, splitRow), - nonceGroup, nonce); + // Execute the operation asynchronously + submitProcedure(new SplitTableRegionProcedure(procedureExecutor.getEnvironment(), + regionInfo, splitRow)); + } - return procId; + @Override + protected String getDescription() { + return "DisableTableProcedure"; + } + }); } void move(final byte[] encodedRegionName, @@ -1600,36 +1615,37 @@ public class HMaster extends HRegionServer implements MasterServices { final byte [][] splitKeys, final long nonceGroup, final long nonce) throws IOException { - if (isStopped()) { - throw new MasterNotRunningException(); - } checkInitialized(); + String namespace = hTableDescriptor.getTableName().getNamespaceAsString(); this.clusterSchemaService.getNamespace(namespace); HRegionInfo[] newRegions = ModifyRegionUtils.createHRegionInfos(hTableDescriptor, splitKeys); - checkInitialized(); sanityCheckTableDescriptor(hTableDescriptor); - if (cpHost != null) { - cpHost.preCreateTable(hTableDescriptor, newRegions); - } - LOG.info(getClientIdAuditPrefix() + " create " + hTableDescriptor); - // TODO: We can handle/merge duplicate requests, and differentiate the case of - // TableExistsException by saying if the schema is the same or not. - ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch(); - long procId = this.procedureExecutor.submitProcedure( - new CreateTableProcedure( - procedureExecutor.getEnvironment(), hTableDescriptor, newRegions, latch), - nonceGroup, - nonce); - latch.await(); + return MasterProcedureUtil.submitProcedure( + new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) { + @Override + protected void run() throws IOException { + getMaster().getMasterCoprocessorHost().preCreateTable(hTableDescriptor, newRegions); - if (cpHost != null) { - cpHost.postCreateTable(hTableDescriptor, newRegions); - } + LOG.info(getClientIdAuditPrefix() + " create " + hTableDescriptor); - return procId; + // TODO: We can handle/merge duplicate requests, and differentiate the case of + // TableExistsException by saying if the schema is the same or not. + ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch(); + submitProcedure(new CreateTableProcedure( + procedureExecutor.getEnvironment(), hTableDescriptor, newRegions, latch)); + latch.await(); + + getMaster().getMasterCoprocessorHost().postCreateTable(hTableDescriptor, newRegions); + } + + @Override + protected String getDescription() { + return "CreateTableProcedure"; + } + }); } @Override @@ -1968,24 +1984,29 @@ public class HMaster extends HRegionServer implements MasterServices { final long nonceGroup, final long nonce) throws IOException { checkInitialized(); - if (cpHost != null) { - cpHost.preDeleteTable(tableName); - } - LOG.info(getClientIdAuditPrefix() + " delete " + tableName); - // TODO: We can handle/merge duplicate request - ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch(); - long procId = this.procedureExecutor.submitProcedure( - new DeleteTableProcedure(procedureExecutor.getEnvironment(), tableName, latch), - nonceGroup, - nonce); - latch.await(); + return MasterProcedureUtil.submitProcedure( + new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) { + @Override + protected void run() throws IOException { + getMaster().getMasterCoprocessorHost().preDeleteTable(tableName); - if (cpHost != null) { - cpHost.postDeleteTable(tableName); - } + LOG.info(getClientIdAuditPrefix() + " delete " + tableName); - return procId; + // TODO: We can handle/merge duplicate request + ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch(); + submitProcedure(new DeleteTableProcedure(procedureExecutor.getEnvironment(), + tableName, latch)); + latch.await(); + + getMaster().getMasterCoprocessorHost().postDeleteTable(tableName); + } + + @Override + protected String getDescription() { + return "DeleteTableProcedure"; + } + }); } @Override @@ -1995,23 +2016,27 @@ public class HMaster extends HRegionServer implements MasterServices { final long nonceGroup, final long nonce) throws IOException { checkInitialized(); - if (cpHost != null) { - cpHost.preTruncateTable(tableName); - } - LOG.info(getClientIdAuditPrefix() + " truncate " + tableName); - ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch(2, 0); - long procId = this.procedureExecutor.submitProcedure( - new TruncateTableProcedure(procedureExecutor.getEnvironment(), tableName, - preserveSplits, latch), - nonceGroup, - nonce); - latch.await(); + return MasterProcedureUtil.submitProcedure( + new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) { + @Override + protected void run() throws IOException { + getMaster().getMasterCoprocessorHost().preTruncateTable(tableName); - if (cpHost != null) { - cpHost.postTruncateTable(tableName); - } - return procId; + LOG.info(getClientIdAuditPrefix() + " truncate " + tableName); + ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch(2, 0); + submitProcedure(new TruncateTableProcedure(procedureExecutor.getEnvironment(), + tableName, preserveSplits, latch)); + latch.await(); + + getMaster().getMasterCoprocessorHost().postTruncateTable(tableName); + } + + @Override + protected String getDescription() { + return "TruncateTableProcedure"; + } + }); } @Override @@ -2025,24 +2050,29 @@ public class HMaster extends HRegionServer implements MasterServices { checkCompression(columnDescriptor); checkEncryption(conf, columnDescriptor); checkReplicationScope(columnDescriptor); - if (cpHost != null) { - if (cpHost.preAddColumn(tableName, columnDescriptor)) { - return -1; + + return MasterProcedureUtil.submitProcedure( + new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) { + @Override + protected void run() throws IOException { + if (getMaster().getMasterCoprocessorHost().preAddColumn(tableName, columnDescriptor)) { + return; + } + + // Execute the operation synchronously, wait for the operation to complete before continuing + ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch(2, 0); + submitProcedure(new AddColumnFamilyProcedure(procedureExecutor.getEnvironment(), + tableName, columnDescriptor, latch)); + latch.await(); + + getMaster().getMasterCoprocessorHost().postAddColumn(tableName, columnDescriptor); } - } - // Execute the operation synchronously - wait for the operation to complete before continuing. - ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch(2, 0); - long procId = this.procedureExecutor.submitProcedure( - new AddColumnFamilyProcedure(procedureExecutor.getEnvironment(), tableName, - columnDescriptor, latch), - nonceGroup, - nonce); - latch.await(); - if (cpHost != null) { - cpHost.postAddColumn(tableName, columnDescriptor); - } - return procId; + @Override + protected String getDescription() { + return "AddColumnFamilyProcedure"; + } + }); } @Override @@ -2056,26 +2086,31 @@ public class HMaster extends HRegionServer implements MasterServices { checkCompression(descriptor); checkEncryption(conf, descriptor); checkReplicationScope(descriptor); - if (cpHost != null) { - if (cpHost.preModifyColumn(tableName, descriptor)) { - return -1; - } - } - LOG.info(getClientIdAuditPrefix() + " modify " + descriptor); - // Execute the operation synchronously - wait for the operation to complete before continuing. - ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch(2, 0); - long procId = this.procedureExecutor.submitProcedure( - new ModifyColumnFamilyProcedure(procedureExecutor.getEnvironment(), tableName, - descriptor, latch), - nonceGroup, - nonce); - latch.await(); + return MasterProcedureUtil.submitProcedure( + new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) { + @Override + protected void run() throws IOException { + if (getMaster().getMasterCoprocessorHost().preModifyColumn(tableName, descriptor)) { + return; + } - if (cpHost != null) { - cpHost.postModifyColumn(tableName, descriptor); - } - return procId; + LOG.info(getClientIdAuditPrefix() + " modify " + descriptor); + + // Execute the operation synchronously - wait for the operation to complete before continuing. + ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch(2, 0); + submitProcedure(new ModifyColumnFamilyProcedure(procedureExecutor.getEnvironment(), + tableName, descriptor, latch)); + latch.await(); + + getMaster().getMasterCoprocessorHost().postModifyColumn(tableName, descriptor); + } + + @Override + protected String getDescription() { + return "ModifyColumnFamilyProcedure"; + } + }); } @Override @@ -2086,87 +2121,97 @@ public class HMaster extends HRegionServer implements MasterServices { final long nonce) throws IOException { checkInitialized(); - if (cpHost != null) { - if (cpHost.preDeleteColumn(tableName, columnName)) { - return -1; - } - } - LOG.info(getClientIdAuditPrefix() + " delete " + Bytes.toString(columnName)); - // Execute the operation synchronously - wait for the operation to complete before continuing. - ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch(2, 0); - long procId = this.procedureExecutor.submitProcedure( - new DeleteColumnFamilyProcedure(procedureExecutor.getEnvironment(), tableName, - columnName, latch), - nonceGroup, - nonce); - latch.await(); + return MasterProcedureUtil.submitProcedure( + new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) { + @Override + protected void run() throws IOException { + if (getMaster().getMasterCoprocessorHost().preDeleteColumn(tableName, columnName)) { + return; + } - if (cpHost != null) { - cpHost.postDeleteColumn(tableName, columnName); - } - return procId; + LOG.info(getClientIdAuditPrefix() + " delete " + Bytes.toString(columnName)); + + // Execute the operation synchronously - wait for the operation to complete before continuing. + ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch(2, 0); + submitProcedure(new DeleteColumnFamilyProcedure(procedureExecutor.getEnvironment(), + tableName, columnName, latch)); + latch.await(); + + getMaster().getMasterCoprocessorHost().postDeleteColumn(tableName, columnName); + } + + @Override + protected String getDescription() { + return "DeleteColumnFamilyProcedure"; + } + }); } @Override - public long enableTable( - final TableName tableName, - final long nonceGroup, - final long nonce) throws IOException { + public long enableTable(final TableName tableName, final long nonceGroup, final long nonce) + throws IOException { checkInitialized(); - if (cpHost != null) { - cpHost.preEnableTable(tableName); - } - LOG.info(getClientIdAuditPrefix() + " enable " + tableName); - - // Execute the operation asynchronously - client will check the progress of the operation - final ProcedurePrepareLatch prepareLatch = ProcedurePrepareLatch.createLatch(); - long procId = this.procedureExecutor.submitProcedure( - new EnableTableProcedure(procedureExecutor.getEnvironment(), tableName, false, prepareLatch), - nonceGroup, - nonce); - // Before returning to client, we want to make sure that the table is prepared to be - // enabled (the table is locked and the table state is set). - // - // Note: if the procedure throws exception, we will catch it and rethrow. - prepareLatch.await(); - if (cpHost != null) { - cpHost.postEnableTable(tableName); - } + return MasterProcedureUtil.submitProcedure( + new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) { + @Override + protected void run() throws IOException { + getMaster().getMasterCoprocessorHost().preEnableTable(tableName); + + LOG.info(getClientIdAuditPrefix() + " enable " + tableName); + + // Execute the operation asynchronously - client will check the progress of the operation + // In case the request is from a <1.1 client before returning, + // we want to make sure that the table is prepared to be + // enabled (the table is locked and the table state is set). + // Note: if the procedure throws exception, we will catch it and rethrow. + final ProcedurePrepareLatch prepareLatch = ProcedurePrepareLatch.createLatch(); + submitProcedure(new EnableTableProcedure(procedureExecutor.getEnvironment(), + tableName, false, prepareLatch)); + prepareLatch.await(); + + getMaster().getMasterCoprocessorHost().postEnableTable(tableName); + } - return procId; + @Override + protected String getDescription() { + return "EnableTableProcedure"; + } + }); } @Override - public long disableTable( - final TableName tableName, - final long nonceGroup, - final long nonce) throws IOException { + public long disableTable(final TableName tableName, final long nonceGroup, final long nonce) + throws IOException { checkInitialized(); - if (cpHost != null) { - cpHost.preDisableTable(tableName); - } - LOG.info(getClientIdAuditPrefix() + " disable " + tableName); - // Execute the operation asynchronously - client will check the progress of the operation - final ProcedurePrepareLatch prepareLatch = ProcedurePrepareLatch.createLatch(); - // Execute the operation asynchronously - client will check the progress of the operation - long procId = this.procedureExecutor.submitProcedure( - new DisableTableProcedure(procedureExecutor.getEnvironment(), tableName, false, prepareLatch), - nonceGroup, - nonce); - // Before returning to client, we want to make sure that the table is prepared to be - // enabled (the table is locked and the table state is set). - // - // Note: if the procedure throws exception, we will catch it and rethrow. - prepareLatch.await(); - - if (cpHost != null) { - cpHost.postDisableTable(tableName); - } + return MasterProcedureUtil.submitProcedure( + new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) { + @Override + protected void run() throws IOException { + getMaster().getMasterCoprocessorHost().preDisableTable(tableName); + + LOG.info(getClientIdAuditPrefix() + " disable " + tableName); + + // Execute the operation asynchronously - client will check the progress of the operation + // In case the request is from a <1.1 client before returning, + // we want to make sure that the table is prepared to be + // enabled (the table is locked and the table state is set). + // Note: if the procedure throws exception, we will catch it and rethrow. + final ProcedurePrepareLatch prepareLatch = ProcedurePrepareLatch.createLatch(); + submitProcedure(new DisableTableProcedure(procedureExecutor.getEnvironment(), + tableName, false, prepareLatch)); + prepareLatch.await(); + + getMaster().getMasterCoprocessorHost().postDisableTable(tableName); + } - return procId; + @Override + protected String getDescription() { + return "DisableTableProcedure"; + } + }); } /** @@ -2207,33 +2252,56 @@ public class HMaster extends HRegionServer implements MasterServices { } @Override - public long modifyTable( - final TableName tableName, - final HTableDescriptor descriptor, - final long nonceGroup, - final long nonce) - throws IOException { + public long modifyTable(final TableName tableName, final HTableDescriptor descriptor, + final long nonceGroup, final long nonce) throws IOException { checkInitialized(); sanityCheckTableDescriptor(descriptor); - if (cpHost != null) { - cpHost.preModifyTable(tableName, descriptor); - } - LOG.info(getClientIdAuditPrefix() + " modify " + tableName); + return MasterProcedureUtil.submitProcedure( + new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) { + @Override + protected void run() throws IOException { + getMaster().getMasterCoprocessorHost().preModifyTable(tableName, descriptor); - // Execute the operation synchronously - wait for the operation completes before continuing. - ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch(2, 0); - long procId = this.procedureExecutor.submitProcedure( - new ModifyTableProcedure(procedureExecutor.getEnvironment(), descriptor, latch), - nonceGroup, - nonce); - latch.await(); + LOG.info(getClientIdAuditPrefix() + " modify " + tableName); - if (cpHost != null) { - cpHost.postModifyTable(tableName, descriptor); - } + // Execute the operation synchronously - wait for the operation completes before continuing. + ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch(2, 0); + submitProcedure(new ModifyTableProcedure(procedureExecutor.getEnvironment(), + descriptor, latch)); + latch.await(); - return procId; + getMaster().getMasterCoprocessorHost().postModifyTable(tableName, descriptor); + } + + @Override + protected String getDescription() { + return "ModifyTableProcedure"; + } + }); + } + + public long restoreSnapshot(final SnapshotDescription snapshotDesc, + final long nonceGroup, final long nonce) throws IOException { + checkInitialized(); + getSnapshotManager().checkSnapshotSupport(); + + // Ensure namespace exists. Will throw exception if non-known NS. + final TableName dstTable = TableName.valueOf(snapshotDesc.getTable()); + getClusterSchema().getNamespace(dstTable.getNamespaceAsString()); + + return MasterProcedureUtil.submitProcedure( + new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) { + @Override + protected void run() throws IOException { + setProcId(getSnapshotManager().restoreOrCloneSnapshot(snapshotDesc, getNonceKey())); + } + + @Override + protected String getDescription() { + return "RestoreSnapshotProcedure"; + } + }); } @Override @@ -2460,9 +2528,11 @@ public class HMaster extends HRegionServer implements MasterServices { } } - void checkInitialized() throws PleaseHoldException, ServerNotRunningYetException { + void checkInitialized() + throws PleaseHoldException, ServerNotRunningYetException, MasterNotRunningException { checkServiceStarted(); if (!isInitialized()) throw new PleaseHoldException("Master is initializing"); + if (isStopped()) throw new MasterNotRunningException(); } /** @@ -2656,18 +2726,29 @@ public class HMaster extends HRegionServer implements MasterServices { * @return procedure id */ long createNamespace(final NamespaceDescriptor namespaceDescriptor, final long nonceGroup, - final long nonce) - throws IOException { + final long nonce) throws IOException { checkInitialized(); + TableName.isLegalNamespaceName(Bytes.toBytes(namespaceDescriptor.getName())); - if (this.cpHost != null && this.cpHost.preCreateNamespace(namespaceDescriptor)) { - throw new BypassCoprocessorException(); - } - LOG.info(getClientIdAuditPrefix() + " creating " + namespaceDescriptor); - // Execute the operation synchronously - wait for the operation to complete before continuing. - long procId = getClusterSchema().createNamespace(namespaceDescriptor, nonceGroup, nonce); - if (this.cpHost != null) this.cpHost.postCreateNamespace(namespaceDescriptor); - return procId; + + return MasterProcedureUtil.submitProcedure( + new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) { + @Override + protected void run() throws IOException { + if (getMaster().getMasterCoprocessorHost().preCreateNamespace(namespaceDescriptor)) { + throw new BypassCoprocessorException(); + } + LOG.info(getClientIdAuditPrefix() + " creating " + namespaceDescriptor); + // Execute the operation synchronously - wait for the operation to complete before continuing. + setProcId(getClusterSchema().createNamespace(namespaceDescriptor, getNonceKey())); + getMaster().getMasterCoprocessorHost().postCreateNamespace(namespaceDescriptor); + } + + @Override + protected String getDescription() { + return "CreateTableProcedure"; + } + }); } /** @@ -2678,18 +2759,29 @@ public class HMaster extends HRegionServer implements MasterServices { * @return procedure id */ long modifyNamespace(final NamespaceDescriptor namespaceDescriptor, final long nonceGroup, - final long nonce) - throws IOException { + final long nonce) throws IOException { checkInitialized(); + TableName.isLegalNamespaceName(Bytes.toBytes(namespaceDescriptor.getName())); - if (this.cpHost != null && this.cpHost.preModifyNamespace(namespaceDescriptor)) { - throw new BypassCoprocessorException(); - } - LOG.info(getClientIdAuditPrefix() + " modify " + namespaceDescriptor); - // Execute the operation synchronously - wait for the operation to complete before continuing. - long procId = getClusterSchema().modifyNamespace(namespaceDescriptor, nonceGroup, nonce); - if (this.cpHost != null) this.cpHost.postModifyNamespace(namespaceDescriptor); - return procId; + + return MasterProcedureUtil.submitProcedure( + new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) { + @Override + protected void run() throws IOException { + if (getMaster().getMasterCoprocessorHost().preModifyNamespace(namespaceDescriptor)) { + throw new BypassCoprocessorException(); + } + LOG.info(getClientIdAuditPrefix() + " modify " + namespaceDescriptor); + // Execute the operation synchronously - wait for the operation to complete before continuing. + setProcId(getClusterSchema().modifyNamespace(namespaceDescriptor, getNonceKey())); + getMaster().getMasterCoprocessorHost().postModifyNamespace(namespaceDescriptor); + } + + @Override + protected String getDescription() { + return "CreateTableProcedure"; + } + }); } /** @@ -2700,16 +2792,27 @@ public class HMaster extends HRegionServer implements MasterServices { * @return procedure id */ long deleteNamespace(final String name, final long nonceGroup, final long nonce) - throws IOException { + throws IOException { checkInitialized(); - if (this.cpHost != null && this.cpHost.preDeleteNamespace(name)) { - throw new BypassCoprocessorException(); - } - LOG.info(getClientIdAuditPrefix() + " delete " + name); - // Execute the operation synchronously - wait for the operation to complete before continuing. - long procId = getClusterSchema().deleteNamespace(name, nonceGroup, nonce); - if (this.cpHost != null) this.cpHost.postDeleteNamespace(name); - return procId; + + return MasterProcedureUtil.submitProcedure( + new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) { + @Override + protected void run() throws IOException { + if (getMaster().getMasterCoprocessorHost().preDeleteNamespace(name)) { + throw new BypassCoprocessorException(); + } + LOG.info(getClientIdAuditPrefix() + " delete " + name); + // Execute the operation synchronously - wait for the operation to complete before continuing. + setProcId(getClusterSchema().deleteNamespace(name, getNonceKey())); + getMaster().getMasterCoprocessorHost().postDeleteNamespace(name); + } + + @Override + protected String getDescription() { + return "DeleteNamespaceProcedure"; + } + }); } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/da356069/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java index 709b3f2..2990076 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java @@ -1199,16 +1199,8 @@ public class MasterRpcServices extends RSRpcServices public RestoreSnapshotResponse restoreSnapshot(RpcController controller, RestoreSnapshotRequest request) throws ServiceException { try { - master.checkInitialized(); - master.snapshotManager.checkSnapshotSupport(); - - // Ensure namespace exists. Will throw exception if non-known NS. - TableName dstTable = TableName.valueOf(request.getSnapshot().getTable()); - master.getClusterSchema().getNamespace(dstTable.getNamespaceAsString()); - - SnapshotDescription reqSnapshot = request.getSnapshot(); - long procId = master.snapshotManager.restoreOrCloneSnapshot( - reqSnapshot, request.getNonceGroup(), request.getNonce()); + long procId = master.restoreSnapshot(request.getSnapshot(), + request.getNonceGroup(), request.getNonce()); return RestoreSnapshotResponse.newBuilder().setProcId(procId).build(); } catch (ForeignException e) { throw new ServiceException(e.getCause()); @@ -1356,7 +1348,7 @@ public class MasterRpcServices extends RSRpcServices master.checkServiceStarted(); RegionStateTransition rt = req.getTransition(0); RegionStates regionStates = master.getAssignmentManager().getRegionStates(); - for (RegionInfo ri : rt.getRegionInfoList()) { + for (RegionInfo ri : rt.getRegionInfoList()) { TableName tableName = ProtobufUtil.toTableName(ri.getTableName()); if (!(TableName.META_TABLE_NAME.equals(tableName) && regionStates.getRegionState(HRegionInfo.FIRST_META_REGIONINFO) != null) http://git-wip-us.apache.org/repos/asf/hbase/blob/da356069/hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableNamespaceManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableNamespaceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableNamespaceManager.java index 2f55988..2f06972 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableNamespaceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableNamespaceManager.java @@ -216,10 +216,9 @@ public class TableNamespaceManager { * Note, by-passes notifying coprocessors and name checks. Use for system namespaces only. */ private void blockingCreateNamespace(final NamespaceDescriptor namespaceDescriptor) - throws IOException { + throws IOException { ClusterSchema clusterSchema = this.masterServices.getClusterSchema(); - long procId = - clusterSchema.createNamespace(namespaceDescriptor, HConstants.NO_NONCE, HConstants.NO_NONCE); + long procId = clusterSchema.createNamespace(namespaceDescriptor, null); block(this.masterServices, procId); } http://git-wip-us.apache.org/repos/asf/hbase/blob/da356069/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureUtil.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureUtil.java index 646e337..9706107 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureUtil.java @@ -18,12 +18,18 @@ package org.apache.hadoop.hbase.master.procedure; +import java.io.IOException; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation; import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.util.NonceKey; import org.apache.hadoop.security.UserGroupInformation; @InterfaceAudience.Private @@ -54,4 +60,85 @@ public final class MasterProcedureUtil { } return null; } + + /** + * Helper Runnable used in conjunction with submitProcedure() to deal with + * submitting procs with nonce. + * See submitProcedure() for an example. + */ + public static abstract class NonceProcedureRunnable { + private final MasterServices master; + private final NonceKey nonceKey; + private Long procId; + + public NonceProcedureRunnable(final MasterServices master, + final long nonceGroup, final long nonce) { + this.master = master; + this.nonceKey = getProcedureExecutor().createNonceKey(nonceGroup, nonce); + } + + protected NonceKey getNonceKey() { + return nonceKey; + } + + protected MasterServices getMaster() { + return master; + } + + protected ProcedureExecutor<MasterProcedureEnv> getProcedureExecutor() { + return master.getMasterProcedureExecutor(); + } + + protected long getProcId() { + return procId != null ? procId.longValue() : -1; + } + + protected long setProcId(final long procId) { + this.procId = procId; + return procId; + } + + protected abstract void run() throws IOException; + protected abstract String getDescription(); + + protected long submitProcedure(final Procedure proc) { + assert procId == null : "submitProcedure() was already called, running procId=" + procId; + procId = getProcedureExecutor().submitProcedure(proc, nonceKey); + return procId; + } + } + + /** + * Helper used to deal with submitting procs with nonce. + * Internally the NonceProcedureRunnable.run() will be called only if no one else + * registered the nonce. any Exception thrown by the run() method will be + * collected/handled and rethrown. + * <code> + * long procId = MasterProcedureUtil.submitProcedure( + * new NonceProcedureRunnable(procExec, nonceGroup, nonce) { + * {@literal @}Override + * public void run() { + * cpHost.preOperation(); + * submitProcedure(new MyProc()); + * cpHost.postOperation(); + * } + * }); + * </code> + */ + public static long submitProcedure(final NonceProcedureRunnable runnable) throws IOException { + final ProcedureExecutor<MasterProcedureEnv> procExec = runnable.getProcedureExecutor(); + final long procId = procExec.registerNonce(runnable.getNonceKey()); + if (procId >= 0) return procId; // someone already registered the nonce + try { + runnable.run(); + } catch (IOException e) { + procExec.setFailureResultForNonce(runnable.getNonceKey(), + runnable.getDescription(), + procExec.getEnvironment().getRequestUser(), e); + throw e; + } finally { + procExec.unregisterNonceIfProcedureWasNotSubmitted(runnable.getNonceKey()); + } + return runnable.getProcId(); + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/da356069/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java index 7c7c511..b950079 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java @@ -85,6 +85,7 @@ import org.apache.hadoop.hbase.snapshot.UnknownSnapshotException; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.KeyLocker; +import org.apache.hadoop.hbase.util.NonceKey; import org.apache.zookeeper.KeeperException; /** @@ -674,18 +675,13 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable * @param tableName table to clone * @param snapshot Snapshot Descriptor * @param snapshotTableDesc Table Descriptor - * @param nonceGroup unique value to prevent duplicated RPC - * @param nonce unique value to prevent duplicated RPC + * @param nonceKey unique identifier to prevent duplicated RPC * @return procId the ID of the clone snapshot procedure * @throws IOException */ - private long cloneSnapshot( - final SnapshotDescription reqSnapshot, - final TableName tableName, - final SnapshotDescription snapshot, - final HTableDescriptor snapshotTableDesc, - final long nonceGroup, - final long nonce) throws IOException { + private long cloneSnapshot(final SnapshotDescription reqSnapshot, final TableName tableName, + final SnapshotDescription snapshot, final HTableDescriptor snapshotTableDesc, + final NonceKey nonceKey) throws IOException { MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost(); HTableDescriptor htd = new HTableDescriptor(tableName, snapshotTableDesc); if (cpHost != null) { @@ -693,7 +689,7 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable } long procId; try { - procId = cloneSnapshot(snapshot, htd, nonceGroup, nonce); + procId = cloneSnapshot(snapshot, htd, nonceKey); } catch (IOException e) { LOG.error("Exception occurred while cloning the snapshot " + snapshot.getName() + " as table " + tableName.getNameAsString(), e); @@ -713,15 +709,12 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable * * @param snapshot Snapshot Descriptor * @param hTableDescriptor Table Descriptor of the table to create - * @param nonceGroup unique value to prevent duplicated RPC - * @param nonce unique value to prevent duplicated RPC + * @param nonceKey unique identifier to prevent duplicated RPC * @return procId the ID of the clone snapshot procedure */ - synchronized long cloneSnapshot( - final SnapshotDescription snapshot, - final HTableDescriptor hTableDescriptor, - final long nonceGroup, - final long nonce) throws HBaseSnapshotException { + synchronized long cloneSnapshot(final SnapshotDescription snapshot, + final HTableDescriptor hTableDescriptor, final NonceKey nonceKey) + throws HBaseSnapshotException { TableName tableName = hTableDescriptor.getTableName(); // make sure we aren't running a snapshot on the same table @@ -738,8 +731,7 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable long procId = master.getMasterProcedureExecutor().submitProcedure( new CloneSnapshotProcedure( master.getMasterProcedureExecutor().getEnvironment(), hTableDescriptor, snapshot), - nonceGroup, - nonce); + nonceKey); this.restoreTableToProcIdMap.put(tableName, procId); return procId; } catch (Exception e) { @@ -753,14 +745,11 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable /** * Restore or Clone the specified snapshot * @param reqSnapshot - * @param nonceGroup unique value to prevent duplicated RPC - * @param nonce unique value to prevent duplicated RPC + * @param nonceKey unique identifier to prevent duplicated RPC * @throws IOException */ - public long restoreOrCloneSnapshot( - SnapshotDescription reqSnapshot, - final long nonceGroup, - final long nonce) throws IOException { + public long restoreOrCloneSnapshot(final SnapshotDescription reqSnapshot, final NonceKey nonceKey) + throws IOException { FileSystem fs = master.getMasterFileSystem().getFileSystem(); Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(reqSnapshot, rootDir); @@ -789,11 +778,9 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable // Execute the restore/clone operation long procId; if (MetaTableAccessor.tableExists(master.getConnection(), tableName)) { - procId = restoreSnapshot( - reqSnapshot, tableName, snapshot, snapshotTableDesc, nonceGroup, nonce); + procId = restoreSnapshot(reqSnapshot, tableName, snapshot, snapshotTableDesc, nonceKey); } else { - procId = cloneSnapshot( - reqSnapshot, tableName, snapshot, snapshotTableDesc, nonceGroup, nonce); + procId = cloneSnapshot(reqSnapshot, tableName, snapshot, snapshotTableDesc, nonceKey); } return procId; } @@ -806,18 +793,13 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable * @param tableName table to restore * @param snapshot Snapshot Descriptor * @param snapshotTableDesc Table Descriptor - * @param nonceGroup unique value to prevent duplicated RPC - * @param nonce unique value to prevent duplicated RPC + * @param nonceKey unique identifier to prevent duplicated RPC * @return procId the ID of the restore snapshot procedure * @throws IOException */ - private long restoreSnapshot( - final SnapshotDescription reqSnapshot, - final TableName tableName, - final SnapshotDescription snapshot, - final HTableDescriptor snapshotTableDesc, - final long nonceGroup, - final long nonce) throws IOException { + private long restoreSnapshot(final SnapshotDescription reqSnapshot, final TableName tableName, + final SnapshotDescription snapshot, final HTableDescriptor snapshotTableDesc, + final NonceKey nonceKey) throws IOException { MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost(); if (master.getTableStateManager().isTableState( @@ -834,7 +816,7 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable long procId; try { - procId = restoreSnapshot(snapshot, snapshotTableDesc, nonceGroup, nonce); + procId = restoreSnapshot(snapshot, snapshotTableDesc, nonceKey); } catch (IOException e) { LOG.error("Exception occurred while restoring the snapshot " + snapshot.getName() + " as table " + tableName.getNameAsString(), e); @@ -855,16 +837,13 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable * * @param snapshot Snapshot Descriptor * @param hTableDescriptor Table Descriptor - * @param nonceGroup unique value to prevent duplicated RPC - * @param nonce unique value to prevent duplicated RPC + * @param nonceKey unique identifier to prevent duplicated RPC * @return procId the ID of the restore snapshot procedure */ - private synchronized long restoreSnapshot( - final SnapshotDescription snapshot, - final HTableDescriptor hTableDescriptor, - final long nonceGroup, - final long nonce) throws HBaseSnapshotException { - TableName tableName = hTableDescriptor.getTableName(); + private synchronized long restoreSnapshot(final SnapshotDescription snapshot, + final HTableDescriptor hTableDescriptor, final NonceKey nonceKey) + throws HBaseSnapshotException { + final TableName tableName = hTableDescriptor.getTableName(); // make sure we aren't running a snapshot on the same table if (isTakingSnapshot(tableName)) { @@ -880,8 +859,7 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable long procId = master.getMasterProcedureExecutor().submitProcedure( new RestoreSnapshotProcedure( master.getMasterProcedureExecutor().getEnvironment(), hTableDescriptor, snapshot), - nonceGroup, - nonce); + nonceKey); this.restoreTableToProcIdMap.put(tableName, procId); return procId; } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/hbase/blob/da356069/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestAddColumnFamilyProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestAddColumnFamilyProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestAddColumnFamilyProcedure.java index 2840e4d..efe63ea 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestAddColumnFamilyProcedure.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestAddColumnFamilyProcedure.java @@ -51,9 +51,7 @@ public class TestAddColumnFamilyProcedure extends TestTableDDLProcedureBase { // Test 1: Add a column family online long procId1 = procExec.submitProcedure( - new AddColumnFamilyProcedure(procExec.getEnvironment(), tableName, columnDescriptor1), - nonceGroup, - nonce); + new AddColumnFamilyProcedure(procExec.getEnvironment(), tableName, columnDescriptor1)); // Wait the completion ProcedureTestingUtility.waitProcedure(procExec, procId1); ProcedureTestingUtility.assertProcNotFailed(procExec, procId1); @@ -64,9 +62,7 @@ public class TestAddColumnFamilyProcedure extends TestTableDDLProcedureBase { // Test 2: Add a column family offline UTIL.getHBaseAdmin().disableTable(tableName); long procId2 = procExec.submitProcedure( - new AddColumnFamilyProcedure(procExec.getEnvironment(), tableName, columnDescriptor2), - nonceGroup + 1, - nonce + 1); + new AddColumnFamilyProcedure(procExec.getEnvironment(), tableName, columnDescriptor2)); // Wait the completion ProcedureTestingUtility.waitProcedure(procExec, procId2); ProcedureTestingUtility.assertProcNotFailed(procExec, procId2); @@ -86,9 +82,7 @@ public class TestAddColumnFamilyProcedure extends TestTableDDLProcedureBase { // add the column family long procId1 = procExec.submitProcedure( - new AddColumnFamilyProcedure(procExec.getEnvironment(), tableName, columnDescriptor), - nonceGroup, - nonce); + new AddColumnFamilyProcedure(procExec.getEnvironment(), tableName, columnDescriptor)); // Wait the completion ProcedureTestingUtility.waitProcedure(procExec, procId1); ProcedureTestingUtility.assertProcNotFailed(procExec, procId1); @@ -97,9 +91,7 @@ public class TestAddColumnFamilyProcedure extends TestTableDDLProcedureBase { // add the column family that exists long procId2 = procExec.submitProcedure( - new AddColumnFamilyProcedure(procExec.getEnvironment(), tableName, columnDescriptor), - nonceGroup + 1, - nonce + 1); + new AddColumnFamilyProcedure(procExec.getEnvironment(), tableName, columnDescriptor)); // Wait the completion ProcedureTestingUtility.waitProcedure(procExec, procId2); @@ -113,9 +105,7 @@ public class TestAddColumnFamilyProcedure extends TestTableDDLProcedureBase { // Do the same add the existing column family - this time offline UTIL.getHBaseAdmin().disableTable(tableName); long procId3 = procExec.submitProcedure( - new AddColumnFamilyProcedure(procExec.getEnvironment(), tableName, columnDescriptor), - nonceGroup + 2, - nonce + 2); + new AddColumnFamilyProcedure(procExec.getEnvironment(), tableName, columnDescriptor)); // Wait the completion ProcedureTestingUtility.waitProcedure(procExec, procId3); @@ -127,37 +117,6 @@ public class TestAddColumnFamilyProcedure extends TestTableDDLProcedureBase { ProcedureTestingUtility.getExceptionCause(result) instanceof InvalidFamilyOperationException); } - @Test(timeout=60000) - public void testAddSameColumnFamilyTwiceWithSameNonce() throws Exception { - final TableName tableName = TableName.valueOf("testAddSameColumnFamilyTwiceWithSameNonce"); - final String cf2 = "cf2"; - final HColumnDescriptor columnDescriptor = new HColumnDescriptor(cf2); - - final ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor(); - - MasterProcedureTestingUtility.createTable(procExec, tableName, null, "f1"); - - // add the column family - long procId1 = procExec.submitProcedure( - new AddColumnFamilyProcedure(procExec.getEnvironment(), tableName, columnDescriptor), - nonceGroup, - nonce); - long procId2 = procExec.submitProcedure( - new AddColumnFamilyProcedure(procExec.getEnvironment(), tableName, columnDescriptor), - nonceGroup, - nonce); - // Wait the completion - ProcedureTestingUtility.waitProcedure(procExec, procId1); - ProcedureTestingUtility.assertProcNotFailed(procExec, procId1); - MasterProcedureTestingUtility.validateColumnFamilyAddition(UTIL.getHBaseCluster().getMaster(), - tableName, cf2); - - // Wait the completion and expect not fail - because it is the same proc - ProcedureTestingUtility.waitProcedure(procExec, procId2); - ProcedureTestingUtility.assertProcNotFailed(procExec, procId2); - assertTrue(procId1 == procId2); - } - @Test(timeout = 60000) public void testRecoveryAndDoubleExecutionOffline() throws Exception { final TableName tableName = TableName.valueOf("testRecoveryAndDoubleExecutionOffline"); @@ -173,9 +132,7 @@ public class TestAddColumnFamilyProcedure extends TestTableDDLProcedureBase { // Start the AddColumnFamily procedure && kill the executor long procId = procExec.submitProcedure( - new AddColumnFamilyProcedure(procExec.getEnvironment(), tableName, columnDescriptor), - nonceGroup, - nonce); + new AddColumnFamilyProcedure(procExec.getEnvironment(), tableName, columnDescriptor)); // Restart the executor and execute the step twice int numberOfSteps = AddColumnFamilyState.values().length; @@ -199,9 +156,7 @@ public class TestAddColumnFamilyProcedure extends TestTableDDLProcedureBase { // Start the AddColumnFamily procedure && kill the executor long procId = procExec.submitProcedure( - new AddColumnFamilyProcedure(procExec.getEnvironment(), tableName, columnDescriptor), - nonceGroup, - nonce); + new AddColumnFamilyProcedure(procExec.getEnvironment(), tableName, columnDescriptor)); // Restart the executor and execute the step twice int numberOfSteps = AddColumnFamilyState.values().length; @@ -225,9 +180,7 @@ public class TestAddColumnFamilyProcedure extends TestTableDDLProcedureBase { // Start the AddColumnFamily procedure && kill the executor long procId = procExec.submitProcedure( - new AddColumnFamilyProcedure(procExec.getEnvironment(), tableName, columnDescriptor), - nonceGroup, - nonce); + new AddColumnFamilyProcedure(procExec.getEnvironment(), tableName, columnDescriptor)); int numberOfSteps = 1; // failing at "pre operations" MasterProcedureTestingUtility.testRollbackAndDoubleExecution(procExec, procId, numberOfSteps);
