HBASE-15579 Procedure v2 - Remove synchronized around nonce in Procedure submit
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/bfca2a46 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/bfca2a46 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/bfca2a46 Branch: refs/heads/HBASE-14850 Commit: bfca2a460694bb2abc720a582318bee4ddc29c0f Parents: 57e1dbc Author: Matteo Bertozzi <[email protected]> Authored: Fri Apr 22 10:13:13 2016 -0700 Committer: Matteo Bertozzi <[email protected]> Committed: Fri Apr 22 10:15:58 2016 -0700 ---------------------------------------------------------------------- .../hbase/procedure2/ProcedureExecutor.java | 43 ++++++++------------ .../store/wal/TestWALProcedureStore.java | 26 ++++++++++++ 2 files changed, 42 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/bfca2a46/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java index 3f0ba37..f43b65f 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java @@ -635,34 +635,23 @@ public class ProcedureExecutor<TEnvironment> { Preconditions.checkArgument(lastProcId.get() >= 0); Preconditions.checkArgument(!proc.hasParent()); - Long currentProcId; - - // The following part of the code has to be synchronized to prevent multiple request - // with the same nonce to execute at the same time. - synchronized (this) { - // Check whether the proc exists. If exist, just return the proc id. - // This is to prevent the same proc to submit multiple times (it could happen - // when client could not talk to server and resubmit the same request). - NonceKey noncekey = null; - if (nonce != HConstants.NO_NONCE) { - noncekey = new NonceKey(nonceGroup, nonce); - currentProcId = nonceKeysToProcIdsMap.get(noncekey); - if (currentProcId != null) { - // Found the proc - return currentProcId; - } + // Initialize the Procedure ID + long currentProcId = nextProcId(); + proc.setProcId(currentProcId); + + // Check whether the proc exists. If exist, just return the proc id. + // This is to prevent the same proc to submit multiple times (it could happen + // when client could not talk to server and resubmit the same request). + if (nonce != HConstants.NO_NONCE) { + NonceKey noncekey = new NonceKey(nonceGroup, nonce); + proc.setNonceKey(noncekey); + + Long oldProcId = nonceKeysToProcIdsMap.putIfAbsent(noncekey, currentProcId); + if (oldProcId != null) { + // Found the proc + return oldProcId.longValue(); } - - // Initialize the Procedure ID - currentProcId = nextProcId(); - proc.setProcId(currentProcId); - - // This is new procedure. Set the noncekey and insert into the map. - if (noncekey != null) { - proc.setNonceKey(noncekey); - nonceKeysToProcIdsMap.put(noncekey, currentProcId); - } - } // end of synchronized (this) + } // Commit the transaction store.insert(proc, null); http://git-wip-us.apache.org/repos/asf/hbase/blob/bfca2a46/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java index b9a439a..88c85ba 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java @@ -208,6 +208,32 @@ public class TestWALProcedureStore { } @Test + public void testProcIdHoles() throws Exception { + // Insert + for (int i = 0; i < 100; i += 2) { + procStore.insert(new TestProcedure(i), null); + if (i > 0 && (i % 10) == 0) { + LoadCounter loader = new LoadCounter(); + storeRestart(loader); + assertEquals(0, loader.getCorruptedCount()); + assertEquals((i / 2) + 1, loader.getLoadedCount()); + } + } + assertEquals(10, procStore.getActiveLogs().size()); + + // Delete + for (int i = 0; i < 100; i += 2) { + procStore.delete(i); + } + assertEquals(1, procStore.getActiveLogs().size()); + + LoadCounter loader = new LoadCounter(); + storeRestart(loader); + assertEquals(0, loader.getLoadedCount()); + assertEquals(0, loader.getCorruptedCount()); + } + + @Test public void testCorruptedTrailer() throws Exception { // Insert something for (int i = 0; i < 100; ++i) {
