Author: stack
Date: Wed Apr 16 20:21:11 2008
New Revision: 648940

URL: http://svn.apache.org/viewvc?rev=648940&view=rev
Log:
HBASE-532 Odd interaction between HRegion.get, HRegion.deleteAll and compactions
Backport to branch.

Modified:
    hadoop/hbase/branches/0.1/CHANGES.txt
    
hadoop/hbase/branches/0.1/src/java/org/apache/hadoop/hbase/HAbstractScanner.java
    hadoop/hbase/branches/0.1/src/java/org/apache/hadoop/hbase/HStore.java
    
hadoop/hbase/branches/0.1/src/test/org/apache/hadoop/hbase/TestHMemcache.java

Modified: hadoop/hbase/branches/0.1/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/hbase/branches/0.1/CHANGES.txt?rev=648940&r1=648939&r2=648940&view=diff
==============================================================================
--- hadoop/hbase/branches/0.1/CHANGES.txt (original)
+++ hadoop/hbase/branches/0.1/CHANGES.txt Wed Apr 16 20:21:11 2008
@@ -14,6 +14,7 @@
    HBASE-500   Regionserver stuck on exit
    HBASE-582   HBase 554 forgot to clear results on each iteration caused by a 
filter
                (Clint Morgan via Stack)
+   HBASE-532   Odd interaction between HRegion.get, HRegion.deleteAll and 
compactions
 
   IMPROVEMENTS
    HBASE-559   MR example job to count table rows

Modified: 
hadoop/hbase/branches/0.1/src/java/org/apache/hadoop/hbase/HAbstractScanner.java
URL: 
http://svn.apache.org/viewvc/hadoop/hbase/branches/0.1/src/java/org/apache/hadoop/hbase/HAbstractScanner.java?rev=648940&r1=648939&r2=648940&view=diff
==============================================================================
--- 
hadoop/hbase/branches/0.1/src/java/org/apache/hadoop/hbase/HAbstractScanner.java
 (original)
+++ 
hadoop/hbase/branches/0.1/src/java/org/apache/hadoop/hbase/HAbstractScanner.java
 Wed Apr 16 20:21:11 2008
@@ -33,7 +33,6 @@
 
 /**
  * Abstract base class that implements the HScannerInterface.
- * Used by the concrete HMemcacheScanner and HStoreScanners
  */
 public abstract class HAbstractScanner implements HInternalScannerInterface {
   final Log LOG = LogFactory.getLog(this.getClass().getName());
@@ -111,11 +110,6 @@
   
   protected boolean scannerClosed = false;                      // True when 
scanning is done
   
-  // Keys retrieved from the sources
-  protected HStoreKey keys[];
-  // Values that correspond to those keys
-  protected byte [][] vals;
-  
   protected long timestamp;                                     // The 
timestamp to match entries against
   private boolean wildcardMatch;
   private boolean multipleMatchers;
@@ -145,18 +139,16 @@
   }
 
   /**
-   * For a particular column i, find all the matchers defined for the column.
+   * For a particular column, find all the matchers defined for the column.
    * Compare the column family and column key using the matchers. The first one
    * that matches returns true. If no matchers are successful, return false.
    * 
-   * @param i index into the keys array
-   * @return true  - if any of the matchers for the column match the column 
family
-   *                 and the column key.
-   *                 
+   * @param column Column to test
+   * @return true if any of the matchers for the column match the column family
+   * and the column key.
    * @throws IOException
    */
