HBASE-14648 Reenable TestWALProcedureStoreOnHDFS#testWalRollOnLowReplication (Heng Chen)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/46c646d6 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/46c646d6 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/46c646d6 Branch: refs/heads/hbase-12439 Commit: 46c646d6154d6a5291c308e065802248f4e4e7b9 Parents: 9597847 Author: stack <st...@apache.org> Authored: Sat Oct 24 20:55:37 2015 -0700 Committer: stack <st...@apache.org> Committed: Sat Oct 24 20:55:47 2015 -0700 ---------------------------------------------------------------------- .../procedure/TestWALProcedureStoreOnHDFS.java | 167 +++++++++---------- 1 file changed, 79 insertions(+), 88 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/46c646d6/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestWALProcedureStoreOnHDFS.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestWALProcedureStoreOnHDFS.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestWALProcedureStoreOnHDFS.java index b360966..2e80b69 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestWALProcedureStoreOnHDFS.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestWALProcedureStoreOnHDFS.java @@ -66,7 +66,7 @@ public class TestWALProcedureStoreOnHDFS { } }; - private static void setupConf(Configuration conf) { + private static void initConfig(Configuration conf) { conf.setInt("dfs.replication", 3); conf.setInt("dfs.namenode.replication.min", 3); @@ -76,20 +76,16 @@ public class TestWALProcedureStoreOnHDFS { conf.setInt("hbase.procedure.store.wal.sync.failure.roll.max", 10); } - @Before public void setup() throws Exception { - setupConf(UTIL.getConfiguration()); MiniDFSCluster dfs = UTIL.startMiniDFSCluster(3); Path logDir = new Path(new Path(dfs.getFileSystem().getUri()), "/test-logs"); - store = ProcedureTestingUtility.createWalStore( - UTIL.getConfiguration(), dfs.getFileSystem(), logDir); + store = ProcedureTestingUtility.createWalStore(UTIL.getConfiguration(), dfs.getFileSystem(), logDir); store.registerListener(stopProcedureListener); store.start(8); store.recoverLease(); } - @After public void tearDown() throws Exception { store.stop(false); UTIL.getDFSCluster().getFileSystem().delete(store.getLogDir(), true); @@ -103,107 +99,102 @@ public class TestWALProcedureStoreOnHDFS { @Test(timeout=60000, expected=RuntimeException.class) public void testWalAbortOnLowReplication() throws Exception { - assertEquals(3, UTIL.getDFSCluster().getDataNodes().size()); - - LOG.info("Stop DataNode"); - UTIL.getDFSCluster().stopDataNode(0); - assertEquals(2, UTIL.getDFSCluster().getDataNodes().size()); + initConfig(UTIL.getConfiguration()); + setup(); + try { + assertEquals(3, UTIL.getDFSCluster().getDataNodes().size()); - store.insert(new TestProcedure(1, -1), null); - for (long i = 2; store.isRunning(); ++i) { + LOG.info("Stop DataNode"); + UTIL.getDFSCluster().stopDataNode(0); assertEquals(2, UTIL.getDFSCluster().getDataNodes().size()); - store.insert(new TestProcedure(i, -1), null); - Thread.sleep(100); + + store.insert(new TestProcedure(1, -1), null); + for (long i = 2; store.isRunning(); ++i) { + assertEquals(2, UTIL.getDFSCluster().getDataNodes().size()); + store.insert(new TestProcedure(i, -1), null); + Thread.sleep(100); + } + assertFalse(store.isRunning()); + fail("The store.insert() should throw an exeption"); + } finally { + tearDown(); } - assertFalse(store.isRunning()); - fail("The store.insert() should throw an exeption"); } @Test(timeout=60000) public void testWalAbortOnLowReplicationWithQueuedWriters() throws Exception { - assertEquals(3, UTIL.getDFSCluster().getDataNodes().size()); - store.registerListener(new ProcedureStore.ProcedureStoreListener() { - @Override - public void postSync() { - Threads.sleepWithoutInterrupt(2000); - } + initConfig(UTIL.getConfiguration()); + setup(); + try { + assertEquals(3, UTIL.getDFSCluster().getDataNodes().size()); + store.registerListener(new ProcedureStore.ProcedureStoreListener() { + @Override + public void postSync() { + Threads.sleepWithoutInterrupt(2000); + } - @Override - public void abortProcess() {} - }); - - final AtomicInteger reCount = new AtomicInteger(0); - Thread[] thread = new Thread[store.getNumThreads() * 2 + 1]; - for (int i = 0; i < thread.length; ++i) { - final long procId = i + 1; - thread[i] = new Thread() { - public void run() { - try { - LOG.debug("[S] INSERT " + procId); - store.insert(new TestProcedure(procId, -1), null); - LOG.debug("[E] INSERT " + procId); - } catch (RuntimeException e) { - reCount.incrementAndGet(); - LOG.debug("[F] INSERT " + procId + ": " + e.getMessage()); + @Override + public void abortProcess() {} + }); + + final AtomicInteger reCount = new AtomicInteger(0); + Thread[] thread = new Thread[store.getNumThreads() * 2 + 1]; + for (int i = 0; i < thread.length; ++i) { + final long procId = i + 1; + thread[i] = new Thread() { + public void run() { + try { + LOG.debug("[S] INSERT " + procId); + store.insert(new TestProcedure(procId, -1), null); + LOG.debug("[E] INSERT " + procId); + } catch (RuntimeException e) { + reCount.incrementAndGet(); + LOG.debug("[F] INSERT " + procId + ": " + e.getMessage()); + } } - } - }; - thread[i].start(); - } + }; + thread[i].start(); + } - Thread.sleep(1000); - LOG.info("Stop DataNode"); - UTIL.getDFSCluster().stopDataNode(0); - assertEquals(2, UTIL.getDFSCluster().getDataNodes().size()); + Thread.sleep(1000); + LOG.info("Stop DataNode"); + UTIL.getDFSCluster().stopDataNode(0); + assertEquals(2, UTIL.getDFSCluster().getDataNodes().size()); - for (int i = 0; i < thread.length; ++i) { - thread[i].join(); - } + for (int i = 0; i < thread.length; ++i) { + thread[i].join(); + } - assertFalse(store.isRunning()); - assertTrue(reCount.toString(), reCount.get() >= store.getNumThreads() && - reCount.get() < thread.length); + assertFalse(store.isRunning()); + assertTrue(reCount.toString(), reCount.get() >= store.getNumThreads() && + reCount.get() < thread.length); + } finally { + tearDown(); + } } - @Ignore ("Needs work") @Test(timeout=60000) + @Test(timeout=60000) public void testWalRollOnLowReplication() throws Exception { - store.unregisterListener(stopProcedureListener); - store.registerListener(new ProcedureStore.ProcedureStoreListener() { - @Override - public void postSync() {} - - @Override - public void abortProcess() { - LOG.info("Aborted!!!!"); - } - }); - int dnCount = 0; - store.insert(new TestProcedure(1, -1), null); - UTIL.getDFSCluster().restartDataNode(dnCount); - for (long i = 2; i < 100; ++i) { - try { + initConfig(UTIL.getConfiguration()); + UTIL.getConfiguration().setInt("dfs.namenode.replication.min", 1); + setup(); + try { + int dnCount = 0; + store.insert(new TestProcedure(1, -1), null); + UTIL.getDFSCluster().restartDataNode(dnCount); + for (long i = 2; i < 100; ++i) { store.insert(new TestProcedure(i, -1), null); - } catch (RuntimeException re) { - String msg = re.getMessage(); - // We could get a sync failed here...if the test cluster is crawling such that DN recovery - // is taking a long time. If we've done enough passes, just finish up the test as a 'pass' - if (msg != null && msg.toLowerCase().contains("sync aborted")) { - LOG.info("i=" + i, re); - if (i > 50) { - LOG.info("Returning early... i=" + i + "...We ran enough of this test", re); - return; - } + waitForNumReplicas(3); + Thread.sleep(100); + if ((i % 30) == 0) { + LOG.info("Restart Data Node"); + UTIL.getDFSCluster().restartDataNode(++dnCount % 3); } - throw re; - } - waitForNumReplicas(3); - Thread.sleep(100); - if ((i % 30) == 0) { - LOG.info("Restart Data Node"); - UTIL.getDFSCluster().restartDataNode(++dnCount % 3); } + assertTrue(store.isRunning()); + } finally { + tearDown(); } - assertTrue(store.isRunning()); } public void waitForNumReplicas(int numReplicas) throws Exception {