Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestCompaction.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestCompaction.java?rev=596835&r1=596834&r2=596835&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestCompaction.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestCompaction.java Tue Nov 20 13:53:30 2007 @@ -43,7 +43,11 @@ /** constructor */ public TestCompaction() { + super(); STARTROW = new Text(START_KEY); + + // Set cache flush size to 1MB + conf.setInt("hbase.hregion.memcache.flush.size", 1024*1024); } /** [EMAIL PROTECTED] */ @@ -71,11 +75,10 @@ */ public void testCompaction() throws Exception { createStoreFile(r); - assertFalse(r.needsCompaction()); + assertFalse(r.compactIfNeeded()); for (int i = 0; i < COMPACTION_THRESHOLD; i++) { createStoreFile(r); } - assertTrue(r.needsCompaction()); // Add more content. Now there are about 5 versions of each column. // Default is that there only 3 (MAXVERSIONS) versions allowed per column. // Assert > 3 and then after compaction, assert that only 3 versions @@ -91,7 +94,7 @@ @Override public void run() { try { - region.flushcache(false); + region.flushcache(); } catch (IOException e) { e.printStackTrace(); } @@ -101,7 +104,7 @@ @Override public void run() { try { - assertTrue(region.compactStores()); + assertTrue(region.compactIfNeeded()); } catch (IOException e) { e.printStackTrace(); } @@ -140,16 +143,15 @@ // verify that it is removed as we compact. // Assert all delted. assertNull(this.r.get(STARTROW, COLUMN_FAMILY_TEXT, 100 /*Too many*/)); - this.r.flushcache(false); + this.r.flushcache(); assertNull(this.r.get(STARTROW, COLUMN_FAMILY_TEXT, 100 /*Too many*/)); // Add a bit of data and flush it so we for sure have the compaction limit // for store files. Usually by this time we will have but if compaction // included the flush that ran 'concurrently', there may be just the // compacted store and the flush above when we added deletes. Add more // content to be certain. - createBunchOfSmallStoreFiles(this.r); - assertTrue(this.r.needsCompaction()); - this.r.compactStores(); + createSmallerStoreFile(this.r); + assertTrue(this.r.compactIfNeeded()); // Assert that the first row is still deleted. bytes = this.r.get(STARTROW, COLUMN_FAMILY_TEXT, 100 /*Too many*/); assertNull(bytes); @@ -167,21 +169,14 @@ private void createStoreFile(final HRegion region) throws IOException { HRegionIncommon loader = new HRegionIncommon(region); - for (int i = 0; i < 1; i++) { - addContent(loader, COLUMN_FAMILY); - } - region.flushcache(false); + addContent(loader, COLUMN_FAMILY); + loader.flushcache(); } - private void createBunchOfSmallStoreFiles(final HRegion region) - throws IOException { - final String xyz = new String("xyz"); - byte [] bytes = xyz.getBytes(); - for (int i = 0; i < 1; i++) { - long lid = region.startUpdate(new Text(xyz)); - region.put(lid, COLUMN_FAMILY_TEXT, bytes); - region.commit(lid); - region.flushcache(false); - } + private void createSmallerStoreFile(final HRegion region) throws IOException { + HRegionIncommon loader = new HRegionIncommon(region); + addContent(loader, COLUMN_FAMILY, + ("bbb" + PUNCTUATION).getBytes(), null); + loader.flushcache(); } }
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestGet.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestGet.java?rev=596835&r1=596834&r2=596835&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestGet.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestGet.java Tue Nov 20 13:53:30 2007 @@ -23,7 +23,7 @@ import java.io.DataOutputStream; import java.io.IOException; import java.util.Iterator; -import java.util.TreeMap; +import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -44,7 +44,7 @@ private static final String SERVER_ADDRESS = "foo.bar.com:1234"; - private void verifyGet(final HRegion r, final String expectedServer) + private void verifyGet(final HRegionIncommon r, final String expectedServer) throws IOException { // This should return a value because there is only one family member byte [] value = r.get(ROW_KEY, CONTENTS); @@ -55,7 +55,7 @@ assertNull(value); // Find out what getFull returns - TreeMap<Text, byte []> values = r.getFull(ROW_KEY); + Map<Text, byte []> values = r.getFull(ROW_KEY); // assertEquals(4, values.keySet().size()); for(Iterator<Text> i = values.keySet().iterator(); i.hasNext(); ) { @@ -95,7 +95,8 @@ HLog log = new HLog(fs, new Path(regionDir, "log"), conf); - HRegion r = new HRegion(dir, log, fs, conf, info, null); + HRegion region = new HRegion(dir, log, fs, conf, info, null); + HRegionIncommon r = new HRegionIncommon(region); // Write information to the table @@ -132,9 +133,10 @@ // Close and re-open region, forcing updates to disk - r.close(); + region.close(); log.rollWriter(); - r = new HRegion(dir, log, fs, conf, info, null); + region = new HRegion(dir, log, fs, conf, info, null); + r = new HRegionIncommon(region); // Read it back @@ -160,9 +162,10 @@ // Close region and re-open it - r.close(); + region.close(); log.rollWriter(); - r = new HRegion(dir, log, fs, conf, info, null); + region = new HRegion(dir, log, fs, conf, info, null); + r = new HRegionIncommon(region); // Read it back @@ -170,13 +173,11 @@ // Close region once and for all - r.close(); + region.close(); log.closeAndDelete(); } finally { - if(cluster != null) { - cluster.shutdown(); - } + StaticTestEnvironment.shutdownDfs(cluster); } } } Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHLog.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHLog.java?rev=596835&r1=596834&r2=596835&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHLog.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHLog.java Tue Nov 20 13:53:30 2007 @@ -49,13 +49,13 @@ try { // Write columns named 1, 2, 3, etc. and then values of single byte // 1, 2, 3... - TreeMap<Text, byte []> cols = new TreeMap<Text, byte []>(); + long timestamp = System.currentTimeMillis(); + TreeMap<HStoreKey, byte []> cols = new TreeMap<HStoreKey, byte []>(); for (int i = 0; i < COL_COUNT; i++) { - cols.put(new Text(Integer.toString(i)), + cols.put(new HStoreKey(row, new Text(Integer.toString(i)), timestamp), new byte[] { (byte)(i + '0') }); } - long timestamp = System.currentTimeMillis(); - log.append(regionName, tableName, row, cols, timestamp); + log.append(regionName, tableName, cols); long logSeqId = log.startCacheFlush(); log.completeCacheFlush(regionName, tableName, logSeqId); log.close(); Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHMemcache.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHMemcache.java?rev=596835&r1=596834&r2=596835&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHMemcache.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHMemcache.java Tue Nov 20 13:53:30 2007 @@ -22,23 +22,16 @@ import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.Map; -import java.util.SortedMap; import java.util.TreeMap; import junit.framework.TestCase; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HMemcache.Snapshot; import org.apache.hadoop.io.Text; /** memcache test case */ public class TestHMemcache extends TestCase { - private HMemcache hmemcache; - - private Configuration conf; + private HStore.Memcache hmemcache; private static final int ROW_COUNT = 3; @@ -50,10 +43,7 @@ @Override public void setUp() throws Exception { super.setUp(); - this.hmemcache = new HMemcache(); - // Set up a configuration that has configuration for a file - // filesystem implementation. - this.conf = new HBaseConfiguration(); + this.hmemcache = new HStore.Memcache(); } private Text getRowName(final int index) { @@ -69,48 +59,26 @@ * Adds [EMAIL PROTECTED] #ROW_COUNT} rows and [EMAIL PROTECTED] #COLUMNS_COUNT} * @param hmc Instance to add rows to. */ - private void addRows(final HMemcache hmc) throws UnsupportedEncodingException { + private void addRows(final HStore.Memcache hmc) + throws UnsupportedEncodingException { + for (int i = 0; i < ROW_COUNT; i++) { - TreeMap<Text, byte []> columns = new TreeMap<Text, byte []>(); + long timestamp = System.currentTimeMillis(); for (int ii = 0; ii < COLUMNS_COUNT; ii++) { Text k = getColumnName(i, ii); - columns.put(k, k.toString().getBytes(HConstants.UTF8_ENCODING)); + hmc.add(new HStoreKey(getRowName(i), k, timestamp), + k.toString().getBytes(HConstants.UTF8_ENCODING)); } - hmc.add(getRowName(i), columns, System.currentTimeMillis()); } } - private HLog getLogfile() throws IOException { - // Create a log file. - Path testDir = new Path(conf.get("hadoop.tmp.dir", - System.getProperty("java.tmp.dir")), "hbase"); - Path logFile = new Path(testDir, this.getName()); - FileSystem fs = testDir.getFileSystem(conf); - // Cleanup any old log file. - if (fs.exists(logFile)) { - fs.delete(logFile); - } - return new HLog(fs, logFile, this.conf); - } - - private Snapshot runSnapshot(final HMemcache hmc, final HLog log) - throws IOException { - + private void runSnapshot(final HStore.Memcache hmc) { // Save off old state. - int oldHistorySize = hmc.history.size(); - SortedMap<HStoreKey, byte []> oldMemcache = hmc.memcache; - // Run snapshot. - Snapshot s = hmc.snapshotMemcacheForLog(log); + int oldHistorySize = hmc.snapshot.size(); + hmc.snapshot(); // Make some assertions about what just happened. - assertEquals("Snapshot equals old memcache", hmc.snapshot, - oldMemcache); - assertEquals("Returned snapshot holds old memcache", - s.memcacheSnapshot, oldMemcache); - assertEquals("History has been incremented", - oldHistorySize + 1, hmc.history.size()); - assertEquals("History holds old snapshot", - hmc.history.get(oldHistorySize), oldMemcache); - return s; + assertTrue("History size has not increased", + oldHistorySize < hmc.snapshot.size()); } /** @@ -119,21 +87,14 @@ */ public void testSnapshotting() throws IOException { final int snapshotCount = 5; - final Text tableName = new Text(getName()); - HLog log = getLogfile(); // Add some rows, run a snapshot. Do it a few times. for (int i = 0; i < snapshotCount; i++) { addRows(this.hmemcache); - int historyInitialSize = this.hmemcache.history.size(); - Snapshot s = runSnapshot(this.hmemcache, log); - log.completeCacheFlush(new Text(Integer.toString(i)), - tableName, s.sequenceId); - // Clean up snapshot now we are done with it. - this.hmemcache.deleteSnapshot(); - assertTrue("History not being cleared", - historyInitialSize == this.hmemcache.history.size()); + runSnapshot(this.hmemcache); + this.hmemcache.getSnapshot(); + assertEquals("History not being cleared", 0, + this.hmemcache.snapshot.size()); } - log.closeAndDelete(); } private void isExpectedRow(final int rowIndex, TreeMap<Text, byte []> row) @@ -161,7 +122,8 @@ addRows(this.hmemcache); for (int i = 0; i < ROW_COUNT; i++) { HStoreKey hsk = new HStoreKey(getRowName(i)); - TreeMap<Text, byte []> all = this.hmemcache.getFull(hsk); + TreeMap<Text, byte []> all = new TreeMap<Text, byte[]>(); + this.hmemcache.getFull(hsk, all); isExpectedRow(i, all); } } Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHRegion.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHRegion.java?rev=596835&r1=596834&r2=596835&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHRegion.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHRegion.java Tue Nov 20 13:53:30 2007 @@ -61,9 +61,7 @@ read(); cleanup(); } finally { - if(cluster != null) { - cluster.shutdown(); - } + StaticTestEnvironment.shutdownDfs(cluster); } } @@ -78,14 +76,15 @@ private static final Text CONTENTS_BODY = new Text("contents:body"); private static final Text CONTENTS_FIRSTCOL = new Text("contents:firstcol"); private static final Text ANCHOR_SECONDCOL = new Text("anchor:secondcol"); - + private MiniDFSCluster cluster = null; private FileSystem fs = null; private Path parentdir = null; private Path newlogdir = null; private HLog log = null; private HTableDescriptor desc = null; - HRegion region = null; + HRegion r = null; + HRegionIncommon region = null; private static int numInserted = 0; @@ -103,8 +102,9 @@ desc = new HTableDescriptor("test"); desc.addFamily(new HColumnDescriptor("contents:")); desc.addFamily(new HColumnDescriptor("anchor:")); - region = new HRegion(parentdir, log, fs, conf, + r = new HRegion(parentdir, log, fs, conf, new HRegionInfo(desc, null, null), null); + region = new HRegionIncommon(r); } // Test basic functionality. Writes to contents:basic and anchor:anchornum-* @@ -129,7 +129,7 @@ startTime = System.currentTimeMillis(); - region.flushcache(false); + region.flushcache(); System.out.println("Cache flush elapsed time: " + ((System.currentTimeMillis() - startTime) / 1000.0)); @@ -169,7 +169,7 @@ try { region.put(-1, CONTENTS_BASIC, "bad input".getBytes(HConstants.UTF8_ENCODING)); - } catch (LockException e) { + } catch (Exception e) { exceptionThrown = true; } assertTrue("Bad lock id", exceptionThrown); @@ -182,6 +182,7 @@ String unregisteredColName = "FamilyGroup:FamilyLabel"; region.put(lockid, new Text(unregisteredColName), unregisteredColName.getBytes(HConstants.UTF8_ENCODING)); + region.commit(lockid); } catch (IOException e) { exceptionThrown = true; } finally { @@ -209,8 +210,8 @@ for (int i = 0; i < lockCount; i++) { try { Text rowid = new Text(Integer.toString(i)); - lockids[i] = region.obtainRowLock(rowid); - rowid.equals(region.getRowFromLock(lockids[i])); + lockids[i] = r.obtainRowLock(rowid); + rowid.equals(r.getRowFromLock(lockids[i])); LOG.debug(getName() + " locked " + rowid.toString()); } catch (IOException e) { e.printStackTrace(); @@ -221,13 +222,8 @@ // Abort outstanding locks. for (int i = lockCount - 1; i >= 0; i--) { - try { - region.abort(lockids[i]); - LOG.debug(getName() + " unlocked " + - Integer.toString(i)); - } catch (IOException e) { - e.printStackTrace(); - } + r.releaseRowLock(r.getRowFromLock(lockids[i])); + LOG.debug(getName() + " unlocked " + i); } LOG.debug(getName() + " released " + Integer.toString(lockCount) + " locks"); @@ -288,7 +284,7 @@ startTime = System.currentTimeMillis(); HInternalScannerInterface s = - region.getScanner(cols, new Text(), System.currentTimeMillis(), null); + r.getScanner(cols, new Text(), System.currentTimeMillis(), null); int numFetched = 0; try { HStoreKey curKey = new HStoreKey(); @@ -326,7 +322,7 @@ startTime = System.currentTimeMillis(); - region.flushcache(false); + region.flushcache(); System.out.println("Cache flush elapsed time: " + ((System.currentTimeMillis() - startTime) / 1000.0)); @@ -335,7 +331,7 @@ startTime = System.currentTimeMillis(); - s = region.getScanner(cols, new Text(), System.currentTimeMillis(), null); + s = r.getScanner(cols, new Text(), System.currentTimeMillis(), null); numFetched = 0; try { HStoreKey curKey = new HStoreKey(); @@ -390,7 +386,7 @@ startTime = System.currentTimeMillis(); - s = region.getScanner(cols, new Text(), System.currentTimeMillis(), null); + s = r.getScanner(cols, new Text(), System.currentTimeMillis(), null); numFetched = 0; try { HStoreKey curKey = new HStoreKey(); @@ -428,7 +424,7 @@ startTime = System.currentTimeMillis(); - region.flushcache(false); + region.flushcache(); System.out.println("Cache flush elapsed time: " + ((System.currentTimeMillis() - startTime) / 1000.0)); @@ -437,7 +433,7 @@ startTime = System.currentTimeMillis(); - s = region.getScanner(cols, new Text(), System.currentTimeMillis(), null); + s = r.getScanner(cols, new Text(), System.currentTimeMillis(), null); numFetched = 0; try { HStoreKey curKey = new HStoreKey(); @@ -473,7 +469,7 @@ startTime = System.currentTimeMillis(); - s = region.getScanner(cols, new Text("row_vals1_500"), + s = r.getScanner(cols, new Text("row_vals1_500"), System.currentTimeMillis(), null); numFetched = 0; @@ -542,7 +538,7 @@ System.out.println("Flushing write #" + k); long flushStart = System.currentTimeMillis(); - region.flushcache(false); + region.flushcache(); long flushEnd = System.currentTimeMillis(); totalFlush += (flushEnd - flushStart); @@ -557,7 +553,7 @@ } } long startCompact = System.currentTimeMillis(); - if(region.compactStores()) { + if(r.compactIfNeeded()) { totalCompact = System.currentTimeMillis() - startCompact; System.out.println("Region compacted - elapsedTime: " + (totalCompact / 1000.0)); @@ -583,43 +579,28 @@ // NOTE: This test depends on testBatchWrite succeeding private void splitAndMerge() throws IOException { - Text midKey = new Text(); - - if(region.needsSplit(midKey)) { - System.out.println("Needs split"); - } - - // Split it anyway - - Text midkey = new Text("row_" - + (StaticTestEnvironment.debugging ? (N_ROWS / 2) : (NUM_VALS/2))); - - Path oldRegionPath = region.getRegionDir(); - + Path oldRegionPath = r.getRegionDir(); long startTime = System.currentTimeMillis(); + HRegion subregions[] = r.splitRegion(this); + if (subregions != null) { + System.out.println("Split region elapsed time: " + + ((System.currentTimeMillis() - startTime) / 1000.0)); - HRegion subregions[] = region.closeAndSplit(midkey, this); - - System.out.println("Split region elapsed time: " - + ((System.currentTimeMillis() - startTime) / 1000.0)); - - assertEquals("Number of subregions", subregions.length, 2); - - // Now merge it back together - - Path oldRegion1 = subregions[0].getRegionDir(); - Path oldRegion2 = subregions[1].getRegionDir(); - - startTime = System.currentTimeMillis(); + assertEquals("Number of subregions", subregions.length, 2); - region = HRegion.closeAndMerge(subregions[0], subregions[1]); + // Now merge it back together - System.out.println("Merge regions elapsed time: " - + ((System.currentTimeMillis() - startTime) / 1000.0)); - - fs.delete(oldRegionPath); - fs.delete(oldRegion1); - fs.delete(oldRegion2); + Path oldRegion1 = subregions[0].getRegionDir(); + Path oldRegion2 = subregions[1].getRegionDir(); + startTime = System.currentTimeMillis(); + r = HRegion.closeAndMerge(subregions[0], subregions[1]); + region = new HRegionIncommon(r); + System.out.println("Merge regions elapsed time: " + + ((System.currentTimeMillis() - startTime) / 1000.0)); + fs.delete(oldRegion1); + fs.delete(oldRegion2); + fs.delete(oldRegionPath); + } } /** @@ -650,7 +631,7 @@ long startTime = System.currentTimeMillis(); HInternalScannerInterface s = - region.getScanner(cols, new Text(), System.currentTimeMillis(), null); + r.getScanner(cols, new Text(), System.currentTimeMillis(), null); try { @@ -706,7 +687,7 @@ startTime = System.currentTimeMillis(); - s = region.getScanner(cols, new Text(), System.currentTimeMillis(), null); + s = r.getScanner(cols, new Text(), System.currentTimeMillis(), null); try { int numFetched = 0; HStoreKey curKey = new HStoreKey(); @@ -744,7 +725,7 @@ if(StaticTestEnvironment.debugging) { startTime = System.currentTimeMillis(); - s = region.getScanner(new Text[] { CONTENTS_BODY }, new Text(), + s = r.getScanner(new Text[] { CONTENTS_BODY }, new Text(), System.currentTimeMillis(), null); try { @@ -782,7 +763,7 @@ startTime = System.currentTimeMillis(); - s = region.getScanner(cols, new Text(), System.currentTimeMillis(), null); + s = r.getScanner(cols, new Text(), System.currentTimeMillis(), null); try { int fetched = 0; @@ -817,21 +798,10 @@ } private void cleanup() { - - // Shut down the mini cluster try { log.closeAndDelete(); } catch (IOException e) { e.printStackTrace(); - } - if (cluster != null) { - try { - fs.close(); - } catch (IOException e) { - e.printStackTrace(); - } - cluster.shutdown(); - cluster = null; } // Delete all the DFS files Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHStoreFile.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHStoreFile.java?rev=596835&r1=596834&r2=596835&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHStoreFile.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHStoreFile.java Tue Nov 20 13:53:30 2007 @@ -45,9 +45,13 @@ /** [EMAIL PROTECTED] */ @Override public void setUp() throws Exception { - this.cluster = new MiniDFSCluster(this.conf, 2, true, (String[])null); - this.fs = cluster.getFileSystem(); - this.dir = new Path(DIR, getName()); + try { + this.cluster = new MiniDFSCluster(this.conf, 2, true, (String[])null); + this.fs = cluster.getFileSystem(); + this.dir = new Path(DIR, getName()); + } catch (IOException e) { + StaticTestEnvironment.shutdownDfs(cluster); + } super.setUp(); } @@ -55,13 +59,7 @@ @Override public void tearDown() throws Exception { super.tearDown(); - if (this.cluster != null) { - try { - this.cluster.shutdown(); - } catch (Exception e) { - LOG.warn("Closing down mini DFS", e); - } - } + StaticTestEnvironment.shutdownDfs(cluster); // ReflectionUtils.printThreadInfo(new PrintWriter(System.out), // "Temporary end-of-test thread dump debugging HADOOP-2040: " + getName()); } Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestLogRolling.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestLogRolling.java?rev=596835&r1=596834&r2=596835&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestLogRolling.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestLogRolling.java Tue Nov 20 13:53:30 2007 @@ -22,7 +22,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.dfs.MiniDFSCluster; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; @@ -90,6 +89,7 @@ super.setUp(); dfs = new MiniDFSCluster(conf, 2, true, (String[]) null); } catch (Exception e) { + StaticTestEnvironment.shutdownDfs(dfs); LOG.fatal("error during setUp: ", e); throw e; } @@ -100,21 +100,10 @@ public void tearDown() throws Exception { try { super.tearDown(); - if (cluster != null) { // shutdown mini HBase cluster cluster.shutdown(); } - - if (dfs != null) { - FileSystem fs = dfs.getFileSystem(); - try { - dfs.shutdown(); - } finally { - if (fs != null) { - fs.close(); - } - } - } + StaticTestEnvironment.shutdownDfs(dfs); } catch (Exception e) { LOG.fatal("error in tearDown", e); throw e; Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestMasterAdmin.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestMasterAdmin.java?rev=596835&r1=596834&r2=596835&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestMasterAdmin.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestMasterAdmin.java Tue Nov 20 13:53:30 2007 @@ -36,6 +36,10 @@ public TestMasterAdmin() { super(true); admin = null; + + // Make the thread wake frequency a little slower so other threads + // can run + conf.setInt("hbase.server.thread.wakefrequency", 2000); } /** @throws Exception */ Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner.java?rev=596835&r1=596834&r2=596835&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner.java Tue Nov 20 13:53:30 2007 @@ -50,7 +50,8 @@ private static final long START_CODE = Long.MAX_VALUE; - private HRegion region; + private HRegion r; + private HRegionIncommon region; /** Compare the HRegionInfo we read from HBase to what we stored */ private void validateRegionInfo(byte [] regionBytes) throws IOException { @@ -79,7 +80,7 @@ for(int i = 0; i < scanColumns.length; i++) { try { - scanner = region.getScanner(scanColumns[i], FIRST_ROW, + scanner = r.getScanner(scanColumns[i], FIRST_ROW, System.currentTimeMillis(), null); while(scanner.next(key, results)) { @@ -145,7 +146,8 @@ HLog log = new HLog(fs, new Path(regionDir, "log"), conf); - region = new HRegion(dir, log, fs, conf, REGION_INFO, null); + r = new HRegion(dir, log, fs, conf, REGION_INFO, null); + region = new HRegionIncommon(r); // Write information to the meta table @@ -165,9 +167,10 @@ // Close and re-open - region.close(); + r.close(); log.rollWriter(); - region = new HRegion(dir, log, fs, conf, REGION_INFO, null); + r = new HRegion(dir, log, fs, conf, REGION_INFO, null); + region = new HRegionIncommon(r); // Verify we can get the data back now that it is on disk. @@ -196,7 +199,7 @@ // flush cache - region.flushcache(false); + region.flushcache(); // Validate again @@ -205,9 +208,10 @@ // Close and reopen - region.close(); + r.close(); log.rollWriter(); - region = new HRegion(dir, log, fs, conf, REGION_INFO, null); + r = new HRegion(dir, log, fs, conf, REGION_INFO, null); + region = new HRegionIncommon(r); // Validate again @@ -232,7 +236,7 @@ // flush cache - region.flushcache(false); + region.flushcache(); // Validate again @@ -241,9 +245,10 @@ // Close and reopen - region.close(); + r.close(); log.rollWriter(); - region = new HRegion(dir, log, fs, conf, REGION_INFO, null); + r = new HRegion(dir, log, fs, conf, REGION_INFO, null); + region = new HRegionIncommon(r); // Validate again @@ -252,13 +257,11 @@ // clean up - region.close(); + r.close(); log.closeAndDelete(); } finally { - if(cluster != null) { - cluster.shutdown(); - } + StaticTestEnvironment.shutdownDfs(cluster); } } } Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner2.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner2.java?rev=596835&r1=596834&r2=596835&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner2.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner2.java Tue Nov 20 13:53:30 2007 @@ -71,6 +71,9 @@ } } + /** + * @throws Exception + */ public void testStopRow() throws Exception { Text tableName = new Text(getName()); createTable(new HBaseAdmin(this.conf), tableName); @@ -86,6 +89,9 @@ } } + /** + * @throws Exception + */ public void testIterator() throws Exception { HTable table = new HTable(this.conf, HConstants.ROOT_TABLE_NAME); HScannerInterface scanner = @@ -139,11 +145,12 @@ int count = 0; while (scanner.next(key, results)) { for (Text c: results.keySet()) { + System.out.println(c); assertTrue(c.toString().matches(regexColumnname)); count++; } } - assertTrue(count == 1); + assertEquals(1, count); scanner.close(); } @@ -267,7 +274,7 @@ Text tableName = new Text(getName()); admin.createTable(new HTableDescriptor(tableName.toString())); List<HRegionInfo> regions = scan(metaTable); - assertEquals("Expected one region", regions.size(), 1); + assertEquals("Expected one region", 1, regions.size()); HRegionInfo region = regions.get(0); assertTrue("Expected region named for test", region.getRegionName().toString().startsWith(getName())); @@ -365,7 +372,7 @@ // Assert added. byte [] bytes = t.get(region.getRegionName(), HConstants.COL_REGIONINFO); HRegionInfo hri = Writables.getHRegionInfo(bytes); - assertEquals(hri.getRegionId(), region.getRegionId()); + assertEquals(region.getRegionId(), hri.getRegionId()); if (LOG.isDebugEnabled()) { LOG.info("Added region " + region.getRegionName() + " to table " + t.getTableName()); Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestSplit.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestSplit.java?rev=596835&r1=596834&r2=596835&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestSplit.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestSplit.java Tue Nov 20 13:53:30 2007 @@ -39,6 +39,9 @@ /** constructor */ public TestSplit() { super(); + + // Always compact if there is more than one store file. + conf.setInt("hbase.hstore.compactionThreshold", 2); // Make lease timeout longer, lease checks less frequent conf.setInt("hbase.master.lease.period", 10 * 1000); @@ -47,20 +50,15 @@ // Increase the amount of time between client retries conf.setLong("hbase.client.pause", 15 * 1000); + // This size should make it so we always split using the addContent + // below. After adding all data, the first region is 1.3M + conf.setLong("hbase.hregion.max.filesize", 1024 * 128); + Logger.getRootLogger().setLevel(Level.WARN); Logger.getLogger(this.getClass().getPackage().getName()). setLevel(Level.DEBUG); } - /** [EMAIL PROTECTED] */ - @Override - public void setUp() throws Exception { - super.setUp(); - // This size should make it so we always split using the addContent - // below. After adding all data, the first region is 1.3M - conf.setLong("hbase.hregion.max.filesize", 1024 * 128); - } - /** * Splits twice and verifies getting from each of the split regions. * @throws Exception @@ -83,7 +81,7 @@ private void basicSplit(final HRegion region) throws Exception { addContent(region, COLFAMILY_NAME3); - region.internalFlushcache(); + region.internalFlushcache(region.snapshotMemcaches()); Text midkey = new Text(); assertTrue(region.needsSplit(midkey)); HRegion [] regions = split(region); @@ -110,7 +108,12 @@ } addContent(regions[i], COLFAMILY_NAME2); addContent(regions[i], COLFAMILY_NAME1); - regions[i].internalFlushcache(); + long startTime = region.snapshotMemcaches(); + if (startTime == -1) { + LOG.info("cache flush not needed"); + } else { + regions[i].internalFlushcache(startTime); + } } // Assert that even if one store file is larger than a reference, the @@ -126,7 +129,7 @@ // To make regions splitable force compaction. for (int i = 0; i < regions.length; i++) { - assertTrue(regions[i].compactStores()); + regions[i].compactStores(); } TreeMap<String, HRegion> sortedMap = new TreeMap<String, HRegion>(); @@ -156,6 +159,12 @@ * @throws Exception */ public void testSplitRegionIsDeleted() throws Exception { + // Make sure the cache gets flushed so we trigger a compaction(s) and + // hence splits. This is done here rather than in the constructor because + // the first test runs without a cluster, and will block when the cache + // fills up. + conf.setInt("hbase.hregion.memcache.flush.size", 1024 * 1024); + try { // Start up a hbase cluster MiniHBaseCluster cluster = new MiniHBaseCluster(conf, 1, true); @@ -228,7 +237,7 @@ assertTrue(r.needsSplit(midKey)); // Assert can get mid key from passed region. assertGet(r, COLFAMILY_NAME3, midKey); - HRegion [] regions = r.closeAndSplit(midKey, null); + HRegion [] regions = r.splitRegion(null); assertEquals(regions.length, 2); return regions; } Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestTimestamp.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestTimestamp.java?rev=596835&r1=596834&r2=596835&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestTimestamp.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestTimestamp.java Tue Nov 20 13:53:30 2007 @@ -38,11 +38,11 @@ private static final String COLUMN_NAME = "contents:"; private static final Text COLUMN = new Text(COLUMN_NAME); - private static final Text[] COLUMNS = {COLUMN}; private static final Text ROW = new Text("row"); // When creating column descriptor, how many versions of a cell to allow. private static final int VERSIONS = 3; + /** * Test that delete works according to description in <a @@ -52,11 +52,8 @@ public void testDelete() throws IOException { final HRegion r = createRegion(); try { - doTestDelete(new HRegionIncommon(r), new FlushCache() { - public void flushcache() throws IOException { - r.flushcache(false); - } - }); + final HRegionIncommon region = new HRegionIncommon(r); + doTestDelete(region, region); } finally { r.close(); r.getLog().closeAndDelete(); @@ -70,11 +67,8 @@ public void testTimestampScanning() throws IOException { final HRegion r = createRegion(); try { - doTestTimestampScanning(new HRegionIncommon(r), new FlushCache() { - public void flushcache() throws IOException { - r.flushcache(false); - } - }); + final HRegionIncommon region = new HRegionIncommon(r); + doTestTimestampScanning(region, region); } finally { r.close(); r.getLog().closeAndDelete(); @@ -187,7 +181,7 @@ // Now assert that if we ask for multiple versions, that they come out in // order. byte [][] bytesBytes = incommon.get(ROW, COLUMN, tss.length); - assertEquals(bytesBytes.length, tss.length); + assertEquals(tss.length, bytesBytes.length); for (int i = 0; i < bytesBytes.length; i++) { long ts = Writables.bytesToLong(bytesBytes[i]); assertEquals(ts, tss[i]); Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/mapred/TestTableIndex.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/mapred/TestTableIndex.java?rev=596835&r1=596834&r2=596835&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/mapred/TestTableIndex.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/mapred/TestTableIndex.java Tue Nov 20 13:53:30 2007 @@ -35,7 +35,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseAdmin; import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HBaseTestCase; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HScannerInterface; @@ -44,6 +43,7 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.MultiRegionTable; +import org.apache.hadoop.hbase.StaticTestEnvironment; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; @@ -58,7 +58,7 @@ /** * Test Map/Reduce job to build index over HBase table */ -public class TestTableIndex extends HBaseTestCase { +public class TestTableIndex extends MultiRegionTable { private static final Log LOG = LogFactory.getLog(TestTableIndex.class); static final String TABLE_NAME = "moretest"; @@ -76,13 +76,21 @@ private Path dir; private MiniHBaseCluster hCluster = null; + /** [EMAIL PROTECTED] */ @Override public void setUp() throws Exception { super.setUp(); + // Make sure the cache gets flushed so we trigger a compaction(s) and + // hence splits. + conf.setInt("hbase.hregion.memcache.flush.size", 1024 * 1024); + + // Always compact if there is more than one store file. + conf.setInt("hbase.hstore.compactionThreshold", 2); + // This size should make it so we always split using the addContent // below. After adding all data, the first region is 1.3M - conf.setLong("hbase.hregion.max.filesize", 256 * 1024); + conf.setLong("hbase.hregion.max.filesize", 128 * 1024); desc = new HTableDescriptor(TABLE_NAME); desc.addFamily(new HColumnDescriptor(INPUT_COLUMN)); @@ -103,22 +111,19 @@ admin.createTable(desc); // Populate a table into multiple regions - MultiRegionTable.makeMultiRegionTable(conf, hCluster, null, TABLE_NAME, - INPUT_COLUMN); + makeMultiRegionTable(conf, hCluster, null, TABLE_NAME, INPUT_COLUMN); // Verify table indeed has multiple regions HTable table = new HTable(conf, new Text(TABLE_NAME)); Text[] startKeys = table.getStartKeys(); assertTrue(startKeys.length > 1); } catch (Exception e) { - if (dfsCluster != null) { - dfsCluster.shutdown(); - dfsCluster = null; - } + StaticTestEnvironment.shutdownDfs(dfsCluster); throw e; } } + /** [EMAIL PROTECTED] */ @Override public void tearDown() throws Exception { super.tearDown(); @@ -127,9 +132,7 @@ hCluster.shutdown(); } - if (dfsCluster != null) { - dfsCluster.shutdown(); - } + StaticTestEnvironment.shutdownDfs(dfsCluster); } /** Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java?rev=596835&r1=596834&r2=596835&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java Tue Nov 20 13:53:30 2007 @@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.MultiRegionTable; +import org.apache.hadoop.hbase.StaticTestEnvironment; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Text; @@ -86,6 +87,21 @@ public TestTableMapReduce() { super(); + // Make the thread wake frequency a little slower so other threads + // can run + conf.setInt("hbase.server.thread.wakefrequency", 2000); + + // Make sure the cache gets flushed so we trigger a compaction(s) and + // hence splits. + conf.setInt("hbase.hregion.memcache.flush.size", 1024 * 1024); + + // Always compact if there is more than one store file. + conf.setInt("hbase.hstore.compactionThreshold", 2); + + // This size should make it so we always split using the addContent + // below. After adding all data, the first region is 1.3M + conf.setLong("hbase.hregion.max.filesize", 256 * 1024); + // Make lease timeout longer, lease checks less frequent conf.setInt("hbase.master.lease.period", 10 * 1000); conf.setInt("hbase.master.lease.thread.wakefrequency", 5 * 1000); @@ -97,9 +113,6 @@ @Override public void setUp() throws Exception { super.setUp(); - // This size is picked so the table is split into two - // after addContent in testMultiRegionTableMapReduce. - conf.setLong("hbase.hregion.max.filesize", 256 * 1024); dfsCluster = new MiniDFSCluster(conf, 1, true, (String[])null); try { fs = dfsCluster.getFileSystem(); @@ -109,10 +122,7 @@ hCluster = new MiniHBaseCluster(conf, 1, dfsCluster); LOG.info("Master is at " + this.conf.get(HConstants.MASTER_ADDRESS)); } catch (Exception e) { - if (dfsCluster != null) { - dfsCluster.shutdown(); - dfsCluster = null; - } + StaticTestEnvironment.shutdownDfs(dfsCluster); throw e; } } @@ -126,18 +136,7 @@ if(hCluster != null) { hCluster.shutdown(); } - - if (dfsCluster != null) { - dfsCluster.shutdown(); - } - - if (fs != null) { - try { - fs.close(); - } catch (IOException e) { - LOG.info("During tear down got a " + e.getMessage()); - } - } + StaticTestEnvironment.shutdownDfs(dfsCluster); } /** @@ -218,49 +217,54 @@ // insert some data into the test table HTable table = new HTable(conf, new Text(SINGLE_REGION_TABLE_NAME)); - for(int i = 0; i < values.length; i++) { - long lockid = table.startUpdate(new Text("row_" - + String.format("%1$05d", i))); + try { + for(int i = 0; i < values.length; i++) { + long lockid = table.startUpdate(new Text("row_" + + String.format("%1$05d", i))); + + try { + table.put(lockid, TEXT_INPUT_COLUMN, values[i]); + table.commit(lockid, System.currentTimeMillis()); + lockid = -1; + } finally { + if (lockid != -1) + table.abort(lockid); + } + } + + LOG.info("Print table contents before map/reduce"); + scanTable(conf, SINGLE_REGION_TABLE_NAME); + + @SuppressWarnings("deprecation") + MiniMRCluster mrCluster = new MiniMRCluster(2, fs.getUri().toString(), 1); try { - table.put(lockid, TEXT_INPUT_COLUMN, values[i]); - table.commit(lockid, System.currentTimeMillis()); - lockid = -1; - } finally { - if (lockid != -1) - table.abort(lockid); - } - } + JobConf jobConf = new JobConf(conf, TestTableMapReduce.class); + jobConf.setJobName("process column contents"); + jobConf.setNumMapTasks(1); + jobConf.setNumReduceTasks(1); - LOG.info("Print table contents before map/reduce"); - scanTable(conf, SINGLE_REGION_TABLE_NAME); - - @SuppressWarnings("deprecation") - MiniMRCluster mrCluster = new MiniMRCluster(2, fs.getUri().toString(), 1); + TableMap.initJob(SINGLE_REGION_TABLE_NAME, INPUT_COLUMN, + ProcessContentsMapper.class, jobConf); - try { - JobConf jobConf = new JobConf(conf, TestTableMapReduce.class); - jobConf.setJobName("process column contents"); - jobConf.setNumMapTasks(1); - jobConf.setNumReduceTasks(1); + TableReduce.initJob(SINGLE_REGION_TABLE_NAME, + IdentityTableReduce.class, jobConf); - TableMap.initJob(SINGLE_REGION_TABLE_NAME, INPUT_COLUMN, - ProcessContentsMapper.class, jobConf); + JobClient.runJob(jobConf); - TableReduce.initJob(SINGLE_REGION_TABLE_NAME, - IdentityTableReduce.class, jobConf); + } finally { + mrCluster.shutdown(); + } + + LOG.info("Print table contents after map/reduce"); + scanTable(conf, SINGLE_REGION_TABLE_NAME); + + // verify map-reduce results + verify(conf, SINGLE_REGION_TABLE_NAME); - JobClient.runJob(jobConf); - } finally { - mrCluster.shutdown(); + table.close(); } - - LOG.info("Print table contents after map/reduce"); - scanTable(conf, SINGLE_REGION_TABLE_NAME); - - // verify map-reduce results - verify(conf, SINGLE_REGION_TABLE_NAME); } /* @@ -277,37 +281,42 @@ admin.createTable(desc); // Populate a table into multiple regions - MultiRegionTable.makeMultiRegionTable(conf, hCluster, fs, - MULTI_REGION_TABLE_NAME, INPUT_COLUMN); + makeMultiRegionTable(conf, hCluster, fs, MULTI_REGION_TABLE_NAME, + INPUT_COLUMN); // Verify table indeed has multiple regions HTable table = new HTable(conf, new Text(MULTI_REGION_TABLE_NAME)); - Text[] startKeys = table.getStartKeys(); - assertTrue(startKeys.length > 1); + try { + Text[] startKeys = table.getStartKeys(); + assertTrue(startKeys.length > 1); - @SuppressWarnings("deprecation") - MiniMRCluster mrCluster = new MiniMRCluster(2, fs.getUri().toString(), 1); + @SuppressWarnings("deprecation") + MiniMRCluster mrCluster = new MiniMRCluster(2, fs.getUri().toString(), 1); - try { - JobConf jobConf = new JobConf(conf, TestTableMapReduce.class); - jobConf.setJobName("process column contents"); - jobConf.setNumMapTasks(2); - jobConf.setNumReduceTasks(1); + try { + JobConf jobConf = new JobConf(conf, TestTableMapReduce.class); + jobConf.setJobName("process column contents"); + jobConf.setNumMapTasks(2); + jobConf.setNumReduceTasks(1); + + TableMap.initJob(MULTI_REGION_TABLE_NAME, INPUT_COLUMN, + ProcessContentsMapper.class, jobConf); - TableMap.initJob(MULTI_REGION_TABLE_NAME, INPUT_COLUMN, - ProcessContentsMapper.class, jobConf); + TableReduce.initJob(MULTI_REGION_TABLE_NAME, + IdentityTableReduce.class, jobConf); - TableReduce.initJob(MULTI_REGION_TABLE_NAME, - IdentityTableReduce.class, jobConf); + JobClient.runJob(jobConf); - JobClient.runJob(jobConf); + } finally { + mrCluster.shutdown(); + } + + // verify map-reduce results + verify(conf, MULTI_REGION_TABLE_NAME); } finally { - mrCluster.shutdown(); + table.close(); } - - // verify map-reduce results - verify(conf, MULTI_REGION_TABLE_NAME); } private void scanTable(HBaseConfiguration conf, String tableName)