Repository: hbase Updated Branches: refs/heads/branch-1 440e0bc6e -> 9135ad1be refs/heads/branch-1.2 e65fda7b1 -> 229965646 refs/heads/master b98598f36 -> 7382f8e04
HBASE-14106 TestProcedureRecovery is flaky Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/7382f8e0 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/7382f8e0 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/7382f8e0 Branch: refs/heads/master Commit: 7382f8e0459ca5f25965e46ab1ea902ad7463713 Parents: b98598f Author: Matteo Bertozzi <[email protected]> Authored: Fri Jul 17 10:20:33 2015 -0700 Committer: Matteo Bertozzi <[email protected]> Committed: Fri Jul 17 10:20:33 2015 -0700 ---------------------------------------------------------------------- .../hbase/procedure2/TestProcedureRecovery.java | 71 ++++++++++++++------ 1 file changed, 49 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/7382f8e0/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java index 24a448e..1a4845c 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.CountDownLatch; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -53,7 +54,8 @@ public class TestProcedureRecovery { private static final int PROCEDURE_EXECUTOR_SLOTS = 1; private static final Procedure NULL_PROC = null; - private static ProcedureExecutor<Void> procExecutor; + private static TestProcEnv procEnv; + private static ProcedureExecutor<TestProcEnv> procExecutor; private static ProcedureStore procStore; private static int procSleepInterval; @@ -70,15 +72,13 @@ public class TestProcedureRecovery { assertTrue(testDir.depth() > 1); logDir = new Path(testDir, "proc-logs"); + procEnv = new TestProcEnv(); procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), fs, logDir); - procExecutor = new ProcedureExecutor(htu.getConfiguration(), null, procStore); + procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore); procExecutor.testing = new ProcedureExecutor.Testing(); procStore.start(PROCEDURE_EXECUTOR_SLOTS); procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true); procSleepInterval = 0; - - ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExecutor, false); - ProcedureTestingUtility.setKillBeforeStoreUpdate(procExecutor, false); } @After @@ -94,13 +94,14 @@ public class TestProcedureRecovery { dumpLogDirState(); } - public static class TestSingleStepProcedure extends SequentialProcedure<Void> { + public static class TestSingleStepProcedure extends SequentialProcedure<TestProcEnv> { private int step = 0; public TestSingleStepProcedure() { } @Override - protected Procedure[] execute(Void env) { + protected Procedure[] execute(TestProcEnv env) throws InterruptedException { + env.waitOnLatch(); LOG.debug("execute procedure " + this + " step=" + step); step++; setResult(Bytes.toBytes(step)); @@ -108,18 +109,19 @@ public class TestProcedureRecovery { } @Override - protected void rollback(Void env) { } + protected void rollback(TestProcEnv env) { } @Override - protected boolean abort(Void env) { return true; } + protected boolean abort(TestProcEnv env) { return true; } } - public static class BaseTestStepProcedure extends SequentialProcedure<Void> { + public static class BaseTestStepProcedure extends SequentialProcedure<TestProcEnv> { private AtomicBoolean abort = new AtomicBoolean(false); private int step = 0; @Override - protected Procedure[] execute(Void env) { + protected Procedure[] execute(TestProcEnv env) throws InterruptedException { + env.waitOnLatch(); LOG.debug("execute procedure " + this + " step=" + step); ProcedureTestingUtility.toggleKillBeforeStoreUpdate(procExecutor); step++; @@ -134,14 +136,14 @@ public class TestProcedureRecovery { } @Override - protected void rollback(Void env) { + protected void rollback(TestProcEnv env) { LOG.debug("rollback procedure " + this + " step=" + step); ProcedureTestingUtility.toggleKillBeforeStoreUpdate(procExecutor); step++; } @Override - protected boolean abort(Void env) { + protected boolean abort(TestProcEnv env) { abort.set(true); return true; } @@ -161,7 +163,7 @@ public class TestProcedureRecovery { public TestMultiStepProcedure() { } @Override - public Procedure[] execute(Void env) { + public Procedure[] execute(TestProcEnv env) throws InterruptedException { super.execute(env); return isFailed() ? null : new Procedure[] { new Step1Procedure() }; } @@ -170,7 +172,7 @@ public class TestProcedureRecovery { public Step1Procedure() { } @Override - protected Procedure[] execute(Void env) { + protected Procedure[] execute(TestProcEnv env) throws InterruptedException { super.execute(env); return isFailed() ? null : new Procedure[] { new Step2Procedure() }; } @@ -298,6 +300,8 @@ public class TestProcedureRecovery { // Restart restart(); + waitProcedure(procId); + Procedure proc2 = new TestSingleStepProcedure(); // Submit a procedure with the same nonce and expect the same procedure would return. long procId2 = ProcedureTestingUtility.submitAndWait(procExecutor, proc2, nonceGroup, nonce); @@ -314,17 +318,23 @@ public class TestProcedureRecovery { Procedure proc = new TestMultiStepProcedure(); long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc, nonceGroup, nonce); - // Restart + // Restart (use a latch to prevent the step execution until we submitted proc2) + CountDownLatch latch = new CountDownLatch(1); + procEnv.setWaitLatch(latch); restart(); - Procedure proc2 = new TestMultiStepProcedure(); // Submit a procedure with the same nonce and expect the same procedure would return. - long procId2 = ProcedureTestingUtility.submitAndWait(procExecutor, proc2, nonceGroup, nonce); + Procedure proc2 = new TestMultiStepProcedure(); + long procId2 = procExecutor.submitProcedure(proc2, nonceGroup, nonce); + latch.countDown(); + procEnv.setWaitLatch(null); + // The original proc is not completed and the new submission should have the same proc Id. assertTrue(procId == procId2); } + public static class TestStateMachineProcedure - extends StateMachineProcedure<Void, TestStateMachineProcedure.State> { + extends StateMachineProcedure<TestProcEnv, TestStateMachineProcedure.State> { enum State { STATE_1, STATE_2, STATE_3, DONE } public TestStateMachineProcedure() {} @@ -333,7 +343,7 @@ public class TestProcedureRecovery { private int iResult = 0; @Override - protected StateMachineProcedure.Flow executeFromState(Void env, State state) { + protected StateMachineProcedure.Flow executeFromState(TestProcEnv env, State state) { switch (state) { case STATE_1: LOG.info("execute step 1 " + this); @@ -364,7 +374,7 @@ public class TestProcedureRecovery { } @Override - protected void rollbackState(Void env, final State state) { + protected void rollbackState(TestProcEnv env, final State state) { switch (state) { case STATE_1: LOG.info("rollback step 1 " + this); @@ -396,7 +406,7 @@ public class TestProcedureRecovery { } @Override - protected boolean abort(Void env) { + protected boolean abort(TestProcEnv env) { aborted.set(true); return true; } @@ -522,4 +532,21 @@ public class TestProcedureRecovery { LOG.warn("Unable to dump " + logDir, e); } } + + private static class TestProcEnv { + private CountDownLatch latch = null; + + /** + * set/unset a latch. every procedure execute() step will wait on the latch if any. + */ + public void setWaitLatch(CountDownLatch latch) { + this.latch = latch; + } + + public void waitOnLatch() throws InterruptedException { + if (latch != null) { + latch.await(); + } + } + } } \ No newline at end of file
