Author: stack Date: Sat Oct 3 05:05:53 2009 New Revision: 821247 URL: http://svn.apache.org/viewvc?rev=821247&view=rev Log: HBASE-1883 HRegion passes the wrong minSequenceNumber to doReconstructionLog
Modified: hadoop/hbase/trunk/CHANGES.txt hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Store.java Modified: hadoop/hbase/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/CHANGES.txt?rev=821247&r1=821246&r2=821247&view=diff ============================================================================== --- hadoop/hbase/trunk/CHANGES.txt (original) +++ hadoop/hbase/trunk/CHANGES.txt Sat Oct 3 05:05:53 2009 @@ -54,6 +54,8 @@ Andrew Purtell) HBASE-1871 Wrong type used in TableMapReduceUtil.initTableReduceJob() (Lars George via Stack) + HBASE-1883 HRegion passes the wrong minSequenceNumber to + doReconstructionLog (Clint Morgan via Stack) IMPROVEMENTS HBASE-1760 Cleanup TODOs in HTable Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=821247&r1=821246&r2=821247&view=diff ============================================================================== --- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original) +++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java Sat Oct 3 05:05:53 2009 @@ -22,6 +22,7 @@ import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.NavigableSet; @@ -298,7 +299,8 @@ // Load in all the HStores. long maxSeqId = -1; - long minSeqId = Integer.MAX_VALUE; + long minSeqIdToRecover = Integer.MAX_VALUE; + for (HColumnDescriptor c : this.regionInfo.getTableDesc().getFamilies()) { Store store = instantiateHStore(this.basedir, c, oldLogFile, reporter); this.stores.put(c.getName(), store); @@ -306,13 +308,15 @@ if (storeSeqId > maxSeqId) { maxSeqId = storeSeqId; } - if (storeSeqId < minSeqId) { - minSeqId = storeSeqId; + + long storeSeqIdBeforeRecovery = store.getMaxSeqIdBeforeLogRecovery(); + if (storeSeqIdBeforeRecovery < minSeqIdToRecover) { + minSeqIdToRecover = storeSeqIdBeforeRecovery; } } // Play log if one. Delete when done. - doReconstructionLog(oldLogFile, minSeqId, maxSeqId, reporter); + doReconstructionLog(oldLogFile, minSeqIdToRecover, maxSeqId, reporter); if (fs.exists(oldLogFile)) { if (LOG.isDebugEnabled()) { LOG.debug("Deleting old log file: " + oldLogFile); @@ -1133,14 +1137,11 @@ this.updatesLock.readLock().lock(); try { - if (writeToWAL) { - this.log.append(regionInfo.getRegionName(), - regionInfo.getTableDesc().getName(), kvs, - (regionInfo.isMetaRegion() || regionInfo.isRootRegion()), now); - } long size = 0; Store store = getStore(family); - for (KeyValue kv: kvs) { + Iterator<KeyValue> kvIterator = kvs.iterator(); + while(kvIterator.hasNext()) { + KeyValue kv = kvIterator.next(); // Check if time is LATEST, change to time of most recent addition if so // This is expensive. if (kv.isLatestTimestamp() && kv.isDeleteType()) { @@ -1154,6 +1155,7 @@ get(store, g, qualifiers, result); if (result.isEmpty()) { // Nothing to delete + kvIterator.remove(); continue; } if (result.size() > 1) { @@ -1165,9 +1167,20 @@ } else { kv.updateLatestStamp(byteNow); } - - size = this.memstoreSize.addAndGet(store.delete(kv)); + + // We must do this in this loop because it could affect + // the above get to find the next timestamp to remove. + // This is the case when there are multiple deletes for the same column. + size = this.memstoreSize.addAndGet(store.delete(kv)); } + + if (writeToWAL) { + this.log.append(regionInfo.getRegionName(), + regionInfo.getTableDesc().getName(), kvs, + (regionInfo.isMetaRegion() || regionInfo.isRootRegion()), now); + } + + flush = isFlushSize(size); } finally { this.updatesLock.readLock().unlock(); Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Store.java URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=821247&r1=821246&r2=821247&view=diff ============================================================================== --- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Store.java (original) +++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Store.java Sat Oct 3 05:05:53 2009 @@ -129,6 +129,9 @@ // reflected in the TreeMaps). private volatile long maxSeqId = -1; + // The most-recent log-seq-id before we recovered from the LOG. + private long maxSeqIdBeforeLogRecovery = -1; + private final Path regionCompactionDir; private final Object compactLock = new Object(); private final int compactionThreshold; @@ -216,13 +219,15 @@ // loadStoreFiles calculates this.maxSeqId. as side-effect. this.storefiles.putAll(loadStoreFiles()); + this.maxSeqIdBeforeLogRecovery = this.maxSeqId; + // Do reconstruction log. long newId = runReconstructionLog(reconstructionLog, this.maxSeqId, reporter); if (newId != -1) { this.maxSeqId = newId; // start with the log id we just recovered. } } - + HColumnDescriptor getFamily() { return this.family; } @@ -230,6 +235,10 @@ long getMaxSequenceId() { return this.maxSeqId; } + + long getMaxSeqIdBeforeLogRecovery() { + return maxSeqIdBeforeLogRecovery; + } /** * @param tabledir @@ -276,8 +285,7 @@ } /* - * Read the reconstructionLog to see whether we need to build a brand-new - * file out of non-flushed log entries. + * Read the reconstructionLog and put into memstore. * * We can ignore any log message that has a sequence ID that's equal to or * lower than maxSeqID. (Because we know such log messages are already @@ -303,9 +311,6 @@ // general memory usage accounting. long maxSeqIdInLog = -1; long firstSeqIdInLog = -1; - // TODO: Move this memstoring over into MemStore. - KeyValueSkipListSet reconstructedCache = - new KeyValueSkipListSet(this.comparator); SequenceFile.Reader logReader = new SequenceFile.Reader(this.fs, reconstructionLog, this.conf); try { @@ -332,8 +337,12 @@ !val.matchingFamily(family.getName())) { continue; } - // Add anything as value as long as we use same instance each time. - reconstructedCache.add(val); + + if (val.isDelete()) { + this.memstore.delete(val); + } else { + this.memstore.add(val); + } editsCount++; // Every 2k edits, tell the reporter we're making progress. // Have seen 60k edits taking 3minutes to complete. @@ -353,26 +362,15 @@ logReader.close(); } - if (reconstructedCache.size() > 0) { - // We create a "virtual flush" at maxSeqIdInLog+1. - if (LOG.isDebugEnabled()) { - LOG.debug("flushing reconstructionCache"); + if (maxSeqIdInLog > -1) { + // We read some edits, so we should flush the memstore + this.snapshot(); + boolean needCompaction = this.flushCache(maxSeqIdInLog); + if (needCompaction) { + this.compact(false); } - - long newFileSeqNo = maxSeqIdInLog + 1; - StoreFile sf = internalFlushCache(reconstructedCache, newFileSeqNo); - // add it to the list of store files with maxSeqIdInLog+1 - if (sf == null) { - throw new IOException("Flush failed with a null store file"); - } - // Add new file to store files. Clear snapshot too while we have the - // Store write lock. - this.storefiles.put(newFileSeqNo, sf); - notifyChangedReadersObservers(); - - return newFileSeqNo; } - return -1; // the reconstructed cache was 0 sized + return maxSeqIdInLog; } /*