-  boolean columnMatch(int i) throws IOException {
-    Text column = keys[i].getColumn();
+  protected boolean columnMatch(final Text column) throws IOException {
     Vector<ColumnMatcher> matchers =
       okCols.get(HStoreKey.extractFamily(column));
     if(matchers == null) {
@@ -170,18 +162,6 @@
     return false;
   }
   
-  /**
-   * If the user didn't want to start scanning at the first row, this method
-   * seeks to the requested row.
-   */
-  abstract boolean findFirstRow(int i, Text firstRow) throws IOException;
-  
-  /** The concrete implementations provide a mechanism to find the next set of 
values */
-  abstract boolean getNext(int i) throws IOException;
-  
-  /** Mechanism used by concrete implementation to shut down a particular 
scanner */
-  abstract void closeSubScanner(int i);
-  
   /** [EMAIL PROTECTED] */
   public boolean isWildcardScanner() {
     return this.wildcardMatch;
@@ -192,89 +172,11 @@
     return this.multipleMatchers;
   }
   
-  /**
-   * Get the next set of values for this scanner.
-   * 
-   * @param key The key that matched
-   * @param results All the results for <code>key</code>
-   * @return true if a match was found
-   * @throws IOException
-   * 
-   * @see 
org.apache.hadoop.hbase.HScannerInterface#next(org.apache.hadoop.hbase.HStoreKey,
 java.util.SortedMap)
-   */
-  public boolean next(HStoreKey key, SortedMap<Text, byte []> results)
-  throws IOException {
-    if (scannerClosed) {
-      return false;
-    }
-    // Find the next row label (and timestamp)
-    Text chosenRow = null;
-    long chosenTimestamp = -1;
-    for(int i = 0; i < keys.length; i++) {
-      if((keys[i] != null)
-          && (columnMatch(i))
-          && (keys[i].getTimestamp() <= this.timestamp)
-          && ((chosenRow == null)
-              || (keys[i].getRow().compareTo(chosenRow) < 0)
-              || ((keys[i].getRow().compareTo(chosenRow) == 0)
-                  && (keys[i].getTimestamp() > chosenTimestamp)))) {
-        chosenRow = new Text(keys[i].getRow());
-        chosenTimestamp = keys[i].getTimestamp();
-      }
-    }
+  public abstract boolean next(HStoreKey key, SortedMap<Text, byte []> results)
+  throws IOException;
 
-    // Grab all the values that match this row/timestamp
-    boolean insertedItem = false;
-    if(chosenRow != null) {
-      key.setRow(chosenRow);
-      key.setVersion(chosenTimestamp);
-      key.setColumn(new Text(""));
-
-      for(int i = 0; i < keys.length; i++) {
-        // Fetch the data
-        while((keys[i] != null)
-            && (keys[i].getRow().compareTo(chosenRow) == 0)) {
-
-          // If we are doing a wild card match or there are multiple matchers
-          // per column, we need to scan all the older versions of this row
-          // to pick up the rest of the family members
-          
-          if(!wildcardMatch
-              && !multipleMatchers
-              && (keys[i].getTimestamp() != chosenTimestamp)) {
-            break;
-          }
-
-          if(columnMatch(i)) {              
-            // We only want the first result for any specific family member
-            if(!results.containsKey(keys[i].getColumn())) {
-              results.put(new Text(keys[i].getColumn()), vals[i]);
-              insertedItem = true;
-            }
-          }
-
-          if(!getNext(i)) {
-            closeSubScanner(i);
-          }
-        }
-
-        // Advance the current scanner beyond the chosen row, to
-        // a valid timestamp, so we're ready next time.
-        
-        while((keys[i] != null)
-            && ((keys[i].getRow().compareTo(chosenRow) <= 0)
-                || (keys[i].getTimestamp() > this.timestamp)
-                || (! columnMatch(i)))) {
-          getNext(i);
-        }
-      }
-    }
-    return insertedItem;
-  }
-  
-  /** [EMAIL PROTECTED] */
   public Iterator<Entry<HStoreKey, SortedMap<Text, byte[]>>> iterator() {
     throw new UnsupportedOperationException("Unimplemented serverside. " +
       "next(HStoreKey, StortedMap(...) is more efficient");
   }
-}
\ No newline at end of file
+}

