Author: stack
Date: Tue Sep 21 06:36:22 2010
New Revision: 999237

URL: http://svn.apache.org/viewvc?rev=999237&view=rev
Log:
HBASE-3018 Bulk assignment on startup runs serially through the cluster servers 
assigning in bulk to one at a time

M src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
  Removed reentrant lock that spanned moving region into
  regionsintransition and the getting of lock on state object.  It
  was a bad idea.  I found it actually deadlocked.  It shouldn't be needed.
  The call to assign will fail if the state instance is not of the right
  'state' type.
  Run a thread per server parcelling out region assignments so we assign
  across the cluster concurrently rather than in series as we were doing.

Modified:
    hbase/trunk/CHANGES.txt
    
hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
    
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java

Modified: hbase/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=999237&r1=999236&r2=999237&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Tue Sep 21 06:36:22 2010
@@ -530,6 +530,8 @@ Release 0.21.0 - Unreleased
    HBASE-3015  recovered.edits files not deleted if it only contain edits that
                have already been flushed; hurts perf for all future opens of
                the region
+   HBASE-3018  Bulk assignment on startup runs serially through the cluster
+               servers assigning in bulk to one at a time
 
   IMPROVEMENTS
    HBASE-1760  Cleanup TODOs in HTable

Modified: 
hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
URL: 
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java?rev=999237&r1=999236&r2=999237&view=diff
==============================================================================
--- 
hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java 
(original)
+++ 
hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java 
Tue Sep 21 06:36:22 2010
@@ -118,8 +118,6 @@ public class AssignmentManager extends Z
   private final SortedMap<HRegionInfo,HServerInfo> regions =
     new TreeMap<HRegionInfo,HServerInfo>();
 
