Author: jimk
Date: Sun Sep 30 16:46:04 2007
New Revision: 580811

URL: http://svn.apache.org/viewvc?rev=580811&view=rev
Log:
HADOOP-1820 regionserver creates hlogs without bound

Added:
    
lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestLogRolling.java
Modified:
    lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt
    
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java
    
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java

Modified: lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt?rev=580811&r1=580810&r2=580811&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt Sun Sep 30 16:46:04 2007
@@ -45,7 +45,7 @@
     HADOOP-1813 OOME makes zombie of region server
     HADOOP-1814        TestCleanRegionServerExit fails too often on Hudson
     HADOOP-1820 Regionserver creates hlogs without bound
-                (reverted 2007/09/25)
+                (reverted 2007/09/25) (Fixed 2007/09/30)
     HADOOP-1821 Replace all String.getBytes() with String.getBytes("UTF-8")
     HADOOP-1832 listTables() returns duplicate tables
     HADOOP-1834 Scanners ignore timestamp passed on creation

Modified: 
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java?rev=580811&r1=580810&r2=580811&view=diff
==============================================================================
--- 
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java
 (original)
+++ 
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java
 Sun Sep 30 16:46:04 2007
@@ -19,91 +19,124 @@
  */
 package org.apache.hadoop.hbase;
 
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.SequenceFile.Reader;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.conf.*;
-
-import java.io.*;
-import java.util.*;
-import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * HLog stores all the edits to the HStore.
- * 
- * It performs logfile-rolling, so external callers are not aware that the 
+ *
+ * It performs logfile-rolling, so external callers are not aware that the
  * underlying file is being rolled.
  *
- * <p>A single HLog is used by several HRegions simultaneously.
- * 
- * <p>Each HRegion is identified by a unique long <code>int</code>. HRegions do
+ * <p>
+ * A single HLog is used by several HRegions simultaneously.
+ *
+ * <p>
+ * Each HRegion is identified by a unique long <code>int</code>. HRegions do
  * not need to declare themselves before using the HLog; they simply include
- * their HRegion-id in the <code>append</code> or 
+ * their HRegion-id in the <code>append</code> or
  * <code>completeCacheFlush</code> calls.
  *
- * <p>An HLog consists of multiple on-disk files, which have a chronological
- * order. As data is flushed to other (better) on-disk structures, the log
- * becomes obsolete.  We can destroy all the log messages for a given
- * HRegion-id up to the most-recent CACHEFLUSH message from that HRegion.
+ * <p>
+ * An HLog consists of multiple on-disk files, which have a chronological 
order.
+ * As data is flushed to other (better) on-disk structures, the log becomes
+ * obsolete. We can destroy all the log messages for a given HRegion-id up to
+ * the most-recent CACHEFLUSH message from that HRegion.
+ *
+ * <p>
+ * It's only practical to delete entire files. Thus, we delete an entire 
on-disk
+ * file F when all of the messages in F have a log-sequence-id that's older
+ * (smaller) than the most-recent CACHEFLUSH message for every HRegion that has
+ * a message in F.
+ *
+ * <p>
+ * Synchronized methods can never execute in parallel. However, between the
+ * start of a cache flush and the completion point, appends are allowed but log
+ * rolling is not. To prevent log rolling taking place during this period, a
+ * separate reentrant lock is used.
  *