Modified: hadoop/hbase/branches/0.1/src/java/org/apache/hadoop/hbase/HStore.java
URL: 
http://svn.apache.org/viewvc/hadoop/hbase/branches/0.1/src/java/org/apache/hadoop/hbase/HStore.java?rev=648940&r1=648939&r2=648940&view=diff
==============================================================================
--- hadoop/hbase/branches/0.1/src/java/org/apache/hadoop/hbase/HStore.java 
(original)
+++ hadoop/hbase/branches/0.1/src/java/org/apache/hadoop/hbase/HStore.java Wed 
Apr 16 20:21:11 2008
@@ -21,12 +21,15 @@
 
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
+import java.rmi.UnexpectedException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.Map.Entry;
@@ -39,8 +42,8 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.filter.RowFilterInterface;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -69,77 +72,105 @@
   static final Log LOG = LogFactory.getLog(HStore.class);
 
   /**
-   * The Memcache holds in-memory modifications to the HRegion.  This is 
really a
-   * wrapper around a TreeMap that helps us when staging the Memcache out to 
disk.
+   * The Memcache holds in-memory modifications to the HRegion.
+   * Keeps a current map.  When asked to flush the map, current map is moved
+   * to snapshot and is cleared.  We continue to serve edits out of new map
+   * and backing snapshot until flusher reports in that the flush succeeded. At
+   * this point we let the snapshot go.
    */
   static class Memcache {
-
     // Note that since these structures are always accessed with a lock held,
-    // no additional synchronization is required.
+    // so no additional synchronization is required.
 
-    @SuppressWarnings("hiding")
-    private final SortedMap<HStoreKey, byte[]> memcache =
-      Collections.synchronizedSortedMap(new TreeMap<HStoreKey, byte []>());
-      
-    volatile SortedMap<HStoreKey, byte[]> snapshot;
-      
-    @SuppressWarnings("hiding")
-    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+    // The currently active sorted map of edits.
+    private volatile SortedMap<HStoreKey, byte[]> mc =
+      createSynchronizedSortedMap();
+ 
+    // Snapshot of memcache.  Made for flusher.
+    private volatile SortedMap<HStoreKey, byte[]> snapshot =
+      createSynchronizedSortedMap();
 
-    /**
-     * Constructor
+    private final ReentrantReadWriteLock mc_lock = new 
ReentrantReadWriteLock();
+
+    /*
+     * Utility method.
+     * @return sycnhronized sorted map of HStoreKey to byte arrays.
      */
-    Memcache() {
-      snapshot = 
-        Collections.synchronizedSortedMap(new TreeMap<HStoreKey, byte []>());
+    private static SortedMap<HStoreKey, byte[]> createSynchronizedSortedMap() {
+      return Collections.synchronizedSortedMap(new TreeMap<HStoreKey, byte 
[]>());
     }
 
     /**
      * Creates a snapshot of the current Memcache
      */
-    void snapshot() {
-      this.lock.writeLock().lock();
+    SortedMap<HStoreKey, byte[]> snapshot() {
+      this.mc_lock.writeLock().lock();
       try {
-        synchronized (memcache) {
-          if (memcache.size() != 0) {
-            snapshot.putAll(memcache);
-            memcache.clear();
-          }
+        // If snapshot has entries, then flusher failed or didn't call cleanup.
+        if (this.snapshot.size() > 0) {
+          LOG.debug("Returning existing snapshot. Either the snapshot was run 
" +
+            "by the region -- normal operation but to be fixed -- or there is 
" +
+            "another ongoing flush or did we fail last attempt?");
+          return this.snapshot;
+        }
+        // We used to synchronize on the memcache here but we're inside a
+        // write lock so removed it. Comment is left in case removal was a
+        // mistake. St.Ack
+        if (this.mc.size() != 0) {
+          this.snapshot = this.mc;
+          this.mc = createSynchronizedSortedMap();
         }
+        return this.snapshot;
       } finally {
-        this.lock.writeLock().unlock();
+        this.mc_lock.writeLock().unlock();
       }
     }
-    
-    /**
-     * @return memcache snapshot
-     */
-    SortedMap<HStoreKey, byte[]> getSnapshot() {
-      this.lock.writeLock().lock();
-      try {
-        SortedMap<HStoreKey, byte[]> currentSnapshot = snapshot;
-        snapshot = 
-          Collections.synchronizedSortedMap(new TreeMap<HStoreKey, byte []>());
-        
-        return currentSnapshot;
 
-      } finally {
-        this.lock.writeLock().unlock();
-      }
-    }
+   /**
+    * Return the current snapshot.
+    * @return Return snapshot.
+    * @see [EMAIL PROTECTED] #snapshot()}
+    * @see [EMAIL PROTECTED] #clearSnapshot(SortedMap)}
+    */
+   SortedMap<HStoreKey, byte[]> getSnapshot() {
+     return this.snapshot;
+   }
+
+   /**
+    * The passed snapshot was successfully persisted; it can be let go.
+    * @param ss The snapshot to clean out.
+    * @throws UnexpectedException
+    * @see [EMAIL PROTECTED] #snapshot()}
+    */
+   void clearSnapshot(final SortedMap<HStoreKey, byte []> ss)
+   throws UnexpectedException {
+     this.mc_lock.writeLock().lock();
+     try {
+       if (this.snapshot != ss) {
+         throw new UnexpectedException("Current snapshot is " +
+           this.snapshot + ", was passed " + ss);
+       }
+       // OK. Passed in snapshot is same as current snapshot.  If not-empty,
+       // create a new snapshot and let the old one go.
+       if (ss.size() != 0) {
+         this.snapshot = createSynchronizedSortedMap();
+       }
+     } finally {
+       this.mc_lock.writeLock().unlock();
+     }
+   }
     
     /**
-     * Store a value.  
+     * Write an update
      * @param key
      * @param value
      */
     void add(final HStoreKey key, final byte[] value) {
-      this.lock.readLock().lock();
+      this.mc_lock.readLock().lock();
       try {
-        memcache.put(key, value);
-        
+        mc.put(key, value);
       } finally {
-        this.lock.readLock().unlock();
+        this.mc_lock.readLock().unlock();
       }
     }
   
@@ -150,22 +181,83 @@
      * @return An array of byte arrays ordered by timestamp.
      */
     List<byte[]> get(final HStoreKey key, final int numVersions) {
-      this.lock.readLock().lock();
+      this.mc_lock.readLock().lock();
       try {
         List<byte []> results;
-        synchronized (memcache) {
-          results = internalGet(memcache, key, numVersions);
+        // The synchronizations here are because internalGet iterates
+       synchronized (this.mc) {
+         results = internalGet(this.mc, key, numVersions);
         }
         synchronized (snapshot) {
           results.addAll(results.size(),
               internalGet(snapshot, key, numVersions - results.size()));
         }
         return results;
-        
       } finally {
-        this.lock.readLock().unlock();
+        this.mc_lock.readLock().unlock();
       }
     }
+   
+   
+   /**
+    * @param a
+    * @param b
+    * @return Return lowest of a or b or null if both a and b are null
+    */
+   @SuppressWarnings("unchecked")
+   private WritableComparable getLowest(final WritableComparable a,
+       final WritableComparable b) {
+     if (a == null) {
+       return b;
+     }
+     if (b == null) {
+       return a;
+     }
+     return a.compareTo(b) <= 0? a: b;
+   }
+ 
+   /**
+    * @param row Find the row that comes after this one.
+    * @return Next row or null if none found
+    */
+   Text getNextRow(final Text row) {
+     this.mc_lock.readLock().lock();
+     try {
+       return (Text)getLowest(getNextRow(row, this.mc),
+         getNextRow(row, this.snapshot));
+     } finally {
+       this.mc_lock.readLock().unlock();
+     }
+   }
+   
+   /*
+    * @param row Find row that follows this one.
+    * @param map Map to look in for a row beyond <code>row</code>.
+    * This method synchronizes on passed map while iterating it.
+    * @return Next row or null if none found.
+    */
+   private Text getNextRow(final Text row,
+       final SortedMap<HStoreKey, byte []> map) {
+     Text result = null;
+     // Synchronize on the map to make the tailMap making 'safe'.
+     synchronized (map) {
+       // Make an HSK with maximum timestamp so we get past most of the current
+       // rows cell entries.
+       HStoreKey hsk = new HStoreKey(row, HConstants.LATEST_TIMESTAMP);
+       SortedMap<HStoreKey, byte []> tailMap = map.tailMap(hsk);
+       // Iterate until we fall into the next row; i.e. move off current row
+       for (Map.Entry<HStoreKey, byte []> es: tailMap.entrySet()) {
+         HStoreKey itKey = es.getKey();
+         if (itKey.getRow().compareTo(row) <= 0) {
+           continue;
+         }
+         // Note: Not suppressing deletes.
+         result = itKey.getRow();
+         break;
+       }
+     }
+     return result;
+   }
 
     /**
      * Return all the available columns for the given key.  The key indicates 
a 
@@ -178,17 +270,17 @@
     void getFull(HStoreKey key, Map<Text, Long> deletes, 
       SortedMap<Text, byte[]> results) {
       
-      this.lock.readLock().lock();
+      this.mc_lock.readLock().lock();
       try {
-        synchronized (memcache) {
-          internalGetFull(memcache, key, deletes, results);
+        synchronized (mc) {
+          internalGetFull(mc, key, deletes, results);
         }
         synchronized (snapshot) {
           internalGetFull(snapshot, key, deletes, results);
         }
 
       } finally {
-        this.lock.readLock().unlock();
+        this.mc_lock.readLock().unlock();
       }
     }
 
@@ -224,20 +316,23 @@
     /**
      * Find the key that matches <i>row</i> exactly, or the one that 
immediately
      * preceeds it.
+     * @param row Row to look for.
+     * @param candidateKeys Map of candidate keys (Accumulation over lots of
+     * lookup over stores and memcaches)
      */
     void getRowKeyAtOrBefore(final Text row, 
       SortedMap<HStoreKey, Long> candidateKeys) {
-      this.lock.readLock().lock();
+      this.mc_lock.readLock().lock();
       
       try {
-        synchronized (memcache) {
-          internalGetRowKeyAtOrBefore(memcache, row, candidateKeys);
+        synchronized (mc) {
+          internalGetRowKeyAtOrBefore(mc, row, candidateKeys);
         }
         synchronized (snapshot) {
           internalGetRowKeyAtOrBefore(snapshot, row, candidateKeys);
         }
       } finally {
-        this.lock.readLock().unlock();
+        this.mc_lock.readLock().unlock();
       }
     }
 
@@ -396,11 +491,11 @@
      * @throws IOException
      */
     List<HStoreKey> getKeys(final HStoreKey origin, final int versions) {
-      this.lock.readLock().lock();
+      this.mc_lock.readLock().lock();
       try {
         List<HStoreKey> results;
-        synchronized (memcache) {
-          results = internalGetKeys(this.memcache, origin, versions);
+        synchronized (mc) {
+          results = internalGetKeys(this.mc, origin, versions);
         }
         synchronized (snapshot) {
           results.addAll(results.size(), internalGetKeys(snapshot, origin,
@@ -410,7 +505,7 @@
         return results;
         
       } finally {
-        this.lock.readLock().unlock();
+        this.mc_lock.readLock().unlock();
       }
     }
 
@@ -470,31 +565,20 @@
      * the cell has been deleted.
      */
     boolean isDeleted(final HStoreKey key) {
-      return HLogEdit.isDeleted(this.memcache.get(key));
+      return HLogEdit.isDeleted(this.mc.get(key));
     }
 
     /**
      * @return a scanner over the keys in the Memcache
      */
     HInternalScannerInterface getScanner(long timestamp,
-        Text targetCols[], Text firstRow) throws IOException {
-
-      // Here we rely on ReentrantReadWriteLock's ability to acquire multiple
-      // locks by the same thread and to be able to downgrade a write lock to
-      // a read lock. We need to hold a lock throughout this method, but only
-      // need the write lock while creating the memcache snapshot
-      
-      this.lock.writeLock().lock(); // hold write lock during memcache snapshot
-      snapshot();                       // snapshot memcache
-      this.lock.readLock().lock();      // acquire read lock
-      this.lock.writeLock().unlock();   // downgrade to read lock
+       Text targetCols[], Text firstRow)
+    throws IOException {
+      this.mc_lock.readLock().lock();
       try {
-        // Prevent a cache flush while we are constructing the scanner
-
         return new MemcacheScanner(timestamp, targetCols, firstRow);
-      
       } finally {
-        this.lock.readLock().unlock();
+        this.mc_lock.readLock().unlock();
       }
     }
 
@@ -503,115 +587,73 @@
     // It lets the caller scan the contents of the Memcache.
     
//////////////////////////////////////////////////////////////////////////////
 
-    class MemcacheScanner extends HAbstractScanner {
-      SortedMap<HStoreKey, byte []> backingMap;
-      Iterator<HStoreKey> keyIterator;
-
-      @SuppressWarnings("unchecked")
+    private class MemcacheScanner extends HAbstractScanner {
+       private Text currentRow;
+       private Set<Text> columns = null;
+        
       MemcacheScanner(final long timestamp, final Text targetCols[],
-          final Text firstRow) throws IOException {
-
+        final Text firstRow)
+      throws IOException {
+        // Call to super will create ColumnMatchers and whether this is a regex
+        // scanner or not.  Will also save away timestamp.  Also sorts rows.
         super(timestamp, targetCols);
-        try {
-          this.backingMap = new TreeMap<HStoreKey, byte[]>();
-          this.backingMap.putAll(snapshot);
-          this.keys = new HStoreKey[1];
-          this.vals = new byte[1][];
-
-          // Generate list of iterators
-
-          HStoreKey firstKey = new HStoreKey(firstRow);
-            if (firstRow != null && firstRow.getLength() != 0) {
-              keyIterator =
-                backingMap.tailMap(firstKey).keySet().iterator();
-
-            } else {
-              keyIterator = backingMap.keySet().iterator();
-            }
-
-            while (getNext(0)) {
-              if (!findFirstRow(0, firstRow)) {
-                continue;
-              }
-              if (columnMatch(0)) {
-                break;
-              }
-            }
-        } catch (RuntimeException ex) {
-          LOG.error("error initializing Memcache scanner: ", ex);
-          close();
-          IOException e = new IOException("error initializing Memcache 
scanner");
-          e.initCause(ex);
-          throw e;
-
-        } catch(IOException ex) {
-          LOG.error("error initializing Memcache scanner: ", ex);
-          close();
-          throw ex;
-        }
-      }
-
-      /**
-       * The user didn't want to start scanning at the first row. This method
-       * seeks to the requested row.
-       *
-       * @param i which iterator to advance
-       * @param firstRow seek to this row
-       * @return true if this is the first row
-       */
-      @Override
-      boolean findFirstRow(int i, Text firstRow) {
-        return firstRow.getLength() == 0 ||
-        keys[i].getRow().compareTo(firstRow) >= 0;
-      }
-
-      /**
-       * Get the next value from the specified iterator.
-       * 
-       * @param i Which iterator to fetch next value from
-       * @return true if there is more data available
-       */
-      @Override
-      boolean getNext(int i) {
-        boolean result = false;
-        while (true) {
-          if (!keyIterator.hasNext()) {
-            closeSubScanner(i);
-            break;
-          }
-          // Check key is < than passed timestamp for this scanner.
-          HStoreKey hsk = keyIterator.next();
-          if (hsk == null) {
-            throw new NullPointerException("Unexpected null key");
-          }
-          if (hsk.getTimestamp() <= this.timestamp) {
-            this.keys[i] = hsk;
-            this.vals[i] = backingMap.get(keys[i]);
-            result = true;
-            break;
+        this.currentRow = firstRow;
+        this.columns = null;
+        if (!isWildcardScanner()) {
+          this.columns = new HashSet<Text>();
+          for (int i = 0; i < targetCols.length; i++) {
+            this.columns.add(targetCols[i]);
           }
         }
-        return result;
-      }
-
-      /** Shut down an individual map iterator. */
-      @Override
-      void closeSubScanner(int i) {
-        keyIterator = null;
-        keys[i] = null;
-        vals[i] = null;
-        backingMap = null;
       }
 
-      /** Shut down map iterators */
-      public void close() {
-        if (!scannerClosed) {
-          if(keyIterator != null) {
-            closeSubScanner(0);
-          }
-          scannerClosed = true;
-        }
-      }
+      public boolean next(HStoreKey key, SortedMap<Text, byte []> results)
+      throws IOException {
+         if (this.scannerClosed) {
+           return false;
+       }
+       Map<Text, Long> deletes = new HashMap<Text, Long>();
+       // Catch all row results in here.  These results are ten filtered to
+       // ensure they match column name regexes, or if none, added to results.
+       SortedMap<Text, byte []> rowResults = new TreeMap<Text, byte[]>();
+       if (results.size() > 0) {
+         results.clear();
+       }
+       while (results.size() <= 0 &&
+           (this.currentRow = getNextRow(this.currentRow)) != null) {
+         if (deletes.size() > 0) {
+           deletes.clear();
+         }
+         if (rowResults.size() > 0) {
+           rowResults.clear();
+         }
+         key.setRow(this.currentRow);
+         key.setVersion(this.timestamp);
+         getFull(key, deletes, rowResults);
+         for (Map.Entry<Text, byte[]> e: rowResults.entrySet()) {
+           Text column = e.getKey();
+           byte [] c = e.getValue();
+           if (isWildcardScanner()) {
+             // Check the results match.  We only check columns, not 
timestamps.
+             // We presume that timestamps have been handled properly when we
+             // called getFull.
+             if (!columnMatch(column)) {
+               continue;
+             }
+           } else if (!this.columns.contains(column)) {
+             // Don't include columns not asked for.
+             continue;
+           }
+           results.put(column, c);
+         }
+       }
+       return results.size() > 0;
+     }
+     public void close() {
+       if (!scannerClosed) {
+         scannerClosed = true;
+       }
+     }
     }
   }
   
@@ -1118,15 +1160,15 @@
   
//////////////////////////////////////////////////////////////////////////////
   // Flush changes to disk
   
//////////////////////////////////////////////////////////////////////////////
-
   /**
-   * Prior to doing a cache flush, we need to snapshot the memcache. Locking is
-   * handled by the memcache.
+   * Prior to doing a cache flush, we need to snapshot the memcache.
+   * TODO: This method is ugly.  Why let client of HStore run snapshots.  How
+   * do we know they'll be cleaned up?
    */
   void snapshotMemcache() {
     this.memcache.snapshot();
   }
-  
+    
   /**
    * Write out a brand-new set of items to the disk.
    *
@@ -1141,17 +1183,24 @@
    * @throws IOException
    */
   void flushCache(final long logCacheFlushId) throws IOException {
-      internalFlushCache(memcache.getSnapshot(), logCacheFlushId);
+    SortedMap<HStoreKey, byte []> cache = this.memcache.snapshot();
+    internalFlushCache(cache, logCacheFlushId);
+    // If an exception happens flushing, we let it out without clearing
+    // the memcache snapshot.  The old snapshot will be returned when we say
+    // 'snapshot', the next time flush comes around.
+    this.memcache.clearSnapshot(cache);
   }
   
   private void internalFlushCache(SortedMap<HStoreKey, byte []> cache,
       long logCacheFlushId) throws IOException {
     
+    // TODO:  We can fail in the below block before we complete adding this
+    // flush to list of store files.  Add cleanup of anything put on filesystem
+    // if we fail.
     synchronized(flushLock) {
       // A. Write the Maps out to the disk
       HStoreFile flushedFile = new HStoreFile(conf, fs, basedir,
         info.getEncodedName(), family.getFamilyName(), -1L, null);
-      String name = flushedFile.toString();
       MapFile.Writer out = flushedFile.getWriter(this.fs, this.compression,
         this.bloomFilter);
       
@@ -1198,7 +1247,7 @@
             flushedFile.getReader(this.fs, this.bloomFilter));
         this.storefiles.put(flushid, flushedFile);
         if(LOG.isDebugEnabled()) {
-          LOG.debug("Added " + name + " with " + entries +
+          LOG.debug("Added " + flushedFile.toString() + " with " + entries +
             " entries, sequence id " + logCacheFlushId + ", and size " +
             StringUtils.humanReadableInt(flushedFile.length()) + " for " +
             this.storeName);
@@ -1919,7 +1968,7 @@
       }
       
       // finally, check the memcache
-      memcache.getRowKeyAtOrBefore(row, candidateKeys);
+      this.memcache.getRowKeyAtOrBefore(row, candidateKeys);
       
       // return the best key from candidateKeys
       if (!candidateKeys.isEmpty()) {
@@ -2290,10 +2339,14 @@
   }
 
   /**
-   * A scanner that iterates through the HStore files
+   * A scanner that iterates through HStore files
    */
   private class StoreFileScanner extends HAbstractScanner {
-    @SuppressWarnings("hiding")
+    // Keys retrieved from the sources
+    private HStoreKey keys[];
+    // Values that correspond to those keys
+    private byte [][] vals;
+
     private MapFile.Reader[] readers;
     
     StoreFileScanner(long timestamp, Text[] targetCols, Text firstRow)
@@ -2335,6 +2388,99 @@
         throw e;
       }
     }
+   
+   /**
+    * For a particular column i, find all the matchers defined for the column.
+    * Compare the column family and column key using the matchers. The first 
one
+    * that matches returns true. If no matchers are successful, return false.
+    * 
+    * @param i index into the keys array
+    * @return true if any of the matchers for the column match the column 
family
+    * and the column key.
+    * @throws IOException
+    */
+   boolean columnMatch(int i) throws IOException {
+     return columnMatch(keys[i].getColumn());
+   }
+ 
+   /**
+    * @param key The key that matched
+    * @param results All the results for <code>key</code>
+    * @return true if a match was found
+    * @throws IOException
+    * 
+    * @see 
org.apache.hadoop.hbase.regionserver.InternalScanner#next(org.apache.hadoop.hbase.HStoreKey,
 java.util.SortedMap)
+    */
+   @Override
+   public boolean next(HStoreKey key, SortedMap<Text, byte []> results)
+   throws IOException {
+     if (scannerClosed) {
+       return false;
+     }
+     // Find the next row label (and timestamp)
+     Text chosenRow = null;
+     long chosenTimestamp = -1;
+     for(int i = 0; i < keys.length; i++) {
+       if((keys[i] != null)
+           && (columnMatch(i))
+           && (keys[i].getTimestamp() <= this.timestamp)
+           && ((chosenRow == null)
+               || (keys[i].getRow().compareTo(chosenRow) < 0)
+               || ((keys[i].getRow().compareTo(chosenRow) == 0)
+                   && (keys[i].getTimestamp() > chosenTimestamp)))) {
+         chosenRow = new Text(keys[i].getRow());
+         chosenTimestamp = keys[i].getTimestamp();
+       }
+     }
+ 
+     // Grab all the values that match this row/timestamp
+     boolean insertedItem = false;
+     if(chosenRow != null) {
+       key.setRow(chosenRow);
+       key.setVersion(chosenTimestamp);
+       key.setColumn(new Text(""));
+ 
+       for(int i = 0; i < keys.length; i++) {
+         // Fetch the data
+         while((keys[i] != null)
+             && (keys[i].getRow().compareTo(chosenRow) == 0)) {
+ 
+           // If we are doing a wild card match or there are multiple matchers
+           // per column, we need to scan all the older versions of this row
+           // to pick up the rest of the family members
+           
+           if(!isWildcardScanner()
+               && !isMultipleMatchScanner()
+               && (keys[i].getTimestamp() != chosenTimestamp)) {
+             break;
+           }
+ 
+           if(columnMatch(i)) {              
+             // We only want the first result for any specific family member
+             if(!results.containsKey(keys[i].getColumn())) {
+               results.put(new Text(keys[i].getColumn()), vals[i]);
+               insertedItem = true;
+             }
+           }
+ 
+           if(!getNext(i)) {
+             closeSubScanner(i);
+           }
+         }
+ 
+         // Advance the current scanner beyond the chosen row, to
+         // a valid timestamp, so we're ready next time.
+         
+         while((keys[i] != null)
+             && ((keys[i].getRow().compareTo(chosenRow) <= 0)
+                 || (keys[i].getTimestamp() > this.timestamp)
+                 || (! columnMatch(i)))) {
+           getNext(i);
+         }
+       }
+     }
+     return insertedItem;
+   }
 
     /**
      * The user didn't want to start scanning at the first row. This method
@@ -2344,7 +2490,6 @@
      * @param firstRow  - seek to this row
      * @return          - true if this is the first row or if the row was not 
found
      */
-    @Override
     boolean findFirstRow(int i, Text firstRow) throws IOException {
       ImmutableBytesWritable ibw = new ImmutableBytesWritable();
       HStoreKey firstKey
@@ -2367,7 +2512,6 @@
      * @param i - which reader to fetch next value from
      * @return - true if there is more data available
      */
-    @Override
     boolean getNext(int i) throws IOException {
       boolean result = false;
       ImmutableBytesWritable ibw = new ImmutableBytesWritable();
@@ -2386,7 +2530,6 @@
     }
     
     /** Close down the indicated reader. */
-    @Override
     void closeSubScanner(int i) {
       try {
         if(readers[i] != null) {

Modified: 
hadoop/hbase/branches/0.1/src/test/org/apache/hadoop/hbase/TestHMemcache.java
URL: 
http://svn.apache.org/viewvc/hadoop/hbase/branches/0.1/src/test/org/apache/hadoop/hbase/TestHMemcache.java?rev=648940&r1=648939&r2=648940&view=diff
==============================================================================
--- 
hadoop/hbase/branches/0.1/src/test/org/apache/hadoop/hbase/TestHMemcache.java 
(original)
+++ 
hadoop/hbase/branches/0.1/src/test/org/apache/hadoop/hbase/TestHMemcache.java 
Wed Apr 16 20:21:11 2008
@@ -21,6 +21,7 @@
 
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
+import java.rmi.UnexpectedException;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.SortedMap;
@@ -72,13 +73,14 @@
     }
   }
 
-  private void runSnapshot(final HStore.Memcache hmc) {
+  private void runSnapshot(final HStore.Memcache hmc)
+  throws UnexpectedException {
     // Save off old state.
-    int oldHistorySize = hmc.snapshot.size();
-    hmc.snapshot();
+    int oldHistorySize = hmc.getSnapshot().size();
+    SortedMap<HStoreKey, byte[]> ss = hmc.snapshot();
     // Make some assertions about what just happened.
-    assertTrue("History size has not increased",
-        oldHistorySize < hmc.snapshot.size());
+    assertTrue("History size has not increased", oldHistorySize < ss.size());
+    hmc.clearSnapshot(ss);
   }
 
   /** 
@@ -91,9 +93,8 @@
     for (int i = 0; i < snapshotCount; i++) {
       addRows(this.hmemcache);
       runSnapshot(this.hmemcache);
-      this.hmemcache.getSnapshot();
-      assertEquals("History not being cleared", 0,
-          this.hmemcache.snapshot.size());
+      SortedMap<HStoreKey, byte[]> ss = this.hmemcache.getSnapshot();
+      assertEquals("History not being cleared", 0, ss.size());
     }
   }
   
@@ -128,7 +129,41 @@
       isExpectedRow(i, all);
     }
   }
-  
+
+  /** Test getNextRow from memcache
+   * @throws UnsupportedEncodingException 
+   */
+  public void testGetNextRow() throws UnsupportedEncodingException {
+    addRows(this.hmemcache);
+    Text closestToEmpty = this.hmemcache.getNextRow(HConstants.EMPTY_TEXT);
+    assertEquals(closestToEmpty, getRowName(0));
+    for (int i = 0; i < ROW_COUNT; i++) {
+      Text nr = this.hmemcache.getNextRow(getRowName(i));
+      if (i + 1 == ROW_COUNT) {
+        assertEquals(nr, null);
+      } else {
+        assertEquals(nr, getRowName(i + 1));
+      }
+    }
+  }
+
+  /** Test getClosest from memcache
+   * @throws UnsupportedEncodingException 
+   */
+  public void testGetClosest() throws UnsupportedEncodingException {
+    addRows(this.hmemcache);
+    Text closestToEmpty = this.hmemcache.getNextRow(HConstants.EMPTY_TEXT);
+    assertEquals(closestToEmpty, getRowName(0));
+    for (int i = 0; i < ROW_COUNT; i++) {
+      Text nr = this.hmemcache.getNextRow(getRowName(i));
+      if (i + 1 == ROW_COUNT) {
+        assertEquals(nr, null);
+      } else {
+        assertEquals(nr, getRowName(i + 1));
+      }
+    }
+  }
+
   /**
    * Test memcache scanner
    * @throws IOException


Reply via email to