Author: zjushch Date: Tue Feb 26 02:38:57 2013 New Revision: 1450001 URL: http://svn.apache.org/r1450001 Log: HBASE-7671 Flushing memstore again after last failure could cause data loss
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1450001&r1=1450000&r2=1450001&view=diff ============================================================================== --- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original) +++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Tue Feb 26 02:38:57 2013 @@ -1448,6 +1448,10 @@ public class HRegion implements HeapSize protected boolean internalFlushcache( final HLog wal, final long myseqid, MonitoredTask status) throws IOException { + if (this.rsServices != null && this.rsServices.isAborted()) { + // Don't flush when server aborting, it's unsafe + throw new IOException("Aborting flush because server is abortted..."); + } final long startTime = EnvironmentEdgeManager.currentTimeMillis(); // Clear flush flag. // Record latest flush time Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1450001&r1=1450000&r2=1450001&view=diff ============================================================================== --- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original) +++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Tue Feb 26 02:38:57 2013 @@ -729,7 +729,7 @@ public class Store extends SchemaConfigu * @return Path The path name of the tmp file to which the store was flushed * @throws IOException */ - private Path flushCache(final long logCacheFlushId, + protected Path flushCache(final long logCacheFlushId, SortedSet<KeyValue> snapshot, TimeRangeTracker snapshotTimeRangeTracker, AtomicLong flushedSize, Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java?rev=1450001&r1=1450000&r2=1450001&view=diff ============================================================================== --- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java (original) +++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java Tue Feb 26 02:38:57 2013 @@ -21,12 +21,16 @@ package org.apache.hadoop.hbase.regionse import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.io.IOException; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.List; +import java.util.SortedSet; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -34,21 +38,31 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.MediumTests; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.regionserver.FlushRequester; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdge; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.hbase.util.Strings; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -428,6 +442,129 @@ public class TestWALReplay { } /** + * Test that we could recover the data correctly after aborting flush. In the + * test, first we abort flush after writing some data, then writing more data + * and flush again, at last verify the data. + * @throws IOException + */ + @Test + public void testReplayEditsAfterAbortingFlush() throws IOException { + final String tableNameStr = "testReplayEditsAfterAbortingFlush"; + final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableNameStr); + final Path basedir = new Path(this.hbaseRootDir, tableNameStr); + deleteDir(basedir); + final HTableDescriptor htd = createBasic3FamilyHTD(tableNameStr); + HRegion region3 = HRegion.createHRegion(hri, hbaseRootDir, this.conf, htd); + region3.close(); + region3.getLog().closeAndDelete(); + // Write countPerFamily edits into the three families. Do a flush on one + // of the families during the load of edits so its seqid is not same as + // others to test we do right thing when different seqids. + HLog wal = createWAL(this.conf); + final AtomicBoolean throwExceptionWhenFlushing = new AtomicBoolean(false); + RegionServerServices rsServices = Mockito.mock(RegionServerServices.class); + Mockito.doReturn(false).when(rsServices).isAborted(); + HRegion region = new HRegion(basedir, wal, this.fs, this.conf, hri, htd, + rsServices) { + @Override + protected Store instantiateHStore(Path tableDir, HColumnDescriptor c) + throws IOException { + return new Store(tableDir, this, c, fs, conf) { + @Override + protected Path flushCache(final long logCacheFlushId, + SortedSet<KeyValue> snapshot, + TimeRangeTracker snapshotTimeRangeTracker, + AtomicLong flushedSize, MonitoredTask status) throws IOException { + if (throwExceptionWhenFlushing.get()) { + throw new IOException("Simulated exception by tests"); + } + return super.flushCache(logCacheFlushId, snapshot, + snapshotTimeRangeTracker, flushedSize, status); + } + }; + } + }; + long seqid = region.initialize(); + // HRegionServer usually does this. It knows the largest seqid across all + // regions. + wal.setSequenceNumber(seqid); + + int writtenRowCount = 10; + List<HColumnDescriptor> families = new ArrayList<HColumnDescriptor>( + htd.getFamilies()); + for (int i = 0; i < writtenRowCount; i++) { + Put put = new Put(Bytes.toBytes(tableNameStr + Integer.toString(i))); + put.add(families.get(i % families.size()).getName(), Bytes.toBytes("q"), + Bytes.toBytes("val")); + region.put(put); + } + + // Now assert edits made it in. + RegionScanner scanner = region.getScanner(new Scan()); + assertEquals(writtenRowCount, getScannedCount(scanner)); + + // Let us flush the region + throwExceptionWhenFlushing.set(true); + try { + region.flushcache(); + fail("Injected exception hasn't been thrown"); + } catch (Throwable t) { + LOG.info("Expected simulated exception when flushing region," + + t.getMessage()); + // simulated to abort server + Mockito.doReturn(true).when(rsServices).isAborted(); + } + // writing more data + int moreRow = 10; + for (int i = writtenRowCount; i < writtenRowCount + moreRow; i++) { + Put put = new Put(Bytes.toBytes(tableNameStr + Integer.toString(i))); + put.add(families.get(i % families.size()).getName(), Bytes.toBytes("q"), + Bytes.toBytes("val")); + region.put(put); + } + writtenRowCount += moreRow; + // call flush again + throwExceptionWhenFlushing.set(false); + try { + region.flushcache(); + } catch (IOException t) { + LOG.info("Expected exception when flushing region because server is stopped," + + t.getMessage()); + } + + region.close(true); + wal.close(); + + // Let us try to split and recover + runWALSplit(this.conf); + HLog wal2 = createWAL(this.conf); + Mockito.doReturn(false).when(rsServices).isAborted(); + HRegion region2 = new HRegion(basedir, wal2, this.fs, this.conf, hri, htd, + rsServices); + long seqid2 = region2.initialize(); + // HRegionServer usually does this. It knows the largest seqid across all + // regions. + wal2.setSequenceNumber(seqid2); + + scanner = region2.getScanner(new Scan()); + assertEquals(writtenRowCount, getScannedCount(scanner)); + } + + private int getScannedCount(RegionScanner scanner) throws IOException { + int scannedCount = 0; + List<KeyValue> results = new ArrayList<KeyValue>(); + while (true) { + boolean existMore = scanner.next(results); + if (!results.isEmpty()) + scannedCount++; + if (!existMore) + break; + results.clear(); + } + return scannedCount; + } + + /** * Create an HRegion with the result of a HLog split and test we only see the * good edits * @throws Exception