Author: stack
Date: Sat Oct  3 05:05:53 2009
New Revision: 821247

URL: http://svn.apache.org/viewvc?rev=821247&view=rev
Log:
HBASE-1883 HRegion passes the wrong minSequenceNumber to doReconstructionLog

Modified:
    hadoop/hbase/trunk/CHANGES.txt
    
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Store.java

Modified: hadoop/hbase/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/hbase/trunk/CHANGES.txt?rev=821247&r1=821246&r2=821247&view=diff
==============================================================================
--- hadoop/hbase/trunk/CHANGES.txt (original)
+++ hadoop/hbase/trunk/CHANGES.txt Sat Oct  3 05:05:53 2009
@@ -54,6 +54,8 @@
                Andrew Purtell)
    HBASE-1871  Wrong type used in TableMapReduceUtil.initTableReduceJob()
                (Lars George via Stack)
+   HBASE-1883  HRegion passes the wrong minSequenceNumber to
+               doReconstructionLog (Clint Morgan via Stack)
 
   IMPROVEMENTS
    HBASE-1760  Cleanup TODOs in HTable

Modified: 
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: 
http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=821247&r1=821246&r2=821247&view=diff
==============================================================================
--- 
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java 
(original)
+++ 
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java 
Sat Oct  3 05:05:53 2009
@@ -22,6 +22,7 @@
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableSet;
@@ -298,7 +299,8 @@
 
     // Load in all the HStores.
     long maxSeqId = -1;
