Author: mbautin Date: Wed Sep 19 01:51:45 2012 New Revision: 1387429 URL: http://svn.apache.org/viewvc?rev=1387429&view=rev Log: [HBASE-6059] Fixing WAL Replay bug
Author: lipi Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionClose.java hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1387429&r1=1387428&r2=1387429&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Wed Sep 19 01:51:45 2012 @@ -54,7 +54,6 @@ import java.util.concurrent.atomic.Atomi import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.Date; -import java.util.Calendar; import java.text.SimpleDateFormat; import org.apache.commons.logging.Log; @@ -70,7 +69,6 @@ import org.apache.hadoop.hbase.HColumnDe import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants.OperationStatusCode; import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HServerInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.NotServingRegionException; @@ -563,22 +561,19 @@ public class HRegion implements HeapSize cleanupTmpDir(); // Load in all the HStores. - // Get minimum of the maxSeqId across all the store. // // Context: During replay we want to ensure that we do not lose any data. So, we // have to be conservative in how we replay logs. For each store, we calculate - // the maxSeqId up to which the store was flushed. But, since different stores - // could have a different maxSeqId, we choose the - // minimum across all the stores. - // This could potentially result in duplication of data for stores that are ahead - // of others. ColumnTrackers in the ScanQueryMatchers do the de-duplication, so we - // do not have to worry. - // TODO: If there is a store that was never flushed in a long time, we could replay - // a lot of data. Currently, this is not a problem because we flush all the stores at - // the same time. If we move to per-cf flushing, we might want to revisit this and send - // in a vector of maxSeqIds instead of sending in a single number, which has to be the - // min across all the max. - long minSeqId = -1; + // the maximum seqId up to which the store was flushed. We skip the edits which + // are equal to or lower than the maxSeqId for each store. + // + // We cannot just choose the minimum maxSeqId for all stores, because doing so + // can cause correctness issues if redundant edits replayed from the logs are + // not contiguous. + + Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>( + Bytes.BYTES_COMPARATOR); + long maxSeqId = -1; // initialized to -1 so that we pick up MemstoreTS from column families long maxMemstoreTS = -1; @@ -609,10 +604,10 @@ public class HRegion implements HeapSize this.stores.put(store.getColumnFamilyName().getBytes(), store); // Do not include bulk loaded files when determining seqIdForReplay - long storeSeqIdForReplay = store.getMaxSequenceId(false); - if (minSeqId == -1 || storeSeqIdForReplay < minSeqId) { - minSeqId = storeSeqIdForReplay; - } + long storeSeqIdforReplay = store.getMaxSequenceId(false); + maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(), + storeSeqIdforReplay); + // Include bulk loaded files when determining seqIdForAssignment long storeSeqIdForAssignment = store.getMaxSequenceId(true); if (maxSeqId == -1 || storeSeqIdForAssignment > maxSeqId) { @@ -634,7 +629,7 @@ public class HRegion implements HeapSize mvcc.initialize(maxMemstoreTS + 1); // Recover any edits if available. maxSeqId = Math.max(maxSeqId, replayRecoveredEditsIfAny( - this.regiondir, minSeqId, reporter, status)); + this.regiondir, maxSeqIdInStores, reporter, status)); // Get rid of any splits or merges that were lost in-progress. Clean out // these directories here on open. We may be opening a region that was @@ -1540,13 +1535,6 @@ public class HRegion implements HeapSize this.getRegionInfo().isMetaRegion()); } - // Update the last flushed sequence id for region - if (this.regionServer != null) { - this.regionServer.getServerInfo().setFlushedSequenceIdForRegion( - getRegionName(), - completeSequenceId); - } - // C. Finally notify anyone waiting on memstore to clear: // e.g. checkResources(). synchronized (this) { @@ -2516,19 +2504,21 @@ public class HRegion implements HeapSize * make sense in a this single region context only -- until we online. * * @param regiondir - * @param minSeqId Any edit found in split editlogs needs to be in excess of - * this minSeqId to be applied, else its skipped. + * @param maxSeqIdInStores Any edit found in split editlogs needs to be in + * excess of the maxSeqId for the store to be applied, else its skipped. * @param reporter * @param status - * @return the sequence id of the last edit added to this region out of the - * recovered edits log or <code>minSeqId</code> if nothing added from editlogs. + * @return the maximum sequence id of the edits replayed or -1 if nothing + * added from editlogs. * @throws UnsupportedEncodingException * @throws IOException */ protected long replayRecoveredEditsIfAny(final Path regiondir, - final long minSeqId, final Progressable reporter, MonitoredTask status) + final Map<byte[], Long> maxSeqIdInStores, final Progressable reporter, MonitoredTask status) throws UnsupportedEncodingException, IOException { - long seqid = minSeqId; + + long seqid = -1; + NavigableSet<Path> files = HLog.getSplitEditFilesSorted(this.fs, regiondir); if (files == null || files.isEmpty()) return seqid; for (Path edits: files) { @@ -2536,9 +2526,11 @@ public class HRegion implements HeapSize LOG.warn("Null or non-existent edits file: " + edits); continue; } - if (isZeroLengthThenDelete(this.fs, edits)) continue; + if (isZeroLengthThenDelete(this.fs, edits)) { + continue; + } try { - seqid = replayRecoveredEdits(edits, seqid, reporter); + seqid = Math.max(seqid, replayRecoveredEdits(edits, maxSeqIdInStores, reporter)); } catch (IOException e) { boolean skipErrors = conf.getBoolean("hbase.skip.errors", false); if (skipErrors) { @@ -2550,10 +2542,12 @@ public class HRegion implements HeapSize } } } - if (seqid > minSeqId) { - // Then we added some edits to memory. Flush and cleanup split edit files. + + if (seqid > -1){ + // In case we added some edits to memory, we should flush internalFlushcache(null, seqid, status); } + // Now delete the content of recovered edits. We're done w/ them. for (Path file: files) { if (!this.fs.delete(file, false)) { @@ -2567,112 +2561,117 @@ public class HRegion implements HeapSize /* * @param edits File of recovered edits. - * @param minSeqId Minimum sequenceid found in a store file. Edits in log - * must be larger than this to be replayed. + * @param maxSeqIdInStores Maximum sequenceid found in each store. Edits in log + * must be larger than this to be replayed for each store. * @param reporter - * @return the sequence id of the last edit added to this region out of the - * recovered edits log or <code>minSeqId</code> if nothing added from editlogs. + * @return the sequence id of the max edit log entry seen in the HLog or -1 + * if no edits have been replayed * @throws IOException */ private long replayRecoveredEdits(final Path edits, - final long minSeqId, final Progressable reporter) + final Map<byte[], Long> maxSeqIdInStores, final Progressable reporter) throws IOException { - String msg = "Replaying edits from " + edits + "; minSeqId=" + minSeqId; + String msg = "Replaying edits from " + edits; LOG.info(msg); MonitoredTask status = TaskMonitor.get().createStatus(msg); status.setStatus("Opening logs"); HLog.Reader reader = HLog.getReader(this.fs, edits, conf); try { - long currentEditSeqId = minSeqId; - long firstSeqIdInLog = -1; - long skippedEdits = 0; - long editsCount = 0; - HLog.Entry entry; - Store store = null; + long currentEditSeqId = -1; + long firstSeqIdInLog = -1; + long skippedEdits = 0; + long editsCount = 0; + HLog.Entry entry; + Store store = null; - try { - // How many edits to apply before we send a progress report. - int interval = this.conf.getInt("hbase.hstore.report.interval.edits", 2000); - while ((entry = reader.next()) != null) { - HLogKey key = entry.getKey(); - WALEdit val = entry.getEdit(); - if (firstSeqIdInLog == -1) { - firstSeqIdInLog = key.getLogSeqNum(); - } - // Now, figure if we should skip this edit. - if (key.getLogSeqNum() <= currentEditSeqId) { - skippedEdits++; - continue; - } - currentEditSeqId = key.getLogSeqNum(); - boolean flush = false; - for (KeyValue kv: val.getKeyValues()) { - // Check this edit is for me. Also, guard against writing the special - // METACOLUMN info such as HBASE::CACHEFLUSH entries - if (kv.matchingFamily(HLog.METAFAMILY) || - !Bytes.equals(key.getRegionName(), this.regionInfo.getRegionName())) { - skippedEdits++; - continue; + try { + // How many edits to apply before we send a progress report. + int interval = this.conf.getInt("hbase.hstore.report.interval.edits", 2000); + while ((entry = reader.next()) != null) { + HLogKey key = entry.getKey(); + WALEdit val = entry.getEdit(); + if (firstSeqIdInLog == -1) { + firstSeqIdInLog = key.getLogSeqNum(); } - // Figure which store the edit is meant for. - if (store == null || !kv.matchingFamily(store.getFamily().getName())) { - store = this.stores.get(kv.getFamily()); + currentEditSeqId = key.getLogSeqNum(); + boolean flush = false; + for (KeyValue kv: val.getKeyValues()) { + // Check this edit is for me. Also, guard against writing the special + // METACOLUMN info such as HBASE::CACHEFLUSH entries + if (kv.matchingFamily(HLog.METAFAMILY) || + !Bytes.equals(key.getRegionName(), this.regionInfo.getRegionName())) { + skippedEdits++; + continue; + } + // Figure which store the edit is meant for. + if (store == null || !kv.matchingFamily(store.getFamily().getName())) { + store = this.stores.get(kv.getFamily()); + } + if (store == null) { + // This should never happen. Perhaps schema was changed between + // crash and redeploy? + LOG.warn("No family for " + kv); + skippedEdits++; + continue; + } + // Now, figure if we should skip this edit. + if (key.getLogSeqNum() <= maxSeqIdInStores.get(store.getFamily() + .getName())) { + skippedEdits++; + continue; + } + // Once we are over the limit, restoreEdit will keep returning true to + // flush -- but don't flush until we've played all the kvs that make up + // the WALEdit. + flush = restoreEdit(store, kv); + editsCount++; } - if (store == null) { - // This should never happen. Perhaps schema was changed between - // crash and redeploy? - LOG.warn("No family for " + kv); - skippedEdits++; - continue; + if (flush) internalFlushcache(null, currentEditSeqId, status); + + // Every 'interval' edits, tell the reporter we're making progress. + // Have seen 60k edits taking 3minutes to complete. + if (reporter != null && (editsCount % interval) == 0) { + status.setStatus("Replaying edits..." + + " skipped=" + skippedEdits + + " edits=" + editsCount); + reporter.progress(); } - // Once we are over the limit, restoreEdit will keep returning true to - // flush -- but don't flush until we've played all the kvs that make up - // the WALEdit. - flush = restoreEdit(store, kv); - editsCount++; - } - if (flush) internalFlushcache(null, currentEditSeqId, status); - - // Every 'interval' edits, tell the reporter we're making progress. - // Have seen 60k edits taking 3minutes to complete. - if (reporter != null && (editsCount % interval) == 0) { - status.setStatus("Replaying edits..." + - " skipped=" + skippedEdits + - " edits=" + editsCount); - reporter.progress(); - } - } - } catch (EOFException eof) { - Path p = HLog.moveAsideBadEditsFile(fs, edits); - msg = "Encountered EOF. Most likely due to Master failure during " + - "log spliting, so we have this data in another edit. " + - "Continuing, but renaming " + edits + " as " + p; - LOG.warn(msg, eof); - status.setStatus(msg); - } catch (IOException ioe) { - // If the IOE resulted from bad file format, - // then this problem is idempotent and retrying won't help - if (ioe.getCause() instanceof ParseException) { + } + } catch (EOFException eof) { Path p = HLog.moveAsideBadEditsFile(fs, edits); - msg = "File corruption encountered! " + + msg = "Encountered EOF. Most likely due to Master failure during " + + "log spliting, so we have this data in another edit. " + "Continuing, but renaming " + edits + " as " + p; - LOG.warn(msg, ioe); + LOG.warn(msg, eof); status.setStatus(msg); + } catch (IOException ioe) { + // If the IOE resulted from bad file format, + // then this problem is idempotent and retrying won't help + if (ioe.getCause() instanceof ParseException) { + Path p = HLog.moveAsideBadEditsFile(fs, edits); + msg = "File corruption encountered! " + + "Continuing, but renaming " + edits + " as " + p; + LOG.warn(msg, ioe); + status.setStatus(msg); + } else { + // other IO errors may be transient (bad network connection, + // checksum exception on one datanode, etc). throw & retry + status.abort(StringUtils.stringifyException(ioe)); + throw ioe; + } + } + msg = "Applied " + editsCount + ", skipped " + skippedEdits + + ", firstSeqIdInLog=" + firstSeqIdInLog + + ", maxSeqIdInLog=" + currentEditSeqId; + status.markComplete(msg); + if (LOG.isDebugEnabled()) { + LOG.debug(msg); + } + if(editsCount == 0){ + return -1; } else { - // other IO errors may be transient (bad network connection, - // checksum exception on one datanode, etc). throw & retry - status.abort(StringUtils.stringifyException(ioe)); - throw ioe; + return currentEditSeqId; } - } - msg = "Applied " + editsCount + ", skipped " + skippedEdits + - ", firstSeqIdInLog=" + firstSeqIdInLog + - ", maxSeqIdInLog=" + currentEditSeqId; - status.markComplete(msg); - if (LOG.isDebugEnabled()) { - LOG.debug(msg); - } - return currentEditSeqId; } finally { reader.close(); status.cleanup(); Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1387429&r1=1387428&r2=1387429&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Wed Sep 19 01:51:45 2012 @@ -1159,10 +1159,13 @@ public class Store extends SchemaConfigu boolean hasMore; do { hasMore = scanner.next(kvs, 1); + // Create the writer even if no kv(Empty store file is also ok), + // because we need record the max seq id for the store file, see + // HBASE-6059 + if (writer == null) { + writer = createWriterInTmp(maxKeyCount, compression, true); + } if (!kvs.isEmpty()) { - if (writer == null) { - writer = createWriterInTmp(maxKeyCount, compression, true); - } // output to writer: for (KeyValue kv : kvs) { if (kv.getMemstoreTS() <= smallestReadPoint) { Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionClose.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionClose.java?rev=1387429&r1=1387428&r2=1387429&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionClose.java (original) +++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionClose.java Wed Sep 19 01:51:45 2012 @@ -19,8 +19,6 @@ */ package org.apache.hadoop.hbase.regionserver; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HRegionInfo; Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java?rev=1387429&r1=1387428&r2=1387429&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java (original) +++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java Wed Sep 19 01:51:45 2012 @@ -38,16 +38,24 @@ import org.apache.hadoop.hbase.HConstant import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.MasterNotRunningException; +import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdge; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.ResultScanner; + import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -75,7 +83,7 @@ public class TestWALReplay { // The below config supported by 0.20-append and CDH3b2 conf.setInt("dfs.client.block.recovery.retries", 2); conf.setInt("hbase.regionserver.flushlogentries", 1); - TEST_UTIL.startMiniDFSCluster(3); + TEST_UTIL.startMiniCluster(3); TEST_UTIL.setNameNodeNameSystemLeasePeriod(100, 10000); Path hbaseRootDir = TEST_UTIL.getDFSCluster().getFileSystem().makeQualified(new Path("/hbase")); @@ -85,7 +93,7 @@ public class TestWALReplay { @AfterClass public static void tearDownAfterClass() throws Exception { - TEST_UTIL.shutdownMiniDFSCluster(); + TEST_UTIL.shutdownMiniCluster(); } @Before @@ -476,6 +484,97 @@ public class TestWALReplay { } } + /** + * + * @throws Exception + */ + @Test + public void testReplayEditsAfterRegionMovedWithMultiCF() throws Exception { + final byte[] tableName = Bytes + .toBytes("testReplayEditsAfterRegionMovedWithMultiCF"); + byte[] family1 = Bytes.toBytes("cf1"); + byte[] family2 = Bytes.toBytes("cf2"); + byte[] qualifier = Bytes.toBytes("q"); + byte[] value = Bytes.toBytes("testV"); + byte[][] familys = { family1, family2 }; + TEST_UTIL.createTable(tableName, familys); + HTable htable = new HTable(TEST_UTIL.getConfiguration(), tableName); + Put put = new Put(Bytes.toBytes("r1")); + put.add(family1, qualifier, value); + htable.put(put); + ResultScanner resultScanner = htable.getScanner(new Scan()); + int count = 0; + while (resultScanner.next() != null) { + count++; + } + resultScanner.close(); + assertEquals(1, count); + + MiniHBaseCluster hbaseCluster = TEST_UTIL.getMiniHBaseCluster(); + List<HRegion> regions = hbaseCluster.getRegions(tableName); + assertEquals(1, regions.size()); + + // move region to another regionserver + HRegion regionToMove = regions.get(0); + int originServerNum = hbaseCluster + .getServerWith(regionToMove.getRegionName()); + assertTrue("Please start more than 1 regionserver", hbaseCluster + .getRegionServerThreads().size() > 1); + int destServerNum = 0; + while (destServerNum == originServerNum) { + destServerNum++; + } + HRegionServer originServer = hbaseCluster.getRegionServer(originServerNum); + HRegionServer destServer = hbaseCluster.getRegionServer(destServerNum); + // move region to destination regionserver + moveRegionAndWait(regionToMove, destServer); + + // delete the row + Delete del = new Delete(Bytes.toBytes("r1")); + htable.delete(del); + resultScanner = htable.getScanner(new Scan()); + count = 0; + while (resultScanner.next() != null) { + count++; + } + resultScanner.close(); + assertEquals(0, count); + + // flush region and make major compaction + destServer.getOnlineRegion(regionToMove.getRegionName()).flushcache(); + // wait to complete major compaction + for (Store store : destServer.getOnlineRegion(regionToMove.getRegionName()) + .getStores().values()) { + store.triggerMajorCompaction(); + } + destServer.getOnlineRegion(regionToMove.getRegionName()).compactStores(); + + // move region to origin regionserver + moveRegionAndWait(regionToMove, originServer); + // abort the origin regionserver + originServer.abort("testing"); + + // see what we get + Result result = htable.get(new Get(Bytes.toBytes("r1"))); + if (result != null) { + assertTrue("Row is deleted, but we get" + result.toString(), + (result == null) || result.isEmpty()); + } + resultScanner.close(); + } + + private void moveRegionAndWait(HRegion regionToMove, HRegionServer destServer) + throws InterruptedException, MasterNotRunningException, + IOException { + TEST_UTIL.getHBaseAdmin().moveRegion( + regionToMove.getRegionName(), + destServer.getServerInfo().getHostnamePort()); + while (destServer.getOnlineRegion(regionToMove.getRegionName()) == null){ + //Wait for this move to complete. + Thread.sleep(10); + } + } + private void addWALEdits (final byte [] tableName, final HRegionInfo hri, final byte [] rowName, final byte [] family, final int count, EnvironmentEdge ee, final HLog wal)