Author: jimk
Date: Thu Nov 29 13:10:03 2007
New Revision: 599578

URL: http://svn.apache.org/viewvc?rev=599578&view=rev
Log:
HADOOP-2295 Fix assigning a region to multiple servers

Modified:
    lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt
    
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java
    
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMsg.java
    
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java
    
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/Leases.java
    
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/LocalHBaseCluster.java
    
lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/DFSAbort.java
    
lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestLogRolling.java

Modified: lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt?rev=599578&r1=599577&r2=599578&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt Thu Nov 29 13:10:03 2007
@@ -39,6 +39,7 @@
                may not restart)
    HADOOP-2253 getRow can return HBASE::DELETEVAL cells
                (Bryan Duxbury via Stack)
+   HADOOP-2295 Fix assigning a region to multiple servers
 
   IMPROVEMENTS
    HADOOP-2401 Add convenience put method that takes writable

Modified: 
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java?rev=599578&r1=599577&r2=599578&view=diff
==============================================================================
--- 
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java
 (original)
+++ 
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java
 Thu Nov 29 13:10:03 2007
@@ -1136,12 +1136,16 @@
 
     // Join up with all threads
     try {
-      rootScannerThread.join();         // Wait for the root scanner to finish.
+      if (rootScannerThread.isAlive()) {
+        rootScannerThread.join();       // Wait for the root scanner to finish.
+      }
     } catch (Exception iex) {
       LOG.warn("root scanner", iex);
     }
     try {
-      metaScannerThread.join();         // Wait for meta scanner to finish.
+      if (metaScannerThread.isAlive()) {
+        metaScannerThread.join();       // Wait for meta scanner to finish.
+      }
     } catch(Exception iex) {
       LOG.warn("meta scanner", iex);
     }
@@ -1460,10 +1464,25 @@
     // Get reports on what the RegionServer did.
 
     for (int i = 0; i < incomingMsgs.length; i++) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Received " + incomingMsgs[i].toString() + "from " +
+            serverName);
+      }
       HRegionInfo region = incomingMsgs[i].getRegionInfo();
 
       switch (incomingMsgs[i].getMsg()) {
 
+      case HMsg.MSG_REPORT_PROCESS_OPEN:
+        synchronized (this.assignAttempts) {
+          // Region server has acknowledged request to open region.
+          // Extend region open time by 1/2 max region open time.
+          assignAttempts.put(region.getRegionName(), 
+              Long.valueOf(assignAttempts.get(
+                  region.getRegionName()).longValue() +
+                  (this.maxRegionOpenTime / 2)));
+        }
+        break;
+        
       case HMsg.MSG_REPORT_OPEN:
         HRegionInfo regionInfo = unassignedRegions.get(region.getRegionName());
 
@@ -1484,9 +1503,7 @@
         } else {
           LOG.info(info.getServerAddress().toString() + " serving " +
               region.getRegionName());
-          // Remove from unassigned list so we don't assign it to someone else
-          this.unassignedRegions.remove(region.getRegionName());
-          this.assignAttempts.remove(region.getRegionName());
+
           if (region.getRegionName().compareTo(
               HRegionInfo.rootRegionInfo.getRegionName()) == 0) {
             // Store the Root Region location (in memory)
@@ -1495,21 +1512,23 @@
                   new HServerAddress(info.getServerAddress()));
               this.rootRegionLocation.notifyAll();
             }
-            break;
-          }
-
-          // Note that the table has been assigned and is waiting for the meta
-          // table to be updated.
-
-          pendingRegions.add(region.getRegionName());
-
-          // Queue up an update to note the region location.
-
-          try {
-            msgQueue.put(new ProcessRegionOpen(info, region));
-          } catch (InterruptedException e) {
-            throw new RuntimeException("Putting into msgQueue was 
interrupted.", e);
-          }
+          } else {
+            // Note that the table has been assigned and is waiting for the 
meta
+            // table to be updated.
+
+            pendingRegions.add(region.getRegionName());
+
+            // Queue up an update to note the region location.
+
+            try {
+              msgQueue.put(new ProcessRegionOpen(info, region));
+            } catch (InterruptedException e) {
+              throw new RuntimeException("Putting into msgQueue was 
interrupted.", e);
+            }
+          } 
+          // Remove from unassigned list so we don't assign it to someone else
+          this.unassignedRegions.remove(region.getRegionName());
+          this.assignAttempts.remove(region.getRegionName());
         }
         break;
 