- * <p>It's only practical to delete entire files.  Thus, we delete an entire 
- * on-disk file F when all of the messages in F have a log-sequence-id that's 
- * older (smaller) than the most-recent CACHEFLUSH message for every HRegion 
- * that has a message in F.
- * 
- * <p>TODO: Vuk Ercegovac also pointed out that keeping HBase HRegion edit logs
- * in HDFS is currently flawed. HBase writes edits to logs and to a memcache.
- * The 'atomic' write to the log is meant to serve as insurance against
- * abnormal RegionServer exit: on startup, the log is rerun to reconstruct an
- * HRegion's last wholesome state. But files in HDFS do not 'exist' until they
- * are cleanly closed -- something that will not happen if RegionServer exits
- * without running its 'close'.
+ * <p>
+ * TODO: Vuk Ercegovac also pointed out that keeping HBase HRegion edit logs in
+ * HDFS is currently flawed. HBase writes edits to logs and to a memcache. The
+ * 'atomic' write to the log is meant to serve as insurance against abnormal
+ * RegionServer exit: on startup, the log is rerun to reconstruct an HRegion's
+ * last wholesome state. But files in HDFS do not 'exist' until they are 
cleanly
+ * closed -- something that will not happen if RegionServer exits without
+ * running its 'close'.
  */
 public class HLog implements HConstants {
   private static final Log LOG = LogFactory.getLog(HLog.class);
-  
+
   static final String HLOG_DATFILE = "hlog.dat.";
+
   static final Text METACOLUMN = new Text("METACOLUMN:");
+
   static final Text METAROW = new Text("METAROW");
 
   FileSystem fs;
+
   Path dir;
+
   Configuration conf;
 
+  final long threadWakeFrequency;
+
   SequenceFile.Writer writer;
+
   TreeMap<Long, Path> outputfiles = new TreeMap<Long, Path>();
-  volatile boolean insideCacheFlush = false;
 
-  TreeMap<Text, Long> regionToLastFlush = new TreeMap<Text, Long>();
+  HashMap<Text, Long> lastSeqWritten = new HashMap<Text, Long>();
 
   volatile boolean closed = false;
+
+  private final Integer sequenceLock = new Integer(0);
   volatile long logSeqNum = 0;
-  long filenum = 0;
-  AtomicInteger numEntries = new AtomicInteger(0);
 
-  Integer rollLock = new Integer(0);
+  volatile long filenum = 0;
+
+  volatile int numEntries = 0;
+
+  // This lock prevents starting a log roll during a cache flush.
+  // synchronized is insufficient because a cache flush spans two method calls.
+  private final Lock cacheFlushLock = new ReentrantLock();
 
   /**
-   * Split up a bunch of log files, that are no longer being written to,
-   * into new files, one per region.  Delete the old log files when ready.
+   * Split up a bunch of log files, that are no longer being written to, into
+   * new files, one per region. Delete the old log files when finished.
+   *
    * @param rootDir Root directory of the HBase instance
-   * @param srcDir Directory of log files to split:
-   * e.g. <code>${ROOTDIR}/log_HOST_PORT</code>
+   * @param srcDir Directory of log files to split: e.g.
+   *                <code>${ROOTDIR}/log_HOST_PORT</code>
    * @param fs FileSystem
    * @param conf HBaseConfiguration
    * @throws IOException
    */
   static void splitLog(Path rootDir, Path srcDir, FileSystem fs,
-    Configuration conf) throws IOException {
-    Path logfiles[] = fs.listPaths(new Path[] {srcDir});
+      Configuration conf) throws IOException {
+    Path logfiles[] = fs.listPaths(new Path[] { srcDir });
     LOG.info("splitting " + logfiles.length + " log(s) in " +
       srcDir.toString());
     HashMap<Text, SequenceFile.Writer> logWriters =
       new HashMap<Text, SequenceFile.Writer>();
     try {
-      for(int i = 0; i < logfiles.length; i++) {
+      for (int i = 0; i < logfiles.length; i++) {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Splitting " + logfiles[i]);
         }
@@ -118,7 +151,7 @@
         try {
           HLogKey key = new HLogKey();
           HLogEdit val = new HLogEdit();
-          while(in.next(key, val)) {
+          while (in.next(key, val)) {
             Text regionName = key.getRegionName();
             SequenceFile.Writer w = logWriters.get(regionName);
             if (w == null) {
@@ -141,15 +174,15 @@
         }
       }
     } finally {
-      for (SequenceFile.Writer w: logWriters.values()) {
+      for (SequenceFile.Writer w : logWriters.values()) {
         w.close();
       }
     }
-    
-    if(fs.exists(srcDir)) {
-      if(! fs.delete(srcDir)) {
+
+    if (fs.exists(srcDir)) {
+      if (!fs.delete(srcDir)) {
         LOG.error("Cannot delete: " + srcDir);
-        if(! FileUtil.fullyDelete(new File(srcDir.toString()))) {
+        if (!FileUtil.fullyDelete(new File(srcDir.toString()))) {
           throw new IOException("Cannot delete: " + srcDir);
         }
       }
@@ -160,10 +193,10 @@
   /**
    * Create an edit log at the given <code>dir</code> location.
    *
-   * You should never have to load an existing log.  If there is a log
-   * at startup, it should have already been processed and deleted by 
-   * the time the HLog object is started up.
-   * 
+   * You should never have to load an existing log. If there is a log at
+   * startup, it should have already been processed and deleted by the time the
+   * HLog object is started up.
+   *
    * @param fs
    * @param dir
    * @param conf
@@ -173,6 +206,7 @@
     this.fs = fs;
     this.dir = dir;
     this.conf = conf;
+    this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000);
 
     if (fs.exists(dir)) {
       throw new IOException("Target HLog directory already exists: " + dir);
@@ -180,115 +214,117 @@
     fs.mkdirs(dir);
     rollWriter();
   }
-  
-  synchronized void setSequenceNumber(long newvalue) {
-    if (newvalue > logSeqNum) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("changing sequence number from " + logSeqNum + " to " +
-            newvalue);
+
+  /**
+   * Called by HRegionServer when it opens a new region to ensure that log
+   * sequence numbers are always greater than the latest sequence number of the
+   * region being brought on-line.
+   *
+   * @param newvalue
+   */
+  void setSequenceNumber(long newvalue) {
+    synchronized (sequenceLock) {
+      if (newvalue > logSeqNum) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("changing sequence number from " + logSeqNum + " to " +
+              newvalue);
+        }
+        logSeqNum = newvalue;
       }
-      logSeqNum = newvalue;
     }
   }
 
   /**
-   * Roll the log writer.  That is, start writing log messages to a new file.
+   * Roll the log writer. That is, start writing log messages to a new file.
+   *
+   * Because a log cannot be rolled during a cache flush, and a cache flush
+   * spans two method calls, a special lock needs to be obtained so that a 
cache
+   * flush cannot start when the log is being rolled and the log cannot be
+   * rolled during a cache flush.
    *
-   * The 'rollLock' prevents us from entering rollWriter() more than
-   * once at a time.
+   * Note that this method cannot be synchronized because it is possible that
+   * startCacheFlush runs, obtaining the cacheFlushLock, then this method could
+   * start which would obtain the lock on this but block on obtaining the
+   * cacheFlushLock and then completeCacheFlush could be called which would 
wait
+   * for the lock on this and consequently never release the cacheFlushLock
    *
-   * The 'this' lock limits access to the current writer so
-   * we don't append multiple items simultaneously.
-   * 
    * @throws IOException
    */
-  void rollWriter() throws IOException {
-    synchronized(rollLock) {
+  synchronized void rollWriter() throws IOException {
+    boolean locked = false;
+    while (!locked && !closed) {
+      if (cacheFlushLock.tryLock()) {
+        locked = true;
+        break;
+      }
+      try {
+        this.wait(threadWakeFrequency);
+      } catch (InterruptedException e) {
+      }
+    }
+    if (closed) {
+      if (locked) {
+        cacheFlushLock.unlock();
+      }
+      throw new IOException("Cannot roll log; log is closed");
+    }
 
-      // Try to roll the writer to a new file.  We may have to
-      // wait for a cache-flush to complete.  In the process,
-      // compute a list of old log files that can be deleted.
-
-      Vector<Path> toDeleteList = new Vector<Path>();
-      synchronized(this) {
-        if(closed) {
-          throw new IOException("Cannot roll log; log is closed");
-        }
+    // If we get here we have locked out both cache flushes and appends
 
-        // Make sure we do not roll the log while inside a
-        // cache-flush.  Otherwise, the log sequence number for
-        // the CACHEFLUSH operation will appear in a "newer" log file
-        // than it should.
-        while(insideCacheFlush) {
-          try {
-            wait();
-          } catch (InterruptedException ie) {
-            // continue;
-          }
-        }
-
-        // Close the current writer (if any), and grab a new one.
-        if(writer != null) {
-          writer.close();
-          Path p = computeFilename(filenum - 1);
-          if(LOG.isDebugEnabled()) {
-            LOG.debug("Closing current log writer " + p.toString() +
+    try {
+      if (writer != null) {
+        // Close the current writer, get a new one.
+        writer.close();
+        Path p = computeFilename(filenum - 1);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Closing current log writer " + p.toString() +
               " to get a new one");
-          }
-          if (filenum > 0) {
+        }
+        if (filenum > 0) {
+          synchronized (sequenceLock) {
             outputfiles.put(logSeqNum - 1, p);
           }
         }
-        Path newPath = computeFilename(filenum++);
-        this.writer = SequenceFile.createWriter(fs, conf, newPath,
+      }
+      Path newPath = computeFilename(filenum++);
+      this.writer = SequenceFile.createWriter(fs, conf, newPath,
           HLogKey.class, HLogEdit.class);
-        if(LOG.isDebugEnabled()) {
-          LOG.debug("new log writer created at " + newPath);
-        }
-        
-        // Can we delete any of the old log files?
-        // First, compute the oldest relevant log operation 
-        // over all the regions.
-
-        long oldestOutstandingSeqNum = Long.MAX_VALUE;
-        for(Long l: regionToLastFlush.values()) {
-          long curSeqNum = l.longValue();
-          
-          if(curSeqNum < oldestOutstandingSeqNum) {
-            oldestOutstandingSeqNum = curSeqNum;
-          }
-        }
 
-        // Next, remove all files with a final ID that's older
-        // than the oldest pending region-operation.
-        for(Iterator<Long> it = outputfiles.keySet().iterator(); 
it.hasNext();) {
-          long maxSeqNum = it.next().longValue();
-          if(maxSeqNum < oldestOutstandingSeqNum) {
-            Path p = outputfiles.get(maxSeqNum);
-            it.remove();
-            toDeleteList.add(p);
-            
-          } else {
-            break;
-          }
-        }
-      }
+      LOG.info("new log writer created at " + newPath);
+
+      // Can we delete any of the old log files?
+
+      TreeSet<Long> sequenceNumbers =
+        new TreeSet<Long>(lastSeqWritten.values());
+
+      if (sequenceNumbers.size() > 0) {
+        long oldestOutstandingSeqNum = sequenceNumbers.first();
+
+        // Get the set of all log files whose final ID is older than the oldest
+        // pending region operation
+
+        sequenceNumbers.clear();
+        sequenceNumbers.addAll(outputfiles.headMap(
+            oldestOutstandingSeqNum).keySet());
 
-      // Actually delete them, if any!
-      for(Iterator<Path> it = toDeleteList.iterator(); it.hasNext(); ) {
-        Path p = it.next();
-        if(LOG.isDebugEnabled()) {
-          LOG.debug("removing old log file " + p.toString());
+        // Now remove old log files (if any)
+
+        for (Long seq : sequenceNumbers) {
+          Path p = outputfiles.remove(seq);
+          LOG.info("removing old log file " + p.toString());
+          fs.delete(p);
         }
-        fs.delete(p);
       }
-      this.numEntries.set(0);
+      this.numEntries = 0;
+
+    } finally {
+      cacheFlushLock.unlock();
     }
   }
 
   /**
-   * This is a convenience method that computes a new filename with
-   * a given file-number.
+   * This is a convenience method that computes a new filename with a given
+   * file-number.
    */
   Path computeFilename(final long fn) {
     return new Path(dir, HLOG_DATFILE + String.format("%1$03d", fn));
@@ -296,19 +332,21 @@
 
   /**
    * Shut down the log and delete the log directory
+   *
    * @throws IOException
    */
   synchronized void closeAndDelete() throws IOException {
     close();
     fs.delete(dir);
   }
-  
+
   /**
    * Shut down the log.
+   *
    * @throws IOException
    */
   synchronized void close() throws IOException {
-    if(LOG.isDebugEnabled()) {
+    if (LOG.isDebugEnabled()) {
       LOG.debug("closing log writer in " + this.dir.toString());
     }
     this.writer.close();
@@ -319,16 +357,19 @@
    * Append a set of edits to the log. Log edits are keyed by regionName,
    * rowname, and log-sequence-id.
    *
-   * Later, if we sort by these keys, we obtain all the relevant edits for
-   * a given key-range of the HRegion (TODO).  Any edits that do not have a
+   * Later, if we sort by these keys, we obtain all the relevant edits for a
+   * given key-range of the HRegion (TODO). Any edits that do not have a
    * matching [EMAIL PROTECTED] HConstants#COMPLETE_CACHEFLUSH} message can be 
discarded.
    *
-   * <p>Logs cannot be restarted once closed, or once the HLog process dies.
-   * Each time the HLog starts, it must create a new log.  This means that
-   * other systems should process the log appropriately upon each startup
-   * (and prior to initializing HLog).
+   * <p>
+   * Logs cannot be restarted once closed, or once the HLog process dies. Each
+   * time the HLog starts, it must create a new log. This means that other
+   * systems should process the log appropriately upon each startup (and prior
+   * to initializing HLog).
+   *
+   * synchronized prevents appends during the completion of a cache flush or 
for
+   * the duration of a log roll.
    *
-   * We need to seize a lock on the writer so that writes are atomic.
    * @param regionName
    * @param tableName
    * @param row
@@ -337,136 +378,121 @@
    * @throws IOException
    */
   synchronized void append(Text regionName, Text tableName, Text row,
-      TreeMap<Text, byte []> columns, long timestamp)
-  throws IOException {
-    if(closed) {
+      TreeMap<Text, byte[]> columns, long timestamp) throws IOException {
+    if (closed) {
       throw new IOException("Cannot append; log is closed");
     }
-    
+
     long seqNum[] = obtainSeqNum(columns.size());
 
-    // The 'regionToLastFlush' map holds the sequence id of the
-    // most recent flush for every regionName.  However, for regions
-    // that don't have any flush yet, the relevant operation is the
-    // first one that's been added.
-    if (regionToLastFlush.get(regionName) == null) {
-      regionToLastFlush.put(regionName, seqNum[0]);
-    }
+    // The 'lastSeqWritten' map holds the sequence number of the most recent
+    // write for each region. When the cache is flushed, the entry for the
+    // region being flushed is removed if the sequence number of the flush
+    // is greater than or equal to the value in lastSeqWritten
+
+    lastSeqWritten.put(regionName, seqNum[seqNum.length - 1]);
 
     int counter = 0;
-    for (Map.Entry<Text, byte []> es: columns.entrySet()) {
+    for (Map.Entry<Text, byte[]> es : columns.entrySet()) {
       HLogKey logKey =
         new HLogKey(regionName, tableName, row, seqNum[counter++]);
       HLogEdit logEdit = new HLogEdit(es.getKey(), es.getValue(), timestamp);
       writer.append(logKey, logEdit);
-      numEntries.getAndIncrement();
+      numEntries++;
     }
   }
 
   /** @return How many items have been added to the log */
   int getNumEntries() {
-    return numEntries.get();
+    return numEntries;
   }
 
   /**
-   * Obtain a log sequence number.  This seizes the whole HLog
-   * lock, but it shouldn't last too long.
+   * Obtain a log sequence number.
    */
-  synchronized long obtainSeqNum() {
-    return logSeqNum++;
+  private long obtainSeqNum() {
+    long value;
+    synchronized (sequenceLock) {
+      value = logSeqNum++;
+    }
+    return value;
   }
-  
+
   /**
    * Obtain a specified number of sequence numbers
-   * 
-   * @param num - number of sequence numbers to obtain
-   * @return - array of sequence numbers
+   *
+   * @param num number of sequence numbers to obtain
+   * @return array of sequence numbers
    */
-  synchronized long[] obtainSeqNum(int num) {
+  private long[] obtainSeqNum(int num) {
     long[] results = new long[num];
-    for (int i = 0; i < num; i++) {
-      results[i] = logSeqNum++;
+    synchronized (sequenceLock) {
+      for (int i = 0; i < num; i++) {
+        results[i] = logSeqNum++;
+      }
     }
     return results;
   }
 
   /**
-   * By acquiring a log sequence ID, we can allow log messages
-   * to continue while we flush the cache.
+   * By acquiring a log sequence ID, we can allow log messages to continue 
while
+   * we flush the cache.
+   *
+   * Acquire a lock so that we do not roll the log between the start and
+   * completion of a cache-flush. Otherwise the log-seq-id for the flush will
+   * not appear in the correct logfile.
    *
-   * Set a flag so that we do not roll the log between the start
-   * and complete of a cache-flush.  Otherwise the log-seq-id for
-   * the flush will not appear in the correct logfile.
    * @return sequence ID to pass [EMAIL PROTECTED] #completeCacheFlush(Text, 
Text, long)}
    * @see #completeCacheFlush(Text, Text, long)
    * @see #abortCacheFlush()
    */
-  synchronized long startCacheFlush() {
-    while (this.insideCacheFlush) {
-      try {
-        wait();
-      } catch (InterruptedException ie) {
-        // continue
-      }
-    }
-    this.insideCacheFlush = true;
-    notifyAll();
+  long startCacheFlush() {
+    cacheFlushLock.lock();
     return obtainSeqNum();
   }
 
-  /** Complete the cache flush
+  /**
+   * Complete the cache flush
+   *
+   * Protected by this and cacheFlushLock
+   *
    * @param regionName
    * @param tableName
    * @param logSeqId
    * @throws IOException
    */
   synchronized void completeCacheFlush(final Text regionName,
-    final Text tableName, final long logSeqId)
-  throws IOException {
-    if(this.closed) {
-      return;
-    }
-    
-    if (!this.insideCacheFlush) {
-      throw new IOException("Impossible situation: inside " +
-        "completeCacheFlush(), but 'insideCacheFlush' flag is false");
-    }
-    HLogKey key = new HLogKey(regionName, tableName, HLog.METAROW, logSeqId);
-    this.writer.append(key,
-      new HLogEdit(HLog.METACOLUMN, HGlobals.completeCacheFlush.get(),
-        System.currentTimeMillis()));
-    this.numEntries.getAndIncrement();
-
-    // Remember the most-recent flush for each region.
-    // This is used to delete obsolete log files.
-    this.regionToLastFlush.put(regionName, Long.valueOf(logSeqId));
-
-    cleanup();
-  }
-  
-  /**
-   * Abort a cache flush.
-   * This method will clear waits on [EMAIL PROTECTED] #insideCacheFlush}.  
Call if the
-   * flush fails.  Note that the only recovery for an aborted flush currently
-   * is a restart of the regionserver so the snapshot content dropped by the
-   * failure gets restored to the  memcache.
-   */
-  synchronized void abortCacheFlush() {
-    cleanup();
-  }
-  
-  private synchronized void cleanup() {
-    this.insideCacheFlush = false;
-    notifyAll();
+      final Text tableName, final long logSeqId) throws IOException {
+
+    try {
+      if (this.closed) {
+        return;
+      }
+
+      writer.append(new HLogKey(regionName, tableName, HLog.METAROW, logSeqId),
+          new HLogEdit(HLog.METACOLUMN, HGlobals.completeCacheFlush.get(),
+              System.currentTimeMillis()));
+
+      numEntries++;
+      Long seq = lastSeqWritten.get(regionName);
+      if (seq != null && logSeqId >= seq) {
+        lastSeqWritten.remove(regionName);
+      }
+
+    } finally {
+      cacheFlushLock.unlock();
+      notifyAll();              // wake up the log roller if it is waiting
+    }
   }
-  
+
   /**
-   * Abort a cache flush.
-   * This method will clear waits on [EMAIL PROTECTED] #insideCacheFlush} but 
if this
-   * method is called, we are losing data.  TODO: Fix.
+   * Abort a cache flush. This method will clear waits on
+   * [EMAIL PROTECTED] #insideCacheFlush}. Call if the flush fails. Note that 
the only
+   * recovery for an aborted flush currently is a restart of the regionserver 
so
+   * the snapshot content dropped by the failure gets restored to the memcache.
    */
-  synchronized void abort() {
-    this.insideCacheFlush = false;
+  synchronized void abortCacheFlush() {
+    this.cacheFlushLock.unlock();
     notifyAll();
   }
 
@@ -474,10 +500,11 @@
     System.err.println("Usage: java org.apache.hbase.HLog" +
         " {--dump <logfile>... | --split <logdir>...}");
   }
-  
+
   /**
    * Pass one or more log file names and it will either dump out a text version
    * on <code>stdout</code> or split the specified log files.
+   *
    * @param args
    * @throws IOException
    */
@@ -490,7 +517,7 @@
     if (args[0].compareTo("--dump") != 0) {
       if (args[0].compareTo("--split") == 0) {
         dump = false;
-        
+
       } else {
         usage();
         System.exit(-1);
@@ -499,7 +526,7 @@
     Configuration conf = new HBaseConfiguration();
     FileSystem fs = FileSystem.get(conf);
     Path baseDir = new Path(conf.get(HBASE_DIR, DEFAULT_HBASE_DIR));
-    
+
     for (int i = 1; i < args.length; i++) {
       Path logPath = new Path(args[i]);
       if (!fs.exists(logPath)) {
@@ -513,7 +540,7 @@
         try {
           HLogKey key = new HLogKey();
           HLogEdit val = new HLogEdit();
-          while(log.next(key, val)) {
+          while (log.next(key, val)) {
             System.out.println(key.toString() + " " + val.toString());
           }
         } finally {

Modified: 
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java?rev=580811&r1=580810&r2=580811&view=diff
==============================================================================
--- 
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java
 (original)
+++ 
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java
 Sun Sep 30 16:46:04 2007
@@ -210,6 +210,7 @@
   final int memcacheFlushSize;
   final int blockingMemcacheSize;
   protected final long threadWakeFrequency;
+  protected final int optionalFlushCount;
   private final HLocking lock = new HLocking();
   private long desiredMaxFileSize;
   private final long maxSequenceId;
@@ -247,6 +248,8 @@
     this.regionInfo = regionInfo;
     this.memcache = new HMemcache();
     this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000);
+    this.optionalFlushCount =
+      conf.getInt("hbase.hregion.memcache.optionalflushcount", 10);
 
     // Declare the regionName.  This is a unique string for the region, used 
to 
     // build a unique filename.
@@ -728,11 +731,13 @@
   void optionallyFlush() throws IOException {
     if(this.memcache.getSize() > this.memcacheFlushSize) {
       flushcache(false);
-    } else if (this.memcache.getSize() > 0 && this.noFlushCount >= 10) {
-      LOG.info("Optional flush called " + this.noFlushCount +
-        " times when data present without flushing.  Forcing one.");
-      flushcache(false);
-      if (this.memcache.getSize() > 0) {
+    } else if (this.memcache.getSize() > 0) {
+      if (this.noFlushCount >= this.optionalFlushCount) {
+        LOG.info("Optional flush called " + this.noFlushCount +
+            " times when data present without flushing.  Forcing one.");
+        flushcache(false);
+        
+      } else {
         // Only increment if something in the cache.
         // Gets zero'd when a flushcache is called.
         this.noFlushCount++;
@@ -864,25 +869,31 @@
             retval.memcacheSnapshot.size());
       }
 
-      // A.  Flush memcache to all the HStores.
-      // Keep running vector of all store files that includes both old and the
-      // just-made new flush store file.
-      for (HStore hstore: stores.values()) {
-        hstore.flushCache(retval.memcacheSnapshot, retval.sequenceId);
+      try {
+        // A.  Flush memcache to all the HStores.
+        // Keep running vector of all store files that includes both old and 
the
+        // just-made new flush store file.
+        for (HStore hstore: stores.values()) {
+          hstore.flushCache(retval.memcacheSnapshot, retval.sequenceId);
+        }
+      } catch (IOException e) {
+        // An exception here means that the snapshot was not persisted.
+        // The hlog needs to be replayed so its content is restored to 
memcache.
+        // Currently, only a server restart will do this.
+        this.log.abortCacheFlush();
+        throw new DroppedSnapshotException(e.getMessage());
       }
 
+      // If we get to here, the HStores have been written. If we get an
+      // error in completeCacheFlush it will release the lock it is holding
+
       // B.  Write a FLUSHCACHE-COMPLETE message to the log.
       //     This tells future readers that the HStores were emitted correctly,
       //     and that all updates to the log for this regionName that have 
lower 
       //     log-sequence-ids can be safely ignored.
       this.log.completeCacheFlush(this.regionInfo.regionName,
-        regionInfo.tableDesc.getName(), logCacheFlushId);
-    } catch (IOException e) {
-      // An exception here means that the snapshot was not persisted.
-      // The hlog needs to be replayed so its content is restored to memcache.
-      // Currently, only a server restart will do this.
-      this.log.abortCacheFlush();
-      throw new DroppedSnapshotException(e.getMessage());
+          regionInfo.tableDesc.getName(), logCacheFlushId);
+
     } finally {
       // C. Delete the now-irrelevant memcache snapshot; its contents have 
been 
       //    dumped to disk-based HStores or, if error, clear aborted snapshot.

Added: 
lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestLogRolling.java
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestLogRolling.java?rev=580811&view=auto
==============================================================================
--- 
lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestLogRolling.java
 (added)
+++ 
lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestLogRolling.java
 Sun Sep 30 16:46:04 2007
@@ -0,0 +1,200 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.dfs.MiniDFSCluster;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+
+/**
+ * Test log deletion as logs are rolled.
+ */
+public class TestLogRolling extends HBaseTestCase {
+  private static final Log LOG = LogFactory.getLog(TestLogRolling.class);
+  private MiniDFSCluster dfs;
+  private MiniHBaseCluster cluster;
+  private Path logdir;
+  private String tableName;
+  private byte[] value;
+  
+  /**
+   * constructor
+   * @throws Exception
+   */
+  public TestLogRolling() throws Exception {
+    super();
+    try {
+      this.dfs = null;
+      this.cluster = null;
+      this.logdir = null;
+      this.tableName = null;
+      this.value = null;
+
+      // We roll the log after every 256 writes
+      conf.setInt("hbase.regionserver.maxlogentries", 256);
+
+      // For less frequently updated regions flush after every 2 flushes
+      conf.setInt("hbase.hregion.memcache.optionalflushcount", 2);
+
+      // We flush the cache after every 8192 bytes
+      conf.setInt("hbase.hregion.memcache.flush.size", 8192);
+
+      // Make lease timeout longer, lease checks less frequent
+      conf.setInt("hbase.master.lease.period", 10 * 1000);
+      conf.setInt("hbase.master.lease.thread.wakefrequency", 5 * 1000);
+
+      // Increase the amount of time between client retries
+      conf.setLong("hbase.client.pause", 15 * 1000);
+
+      String className = this.getClass().getName();
+      StringBuilder v = new StringBuilder(className);
+      while (v.length() < 1000) {
+        v.append(className);
+      }
+      value = v.toString().getBytes(HConstants.UTF8_ENCODING);
+      
+    } catch (Exception e) {
+      LOG.fatal("error in constructor", e);
+      throw e;
+    }
+  }
+
+  /** [EMAIL PROTECTED] */
+  @Override
+  public void setUp() throws Exception {
+    try {
+      super.setUp();
+      dfs = new MiniDFSCluster(conf, 2, true, (String[]) null);
+    } catch (Exception e) {
+      LOG.fatal("error during setUp: ", e);
+      throw e;
+    }
+  }
+
+  /** [EMAIL PROTECTED] */
+  @Override
+  public void tearDown() throws Exception {
+    try {
+      super.tearDown();
+
+      if (cluster != null) {                      // shutdown mini HBase 
cluster
+        cluster.shutdown();
+      }
+
+      if (dfs != null) {
+        FileSystem fs = dfs.getFileSystem();
+        try {
+          dfs.shutdown();
+        } finally {
+          if (fs != null) {
+            fs.close();
+          }
+        }
+      }
+    } catch (Exception e) {
+      LOG.fatal("error in tearDown", e);
+      throw e;
+    }
+  }
+  
+  private void startAndWriteData() throws Exception {
+    cluster = new MiniHBaseCluster(conf, 1, dfs);
+    try {
+      Thread.sleep(10 * 1000);                  // Wait for region server to 
start
+    } catch (InterruptedException e) {
+    }
+
+    logdir = cluster.regionThreads.get(0).getRegionServer().getLog().dir;
+    
+    // When the META table can be opened, the region servers are running
+    @SuppressWarnings("unused")
+    HTable meta = new HTable(conf, HConstants.META_TABLE_NAME);
+    
+    // Create the test table and open it
+    HTableDescriptor desc = new HTableDescriptor(tableName);
+    desc.addFamily(new HColumnDescriptor(HConstants.COLUMN_FAMILY.toString()));
+    HBaseAdmin admin = new HBaseAdmin(conf);
+    admin.createTable(desc);
+    HTable table = new HTable(conf, new Text(tableName));
+
+    for (int i = 1; i <= 2048; i++) {    // 2048 writes should cause 8 log 
rolls
+      long lockid =
+        table.startUpdate(new Text("row" + String.format("%1$04d", i)));
+      table.put(lockid, HConstants.COLUMN_FAMILY, value);
+      table.commit(lockid);
+      
+      if (i % 256 == 0) {
+        // After every 256 writes sleep to let the log roller run
+        
+        try {
+          Thread.sleep(2000);
+        } catch (InterruptedException e) {
+        }
+      }
+    }
+  }
+  
+  private int countLogFiles(boolean print) throws IOException {
+    Path[] logfiles = dfs.getFileSystem().listPaths(new Path[] {logdir});
+    if (print) {
+      for (int i = 0; i < logfiles.length; i++) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("logfile: " + logfiles[i].toString());
+        }
+      }
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("number of log files: " + logfiles.length);
+    }
+    return logfiles.length;
+  }
+  
+  /**
+   * Tests that logs are deleted
+   * 
+   * @throws Exception
+   */
+  public void testLogRolling() throws Exception {
+    tableName = getName();
+    // Force a region split after every 768KB
+    conf.setLong("hbase.hregion.max.filesize", 768L * 1024L);
+    try {
+      startAndWriteData();
+      LOG.info("Finished writing. Sleeping to let cache flusher and log roller 
run");
+      try {
+        // Wait for log roller and cache flusher to run a few times...
+        Thread.sleep(30L * 1000L);
+      } catch (InterruptedException e) {
+        LOG.info("Sleep interrupted", e);
+      }
+      LOG.info("Wake from sleep");
+      assertTrue(countLogFiles(true) <= 2);
+    } catch (Exception e) {
+      LOG.fatal("unexpected exception", e);
+      throw e;
+    }
+  }
+
+}


Reply via email to