Repository: hbase Updated Branches: refs/heads/master 0207da8ce -> e82ccb900
HBASE-14794 Cleanup TestAtomicOperation hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaWithReplicas.java Fix a few missing table closes (This suite seems to leave loads of threads when test is done but have not figured the how yet). hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java Fix some missing table closes. We were leaving around client resources. hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java Close up WALs when done. Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e82ccb90 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e82ccb90 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e82ccb90 Branch: refs/heads/master Commit: e82ccb900ef8165a94f4a9e2bfe849fbe89e2e9c Parents: 0207da8 Author: stack <[email protected]> Authored: Tue Nov 10 15:08:10 2015 -1000 Committer: stack <[email protected]> Committed: Tue Nov 10 15:08:18 2015 -1000 ---------------------------------------------------------------------- .../hbase/client/TestMetaWithReplicas.java | 170 ++++----- .../hbase/mapreduce/TestImportExport.java | 341 +++++++++---------- .../hbase/regionserver/TestAtomicOperation.java | 42 ++- 3 files changed, 283 insertions(+), 270 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/e82ccb90/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaWithReplicas.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaWithReplicas.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaWithReplicas.java index 831738c..7278892 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaWithReplicas.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaWithReplicas.java @@ -170,63 +170,67 @@ public class TestMetaWithReplicas { util.getHBaseAdmin().disableTable(TABLE); util.getHBaseAdmin().deleteTable(TABLE); } - Table htable = util.createTable(TABLE, FAMILIES); - - util.getHBaseAdmin().flush(TableName.META_TABLE_NAME); - Thread.sleep(conf.getInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, - 30000) * 6); - Connection c = ConnectionFactory.createConnection(util.getConfiguration()); - List<HRegionInfo> regions = MetaTableAccessor.getTableRegions(c, TABLE); - HRegionLocation hrl = MetaTableAccessor.getRegionLocation(c, regions.get(0)); - // Ensure that the primary server for test table is not the same one as the primary - // of the meta region since we will be killing the srv holding the meta's primary... - // We want to be able to write to the test table even when the meta is not present .. - // If the servers are the same, then move the test table's region out of the server - // to another random server - if (hrl.getServerName().equals(primary)) { - util.getHBaseAdmin().move(hrl.getRegionInfo().getEncodedNameAsBytes(), null); - // wait for the move to complete - do { - Thread.sleep(10); - hrl = MetaTableAccessor.getRegionLocation(c, regions.get(0)); - } while (primary.equals(hrl.getServerName())); - util.getHBaseAdmin().flush(TableName.META_TABLE_NAME); - Thread.sleep(conf.getInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, - 30000) * 3); - } - ServerName master = util.getHBaseClusterInterface().getClusterStatus().getMaster(); - // kill the master so that regionserver recovery is not triggered at all - // for the meta server - util.getHBaseClusterInterface().stopMaster(master); - util.getHBaseClusterInterface().waitForMasterToStop(master, 60000); - if (!master.equals(primary)) { - util.getHBaseClusterInterface().killRegionServer(primary); - util.getHBaseClusterInterface().waitForRegionServerToStop(primary, 60000); + ServerName master = null; + try (Connection c = ConnectionFactory.createConnection(util.getConfiguration());) { + try (Table htable = util.createTable(TABLE, FAMILIES);) { + util.getHBaseAdmin().flush(TableName.META_TABLE_NAME); + Thread.sleep(conf.getInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, + 30000) * 6); + List<HRegionInfo> regions = MetaTableAccessor.getTableRegions(c, TABLE); + HRegionLocation hrl = MetaTableAccessor.getRegionLocation(c, regions.get(0)); + // Ensure that the primary server for test table is not the same one as the primary + // of the meta region since we will be killing the srv holding the meta's primary... + // We want to be able to write to the test table even when the meta is not present .. + // If the servers are the same, then move the test table's region out of the server + // to another random server + if (hrl.getServerName().equals(primary)) { + util.getHBaseAdmin().move(hrl.getRegionInfo().getEncodedNameAsBytes(), null); + // wait for the move to complete + do { + Thread.sleep(10); + hrl = MetaTableAccessor.getRegionLocation(c, regions.get(0)); + } while (primary.equals(hrl.getServerName())); + util.getHBaseAdmin().flush(TableName.META_TABLE_NAME); + Thread.sleep(conf.getInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, + 30000) * 3); + } + master = util.getHBaseClusterInterface().getClusterStatus().getMaster(); + // kill the master so that regionserver recovery is not triggered at all + // for the meta server + util.getHBaseClusterInterface().stopMaster(master); + util.getHBaseClusterInterface().waitForMasterToStop(master, 60000); + if (!master.equals(primary)) { + util.getHBaseClusterInterface().killRegionServer(primary); + util.getHBaseClusterInterface().waitForRegionServerToStop(primary, 60000); + } + ((ClusterConnection)c).clearRegionCache(); + } + Get get = null; + Result r = null; + byte[] row = "test".getBytes(); + try (Table htable = c.getTable(TABLE);) { + Put put = new Put(row); + put.addColumn("foo".getBytes(), row, row); + BufferedMutator m = c.getBufferedMutator(TABLE); + m.mutate(put); + m.flush(); + // Try to do a get of the row that was just put + get = new Get(row); + r = htable.get(get); + assertTrue(Arrays.equals(r.getRow(), row)); + // now start back the killed servers and disable use of replicas. That would mean + // calls go to the primary + util.getHBaseClusterInterface().startMaster(master.getHostname(), 0); + util.getHBaseClusterInterface().startRegionServer(primary.getHostname(), 0); + util.getHBaseClusterInterface().waitForActiveAndReadyMaster(); + ((ClusterConnection)c).clearRegionCache(); + } + conf.setBoolean(HConstants.USE_META_REPLICAS, false); + try (Table htable = c.getTable(TABLE);) { + r = htable.get(get); + assertTrue(Arrays.equals(r.getRow(), row)); + } } - ((ClusterConnection)c).clearRegionCache(); - htable.close(); - htable = c.getTable(TABLE); - byte[] row = "test".getBytes(); - Put put = new Put(row); - put.addColumn("foo".getBytes(), row, row); - BufferedMutator m = c.getBufferedMutator(TABLE); - m.mutate(put); - m.flush(); - // Try to do a get of the row that was just put - Get get = new Get(row); - Result r = htable.get(get); - assertTrue(Arrays.equals(r.getRow(), row)); - // now start back the killed servers and disable use of replicas. That would mean - // calls go to the primary - util.getHBaseClusterInterface().startMaster(master.getHostname(), 0); - util.getHBaseClusterInterface().startRegionServer(primary.getHostname(), 0); - util.getHBaseClusterInterface().waitForActiveAndReadyMaster(); - ((ClusterConnection)c).clearRegionCache(); - htable.close(); - conf.setBoolean(HConstants.USE_META_REPLICAS, false); - htable = c.getTable(TABLE); - r = htable.get(get); - assertTrue(Arrays.equals(r.getRow(), row)); } @Test @@ -237,13 +241,14 @@ public class TestMetaWithReplicas { TEST_UTIL.getHBaseAdmin().disableTable(TABLE); TEST_UTIL.getHBaseAdmin().deleteTable(TABLE); } - Table htable = TEST_UTIL.createTable(TABLE, FAMILIES); - byte[] row = "test".getBytes(); - ConnectionImplementation c = ((ConnectionImplementation) TEST_UTIL.getConnection()); - // check that metalookup pool would get created - c.relocateRegion(TABLE, row); - ExecutorService ex = c.getCurrentMetaLookupPool(); - assert(ex != null); + try (Table htable = TEST_UTIL.createTable(TABLE, FAMILIES);) { + byte[] row = "test".getBytes(); + ConnectionImplementation c = ((ConnectionImplementation) TEST_UTIL.getConnection()); + // check that metalookup pool would get created + c.relocateRegion(TABLE, row); + ExecutorService ex = c.getCurrentMetaLookupPool(); + assert(ex != null); + } } @Test @@ -416,28 +421,27 @@ public class TestMetaWithReplicas { public void testShutdownOfReplicaHolder() throws Exception { // checks that the when the server holding meta replica is shut down, the meta replica // can be recovered - ClusterConnection conn = (ClusterConnection) - ConnectionFactory.createConnection(TEST_UTIL.getConfiguration()); - RegionLocations rl = conn. - locateRegion(TableName.META_TABLE_NAME, Bytes.toBytes(""), false, true); - HRegionLocation hrl = rl.getRegionLocation(1); - ServerName oldServer = hrl.getServerName(); - TEST_UTIL.getHBaseClusterInterface().killRegionServer(oldServer); - int i = 0; - do { - LOG.debug("Waiting for the replica " + hrl.getRegionInfo() + " to come up"); - Thread.sleep(30000); //wait for the detection/recovery - rl = conn.locateRegion(TableName.META_TABLE_NAME, Bytes.toBytes(""), false, true); - hrl = rl.getRegionLocation(1); - i++; - } while ((hrl == null || hrl.getServerName().equals(oldServer)) && i < 3); - assertTrue(i != 3); - conn.close(); + try (ClusterConnection conn = (ClusterConnection) + ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());) { + RegionLocations rl = conn. + locateRegion(TableName.META_TABLE_NAME, Bytes.toBytes(""), false, true); + HRegionLocation hrl = rl.getRegionLocation(1); + ServerName oldServer = hrl.getServerName(); + TEST_UTIL.getHBaseClusterInterface().killRegionServer(oldServer); + int i = 0; + do { + LOG.debug("Waiting for the replica " + hrl.getRegionInfo() + " to come up"); + Thread.sleep(30000); //wait for the detection/recovery + rl = conn.locateRegion(TableName.META_TABLE_NAME, Bytes.toBytes(""), false, true); + hrl = rl.getRegionLocation(1); + i++; + } while ((hrl == null || hrl.getServerName().equals(oldServer)) && i < 3); + assertTrue(i != 3); + } } @Test public void testHBaseFsckWithExcessMetaReplicas() throws Exception { - HBaseFsck hbck = new HBaseFsck(TEST_UTIL.getConfiguration()); // Create a meta replica (this will be the 4th one) and assign it HRegionInfo h = RegionReplicaUtil.getRegionInfoForReplica( HRegionInfo.FIRST_META_REGIONINFO, 3); @@ -447,7 +451,7 @@ public class TestMetaWithReplicas { TEST_UTIL.getMiniHBaseCluster().getMaster().assignRegion(h); HBaseFsckRepair.waitUntilAssigned(TEST_UTIL.getHBaseAdmin(), h); // check that problem exists - hbck = doFsck(TEST_UTIL.getConfiguration(), false); + HBaseFsck hbck = doFsck(TEST_UTIL.getConfiguration(), false); assertErrors(hbck, new ERROR_CODE[]{ERROR_CODE.UNKNOWN, ERROR_CODE.SHOULD_NOT_BE_DEPLOYED}); // fix the problem hbck = doFsck(TEST_UTIL.getConfiguration(), true); @@ -455,4 +459,4 @@ public class TestMetaWithReplicas { hbck = doFsck(TEST_UTIL.getConfiguration(), false); assertErrors(hbck, new ERROR_CODE[]{}); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/e82ccb90/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java index cb8b06f..05f9f36 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java @@ -169,42 +169,44 @@ public class TestImportExport { @Test public void testSimpleCase() throws Exception { String EXPORT_TABLE = "exportSimpleCase"; - Table t = UTIL.createTable(TableName.valueOf(EXPORT_TABLE), FAMILYA, 3); - Put p = new Put(ROW1); - p.addColumn(FAMILYA, QUAL, now, QUAL); - p.addColumn(FAMILYA, QUAL, now + 1, QUAL); - p.addColumn(FAMILYA, QUAL, now + 2, QUAL); - t.put(p); - p = new Put(ROW2); - p.addColumn(FAMILYA, QUAL, now, QUAL); - p.addColumn(FAMILYA, QUAL, now + 1, QUAL); - p.addColumn(FAMILYA, QUAL, now + 2, QUAL); - t.put(p); - - String[] args = new String[] { - EXPORT_TABLE, - FQ_OUTPUT_DIR, - "1000", // max number of key versions per key to export - }; - assertTrue(runExport(args)); - - String IMPORT_TABLE = "importTableSimpleCase"; - t = UTIL.createTable(TableName.valueOf(IMPORT_TABLE), FAMILYB, 3); - args = new String[] { - "-D" + Import.CF_RENAME_PROP + "="+FAMILYA_STRING+":"+FAMILYB_STRING, - IMPORT_TABLE, - FQ_OUTPUT_DIR - }; - assertTrue(runImport(args)); + try (Table t = UTIL.createTable(TableName.valueOf(EXPORT_TABLE), FAMILYA, 3);) { + Put p = new Put(ROW1); + p.addColumn(FAMILYA, QUAL, now, QUAL); + p.addColumn(FAMILYA, QUAL, now + 1, QUAL); + p.addColumn(FAMILYA, QUAL, now + 2, QUAL); + t.put(p); + p = new Put(ROW2); + p.addColumn(FAMILYA, QUAL, now, QUAL); + p.addColumn(FAMILYA, QUAL, now + 1, QUAL); + p.addColumn(FAMILYA, QUAL, now + 2, QUAL); + t.put(p); + } - Get g = new Get(ROW1); - g.setMaxVersions(); - Result r = t.get(g); - assertEquals(3, r.size()); - g = new Get(ROW2); - g.setMaxVersions(); - r = t.get(g); - assertEquals(3, r.size()); + String[] args = new String[] { + EXPORT_TABLE, + FQ_OUTPUT_DIR, + "1000", // max number of key versions per key to export + }; + assertTrue(runExport(args)); + + String IMPORT_TABLE = "importTableSimpleCase"; + try (Table t = UTIL.createTable(TableName.valueOf(IMPORT_TABLE), FAMILYB, 3);) { + args = new String[] { + "-D" + Import.CF_RENAME_PROP + "="+FAMILYA_STRING+":"+FAMILYB_STRING, + IMPORT_TABLE, + FQ_OUTPUT_DIR + }; + assertTrue(runImport(args)); + + Get g = new Get(ROW1); + g.setMaxVersions(); + Result r = t.get(g); + assertEquals(3, r.size()); + g = new Get(ROW2); + g.setMaxVersions(); + r = t.get(g); + assertEquals(3, r.size()); + } } /** @@ -238,23 +240,22 @@ public class TestImportExport { FileSystem fs = FileSystem.get(UTIL.getConfiguration()); fs.copyFromLocalFile(importPath, new Path(FQ_OUTPUT_DIR + Path.SEPARATOR + name)); String IMPORT_TABLE = name; - Table t = UTIL.createTable(TableName.valueOf(IMPORT_TABLE), Bytes.toBytes("f1"), 3); - String[] args = new String[] { - "-Dhbase.import.version=0.94" , - IMPORT_TABLE, FQ_OUTPUT_DIR - }; - assertTrue(runImport(args)); - - /* exportedTableIn94Format contains 5 rows - ROW COLUMN+CELL - r1 column=f1:c1, timestamp=1383766761171, value=val1 - r2 column=f1:c1, timestamp=1383766771642, value=val2 - r3 column=f1:c1, timestamp=1383766777615, value=val3 - r4 column=f1:c1, timestamp=1383766785146, value=val4 - r5 column=f1:c1, timestamp=1383766791506, value=val5 - */ - assertEquals(5, UTIL.countRows(t)); - t.close(); + try (Table t = UTIL.createTable(TableName.valueOf(IMPORT_TABLE), Bytes.toBytes("f1"), 3);) { + String[] args = new String[] { + "-Dhbase.import.version=0.94" , + IMPORT_TABLE, FQ_OUTPUT_DIR + }; + assertTrue(runImport(args)); + /* exportedTableIn94Format contains 5 rows + ROW COLUMN+CELL + r1 column=f1:c1, timestamp=1383766761171, value=val1 + r2 column=f1:c1, timestamp=1383766771642, value=val2 + r3 column=f1:c1, timestamp=1383766777615, value=val3 + r4 column=f1:c1, timestamp=1383766785146, value=val4 + r5 column=f1:c1, timestamp=1383766791506, value=val5 + */ + assertEquals(5, UTIL.countRows(t)); + } } /** @@ -268,26 +269,26 @@ public class TestImportExport { .setMaxVersions(1) ); UTIL.getHBaseAdmin().createTable(desc); - Table t = UTIL.getConnection().getTable(desc.getTableName()); - - Put p = new Put(ROW1); - p.addColumn(FAMILYA, QUAL, now, QUAL); - p.addColumn(FAMILYA, QUAL, now + 1, QUAL); - p.addColumn(FAMILYA, QUAL, now + 2, QUAL); - p.addColumn(FAMILYA, QUAL, now + 3, QUAL); - p.addColumn(FAMILYA, QUAL, now + 4, QUAL); - t.put(p); - - String[] args = new String[] { - "-D" + Export.EXPORT_BATCHING + "=" + EXPORT_BATCH_SIZE, // added scanner batching arg. - BATCH_TABLE, - FQ_OUTPUT_DIR - }; - assertTrue(runExport(args)); - - FileSystem fs = FileSystem.get(UTIL.getConfiguration()); - fs.delete(new Path(FQ_OUTPUT_DIR), true); - t.close(); + try (Table t = UTIL.getConnection().getTable(desc.getTableName());) { + + Put p = new Put(ROW1); + p.addColumn(FAMILYA, QUAL, now, QUAL); + p.addColumn(FAMILYA, QUAL, now + 1, QUAL); + p.addColumn(FAMILYA, QUAL, now + 2, QUAL); + p.addColumn(FAMILYA, QUAL, now + 3, QUAL); + p.addColumn(FAMILYA, QUAL, now + 4, QUAL); + t.put(p); + + String[] args = new String[] { + "-D" + Export.EXPORT_BATCHING + "=" + EXPORT_BATCH_SIZE, // added scanner batching arg. + BATCH_TABLE, + FQ_OUTPUT_DIR + }; + assertTrue(runExport(args)); + + FileSystem fs = FileSystem.get(UTIL.getConfiguration()); + fs.delete(new Path(FQ_OUTPUT_DIR), true); + } } @Test @@ -299,21 +300,22 @@ public class TestImportExport { .setKeepDeletedCells(KeepDeletedCells.TRUE) ); UTIL.getHBaseAdmin().createTable(desc); - Table t = UTIL.getConnection().getTable(desc.getTableName()); - - Put p = new Put(ROW1); - p.addColumn(FAMILYA, QUAL, now, QUAL); - p.addColumn(FAMILYA, QUAL, now + 1, QUAL); - p.addColumn(FAMILYA, QUAL, now + 2, QUAL); - p.addColumn(FAMILYA, QUAL, now + 3, QUAL); - p.addColumn(FAMILYA, QUAL, now + 4, QUAL); - t.put(p); - - Delete d = new Delete(ROW1, now+3); - t.delete(d); - d = new Delete(ROW1); - d.addColumns(FAMILYA, QUAL, now+2); - t.delete(d); + try (Table t = UTIL.getConnection().getTable(desc.getTableName());) { + + Put p = new Put(ROW1); + p.addColumn(FAMILYA, QUAL, now, QUAL); + p.addColumn(FAMILYA, QUAL, now + 1, QUAL); + p.addColumn(FAMILYA, QUAL, now + 2, QUAL); + p.addColumn(FAMILYA, QUAL, now + 3, QUAL); + p.addColumn(FAMILYA, QUAL, now + 4, QUAL); + t.put(p); + + Delete d = new Delete(ROW1, now+3); + t.delete(d); + d = new Delete(ROW1); + d.addColumns(FAMILYA, QUAL, now+2); + t.delete(d); + } String[] args = new String[] { "-D" + Export.RAW_SCAN + "=true", @@ -330,28 +332,27 @@ public class TestImportExport { .setKeepDeletedCells(KeepDeletedCells.TRUE) ); UTIL.getHBaseAdmin().createTable(desc); - t.close(); - t = UTIL.getConnection().getTable(desc.getTableName()); - args = new String[] { - IMPORT_TABLE, - FQ_OUTPUT_DIR - }; - assertTrue(runImport(args)); - - Scan s = new Scan(); - s.setMaxVersions(); - s.setRaw(true); - ResultScanner scanner = t.getScanner(s); - Result r = scanner.next(); - Cell[] res = r.rawCells(); - assertTrue(CellUtil.isDeleteFamily(res[0])); - assertEquals(now+4, res[1].getTimestamp()); - assertEquals(now+3, res[2].getTimestamp()); - assertTrue(CellUtil.isDelete(res[3])); - assertEquals(now+2, res[4].getTimestamp()); - assertEquals(now+1, res[5].getTimestamp()); - assertEquals(now, res[6].getTimestamp()); - t.close(); + try (Table t = UTIL.getConnection().getTable(desc.getTableName());) { + args = new String[] { + IMPORT_TABLE, + FQ_OUTPUT_DIR + }; + assertTrue(runImport(args)); + + Scan s = new Scan(); + s.setMaxVersions(); + s.setRaw(true); + ResultScanner scanner = t.getScanner(s); + Result r = scanner.next(); + Cell[] res = r.rawCells(); + assertTrue(CellUtil.isDeleteFamily(res[0])); + assertEquals(now+4, res[1].getTimestamp()); + assertEquals(now+3, res[2].getTimestamp()); + assertTrue(CellUtil.isDelete(res[3])); + assertEquals(now+2, res[4].getTimestamp()); + assertEquals(now+1, res[5].getTimestamp()); + assertEquals(now, res[6].getTimestamp()); + } } @@ -418,15 +419,11 @@ public class TestImportExport { ResultScanner exportedTScanner = exportT.getScanner(s); Result exportedTResult = exportedTScanner.next(); - try - { + try { Result.compareResults(exportedTResult, importedTResult); - } - catch (Exception e) { + } catch (Exception e) { fail("Original and imported tables data comparision failed with error:"+e.getMessage()); - } - finally - { + } finally { exportT.close(); importT.close(); } @@ -470,7 +467,8 @@ public class TestImportExport { Table importTable = UTIL.getConnection().getTable(desc.getTableName()); args = new String[] { "-D" + Import.FILTER_CLASS_CONF_KEY + "=" + PrefixFilter.class.getName(), - "-D" + Import.FILTER_ARGS_CONF_KEY + "=" + Bytes.toString(ROW1), IMPORT_TABLE, FQ_OUTPUT_DIR, + "-D" + Import.FILTER_ARGS_CONF_KEY + "=" + Bytes.toString(ROW1), IMPORT_TABLE, + FQ_OUTPUT_DIR, "1000" }; assertTrue(runImport(args)); @@ -634,60 +632,61 @@ public class TestImportExport { public void testDurability() throws Exception { // Create an export table. String exportTableName = "exporttestDurability"; - Table exportTable = UTIL.createTable(TableName.valueOf(exportTableName), FAMILYA, 3); - - // Insert some data - Put put = new Put(ROW1); - put.addColumn(FAMILYA, QUAL, now, QUAL); - put.addColumn(FAMILYA, QUAL, now + 1, QUAL); - put.addColumn(FAMILYA, QUAL, now + 2, QUAL); - exportTable.put(put); - - put = new Put(ROW2); - put.addColumn(FAMILYA, QUAL, now, QUAL); - put.addColumn(FAMILYA, QUAL, now + 1, QUAL); - put.addColumn(FAMILYA, QUAL, now + 2, QUAL); - exportTable.put(put); - - // Run the export - String[] args = new String[] { exportTableName, FQ_OUTPUT_DIR, "1000"}; - assertTrue(runExport(args)); - - // Create the table for import - String importTableName = "importTestDurability1"; - Table importTable = UTIL.createTable(TableName.valueOf(importTableName), FAMILYA, 3); - - // Register the wal listener for the import table - TableWALActionListener walListener = new TableWALActionListener(importTableName); - HRegionInfo region = UTIL.getHBaseCluster().getRegionServerThreads().get(0).getRegionServer() - .getOnlineRegions(importTable.getName()).get(0).getRegionInfo(); - WAL wal = UTIL.getMiniHBaseCluster().getRegionServer(0).getWAL(region); - wal.registerWALActionsListener(walListener); - - // Run the import with SKIP_WAL - args = - new String[] { "-D" + Import.WAL_DURABILITY + "=" + Durability.SKIP_WAL.name(), - importTableName, FQ_OUTPUT_DIR }; - assertTrue(runImport(args)); - //Assert that the wal is not visisted - assertTrue(!walListener.isWALVisited()); - //Ensure that the count is 2 (only one version of key value is obtained) - assertTrue(getCount(importTable, null) == 2); - - // Run the import with the default durability option - importTableName = "importTestDurability2"; - importTable = UTIL.createTable(TableName.valueOf(importTableName), FAMILYA, 3); - region = UTIL.getHBaseCluster().getRegionServerThreads().get(0).getRegionServer() - .getOnlineRegions(importTable.getName()).get(0).getRegionInfo(); - wal = UTIL.getMiniHBaseCluster().getRegionServer(0).getWAL(region); - walListener = new TableWALActionListener(importTableName); - wal.registerWALActionsListener(walListener); - args = new String[] { importTableName, FQ_OUTPUT_DIR }; - assertTrue(runImport(args)); - //Assert that the wal is visisted - assertTrue(walListener.isWALVisited()); - //Ensure that the count is 2 (only one version of key value is obtained) - assertTrue(getCount(importTable, null) == 2); + try (Table exportTable = UTIL.createTable(TableName.valueOf(exportTableName), FAMILYA, 3);) { + + // Insert some data + Put put = new Put(ROW1); + put.addColumn(FAMILYA, QUAL, now, QUAL); + put.addColumn(FAMILYA, QUAL, now + 1, QUAL); + put.addColumn(FAMILYA, QUAL, now + 2, QUAL); + exportTable.put(put); + + put = new Put(ROW2); + put.addColumn(FAMILYA, QUAL, now, QUAL); + put.addColumn(FAMILYA, QUAL, now + 1, QUAL); + put.addColumn(FAMILYA, QUAL, now + 2, QUAL); + exportTable.put(put); + + // Run the export + String[] args = new String[] { exportTableName, FQ_OUTPUT_DIR, "1000"}; + assertTrue(runExport(args)); + + // Create the table for import + String importTableName = "importTestDurability1"; + Table importTable = UTIL.createTable(TableName.valueOf(importTableName), FAMILYA, 3); + + // Register the wal listener for the import table + TableWALActionListener walListener = new TableWALActionListener(importTableName); + HRegionInfo region = UTIL.getHBaseCluster().getRegionServerThreads().get(0).getRegionServer() + .getOnlineRegions(importTable.getName()).get(0).getRegionInfo(); + WAL wal = UTIL.getMiniHBaseCluster().getRegionServer(0).getWAL(region); + wal.registerWALActionsListener(walListener); + + // Run the import with SKIP_WAL + args = + new String[] { "-D" + Import.WAL_DURABILITY + "=" + Durability.SKIP_WAL.name(), + importTableName, FQ_OUTPUT_DIR }; + assertTrue(runImport(args)); + //Assert that the wal is not visisted + assertTrue(!walListener.isWALVisited()); + //Ensure that the count is 2 (only one version of key value is obtained) + assertTrue(getCount(importTable, null) == 2); + + // Run the import with the default durability option + importTableName = "importTestDurability2"; + importTable = UTIL.createTable(TableName.valueOf(importTableName), FAMILYA, 3); + region = UTIL.getHBaseCluster().getRegionServerThreads().get(0).getRegionServer() + .getOnlineRegions(importTable.getName()).get(0).getRegionInfo(); + wal = UTIL.getMiniHBaseCluster().getRegionServer(0).getWAL(region); + walListener = new TableWALActionListener(importTableName); + wal.registerWALActionsListener(walListener); + args = new String[] { importTableName, FQ_OUTPUT_DIR }; + assertTrue(runImport(args)); + //Assert that the wal is visisted + assertTrue(walListener.isWALVisited()); + //Ensure that the count is 2 (only one version of key value is obtained) + assertTrue(getCount(importTable, null) == 2); + } } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/e82ccb90/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java index 5ea219a..d15a7f4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver; import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1; import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertNull; import static org.junit.Assert.fail; @@ -60,9 +61,11 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.io.HeapSize; +import org.apache.hadoop.hbase.io.hfile.BlockCache; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.WAL; import org.junit.After; import org.junit.Before; @@ -97,11 +100,15 @@ public class TestAtomicOperation { public void setup() { tableName = Bytes.toBytes(name.getMethodName()); } - + @After public void teardown() throws IOException { if (region != null) { + BlockCache bc = region.getStores().get(0).getCacheConfig().getBlockCache(); ((HRegion)region).close(); + WAL wal = ((HRegion)region).getWAL(); + if (wal != null) wal.close(); + if (bc != null) bc.shutdown(); region = null; } } @@ -176,17 +183,15 @@ public class TestAtomicOperation { */ @Test public void testIncrementMultiThreads() throws IOException { - LOG.info("Starting test testIncrementMultiThreads"); // run a with mixed column families (1 and 3 versions) initHRegion(tableName, name.getMethodName(), new int[] {1,3}, fam1, fam2); - // create 100 threads, each will increment by its own quantity - int numThreads = 100; + // create 25 threads, each will increment by its own quantity + int numThreads = 25; int incrementsPerThread = 1000; Incrementer[] all = new Incrementer[numThreads]; int expectedTotal = 0; - // create all threads for (int i = 0; i < numThreads; i++) { all[i] = new Incrementer(region, i, i, incrementsPerThread); @@ -203,13 +208,13 @@ public class TestAtomicOperation { try { all[i].join(); } catch (InterruptedException e) { + LOG.info("Ignored", e); } } assertICV(row, fam1, qual1, expectedTotal); assertICV(row, fam1, qual2, expectedTotal*2); assertICV(row, fam2, qual3, expectedTotal*3); - LOG.info("testIncrementMultiThreads successfully verified that total is " + - expectedTotal); + LOG.info("testIncrementMultiThreads successfully verified that total is " + expectedTotal); } @@ -260,6 +265,7 @@ public class TestAtomicOperation { public Incrementer(Region region, int threadNumber, int amount, int numIncrements) { + super("incrementer." + threadNumber); this.region = region; this.numIncrements = numIncrements; this.amount = amount; @@ -268,7 +274,7 @@ public class TestAtomicOperation { @Override public void run() { - for (int i=0; i<numIncrements; i++) { + for (int i = 0; i < numIncrements; i++) { try { Increment inc = new Increment(row); inc.addColumn(fam1, qual1, amount); @@ -280,8 +286,15 @@ public class TestAtomicOperation { // verify: Make sure we only see completed increments Get g = new Get(row); Result result = region.get(g); - assertEquals(Bytes.toLong(result.getValue(fam1, qual1))*2, Bytes.toLong(result.getValue(fam1, qual2))); - assertEquals(Bytes.toLong(result.getValue(fam1, qual1))*3, Bytes.toLong(result.getValue(fam2, qual3))); + if (result != null) { + assertTrue(result.getValue(fam1, qual1) != null); + assertTrue(result.getValue(fam1, qual2) != null); + assertEquals(Bytes.toLong(result.getValue(fam1, qual1))*2, + Bytes.toLong(result.getValue(fam1, qual2))); + assertTrue(result.getValue(fam2, qual3) != null); + assertEquals(Bytes.toLong(result.getValue(fam1, qual1))*3, + Bytes.toLong(result.getValue(fam2, qual3))); + } } catch (IOException e) { e.printStackTrace(); } @@ -359,7 +372,7 @@ public class TestAtomicOperation { // create 10 threads, each will alternate between adding and // removing a column int numThreads = 10; - int opsPerThread = 500; + int opsPerThread = 250; AtomicOperation[] all = new AtomicOperation[numThreads]; AtomicLong timeStamps = new AtomicLong(0); @@ -451,7 +464,7 @@ public class TestAtomicOperation { // create 10 threads, each will alternate between adding and // removing a column int numThreads = 10; - int opsPerThread = 500; + int opsPerThread = 250; AtomicOperation[] all = new AtomicOperation[numThreads]; AtomicLong timeStamps = new AtomicLong(0); @@ -571,14 +584,12 @@ public class TestAtomicOperation { */ @Test public void testPutAndCheckAndPutInParallel() throws Exception { - final String tableName = "testPutAndCheckAndPut"; Configuration conf = TEST_UTIL.getConfiguration(); conf.setClass(HConstants.REGION_IMPL, MockHRegion.class, HeapSize.class); HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName)) .addFamily(new HColumnDescriptor(family)); - final Region region = TEST_UTIL.createLocalHRegion(htd, null, null); - + this.region = TEST_UTIL.createLocalHRegion(htd, null, null); Put[] puts = new Put[1]; Put put = new Put(Bytes.toBytes("r1")); put.addColumn(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("10")); @@ -602,7 +613,6 @@ public class TestAtomicOperation { for (Cell keyValue : results) { assertEquals("50",Bytes.toString(CellUtil.cloneValue(keyValue))); } - } private class PutThread extends TestThread {