Modified: 
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMsg.java
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMsg.java?rev=599578&r1=599577&r2=599578&view=diff
==============================================================================
--- 
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMsg.java
 (original)
+++ 
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMsg.java
 Thu Nov 29 13:10:03 2007
@@ -53,6 +53,9 @@
   
   /** region server is no longer serving the specified region */
   public static final byte MSG_REPORT_CLOSE = 101;
+  
+  /** region server is processing open request */
+  public static final byte MSG_REPORT_PROCESS_OPEN = 102;
 
   /**
    * region server split the region associated with this message.
@@ -140,6 +143,10 @@
       
     case MSG_REGION_CLOSE_WITHOUT_REPORT:
       message.append("MSG_REGION_CLOSE_WITHOUT_REPORT : ");
+      break;
+      
+    case MSG_REPORT_PROCESS_OPEN:
+      message.append("MSG_REPORT_PROCESS_OPEN : ");
       break;
       
     case MSG_REPORT_OPEN:

Modified: 
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java?rev=599578&r1=599577&r2=599578&view=diff
==============================================================================
--- 
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java
 (original)
+++ 
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java
 Thu Nov 29 13:10:03 2007
@@ -742,6 +742,10 @@
                       throw new RuntimeException("Putting into msgQueue was " +
                         "interrupted.", e);
                     }
+                    if (msgs[i].getMsg() == HMsg.MSG_REGION_OPEN) {
+                      outboundMsgs.add(new HMsg(HMsg.MSG_REPORT_PROCESS_OPEN,
+                          msgs[i].getRegionInfo()));
+                    }
                   }
                 }
               }
@@ -982,11 +986,11 @@
    * Presumption is that all closes and stops have already been called.
    */
   void join() {
-    join(this.workerThread);
     join(this.logRoller);
     join(this.cacheFlusher);
     join(this.compactor);
     join(this.splitter);
+    join(this.workerThread);
   }
 
   private void join(final Thread t) {
@@ -1161,8 +1165,8 @@
       } finally {
         this.lock.writeLock().unlock();
       }
+      reportOpen(region); 
     }
-    reportOpen(region); 
   }
 
   void closeRegion(final HRegionInfo hri, final boolean reportWhenCompleted)

Modified: 
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/Leases.java
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/Leases.java?rev=599578&r1=599577&r2=599578&view=diff
==============================================================================
--- 
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/Leases.java
 (original)
+++ 
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/Leases.java
 Thu Nov 29 13:10:03 2007
