Author: jimk
Date: Fri Oct  5 02:39:32 2007
New Revision: 582165

URL: http://svn.apache.org/viewvc?rev=582165&view=rev
Log:
HADOOP-1937 When the master times out a region server's lease, it is too 
aggressive in reclaiming the server's log.

Modified:
    lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt
    
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java
    
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java

Modified: lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt?rev=582165&r1=582164&r2=582165&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt Fri Oct  5 02:39:32 2007
@@ -66,6 +66,8 @@
     HADOOP-1975 HBase tests failing with java.lang.NumberFormatException
     HADOOP-1990 Regression test instability affects nightly and patch builds
     HADOOP-1996 TestHStoreFile fails on windows if run multiple times
+    HADOOP-1937 When the master times out a region server's lease, it is too 
+                aggressive in reclaiming the server's log.
 
   IMPROVEMENTS
     HADOOP-1737 Make HColumnDescriptor data publically members settable

Modified: 
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java?rev=582165&r1=582164&r2=582165&view=diff
==============================================================================
--- 
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java
 (original)
+++ 
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java
 Fri Oct  5 02:39:32 2007
@@ -35,6 +35,8 @@
 import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.DelayQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -99,8 +101,10 @@
   int numRetries;
   long maxRegionOpenTime;
 
+  DelayQueue<PendingServerShutdown> shutdownQueue;
   BlockingQueue<PendingOperation> msgQueue;
 
+  int leaseTimeout;
   private Leases serverLeases;
   private Server server;
   private HServerAddress address;
@@ -860,11 +864,12 @@
     this.numRetries =  conf.getInt("hbase.client.retries.number", 2);
     this.maxRegionOpenTime =
       conf.getLong("hbase.hbasemaster.maxregionopen", 30 * 1000);
-    
+
+    this.shutdownQueue = new DelayQueue<PendingServerShutdown>();
     this.msgQueue = new LinkedBlockingQueue<PendingOperation>();
