HBASE-13606 AssignmentManager.assign() is not sync in both path

Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/5d553ada
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/5d553ada
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/5d553ada

Branch: refs/heads/branch-1
Commit: 5d553adacdd2e1ad06abd9ebea9b954604683b01
Parents: 33fe79c
Author: Matteo Bertozzi <matteo.berto...@cloudera.com>
Authored: Mon May 11 23:49:08 2015 +0100
Committer: Matteo Bertozzi <matteo.berto...@cloudera.com>
Committed: Tue May 12 00:14:04 2015 +0100

----------------------------------------------------------------------
 .../hadoop/hbase/master/AssignmentManager.java  | 99 ++++++++++++++++----
 .../hbase/master/GeneralBulkAssigner.java       | 29 +-----
 2 files changed, 85 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/5d553ada/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
index bf9b207..34c4963 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
@@ -214,6 +214,7 @@ public class AssignmentManager extends ZooKeeperListener {
   // bulk assigning may be not as efficient.
   private final int bulkAssignThresholdRegions;
   private final int bulkAssignThresholdServers;
+  private final int bulkPerRegionOpenTimeGuesstimate;
 
   // Should bulk assignment wait till all regions are assigned,
   // or it is timed out?  This is useful to measure bulk assignment
@@ -255,7 +256,7 @@ public class AssignmentManager extends ZooKeeperListener {
 
   /** Listeners that are called on assignment events. */
   private List<AssignmentListener> listeners = new 
CopyOnWriteArrayList<AssignmentListener>();
-  
+
   private RegionStateListener regionStateListener;
 
   /**
@@ -312,6 +313,8 @@ public class AssignmentManager extends ZooKeeperListener {
       conf.getBoolean("hbase.bulk.assignment.waittillallassigned", false);
     this.bulkAssignThresholdRegions = 
conf.getInt("hbase.bulk.assignment.threshold.regions", 7);
     this.bulkAssignThresholdServers = 
conf.getInt("hbase.bulk.assignment.threshold.servers", 3);
+    this.bulkPerRegionOpenTimeGuesstimate =
+      conf.getInt("hbase.bulk.assignment.perregion.open.time", 10000);
 
     int workers = conf.getInt("hbase.assignment.zkevent.workers", 20);
     ThreadFactory threadFactory = 
Threads.newDaemonThreadFactory("AM.ZK.Worker");
@@ -1096,7 +1099,7 @@ public class AssignmentManager extends ZooKeeperListener {
             return;
           }
           // Handle OPENED by removing from transition and deleted zk node
-          regionState = 
+          regionState =
               
regionStates.transitionOpenFromPendingOpenOrOpeningOnServer(rt,regionState, sn);
           if (regionState != null) {
             failedOpenTracker.remove(encodedName); // reset the count, if any
@@ -1788,6 +1791,18 @@ public class AssignmentManager extends ZooKeeperListener 
{
           }
         }
       }
+
+      // wait for assignment completion
+      ArrayList<HRegionInfo> userRegionSet = new 
ArrayList<HRegionInfo>(regions.size());
+      for (HRegionInfo region: regions) {
+        if (!region.getTable().isSystemTable()) {
+          userRegionSet.add(region);
+        }
+      }
+      if (!waitForAssignment(userRegionSet, true, userRegionSet.size(),
+            System.currentTimeMillis())) {
+        LOG.debug("some user regions are still in transition: " + 
userRegionSet);
+      }
       LOG.debug("Bulk assigning done for " + destination);
       return true;
     } finally {
@@ -2617,22 +2632,62 @@ public class AssignmentManager extends 
ZooKeeperListener {
    * If the region is already assigned, returns immediately.  Otherwise, method
    * blocks until the region is assigned.
    * @param regionInfo region to wait on assignment for
+   * @return true if the region is assigned false otherwise.
    * @throws InterruptedException
    */
   public boolean waitForAssignment(HRegionInfo regionInfo)
       throws InterruptedException {
-    while (!regionStates.isRegionOnline(regionInfo)) {
-      if (regionStates.isRegionInState(regionInfo, State.FAILED_OPEN)
-          || this.server.isStopped()) {
-        return false;
-      }
+    ArrayList<HRegionInfo> regionSet = new ArrayList<HRegionInfo>(1);
+    regionSet.add(regionInfo);
+    return waitForAssignment(regionSet, true, Long.MAX_VALUE);
+  }
 
-      // We should receive a notification, but it's
-      //  better to have a timeout to recheck the condition here:
-      //  it lowers the impact of a race condition if any
-      regionStates.waitForUpdate(100);
+  /**
+   * Waits until the specified region has completed assignment, or the 
deadline is reached.
+   */
+  protected boolean waitForAssignment(final Collection<HRegionInfo> regionSet,
+      final boolean waitTillAllAssigned, final int reassigningRegions,
+      final long minEndTime) throws InterruptedException {
+    long deadline = minEndTime + bulkPerRegionOpenTimeGuesstimate * 
(reassigningRegions + 1);
+    return waitForAssignment(regionSet, waitTillAllAssigned, deadline);
+  }
+
+  /**
+   * Waits until the specified region has completed assignment, or the 
deadline is reached.
+   * @param regionSet set of region to wait on. the set is modified and the 
assigned regions removed
+   * @param waitTillAllAssigned true if we should wait all the regions to be 
assigned
+   * @param deadline the timestamp after which the wait is aborted
+   * @return true if all the regions are assigned false otherwise.
+   * @throws InterruptedException
+   */
+  protected boolean waitForAssignment(final Collection<HRegionInfo> regionSet,
+      final boolean waitTillAllAssigned, final long deadline) throws 
InterruptedException {
+    // We're not synchronizing on regionsInTransition now because we don't use 
any iterator.
+    while (!regionSet.isEmpty() && !server.isStopped() && deadline > 
System.currentTimeMillis()) {
+      int failedOpenCount = 0;
+      Iterator<HRegionInfo> regionInfoIterator = regionSet.iterator();
+      while (regionInfoIterator.hasNext()) {
+        HRegionInfo hri = regionInfoIterator.next();
+        if (regionStates.isRegionOnline(hri) || 
regionStates.isRegionInState(hri,
+            State.SPLITTING, State.SPLIT, State.MERGING, State.MERGED)) {
+          regionInfoIterator.remove();
+        } else if (regionStates.isRegionInState(hri, State.FAILED_OPEN)) {
+          failedOpenCount++;
+        }
+      }
+      if (!waitTillAllAssigned) {
+        // No need to wait, let assignment going on asynchronously
+        break;
+      }
+      if (!regionSet.isEmpty()) {
+        if (failedOpenCount == regionSet.size()) {
+          // all the regions we are waiting had an error on open.
+          break;
+        }
+        regionStates.waitForUpdate(100);
+      }
     }
-    return true;
+    return regionSet.isEmpty();
   }
 
   /**
@@ -2725,15 +2780,27 @@ public class AssignmentManager extends 
ZooKeeperListener {
         LOG.trace("Not using bulk assignment since we are assigning only " + 
regions +
           " region(s) to " + servers + " server(s)");
       }
+
+      // invoke assignment (async)
+      ArrayList<HRegionInfo> userRegionSet = new 
ArrayList<HRegionInfo>(regions);
       for (Map.Entry<ServerName, List<HRegionInfo>> plan: bulkPlan.entrySet()) 
{
         if (!assign(plan.getKey(), plan.getValue())) {
           for (HRegionInfo region: plan.getValue()) {
             if (!regionStates.isRegionOnline(region)) {
               invokeAssign(region);
+              if (!region.getTable().isSystemTable()) {
+                userRegionSet.add(region);
+              }
             }
           }
         }
       }
+
+      // wait for assignment completion
+      if (!waitForAssignment(userRegionSet, true, userRegionSet.size(),
+            System.currentTimeMillis())) {
+        LOG.debug("some user regions are still in transition: " + 
userRegionSet);
+      }
     } else {
       LOG.info("Bulk assigning " + regions + " region(s) across "
         + totalServers + " server(s), " + message);
@@ -3044,11 +3111,11 @@ public class AssignmentManager extends 
ZooKeeperListener {
       if (serverName != null
           && !serverManager.getOnlineServers().containsKey(serverName)) {
         LOG.info("Server " + serverName + " isn't online. SSH will handle 
this");
-        continue; 
+        continue;
       }
       HRegionInfo regionInfo = regionState.getRegion();
       State state = regionState.getState();
-      
+
       switch (state) {
       case CLOSED:
         invokeAssign(regionInfo);
@@ -3060,7 +3127,7 @@ public class AssignmentManager extends ZooKeeperListener {
         retrySendRegionClose(regionState);
         break;
       case FAILED_CLOSE:
-      case FAILED_OPEN:  
+      case FAILED_OPEN:
         invokeUnAssign(regionInfo);
         break;
       default:
@@ -4270,7 +4337,7 @@ public class AssignmentManager extends ZooKeeperListener {
     getSnapShotOfAssignment(Collection<HRegionInfo> infos) {
     return getRegionStates().getRegionAssignments(infos);
   }
-  
+
   void setRegionStateListener(RegionStateListener listener) {
     this.regionStateListener = listener;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/5d553ada/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java
index 356f4af..43ea523 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.master;
 import java.lang.Thread.UncaughtExceptionHandler;
 import java.util.ArrayList;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -35,7 +34,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.master.RegionState.State;
 
 /**
  * Run bulk assign.  Does one RCP per regionserver passing a
@@ -118,31 +116,8 @@ public class GeneralBulkAssigner extends BulkAssigner {
     if (!failedPlans.isEmpty() && !server.isStopped()) {
       reassigningRegions = reassignFailedPlans();
     }
-
-    Configuration conf = server.getConfiguration();
-    long perRegionOpenTimeGuesstimate =
-      conf.getLong("hbase.bulk.assignment.perregion.open.time", 1000);
-    long endTime = Math.max(System.currentTimeMillis(), rpcWaitTime)
-      + perRegionOpenTimeGuesstimate * (reassigningRegions + 1);
-    RegionStates regionStates = assignmentManager.getRegionStates();
-    // We're not synchronizing on regionsInTransition now because we don't use 
any iterator.
-    while (!regionSet.isEmpty() && !server.isStopped() && endTime > 
System.currentTimeMillis()) {
-      Iterator<HRegionInfo> regionInfoIterator = regionSet.iterator();
-      while (regionInfoIterator.hasNext()) {
-        HRegionInfo hri = regionInfoIterator.next();
-        if (regionStates.isRegionOnline(hri) || 
regionStates.isRegionInState(hri,
-            State.SPLITTING, State.SPLIT, State.MERGING, State.MERGED)) {
-          regionInfoIterator.remove();
-        }
-      }
-      if (!waitTillAllAssigned) {
-        // No need to wait, let assignment going on asynchronously
-        break;
-      }
-      if (!regionSet.isEmpty()) {
-        regionStates.waitForUpdate(100);
-      }
-    }
+    assignmentManager.waitForAssignment(regionSet, waitTillAllAssigned,
+      reassigningRegions, Math.max(System.currentTimeMillis(), rpcWaitTime));
 
     if (LOG.isDebugEnabled()) {
       long elapsedTime = System.currentTimeMillis() - startTime;

Reply via email to