@@ -108,11 +108,13 @@
   public void close() {
     LOG.info(Thread.currentThread().getName() + " closing leases");
     this.stop.set(true);
-    try {
-      this.leaseMonitorThread.interrupt();
-      this.leaseMonitorThread.join();
-    } catch (InterruptedException iex) {
-      // Ignore
+    while (this.leaseMonitorThread.isAlive()) {
+      try {
+        this.leaseMonitorThread.interrupt();
+        this.leaseMonitorThread.join();
+      } catch (InterruptedException iex) {
+        // Ignore
+      }
     }
     synchronized(leases) {
       synchronized(sortedLeases) {
@@ -211,10 +213,16 @@
    * Its a daemon thread.
    */
   class LeaseMonitor extends Chore {
+    /**
+     * @param p
+     * @param s
+     */
     public LeaseMonitor(int p, AtomicBoolean s) {
       super(p, s);
     }
 
+    /** [EMAIL PROTECTED] */
+    @Override
     protected void chore() {
       synchronized(leases) {
         synchronized(sortedLeases) {

Modified: 
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/LocalHBaseCluster.java
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/LocalHBaseCluster.java?rev=599578&r1=599577&r2=599578&view=diff
==============================================================================
--- 
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/LocalHBaseCluster.java
 (original)
+++ 
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/LocalHBaseCluster.java
 Thu Nov 29 13:10:03 2007
@@ -53,7 +53,9 @@
   private final HMaster master;
   private final List<RegionServerThread> regionThreads;
   private final static int DEFAULT_NO = 1;
+  /** local mode */
   public static final String LOCAL = "local";
+  /** 'local:' */
   public static final String LOCAL_COLON = LOCAL + ":";
   private final HBaseConfiguration conf;
 
@@ -146,12 +148,14 @@
   public String waitOnRegionServer(int serverNumber) {
     RegionServerThread regionServerThread =
       this.regionThreads.remove(serverNumber);
-    try {
-      LOG.info("Waiting on " +
-        regionServerThread.getRegionServer().serverInfo.toString());
-      regionServerThread.join();
-    } catch (InterruptedException e) {
-      e.printStackTrace();
+    while (regionServerThread.isAlive()) {
+      try {
+        LOG.info("Waiting on " +
+            regionServerThread.getRegionServer().serverInfo.toString());
+        regionServerThread.join();
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
     }
     return regionServerThread.getName();
   }
@@ -217,10 +221,12 @@
       }
     }
     if (this.master != null) {
-      try {
-        this.master.join();
-      } catch(InterruptedException e) {
-        // continue
+      while (this.master.isAlive()) {
+        try {
+          this.master.join();
+        } catch(InterruptedException e) {
+          // continue
+        }
       }
     }
     LOG.info("Shutdown " +

Modified: 
lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/DFSAbort.java
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/DFSAbort.java?rev=599578&r1=599577&r2=599578&view=diff
==============================================================================
--- 
lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/DFSAbort.java
 (original)
+++ 
lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/DFSAbort.java
 Thu Nov 29 13:10:03 2007
@@ -22,19 +22,10 @@
 import junit.framework.TestSuite;
 import junit.textui.TestRunner;
 
-import java.io.PrintWriter;
-import org.apache.hadoop.util.ReflectionUtils;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
 /**
  * Test ability of HBase to handle DFS failure
  */
 public class DFSAbort extends HBaseClusterTestCase {
-  private static final Log LOG =
-    LogFactory.getLog(DFSAbort.class.getName());
-
   /** constructor */
   public DFSAbort() {
     super();
@@ -66,8 +57,6 @@
       // By now the Mini DFS is running, Mini HBase is running and we have
       // created a table. Now let's yank the rug out from HBase
       cluster.getDFSCluster().shutdown();
-      // Now wait for Mini HBase Cluster to shut down
-//      cluster.join();
       threadDumpingJoin();
     } catch (Exception e) {
       e.printStackTrace();

Modified: 
lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestLogRolling.java
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestLogRolling.java?rev=599578&r1=599577&r2=599578&view=diff
==============================================================================
--- 
lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestLogRolling.java
 (original)
+++ 
lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestLogRolling.java
 Thu Nov 29 13:10:03 2007
@@ -127,33 +127,41 @@
 
     this.server = cluster.getRegionThreads().get(0).getRegionServer();
     this.log = server.getLog();
-    
+
     // When the META table can be opened, the region servers are running
-    @SuppressWarnings("unused")
     HTable meta = new HTable(conf, HConstants.META_TABLE_NAME);
     
-    // Create the test table and open it
-    HTableDescriptor desc = new HTableDescriptor(tableName);
-    desc.addFamily(new HColumnDescriptor(HConstants.COLUMN_FAMILY.toString()));
-    HBaseAdmin admin = new HBaseAdmin(conf);
-    admin.createTable(desc);
-    HTable table = new HTable(conf, new Text(tableName));
+    try {
+
+      // Create the test table and open it
+      HTableDescriptor desc = new HTableDescriptor(tableName);
+      desc.addFamily(new 
HColumnDescriptor(HConstants.COLUMN_FAMILY.toString()));
+      HBaseAdmin admin = new HBaseAdmin(conf);
+      admin.createTable(desc);
+      HTable table = new HTable(conf, new Text(tableName));
+
+      try {
+        for (int i = 1; i <= 2048; i++) {    // 2048 writes should cause 8 log 
rolls
+          long lockid =
+            table.startUpdate(new Text("row" + String.format("%1$04d", i)));
+          table.put(lockid, HConstants.COLUMN_FAMILY, value);
+          table.commit(lockid);
+
+          if (i % 256 == 0) {
+            // After every 256 writes sleep to let the log roller run
 
-    for (int i = 1; i <= 2048; i++) {    // 2048 writes should cause 8 log 
rolls
-      long lockid =
-        table.startUpdate(new Text("row" + String.format("%1$04d", i)));
-      table.put(lockid, HConstants.COLUMN_FAMILY, value);
-      table.commit(lockid);
-      
-      if (i % 256 == 0) {
-        // After every 256 writes sleep to let the log roller run
-        
-        try {
-          Thread.sleep(2000);
-        } catch (InterruptedException e) {
-          // continue
+            try {
+              Thread.sleep(2000);
+            } catch (InterruptedException e) {
+              // continue
+            }
+          }
         }
+      } finally {
+        table.close();
       }
+    } finally {
+      meta.close();
     }
   }
   


Reply via email to