-    
-    this.serverLeases = new Leases(
-        conf.getInt("hbase.master.lease.period", 30 * 1000), 
+
+    this.leaseTimeout = conf.getInt("hbase.master.lease.period", 30 * 1000); 
+    this.serverLeases = new Leases(this.leaseTimeout, 
         conf.getInt("hbase.master.lease.thread.wakefrequency", 15 * 1000));
     
     this.server = RPC.getServer(this, address.getBindAddress(),
@@ -966,10 +971,13 @@
      */
     try {
       for (PendingOperation op = null; !closed.get(); ) {
-        try {
-          op = msgQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
-        } catch (InterruptedException e) {
-          // continue
+        op = shutdownQueue.poll();
+        if (op == null ) {
+          try {
+            op = msgQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
+          } catch (InterruptedException e) {
+            // continue
+          }
         }
         if (op == null || closed.get()) {
           continue;
@@ -1117,6 +1125,7 @@
    * HMasterRegionInterface
    */
 
+  /** [EMAIL PROTECTED] */
   @SuppressWarnings("unused")
   public MapWritable regionServerStartup(HServerInfo serverInfo)
   throws IOException {
@@ -1140,11 +1149,7 @@
       serversToServerInfo.notifyAll();
     }
     if (storedInfo != null && !closed.get()) {
-      try {
-        msgQueue.put(new PendingServerShutdown(storedInfo));
-      } catch (InterruptedException e) {
-        throw new RuntimeException("Putting into msgQueue was interrupted.", 
e);
-      }
+      shutdownQueue.put(new PendingServerShutdown(storedInfo));
     }
 
     // Either way, record the new server
@@ -1683,9 +1688,12 @@
    * The region server's log file needs to be split up for each region it was
    * serving, and the regions need to get reassigned.
    */
-  private class PendingServerShutdown extends PendingOperation {
+  private class PendingServerShutdown extends PendingOperation
+    implements Delayed {
+    private long delay;
     private HServerAddress deadServer;
     private String deadServerName;
+    private Path oldLogDir;
     private transient boolean logSplit;
     private transient boolean rootChecked;
     private transient boolean rootRescanned;
@@ -1706,14 +1714,33 @@
 
     PendingServerShutdown(HServerInfo serverInfo) {
       super();
+      this.delay = leaseTimeout / 2;
       this.deadServer = serverInfo.getServerAddress();
       this.deadServerName = this.deadServer.toString();
       this.logSplit = false;
       this.rootChecked = false;
       this.rootRescanned = false;
+      StringBuilder dirName = new StringBuilder("log_");
+      dirName.append(deadServer.getBindAddress());
+      dirName.append("_");
+      dirName.append(serverInfo.getStartCode());
+      dirName.append("_");
+      dirName.append(deadServer.getPort());
+      this.oldLogDir = new Path(dir, dirName.toString());
     }
 
     /** [EMAIL PROTECTED] */
+    public long getDelay(TimeUnit unit) {
+      return unit.convert(delay, TimeUnit.MILLISECONDS);
+    }
+    
+    /** [EMAIL PROTECTED] */
+    public int compareTo(Delayed o) {
+      return Long.valueOf(getDelay(TimeUnit.MILLISECONDS)
+          - o.getDelay(TimeUnit.MILLISECONDS)).intValue();
+    }
+    
+    /** [EMAIL PROTECTED] */
     @Override
     public String toString() {
       return "PendingServerShutdown of " + this.deadServer.toString();
@@ -1875,17 +1902,12 @@
 
       if (!logSplit) {
         // Process the old log file
-        StringBuilder dirName = new StringBuilder("log_");
-        dirName.append(deadServer.getBindAddress());
-        dirName.append("_");
-        dirName.append(deadServer.getPort());
-        Path logdir = new Path(dir, dirName.toString());
-        if (fs.exists(logdir)) {
+        if (fs.exists(oldLogDir)) {
           if (!splitLogLock.tryLock()) {
             return false;
           }
           try {
-            HLog.splitLog(dir, logdir, fs, conf);
+            HLog.splitLog(dir, oldLogDir, fs, conf);
           } finally {
             splitLogLock.unlock();
           }
@@ -2901,16 +2923,8 @@
       // NOTE: If the server was serving the root region, we cannot reassign it
       // here because the new server will start serving the root region before
       // the PendingServerShutdown operation has a chance to split the log 
file.
-      try {
-        if (info != null) {
-          msgQueue.put(new PendingServerShutdown(info));
-        }
-      } catch (InterruptedException e) {
-        // continue.  We used to throw a RuntimeException here but on exit
-        // this put is often interrupted.  For now, just log these iterrupts
-        // rather than throw an exception
-        LOG.debug("MsgQueue.put was interrupted (If we are exiting, this " +
-          "msg can be ignored)");
+      if (info != null) {
+        shutdownQueue.put(new PendingServerShutdown(info));
       }
     }
   }

Modified: 
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java?rev=582165&r1=582164&r2=582165&view=diff
==============================================================================
--- 
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java
 (original)
+++ 
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java
 Fri Oct  5 02:39:32 2007
@@ -96,7 +96,7 @@
   protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
   private final Vector<HMsg> outboundMsgs = new Vector<HMsg>();
 
-  int numRetries;
+  final int numRetries;
   protected final int threadWakeFrequency;
   private final int msgInterval;
   private final int serverLeaseTimeout;
@@ -314,7 +314,7 @@
       }
     }
   }
-  
+
   // HLog and HLog roller.  log is protected rather than private to avoid
   // eclipse warning when accessed by inner classes
   protected HLog log;
@@ -472,19 +472,27 @@
                   // get it when the master is panicing because for instance
                   // the HDFS has been yanked out from under it.  Be wary of
                   // this message.
-                  try {
-                    if (checkFileSystem()) {
-                      closeAllRegions();
-                      restart = true;
+                  if (checkFileSystem()) {
+                    closeAllRegions();
+                    synchronized (logRollerLock) {
+                      try {
+                        log.closeAndDelete();
+                        serverInfo.setStartCode(rand.nextLong());
+                        log = setupHLog();
+                      } catch (IOException e) {
+                        this.abortRequested = true;
+                        this.stopRequested.set(true);
+                        e = RemoteExceptionHandler.checkIOException(e); 
+                        LOG.fatal("error restarting server", e);
+                        break;
+                      }
                     }
-                  } catch (Exception e) {
+                    reportForDuty();
+                    restart = true;
+                  } else {
                     LOG.fatal("file system available check failed. " +
-                        "Shutting down server.", e);
-                    this.stopRequested.set(true);
-                    this.fsOk = false;
-                    this.abortRequested = true;
+                        "Shutting down server.");
                   }
-                  
                   break;
 
                 case HMsg.MSG_REGIONSERVER_STOP:
@@ -604,7 +612,7 @@
    * Run init. Sets up hlog and starts up all server threads.
    * @param c Extra configuration.
    */
-  private void init(final MapWritable c) {
+  private void init(final MapWritable c) throws IOException {
     try {
       for (Map.Entry<Writable, Writable> e: c.entrySet()) {
         String key = e.getKey().toString();
@@ -618,18 +626,22 @@
       startServiceThreads();
     } catch (IOException e) {
       this.stopRequested.set(true);
-      LOG.fatal("Failed init",
-        RemoteExceptionHandler.checkIOException(e));
+      e = RemoteExceptionHandler.checkIOException(e); 
+      LOG.fatal("Failed init", e);
+      IOException ex = new IOException("region server startup failed");
+      ex.initCause(e);
+      throw ex;
     }
   }
   
-  private HLog setupHLog()
-  throws RegionServerRunningException, IOException {
+  private HLog setupHLog() throws RegionServerRunningException,
+    IOException {
+    
     String rootDir = this.conf.get(HConstants.HBASE_DIR);
     LOG.info("Root dir: " + rootDir);
-    Path logdir = new Path(new Path(rootDir),
-      "log" + "_" + getThisIP() + "_" +
-      this.serverInfo.getServerAddress().getPort());
+    Path logdir = new Path(new Path(rootDir), "log" + "_" + getThisIP() + "_" +
+        this.serverInfo.getStartCode() + "_" + 
+        this.serverInfo.getServerAddress().getPort());
     if (LOG.isDebugEnabled()) {
       LOG.debug("Log dir " + logdir);
     }
@@ -762,6 +774,7 @@
         }
         break;
       } catch(IOException e) {
+        LOG.warn("error telling master we are up", e);
         this.sleeper.sleep(lastMsg);
         continue;
       }
@@ -1429,4 +1442,4 @@
   public static void main(String [] args) {
     doMain(args, HRegionServer.class);
   }
-}
\ No newline at end of file
+}


Reply via email to