-    long minSeqId = Integer.MAX_VALUE;
+    long minSeqIdToRecover = Integer.MAX_VALUE;
+    
     for (HColumnDescriptor c : this.regionInfo.getTableDesc().getFamilies()) {
       Store store = instantiateHStore(this.basedir, c, oldLogFile, reporter);
       this.stores.put(c.getName(), store);
@@ -306,13 +308,15 @@
       if (storeSeqId > maxSeqId) {
         maxSeqId = storeSeqId;
       }
-      if (storeSeqId < minSeqId) {
-        minSeqId = storeSeqId;
+      
+      long storeSeqIdBeforeRecovery = store.getMaxSeqIdBeforeLogRecovery();
+      if (storeSeqIdBeforeRecovery < minSeqIdToRecover) {
+        minSeqIdToRecover = storeSeqIdBeforeRecovery;
       }
     }
 
     // Play log if one.  Delete when done.
-    doReconstructionLog(oldLogFile, minSeqId, maxSeqId, reporter);
+    doReconstructionLog(oldLogFile, minSeqIdToRecover, maxSeqId, reporter);
     if (fs.exists(oldLogFile)) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Deleting old log file: " + oldLogFile);
@@ -1133,14 +1137,11 @@
     this.updatesLock.readLock().lock();
 
     try {
-      if (writeToWAL) {
-        this.log.append(regionInfo.getRegionName(),
-          regionInfo.getTableDesc().getName(), kvs,
-          (regionInfo.isMetaRegion() || regionInfo.isRootRegion()), now);
-      }
       long size = 0;
       Store store = getStore(family);
-      for (KeyValue kv: kvs) {
+      Iterator<KeyValue> kvIterator = kvs.iterator();
+      while(kvIterator.hasNext()) {
+        KeyValue kv = kvIterator.next();
         // Check if time is LATEST, change to time of most recent addition if 
so
         // This is expensive.
         if (kv.isLatestTimestamp() && kv.isDeleteType()) {
@@ -1154,6 +1155,7 @@
           get(store, g, qualifiers, result);
           if (result.isEmpty()) {
             // Nothing to delete
+            kvIterator.remove();
             continue;
           }
           if (result.size() > 1) {
@@ -1165,9 +1167,20 @@
         } else {
           kv.updateLatestStamp(byteNow);
         }
-
-        size = this.memstoreSize.addAndGet(store.delete(kv));
+        
+        // We must do this in this loop because it could affect 
+        // the above get to find the next timestamp to remove. 
+        // This is the case when there are multiple deletes for the same 
column.
+        size = this.memstoreSize.addAndGet(store.delete(kv));  
       }
+      
+      if (writeToWAL) {
+        this.log.append(regionInfo.getRegionName(),
+          regionInfo.getTableDesc().getName(), kvs,
+          (regionInfo.isMetaRegion() || regionInfo.isRootRegion()), now);
+      }
+
+      
       flush = isFlushSize(size);
     } finally {
       this.updatesLock.readLock().unlock();

Modified: 
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: 
http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=821247&r1=821246&r2=821247&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Store.java 
(original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Store.java 
Sat Oct  3 05:05:53 2009
@@ -129,6 +129,9 @@
   // reflected in the TreeMaps).
   private volatile long maxSeqId = -1;
 
+  // The most-recent log-seq-id before we recovered from the LOG.
+  private long maxSeqIdBeforeLogRecovery = -1;
+
   private final Path regionCompactionDir;
   private final Object compactLock = new Object();
   private final int compactionThreshold;
@@ -216,13 +219,15 @@
     // loadStoreFiles calculates this.maxSeqId. as side-effect.
     this.storefiles.putAll(loadStoreFiles());
 
+    this.maxSeqIdBeforeLogRecovery = this.maxSeqId;
+
     // Do reconstruction log.
     long newId = runReconstructionLog(reconstructionLog, this.maxSeqId, 
reporter);
     if (newId != -1) {
       this.maxSeqId = newId; // start with the log id we just recovered.
     }
   }
-
+    
   HColumnDescriptor getFamily() {
     return this.family;
   }
@@ -230,6 +235,10 @@
   long getMaxSequenceId() {
     return this.maxSeqId;
   }
+  
+  long getMaxSeqIdBeforeLogRecovery() {
+    return maxSeqIdBeforeLogRecovery;
+  }
 
   /**
    * @param tabledir
@@ -276,8 +285,7 @@
   }
 
   /*
-   * Read the reconstructionLog to see whether we need to build a brand-new 
-   * file out of non-flushed log entries.  
+   * Read the reconstructionLog and put into memstore. 
    *
    * We can ignore any log message that has a sequence ID that's equal to or 
    * lower than maxSeqID.  (Because we know such log messages are already 
@@ -303,9 +311,6 @@
     // general memory usage accounting.
     long maxSeqIdInLog = -1;
     long firstSeqIdInLog = -1;
-    // TODO: Move this memstoring over into MemStore.
-    KeyValueSkipListSet reconstructedCache =
-      new KeyValueSkipListSet(this.comparator);
     SequenceFile.Reader logReader = new SequenceFile.Reader(this.fs,
       reconstructionLog, this.conf);
     try {
@@ -332,8 +337,12 @@
           !val.matchingFamily(family.getName())) {
           continue;
         }
-        // Add anything as value as long as we use same instance each time.
-        reconstructedCache.add(val);
+
+        if (val.isDelete()) {
+          this.memstore.delete(val);
+        } else {
+          this.memstore.add(val);
+        }
         editsCount++;
         // Every 2k edits, tell the reporter we're making progress.
         // Have seen 60k edits taking 3minutes to complete.
@@ -353,26 +362,15 @@
       logReader.close();
     }
     
-    if (reconstructedCache.size() > 0) {
-      // We create a "virtual flush" at maxSeqIdInLog+1.
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("flushing reconstructionCache");
+    if (maxSeqIdInLog > -1) {
+      // We read some edits, so we should flush the memstore
+      this.snapshot();
+      boolean needCompaction = this.flushCache(maxSeqIdInLog);
+      if (needCompaction) {
+        this.compact(false);
       }
-
-      long newFileSeqNo = maxSeqIdInLog + 1;
-      StoreFile sf = internalFlushCache(reconstructedCache, newFileSeqNo);
-      // add it to the list of store files with maxSeqIdInLog+1
-      if (sf == null) {
-        throw new IOException("Flush failed with a null store file");
-      }
-      // Add new file to store files.  Clear snapshot too while we have the
-      // Store write lock.
-      this.storefiles.put(newFileSeqNo, sf);
-      notifyChangedReadersObservers();
-
-      return newFileSeqNo;
     }
-    return -1; // the reconstructed cache was 0 sized
+    return maxSeqIdInLog;
   }
 
   /*


Reply via email to