-  private final ReentrantLock assignLock = new ReentrantLock();
-
   private final ExecutorService executorService;
 
   /**
@@ -493,24 +491,18 @@ public class AssignmentManager extends Z
     // Grab the state of this region and synchronize on it
     String encodedName = region.getEncodedName();
     RegionState state;
-    // This assignLock is used bridging the two synchronization blocks.  Once
-    // we've made it into the 'state' synchronization block, then we can let
-    // go of this lock.  There must be a better construct that this -- St.Ack 
20100811
-    this.assignLock.lock();
-    try {
-      synchronized (regionsInTransition) {
-        state = regionsInTransition.get(encodedName);
-        if(state == null) {
-          state = new RegionState(region, RegionState.State.OFFLINE);
-          regionsInTransition.put(encodedName, state);
-        }
-      }
-      synchronized(state) {
-        this.assignLock.unlock();
-        assign(state);
+    synchronized (regionsInTransition) {
+      state = regionsInTransition.get(encodedName);
+      if (state == null) {
+        state = new RegionState(region, RegionState.State.OFFLINE);
+        regionsInTransition.put(encodedName, state);
       }
-    } finally {
-      if (this.assignLock.isHeldByCurrentThread()) this.assignLock.unlock();
+    }
+    // This here gap between synchronizations looks like a hole but it should
+    // be ok because the assign below would protect against being called with
+    // a state instance that is not in the right 'state' -- St.Ack 20100920.
+    synchronized (state) {
+      assign(state);
     }
   }
 
@@ -519,9 +511,9 @@ public class AssignmentManager extends Z
    * @param state 
    */
   private void assign(final RegionState state) {
-    if(!state.isClosed() && !state.isOffline()) {
+    if (!state.isClosed() && !state.isOffline()) {
       LOG.info("Attempting to assign region but it is in transition and in " +
-          "an unexpected state:" + state);
+        "an unexpected state:" + state);
       return;
     } else {
       state.update(RegionState.State.OFFLINE);
@@ -675,32 +667,21 @@ public class AssignmentManager extends Z
     // Scan META for all user regions
     List<HRegionInfo> allRegions =
       MetaScanner.listAllRegions(master.getConfiguration());
-    if (allRegions == null || allRegions.isEmpty()) {
-      return;
-    }
+    if (allRegions == null || allRegions.isEmpty()) return;
 
     // Get all available servers
     List<HServerInfo> servers = serverManager.getOnlineServersList();
-
-    LOG.info("Assigning " + allRegions.size() + " regions across " + 
servers.size() +
-      " servers");
+    LOG.info("Assigning " + allRegions.size() + " region(s) across " +
+      servers.size() + " server(s)");
 
     // Generate a cluster startup region placement plan
-    Map<HServerInfo,List<HRegionInfo>> bulkPlan =
+    Map<HServerInfo, List<HRegionInfo>> bulkPlan =
       LoadBalancer.bulkAssignment(allRegions, servers);
 
-    // For each server, create OFFLINE nodes and send OPEN RPCs
-    for (Map.Entry<HServerInfo,List<HRegionInfo>> entry : bulkPlan.entrySet()) 
{
-      HServerInfo server = entry.getKey();
-      List<HRegionInfo> regions = entry.getValue();
-      LOG.debug("Assigning " + regions.size() + " regions to " + server);
-      for (HRegionInfo region : regions) {
-        LOG.debug("Assigning " + region.getRegionNameAsString() + " to " + 
server);
-        String regionName = region.getEncodedName();
-        RegionPlan plan = new RegionPlan(region, null,server);
-        regionPlans.put(regionName, plan);
-        assign(region);
-      }
+    // Now start a thread per server to run assignment.
+    for (Map.Entry<HServerInfo,List<HRegionInfo>> entry: bulkPlan.entrySet()) {
+      Thread t = new BulkAssignServer(entry.getKey(), entry.getValue(), 
this.master);
+      t.start();
     }
 
     // Wait for no regions to be in transition
@@ -714,6 +695,46 @@ public class AssignmentManager extends Z
     LOG.info("All user regions have been assigned");
   }
 
+  /**
+   * Class to run bulk assign to a single server.
+   */
+  class BulkAssignServer extends Thread {
+    private final List<HRegionInfo> regions;
+    private final HServerInfo server;
+    private final Stoppable stopper;
+
+    BulkAssignServer(final HServerInfo server,
+        final List<HRegionInfo> regions, final Stoppable stopper) {
+      super("serverassign-" + server.getServerName());
+      setDaemon(true);
+      this.server = server;
+      this.regions = regions;
+      this.stopper = stopper;
+    }
+
+    @Override
+    public void run() {
+      // Insert a plan for each region with 'server' as the target 
regionserver.
+      // Below, we run through regions one at a time.  The call to assign will
+      // move the region into the regionsInTransition which starts up a timer.
+      // if the region is not out of the regionsInTransition by a certain time,
+      // it will be reassigned.  We don't want that to happen.  So, do it this
+      // way a region at a time for now.  Presumably the regionserver will put
+      // up a back pressure if opening a region takes time which is good since
+      // this will block our adding new regions to regionsInTransition.  Later
+      // make it so we can send over a lump of regions in one rpc with the
+      // regionserver on remote side tickling zk on a period to prevent our
+      // regionsInTransition timing out.  Currently its not possible given the
+      // Executor architecture on the regionserver side.  St.Ack 20100920.
+      for (HRegionInfo region : regions) {
+        LOG.debug("Assigning " + region.getRegionNameAsString() + " to " + 
this.server);
+        regionPlans.put(region.getEncodedName(), new RegionPlan(region, null, 
server));
+        assign(region);
+        if (this.stopper.isStopped()) break;
+      }
+    }
+  }
+
   private void rebuildUserRegions() throws IOException {
     Map<HRegionInfo,HServerAddress> allRegions =
       MetaReader.fullScan(catalogTracker);

Modified: 
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: 
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=999237&r1=999236&r2=999237&view=diff
==============================================================================
--- 
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
 (original)
+++ 
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
 Tue Sep 21 06:36:22 2010
@@ -1908,7 +1908,7 @@ public class HRegionServer implements HR
   public void openRegion(HRegionInfo region) {
     LOG.info("Received request to open region: " +
       region.getRegionNameAsString());
-    if(region.isRootRegion()) {
+    if (region.isRootRegion()) {
       this.service.submit(new OpenRootHandler(this, this, region));
     } else if(region.isMetaRegion()) {
       this.service.submit(new OpenMetaHandler(this, this, region));


Reply via email to