This is an automated email from the ASF dual-hosted git repository.

stack pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 727ebbd  HBASE-22527 [hbck2] Add a master web ui to show the 
problematic regions
727ebbd is described below

commit 727ebbddac811076b336322f4e01f16d20a05641
Author: Guanghao Zhang <[email protected]>
AuthorDate: Thu Jul 18 09:41:11 2019 +0800

    HBASE-22527 [hbck2] Add a master web ui to show the problematic regions
---
 .../tmpl/master/AssignmentManagerStatusTmpl.jamon  | 103 +++-
 .../hbase/master/assignment/AssignmentManager.java |  57 ++
 .../assignment/TestAMProblematicRegions.java       | 127 +++++
 .../master/assignment/TestAssignmentManager.java   | 599 +-------------------
 .../assignment/TestAssignmentManagerBase.java      | 629 +++++++++++++++++++++
 5 files changed, 911 insertions(+), 604 deletions(-)

diff --git 
a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/AssignmentManagerStatusTmpl.jamon
 
b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/AssignmentManagerStatusTmpl.jamon
index 9f31483..1d8fa70 100644
--- 
a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/AssignmentManagerStatusTmpl.jamon
+++ 
b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/AssignmentManagerStatusTmpl.jamon
@@ -17,27 +17,108 @@ See the License for the specific language governing 
permissions and
 limitations under the License.
 </%doc>
 <%import>
-org.apache.hadoop.hbase.master.assignment.AssignmentManager;
-org.apache.hadoop.hbase.master.assignment.AssignmentManager.RegionInTransitionStat;
-org.apache.hadoop.hbase.master.assignment.RegionStates.RegionFailedOpen;
-org.apache.hadoop.hbase.master.RegionState;
+java.util.Map;
+java.util.Set;
+java.util.SortedSet;
+java.util.concurrent.atomic.AtomicInteger;
+java.util.stream.Collectors;
 org.apache.hadoop.conf.Configuration;
 org.apache.hadoop.hbase.HBaseConfiguration;
 org.apache.hadoop.hbase.HConstants;
+org.apache.hadoop.hbase.ServerName;
+org.apache.hadoop.hbase.client.RegionInfo;
 org.apache.hadoop.hbase.client.RegionInfoDisplay;
-java.util.HashSet;
-java.util.SortedSet;
-java.util.Map;
-java.util.concurrent.atomic.AtomicInteger;
+org.apache.hadoop.hbase.master.RegionState;
+org.apache.hadoop.hbase.master.assignment.AssignmentManager;
+org.apache.hadoop.hbase.master.assignment.AssignmentManager.RegionInTransitionStat;
+org.apache.hadoop.hbase.master.assignment.RegionStates.RegionFailedOpen;
+org.apache.hadoop.hbase.util.Pair;
 </%import>
 <%args>
 AssignmentManager assignmentManager;
 int limit = 100;
 </%args>
 
-<%java SortedSet<RegionState> rit = assignmentManager
-  .getRegionStates().getRegionsInTransitionOrderedByTimestamp();
-%>
+<%java>
+SortedSet<RegionState> rit = assignmentManager.getRegionStates()
+    .getRegionsInTransitionOrderedByTimestamp();
+Map<String, Pair<ServerName, Set<ServerName>>> problematicRegions = 
assignmentManager
+    .getProblematicRegions();
+</%java>
+
+<%if !problematicRegions.isEmpty() %>
+<%java>
+int totalSize = problematicRegions.size();
+int sizePerPage = Math.min(10, totalSize);
+int numOfPages = (int) Math.ceil(totalSize * 1.0 / sizePerPage);
+</%java>
+    <section>
+    <h2><a name="problem-regions">Problematic Regions</a></h2>
+    <p>
+        <span>
+            <% problematicRegions.size() %> problematic region(s). There are 
three case: 1. Master
+             thought this region opened, but no regionserver reported it. 2. 
Master thought this
+              region opened on Server1, but regionserver reported Server2. 3. 
More than one
+               regionservers reported opened this region. Notice: the reported 
online regionservers
+                may be not right when there are regions in transition. Please 
check them in
+                 regionserver's web UI.
+        </span>
+    </p>
+    <div class="tabbable">
+        <div class="tab-content">
+        <%java int recordItr = 0; %>
+        <%for Map.Entry<String, Pair<ServerName, Set<ServerName>>> entry : 
problematicRegions.entrySet() %>
+            <%if (recordItr % sizePerPage) == 0 %>
+                <%if recordItr == 0 %>
+                    <div class="tab-pane active" id="tab_prs<% (recordItr / 
sizePerPage) + 1 %>">
+                <%else>
+                    <div class="tab-pane" id="tab_prs<% (recordItr / 
sizePerPage) + 1 %>">
+                </%if>
+                <table class="table table-striped" style="margin-bottom:0px;">
+                    <tr>
+                        <th>Region</th>
+                        <th>Location in META</th>
+                        <th>Reported Online Region Servers</th>
+                    </tr>
+            </%if>
+
+            <tr>
+                <td><% entry.getKey() %></td>
+                <td><% entry.getValue().getFirst() %></td>
+                <td><% 
entry.getValue().getSecond().stream().map(ServerName::getServerName)
+                    .collect(Collectors.joining(", ")) %></td>
+            </tr>
+            <%java recordItr++; %>
+            <%if (recordItr % sizePerPage) == 0 %>
+                </table>
+                </div>
+            </%if>
+        </%for>
+
+        <%if (recordItr % sizePerPage) != 0 %>
+         <%for ; (recordItr % sizePerPage) != 0 ; recordItr++ %>
+            <tr><td colspan="3" style="height:61px"></td></tr>
+         </%for>
+         </table>
+         </div>
+        </%if>
+
+        </div>
+        <nav>
+         <ul class="nav nav-pills pagination">
+         <%for int i = 1 ; i <= numOfPages; i++ %>
+             <%if i == 1 %>
+             <li class="active">
+             <%else>
+             <li>
+             </%if>
+             <a href="#tab_prs<% i %>"><% i %></a></li>
+         </%for>
+         </ul>
+        </nav>
+    </div>
+    </section>
+</%if>
 
 <%if !rit.isEmpty() %>
 <%java>
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java
index 69b552e..e680454 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -156,6 +157,8 @@ public class AssignmentManager implements ServerListener {
   private final RegionStates regionStates = new RegionStates();
   private final RegionStateStore regionStateStore;
 
+  private final Map<ServerName, Set<byte[]>> rsReports = new HashMap<>();
+
   private final boolean shouldAssignRegionsWithFavoredNodes;
   private final int assignDispatchWaitQueueMaxSize;
   private final int assignDispatchWaitMillis;
@@ -962,6 +965,11 @@ public class AssignmentManager implements ServerListener {
       }
     }
 
+    // Track the regionserver reported online regions in memory.
+    synchronized (rsReports) {
+      rsReports.put(serverName, regionNames);
+    }
+
     if (regionNames.isEmpty()) {
       // nothing to do if we don't have regions
       LOG.trace("no online region found on " + serverName);
@@ -1882,4 +1890,53 @@ public class AssignmentManager implements ServerListener 
{
   MasterServices getMaster() {
     return master;
   }
+
+  /**
+   * Found the potentially problematic opened regions. There are three case:
+   * case 1. Master thought this region opened, but no regionserver reported 
it.
+   * case 2. Master thought this region opened on Server1, but regionserver 
reported Server2
+   * case 3. More than one regionservers reported opened this region
+   *
+   * @return the map of potentially problematic opened regions. Key is the 
region name. Value is
+   *         a pair of location in meta and the regionservers which reported 
opened this region.
+   */
+  public Map<String, Pair<ServerName, Set<ServerName>>> 
getProblematicRegions() {
+    Map<String, Set<ServerName>> reportedOnlineRegions = new HashMap<>();
+    synchronized (rsReports) {
+      for (Map.Entry<ServerName, Set<byte[]>> entry : rsReports.entrySet()) {
+        for (byte[] regionName : entry.getValue()) {
+          reportedOnlineRegions
+              .computeIfAbsent(RegionInfo.getRegionNameAsString(regionName), r 
-> new HashSet<>())
+              .add(entry.getKey());
+        }
+      }
+    }
+
+    Map<String, Pair<ServerName, Set<ServerName>>> problematicRegions = new 
HashMap<>();
+    List<RegionState> rits = regionStates.getRegionsStateInTransition();
+    for (RegionState regionState : regionStates.getRegionStates()) {
+      // Only consider the opened region and not in transition
+      if (!rits.contains(regionState) && regionState.isOpened()) {
+        String regionName = regionState.getRegion().getRegionNameAsString();
+        ServerName serverName = regionState.getServerName();
+        if (reportedOnlineRegions.containsKey(regionName)) {
+          Set<ServerName> reportedServers = 
reportedOnlineRegions.get(regionName);
+          if (reportedServers.contains(serverName)) {
+            if (reportedServers.size() > 1) {
+              // More than one regionserver reported opened this region
+              problematicRegions.put(regionName, new Pair<>(serverName, 
reportedServers));
+            }
+          } else {
+            // Master thought this region opened on Server1, but regionserver 
reported Server2
+            problematicRegions.put(regionName, new Pair<>(serverName, 
reportedServers));
+          }
+        } else {
+          // Master thought this region opened, but no regionserver reported 
it.
+          problematicRegions.put(regionName, new Pair<>(serverName, new 
HashSet<>()));
+        }
+      }
+    }
+
+    return problematicRegions;
+  }
 }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAMProblematicRegions.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAMProblematicRegions.java
new file mode 100644
index 0000000..d07e129
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAMProblematicRegions.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.assignment;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Pair;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category({ MasterTests.class, MediumTests.class })
+public class TestAMProblematicRegions extends TestAssignmentManagerBase {
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestAMProblematicRegions.class);
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestAMProblematicRegions.class);
+
+  @Test
+  public void testForMeta() throws Exception {
+    byte[] metaRegionNameAsBytes = 
RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName();
+    String metaRegionName = 
RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionNameAsString();
+    List<ServerName> serverNames = 
master.getServerManager().getOnlineServersList();
+    assertEquals(NSERVERS, serverNames.size());
+
+    Map<String, Pair<ServerName, Set<ServerName>>> problematicRegions = 
am.getProblematicRegions();
+
+    // Test for case1: Master thought this region opened, but no regionserver 
reported it.
+    assertTrue(problematicRegions.containsKey(metaRegionName));
+    Pair<ServerName, Set<ServerName>> pair = 
problematicRegions.get(metaRegionName);
+    ServerName locationInMeta = pair.getFirst();
+    Set<ServerName> reportedRegionServers = pair.getSecond();
+    assertTrue(serverNames.contains(locationInMeta));
+    assertEquals(0, reportedRegionServers.size());
+
+    // Reported right region location. Then not in problematic regions.
+    am.reportOnlineRegions(locationInMeta, 
Collections.singleton(metaRegionNameAsBytes));
+    problematicRegions = am.getProblematicRegions();
+    assertFalse(problematicRegions.containsKey(metaRegionName));
+  }
+
+  @Test
+  public void testForUserTable() throws Exception {
+    TableName tableName = TableName.valueOf("testForUserTable");
+    RegionInfo hri = createRegionInfo(tableName, 1);
+    String regionName = hri.getRegionNameAsString();
+    rsDispatcher.setMockRsExecutor(new GoodRsExecutor());
+    Future<byte[]> future = submitProcedure(am.createAssignProcedure(hri));
+    waitOnFuture(future);
+
+    List<ServerName> serverNames = 
master.getServerManager().getOnlineServersList();
+    assertEquals(NSERVERS, serverNames.size());
+
+    // Test for case1: Master thought this region opened, but no regionserver 
reported it.
+    Map<String, Pair<ServerName, Set<ServerName>>> problematicRegions = 
am.getProblematicRegions();
+    assertTrue(problematicRegions.containsKey(regionName));
+    Pair<ServerName, Set<ServerName>> pair = 
problematicRegions.get(regionName);
+    ServerName locationInMeta = pair.getFirst();
+    Set<ServerName> reportedRegionServers = pair.getSecond();
+    assertTrue(serverNames.contains(locationInMeta));
+    assertEquals(0, reportedRegionServers.size());
+
+    // Test for case2: Master thought this region opened on Server1, but 
regionserver reported
+    // Server2
+    final ServerName tempLocationInMeta = locationInMeta;
+    final ServerName anotherServer =
+        serverNames.stream().filter(s -> 
!s.equals(tempLocationInMeta)).findFirst().get();
+    am.reportOnlineRegions(anotherServer, 
Collections.singleton(hri.getRegionName()));
+    problematicRegions = am.getProblematicRegions();
+    assertTrue(problematicRegions.containsKey(regionName));
+    pair = problematicRegions.get(regionName);
+    locationInMeta = pair.getFirst();
+    reportedRegionServers = pair.getSecond();
+    assertEquals(1, reportedRegionServers.size());
+    assertFalse(reportedRegionServers.contains(locationInMeta));
+    assertTrue(reportedRegionServers.contains(anotherServer));
+
+    // Test for case3: More than one regionservers reported opened this region.
+    am.reportOnlineRegions(locationInMeta, 
Collections.singleton(hri.getRegionName()));
+    problematicRegions = am.getProblematicRegions();
+    assertTrue(problematicRegions.containsKey(regionName));
+    pair = problematicRegions.get(regionName);
+    locationInMeta = pair.getFirst();
+    reportedRegionServers = pair.getSecond();
+    assertEquals(2, reportedRegionServers.size());
+    assertTrue(reportedRegionServers.contains(locationInMeta));
+    assertTrue(reportedRegionServers.contains(anotherServer));
+
+    // Reported right region location. Then not in problematic regions.
+    am.reportOnlineRegions(anotherServer, Collections.EMPTY_SET);
+    problematicRegions = am.getProblematicRegions();
+    assertFalse(problematicRegions.containsKey(regionName));
+  }
+}
\ No newline at end of file
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java
index 86186e3..33aba1b 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java
@@ -18,154 +18,47 @@
 package org.apache.hadoop.hbase.master.assignment;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.net.SocketTimeoutException;
-import java.util.List;
-import java.util.NavigableMap;
-import java.util.Random;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.CallQueueTooBigException;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.RegionInfoBuilder;
 import org.apache.hadoop.hbase.client.RetriesExhaustedException;
 import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
-import org.apache.hadoop.hbase.ipc.CallTimeoutException;
-import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
-import org.apache.hadoop.hbase.master.MasterServices;
 import org.apache.hadoop.hbase.master.RegionState.State;
-import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
-import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
-import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
-import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher;
-import org.apache.hadoop.hbase.procedure2.Procedure;
-import org.apache.hadoop.hbase.procedure2.ProcedureMetrics;
 import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
-import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
 import org.apache.hadoop.hbase.procedure2.util.StringUtils;
-import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException;
-import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.testclassification.MasterTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.ipc.RemoteException;
-import org.junit.After;
-import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Ignore;
-import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.junit.rules.ExpectedException;
-import org.junit.rules.TestName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
-import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
-import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest;
-import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse;
-import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest;
-import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo;
-import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse;
-import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState;
-import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
-import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
-import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
-
 @Category({MasterTests.class, LargeTests.class})
-public class TestAssignmentManager {
+public class TestAssignmentManager extends TestAssignmentManagerBase {
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestAssignmentManager.class);
 
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
       HBaseClassTestRule.forClass(TestAssignmentManager.class);
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(TestAssignmentManager.class);
-
-  @Rule public TestName name = new TestName();
-  @Rule public final ExpectedException exception = ExpectedException.none();
-
-  private static final int PROC_NTHREADS = 64;
-  private static final int NREGIONS = 1 * 1000;
-  private static final int NSERVERS = Math.max(1, NREGIONS / 100);
-
-  private HBaseTestingUtility UTIL;
-  private MockRSProcedureDispatcher rsDispatcher;
-  private MockMasterServices master;
-  private AssignmentManager am;
-  private NavigableMap<ServerName, SortedSet<byte []>> regionsToRegionServers =
-      new ConcurrentSkipListMap<ServerName, SortedSet<byte []>>();
-  // Simple executor to run some simple tasks.
-  private ScheduledExecutorService executor;
-
-  private ProcedureMetrics assignProcMetrics;
-  private ProcedureMetrics unassignProcMetrics;
-
-  private long assignSubmittedCount = 0;
-  private long assignFailedCount = 0;
-  private long unassignSubmittedCount = 0;
-  private long unassignFailedCount = 0;
-
-  private void setupConfiguration(Configuration conf) throws Exception {
-    FSUtils.setRootDir(conf, UTIL.getDataTestDir());
-    conf.setBoolean(WALProcedureStore.USE_HSYNC_CONF_KEY, false);
-    conf.setInt(WALProcedureStore.SYNC_WAIT_MSEC_CONF_KEY, 10);
-    conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 
PROC_NTHREADS);
-    conf.setInt(RSProcedureDispatcher.RS_RPC_STARTUP_WAIT_TIME_CONF_KEY, 1000);
-    conf.setInt(AssignmentManager.ASSIGN_MAX_ATTEMPTS, 100); // Have many so 
we succeed eventually.
-  }
-
-  @Before
-  public void setUp() throws Exception {
-    UTIL = new HBaseTestingUtility();
-    this.executor = Executors.newSingleThreadScheduledExecutor();
-    setupConfiguration(UTIL.getConfiguration());
-    master = new MockMasterServices(UTIL.getConfiguration(), 
this.regionsToRegionServers);
-    rsDispatcher = new MockRSProcedureDispatcher(master);
-    master.start(NSERVERS, rsDispatcher);
-    am = master.getAssignmentManager();
-    assignProcMetrics = 
am.getAssignmentManagerMetrics().getAssignProcMetrics();
-    unassignProcMetrics = 
am.getAssignmentManagerMetrics().getUnassignProcMetrics();
-    setUpMeta();
-  }
-
-  private void setUpMeta() throws Exception {
-    rsDispatcher.setMockRsExecutor(new GoodRsExecutor());
-    am.assign(RegionInfoBuilder.FIRST_META_REGIONINFO);
-    am.wakeMetaLoadedEvent();
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    master.stop("tearDown");
-    this.executor.shutdownNow();
-  }
-
-  @Test (expected=NullPointerException.class)
+  @Test(expected = NullPointerException.class)
   public void testWaitServerReportEventWithNullServer() throws 
UnexpectedStateException {
     // Test what happens if we pass in null server. I'd expect it throws NPE.
-    if (this.am.waitServerReportEvent(null, null)) throw new 
UnexpectedStateException();
+    if (this.am.waitServerReportEvent(null, null)) {
+      throw new UnexpectedStateException();
+    }
   }
 
   @Test
@@ -471,484 +364,4 @@ public class TestAssignmentManager {
     assertEquals(unassignSubmittedCount + 1, 
unassignProcMetrics.getSubmittedCounter().getCount());
     assertEquals(unassignFailedCount, 
unassignProcMetrics.getFailedCounter().getCount());
   }
-
-
-  private Future<byte[]> submitProcedure(final Procedure<MasterProcedureEnv> 
proc) {
-    return 
ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), proc);
-  }
-
-  private byte[] waitOnFuture(final Future<byte[]> future) throws Exception {
-    try {
-      return future.get(5, TimeUnit.SECONDS);
-    } catch (ExecutionException e) {
-      LOG.info("ExecutionException", e);
-      Exception ee = (Exception)e.getCause();
-      if (ee instanceof InterruptedIOException) {
-        for (Procedure<?> p: 
this.master.getMasterProcedureExecutor().getProcedures()) {
-          LOG.info(p.toStringDetails());
-        }
-      }
-      throw (Exception)e.getCause();
-    }
-  }
-
-  // 
============================================================================================
-  //  Helpers
-  // 
============================================================================================
-  private void bulkSubmit(final AssignProcedure[] procs) throws Exception {
-    final Thread[] threads = new Thread[PROC_NTHREADS];
-    for (int i = 0; i < threads.length; ++i) {
-      final int threadId = i;
-      threads[i] = new Thread() {
-        @Override
-        public void run() {
-          TableName tableName = TableName.valueOf("table-" + threadId);
-          int n = (procs.length / threads.length);
-          int start = threadId * n;
-          int stop = start + n;
-          for (int j = start; j < stop; ++j) {
-            procs[j] = createAndSubmitAssign(tableName, j);
-          }
-        }
-      };
-      threads[i].start();
-    }
-    for (int i = 0; i < threads.length; ++i) {
-      threads[i].join();
-    }
-    for (int i = procs.length - 1; i >= 0 && procs[i] == null; --i) {
-      procs[i] = createAndSubmitAssign(TableName.valueOf("table-sync"), i);
-    }
-  }
-
-  private AssignProcedure createAndSubmitAssign(TableName tableName, int 
regionId) {
-    RegionInfo hri = createRegionInfo(tableName, regionId);
-    AssignProcedure proc = am.createAssignProcedure(hri);
-    master.getMasterProcedureExecutor().submitProcedure(proc);
-    return proc;
-  }
-
-  private RegionInfo createRegionInfo(final TableName tableName, final long 
regionId) {
-    return RegionInfoBuilder.newBuilder(tableName)
-        .setStartKey(Bytes.toBytes(regionId))
-        .setEndKey(Bytes.toBytes(regionId + 1))
-        .setSplit(false)
-        .setRegionId(0)
-        .build();
-  }
-
-  private void sendTransitionReport(final ServerName serverName,
-      final 
org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo 
regionInfo,
-      final TransitionCode state) throws IOException {
-    ReportRegionStateTransitionRequest.Builder req =
-      ReportRegionStateTransitionRequest.newBuilder();
-    req.setServer(ProtobufUtil.toServerName(serverName));
-    req.addTransition(RegionStateTransition.newBuilder()
-      .addRegionInfo(regionInfo)
-      .setTransitionCode(state)
-      .setOpenSeqNum(1)
-      .build());
-    am.reportRegionStateTransition(req.build());
-  }
-
-  private void doCrash(final ServerName serverName) {
-    this.am.submitServerCrash(serverName, false/*No WALs here*/);
-  }
-
-  private void doRestart(final ServerName serverName) {
-    try {
-      this.master.restartRegionServer(serverName);
-    } catch (IOException e) {
-      LOG.warn("Can not restart RS with new startcode");
-    }
-  }
-
-  private class NoopRsExecutor implements MockRSExecutor {
-    @Override
-    public ExecuteProceduresResponse sendRequest(ServerName server,
-        ExecuteProceduresRequest request) throws IOException {
-      if (request.getOpenRegionCount() > 0) {
-        for (OpenRegionRequest req : request.getOpenRegionList()) {
-          for (RegionOpenInfo openReq : req.getOpenInfoList()) {
-            execOpenRegion(server, openReq);
-          }
-        }
-      }
-      if (request.getCloseRegionCount() > 0) {
-        for (CloseRegionRequest req : request.getCloseRegionList()) {
-          execCloseRegion(server, req.getRegion().getValue().toByteArray());
-        }
-      }
-      return ExecuteProceduresResponse.newBuilder().build();
-    }
-
-    protected RegionOpeningState execOpenRegion(ServerName server, 
RegionOpenInfo regionInfo)
-        throws IOException {
-      return null;
-    }
-
-    protected CloseRegionResponse execCloseRegion(ServerName server, byte[] 
regionName)
-        throws IOException {
-      return null;
-    }
-  }
-
-  private class GoodRsExecutor extends NoopRsExecutor {
-    @Override
-    protected RegionOpeningState execOpenRegion(ServerName server, 
RegionOpenInfo openReq)
-        throws IOException {
-      sendTransitionReport(server, openReq.getRegion(), TransitionCode.OPENED);
-      // Concurrency?
-      // Now update the state of our cluster in regionsToRegionServers.
-      SortedSet<byte []> regions = regionsToRegionServers.get(server);
-      if (regions == null) {
-        regions = new ConcurrentSkipListSet<byte[]>(Bytes.BYTES_COMPARATOR);
-        regionsToRegionServers.put(server, regions);
-      }
-      RegionInfo hri = ProtobufUtil.toRegionInfo(openReq.getRegion());
-      if (regions.contains(hri.getRegionName())) {
-        throw new UnsupportedOperationException(hri.getRegionNameAsString());
-      }
-      regions.add(hri.getRegionName());
-      return RegionOpeningState.OPENED;
-    }
-
-    @Override
-    protected CloseRegionResponse execCloseRegion(ServerName server, byte[] 
regionName)
-        throws IOException {
-      RegionInfo hri = am.getRegionInfo(regionName);
-      sendTransitionReport(server, ProtobufUtil.toRegionInfo(hri), 
TransitionCode.CLOSED);
-      return CloseRegionResponse.newBuilder().setClosed(true).build();
-    }
-  }
-
-  private static class ServerNotYetRunningRsExecutor implements MockRSExecutor 
{
-    @Override
-    public ExecuteProceduresResponse sendRequest(ServerName server, 
ExecuteProceduresRequest req)
-        throws IOException {
-      throw new ServerNotRunningYetException("wait on server startup");
-    }
-  }
-
-  private static class FaultyRsExecutor implements MockRSExecutor {
-    private final IOException exception;
-
-    public FaultyRsExecutor(final IOException exception) {
-      this.exception = exception;
-    }
-
-    @Override
-    public ExecuteProceduresResponse sendRequest(ServerName server, 
ExecuteProceduresRequest req)
-        throws IOException {
-      throw exception;
-    }
-  }
-
-  private class SocketTimeoutRsExecutor extends GoodRsExecutor {
-    private final int maxSocketTimeoutRetries;
-    private final int maxServerRetries;
-
-    private ServerName lastServer;
-    private int sockTimeoutRetries;
-    private int serverRetries;
-
-    public SocketTimeoutRsExecutor(int maxSocketTimeoutRetries, int 
maxServerRetries) {
-      this.maxServerRetries = maxServerRetries;
-      this.maxSocketTimeoutRetries = maxSocketTimeoutRetries;
-    }
-
-    @Override
-    public ExecuteProceduresResponse sendRequest(ServerName server, 
ExecuteProceduresRequest req)
-        throws IOException {
-      // SocketTimeoutException should be a temporary problem
-      // unless the server will be declared dead.
-      if (sockTimeoutRetries++ < maxSocketTimeoutRetries) {
-        if (sockTimeoutRetries == 1) assertNotEquals(lastServer, server);
-        lastServer = server;
-        LOG.debug("Socket timeout for server=" + server + " retries=" + 
sockTimeoutRetries);
-        throw new SocketTimeoutException("simulate socket timeout");
-      } else if (serverRetries++ < maxServerRetries) {
-        LOG.info("Mark server=" + server + " as dead. serverRetries=" + 
serverRetries);
-        master.getServerManager().moveFromOnlineToDeadServers(server);
-        sockTimeoutRetries = 0;
-        throw new SocketTimeoutException("simulate socket timeout");
-      } else {
-        return super.sendRequest(server, req);
-      }
-    }
-  }
-
-  /**
-   * Takes open request and then returns nothing so acts like a RS that went 
zombie.
-   * No response (so proc is stuck/suspended on the Master and won't wake 
up.). We
-   * then send in a crash for this server after a few seconds; crash is 
supposed to
-   * take care of the suspended procedures.
-   */
-  private class HangThenRSCrashExecutor extends GoodRsExecutor {
-    private int invocations;
-
-    @Override
-    protected RegionOpeningState execOpenRegion(final ServerName server, 
RegionOpenInfo openReq)
-    throws IOException {
-      if (this.invocations++ > 0) {
-        // Return w/o problem the second time through here.
-        return super.execOpenRegion(server, openReq);
-      }
-      // The procedure on master will just hang forever because nothing comes 
back
-      // from the RS in this case.
-      LOG.info("Return null response from serverName=" + server + "; means 
STUCK...TODO timeout");
-      executor.schedule(new Runnable() {
-        @Override
-        public void run() {
-          LOG.info("Sending in CRASH of " + server);
-          doCrash(server);
-        }
-      }, 1, TimeUnit.SECONDS);
-      return null;
-    }
-  }
-
-  /**
-   * Takes open request and then returns nothing so acts like a RS that went 
zombie.
-   * No response (so proc is stuck/suspended on the Master and won't wake up.).
-   * Different with HangThenRSCrashExecutor,  HangThenRSCrashExecutor will 
create
-   * ServerCrashProcedure to handle the server crash. However, this 
HangThenRSRestartExecutor
-   * will restart RS directly, situation for RS crashed when SCP is not 
enabled.
-   */
-  private class HangThenRSRestartExecutor extends GoodRsExecutor {
-    private int invocations;
-
-    @Override
-    protected RegionOpeningState execOpenRegion(final ServerName server, 
RegionOpenInfo openReq)
-        throws IOException {
-      if (this.invocations++ > 0) {
-        // Return w/o problem the second time through here.
-        return super.execOpenRegion(server, openReq);
-      }
-      // The procedure on master will just hang forever because nothing comes 
back
-      // from the RS in this case.
-      LOG.info("Return null response from serverName=" + server + "; means 
STUCK...TODO timeout");
-      executor.schedule(new Runnable() {
-        @Override
-        public void run() {
-          LOG.info("Restarting RS of " + server);
-          doRestart(server);
-        }
-      }, 1, TimeUnit.SECONDS);
-      return null;
-    }
-  }
-
-  private class HangOnCloseThenRSCrashExecutor extends GoodRsExecutor {
-    public static final int TYPES_OF_FAILURE = 6;
-    private int invocations;
-
-    @Override
-    protected CloseRegionResponse execCloseRegion(ServerName server, byte[] 
regionName)
-        throws IOException {
-      switch (this.invocations++) {
-        case 0: throw new NotServingRegionException("Fake");
-        case 1:
-          executor.schedule(new Runnable() {
-            @Override
-            public void run() {
-              LOG.info("Sending in CRASH of " + server);
-              doCrash(server);
-            }
-          }, 1, TimeUnit.SECONDS);
-          throw new RegionServerAbortedException("Fake!");
-        case 2:
-          executor.schedule(new Runnable() {
-            @Override
-            public void run() {
-              LOG.info("Sending in CRASH of " + server);
-              doCrash(server);
-            }
-          }, 1, TimeUnit.SECONDS);
-          throw new RegionServerStoppedException("Fake!");
-        case 3: throw new ServerNotRunningYetException("Fake!");
-        case 4:
-          LOG.info("Returned null from serverName={}; means STUCK...TODO 
timeout", server);
-          executor.schedule(new Runnable() {
-            @Override
-            public void run() {
-              LOG.info("Sending in CRASH of " + server);
-              doCrash(server);
-            }
-          }, 1, TimeUnit.SECONDS);
-          return null;
-        default:
-          return super.execCloseRegion(server, regionName);
-      }
-    }
-  }
-
-  private class RandRsExecutor extends NoopRsExecutor {
-    private final Random rand = new Random();
-
-    @Override
-    public ExecuteProceduresResponse sendRequest(ServerName server, 
ExecuteProceduresRequest req)
-        throws IOException {
-      switch (rand.nextInt(5)) {
-        case 0: throw new ServerNotRunningYetException("wait on server 
startup");
-        case 1: throw new SocketTimeoutException("simulate socket timeout");
-        case 2: throw new RemoteException("java.io.IOException", "unexpected 
exception");
-        default:
-          // fall out
-      }
-      return super.sendRequest(server, req);
-    }
-
-    @Override
-    protected RegionOpeningState execOpenRegion(final ServerName server, 
RegionOpenInfo openReq)
-        throws IOException {
-      switch (rand.nextInt(6)) {
-        case 0:
-          LOG.info("Return OPENED response");
-          sendTransitionReport(server, openReq.getRegion(), 
TransitionCode.OPENED);
-          return OpenRegionResponse.RegionOpeningState.OPENED;
-        case 1:
-          LOG.info("Return transition report that OPENED/ALREADY_OPENED 
response");
-          sendTransitionReport(server, openReq.getRegion(), 
TransitionCode.OPENED);
-          return OpenRegionResponse.RegionOpeningState.ALREADY_OPENED;
-        case 2:
-          LOG.info("Return transition report that FAILED_OPEN/FAILED_OPENING 
response");
-          sendTransitionReport(server, openReq.getRegion(), 
TransitionCode.FAILED_OPEN);
-          return OpenRegionResponse.RegionOpeningState.FAILED_OPENING;
-        default:
-          // fall out
-      }
-      // The procedure on master will just hang forever because nothing comes 
back
-      // from the RS in this case.
-      LOG.info("Return null as response; means proc stuck so we send in a 
crash report after a few seconds...");
-      executor.schedule(new Runnable() {
-        @Override
-        public void run() {
-          LOG.info("Delayed CRASHING of " + server);
-          doCrash(server);
-        }
-      }, 5, TimeUnit.SECONDS);
-      return null;
-    }
-
-    @Override
-    protected CloseRegionResponse execCloseRegion(ServerName server, byte[] 
regionName)
-        throws IOException {
-      CloseRegionResponse.Builder resp = CloseRegionResponse.newBuilder();
-      boolean closed = rand.nextBoolean();
-      if (closed) {
-        RegionInfo hri = am.getRegionInfo(regionName);
-        sendTransitionReport(server, ProtobufUtil.toRegionInfo(hri), 
TransitionCode.CLOSED);
-      }
-      resp.setClosed(closed);
-      return resp.build();
-    }
-  }
-
-  protected class CallQueueTooBigOnceRsExecutor extends GoodRsExecutor {
-
-    private boolean invoked = false;
-
-    private ServerName lastServer;
-
-    @Override
-    public ExecuteProceduresResponse sendRequest(ServerName server, 
ExecuteProceduresRequest req)
-        throws IOException {
-      if (!invoked) {
-        lastServer = server;
-        invoked = true;
-        throw new CallQueueTooBigException("simulate queue full");
-      }
-      // better select another server since the server is over loaded, but 
anyway, it is fine to
-      // still select the same server since it is not dead yet...
-      if (lastServer.equals(server)) {
-        LOG.warn("We still select the same server, which is not good.");
-      }
-      return super.sendRequest(server, req);
-    }
-  }
-
-  protected class TimeoutThenCallQueueTooBigRsExecutor extends GoodRsExecutor {
-
-    private final int queueFullTimes;
-
-    private int retries;
-
-    private ServerName lastServer;
-
-    public TimeoutThenCallQueueTooBigRsExecutor(int queueFullTimes) {
-      this.queueFullTimes = queueFullTimes;
-    }
-
-    @Override
-    public ExecuteProceduresResponse sendRequest(ServerName server, 
ExecuteProceduresRequest req)
-        throws IOException {
-      retries++;
-      if (retries == 1) {
-        lastServer = server;
-        throw new CallTimeoutException("simulate call timeout");
-      }
-      // should always retry on the same server
-      assertEquals(lastServer, server);
-      if (retries < queueFullTimes) {
-        throw new CallQueueTooBigException("simulate queue full");
-      }
-      return super.sendRequest(server, req);
-    }
-  }
-
-  private interface MockRSExecutor {
-    ExecuteProceduresResponse sendRequest(ServerName server, 
ExecuteProceduresRequest req)
-        throws IOException;
-  }
-
-  private class MockRSProcedureDispatcher extends RSProcedureDispatcher {
-    private MockRSExecutor mockRsExec;
-
-    public MockRSProcedureDispatcher(final MasterServices master) {
-      super(master);
-    }
-
-    public void setMockRsExecutor(final MockRSExecutor mockRsExec) {
-      this.mockRsExec = mockRsExec;
-    }
-
-    @Override
-    protected void remoteDispatch(ServerName serverName, Set<RemoteProcedure> 
remoteProcedures) {
-      submitTask(new MockRemoteCall(serverName, remoteProcedures));
-    }
-
-    private class MockRemoteCall extends ExecuteProceduresRemoteCall {
-      public MockRemoteCall(final ServerName serverName, final 
Set<RemoteProcedure> operations) {
-        super(serverName, operations);
-      }
-
-      @Override
-      public void dispatchOpenRequests(MasterProcedureEnv env,
-          List<RegionOpenOperation> operations) {
-        request.addOpenRegion(buildOpenRegionRequest(env, getServerName(), 
operations));
-      }
-
-      @Override
-      public void dispatchCloseRequests(MasterProcedureEnv env,
-          List<RegionCloseOperation> operations) {
-        for (RegionCloseOperation op : operations) {
-          request.addCloseRegion(op.buildCloseRegionRequest(getServerName()));
-        }
-      }
-
-      @Override
-      protected ExecuteProceduresResponse sendRequest(final ServerName 
serverName,
-          final ExecuteProceduresRequest request) throws IOException {
-        return mockRsExec.sendRequest(serverName, request);
-      }
-    }
-  }
-
-  private void collectAssignmentManagerMetrics() {
-    assignSubmittedCount = assignProcMetrics.getSubmittedCounter().getCount();
-    assignFailedCount = assignProcMetrics.getFailedCounter().getCount();
-    unassignSubmittedCount = 
unassignProcMetrics.getSubmittedCounter().getCount();
-    unassignFailedCount = unassignProcMetrics.getFailedCounter().getCount();
-  }
 }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManagerBase.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManagerBase.java
new file mode 100644
index 0000000..17ea05a
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManagerBase.java
@@ -0,0 +1,629 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.assignment;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.net.SocketTimeoutException;
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.Random;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CallQueueTooBigException;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.ipc.CallTimeoutException;
+import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
+import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
+import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureMetrics;
+import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
+import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException;
+import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.ipc.RemoteException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos;
+
+/**
+ * Base class for AM test.
+ */
+public class TestAssignmentManagerBase {
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestAssignmentManagerBase.class);
+
+  @Rule
+  public TestName name = new TestName();
+  @Rule
+  public final ExpectedException exception = ExpectedException.none();
+
+  private static final int PROC_NTHREADS = 64;
+  protected static final int NREGIONS = 1 * 1000;
+  protected static final int NSERVERS = Math.max(1, NREGIONS / 100);
+
+  protected HBaseTestingUtility UTIL;
+  protected MockRSProcedureDispatcher rsDispatcher;
+  protected MockMasterServices master;
+  protected AssignmentManager am;
+  protected NavigableMap<ServerName, SortedSet<byte[]>> regionsToRegionServers 
=
+      new ConcurrentSkipListMap<>();
+  // Simple executor to run some simple tasks.
+  protected ScheduledExecutorService executor;
+
+  protected ProcedureMetrics assignProcMetrics;
+  protected ProcedureMetrics unassignProcMetrics;
+
+  protected long assignSubmittedCount = 0;
+  protected long assignFailedCount = 0;
+  protected long unassignSubmittedCount = 0;
+  protected long unassignFailedCount = 0;
+
+  protected void setupConfiguration(Configuration conf) throws Exception {
+    FSUtils.setRootDir(conf, UTIL.getDataTestDir());
+    conf.setBoolean(WALProcedureStore.USE_HSYNC_CONF_KEY, false);
+    conf.setInt(WALProcedureStore.SYNC_WAIT_MSEC_CONF_KEY, 10);
+    conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 
PROC_NTHREADS);
+    conf.setInt(RSProcedureDispatcher.RS_RPC_STARTUP_WAIT_TIME_CONF_KEY, 1000);
+    conf.setInt(AssignmentManager.ASSIGN_MAX_ATTEMPTS, 100); // Have many so 
we succeed eventually.
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    UTIL = new HBaseTestingUtility();
+    this.executor = Executors.newSingleThreadScheduledExecutor();
+    setupConfiguration(UTIL.getConfiguration());
+    master = new MockMasterServices(UTIL.getConfiguration(), 
this.regionsToRegionServers);
+    rsDispatcher = new MockRSProcedureDispatcher(master);
+    master.start(NSERVERS, rsDispatcher);
+    am = master.getAssignmentManager();
+    assignProcMetrics = 
am.getAssignmentManagerMetrics().getAssignProcMetrics();
+    unassignProcMetrics = 
am.getAssignmentManagerMetrics().getUnassignProcMetrics();
+    setUpMeta();
+  }
+
+  private void setUpMeta() throws Exception {
+    rsDispatcher.setMockRsExecutor(new GoodRsExecutor());
+    am.assign(RegionInfoBuilder.FIRST_META_REGIONINFO);
+    am.wakeMetaLoadedEvent();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    master.stop("tearDown");
+    this.executor.shutdownNow();
+  }
+
+  protected Future<byte[]> submitProcedure(final Procedure<MasterProcedureEnv> 
proc) {
+    return 
ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), proc);
+  }
+
+  protected byte[] waitOnFuture(final Future<byte[]> future) throws Exception {
+    try {
+      return future.get(5, TimeUnit.SECONDS);
+    } catch (ExecutionException e) {
+      LOG.info("ExecutionException", e);
+      Exception ee = (Exception) e.getCause();
+      if (ee instanceof InterruptedIOException) {
+        for (Procedure<?> p : 
this.master.getMasterProcedureExecutor().getProcedures()) {
+          LOG.info(p.toStringDetails());
+        }
+      }
+      throw (Exception) e.getCause();
+    }
+  }
+
+  // 
============================================================================================
+  //  Helpers
+  // 
============================================================================================
+  protected void bulkSubmit(final AssignProcedure[] procs) throws Exception {
+    final Thread[] threads = new Thread[PROC_NTHREADS];
+    for (int i = 0; i < threads.length; ++i) {
+      final int threadId = i;
+      threads[i] = new Thread() {
+        @Override
+        public void run() {
+          TableName tableName = TableName.valueOf("table-" + threadId);
+          int n = (procs.length / threads.length);
+          int start = threadId * n;
+          int stop = start + n;
+          for (int j = start; j < stop; ++j) {
+            procs[j] = createAndSubmitAssign(tableName, j);
+          }
+        }
+      };
+      threads[i].start();
+    }
+    for (int i = 0; i < threads.length; ++i) {
+      threads[i].join();
+    }
+    for (int i = procs.length - 1; i >= 0 && procs[i] == null; --i) {
+      procs[i] = createAndSubmitAssign(TableName.valueOf("table-sync"), i);
+    }
+  }
+
+  private AssignProcedure createAndSubmitAssign(TableName tableName, int 
regionId) {
+    RegionInfo hri = createRegionInfo(tableName, regionId);
+    AssignProcedure proc = am.createAssignProcedure(hri);
+    master.getMasterProcedureExecutor().submitProcedure(proc);
+    return proc;
+  }
+
+  protected RegionInfo createRegionInfo(final TableName tableName, final long 
regionId) {
+    return 
RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes(regionId))
+        .setEndKey(Bytes.toBytes(regionId + 
1)).setSplit(false).setRegionId(0).build();
+  }
+
+  private void sendTransitionReport(final ServerName serverName,
+      final 
org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo 
regionInfo,
+      final RegionServerStatusProtos.RegionStateTransition.TransitionCode 
state)
+      throws IOException {
+    RegionServerStatusProtos.ReportRegionStateTransitionRequest.Builder req =
+        
RegionServerStatusProtos.ReportRegionStateTransitionRequest.newBuilder();
+    req.setServer(ProtobufUtil.toServerName(serverName));
+    req.addTransition(
+        
RegionServerStatusProtos.RegionStateTransition.newBuilder().addRegionInfo(regionInfo)
+            .setTransitionCode(state).setOpenSeqNum(1).build());
+    am.reportRegionStateTransition(req.build());
+  }
+
+  private void doCrash(final ServerName serverName) {
+    this.am.submitServerCrash(serverName, false/*No WALs here*/);
+  }
+
+  private void doRestart(final ServerName serverName) {
+    try {
+      this.master.restartRegionServer(serverName);
+    } catch (IOException e) {
+      LOG.warn("Can not restart RS with new startcode");
+    }
+  }
+
+  private class NoopRsExecutor implements MockRSExecutor {
+    @Override
+    public AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server,
+        AdminProtos.ExecuteProceduresRequest request) throws IOException {
+      if (request.getOpenRegionCount() > 0) {
+        for (AdminProtos.OpenRegionRequest req : request.getOpenRegionList()) {
+          for (AdminProtos.OpenRegionRequest.RegionOpenInfo openReq : 
req.getOpenInfoList()) {
+            execOpenRegion(server, openReq);
+          }
+        }
+      }
+      if (request.getCloseRegionCount() > 0) {
+        for (AdminProtos.CloseRegionRequest req : 
request.getCloseRegionList()) {
+          execCloseRegion(server, req.getRegion().getValue().toByteArray());
+        }
+      }
+      return AdminProtos.ExecuteProceduresResponse.newBuilder().build();
+    }
+
+    protected AdminProtos.OpenRegionResponse.RegionOpeningState 
execOpenRegion(ServerName server,
+        AdminProtos.OpenRegionRequest.RegionOpenInfo regionInfo) throws 
IOException {
+      return null;
+    }
+
+    protected AdminProtos.CloseRegionResponse execCloseRegion(ServerName 
server, byte[] regionName)
+        throws IOException {
+      return null;
+    }
+  }
+
+  protected class GoodRsExecutor extends NoopRsExecutor {
+    @Override
+    protected AdminProtos.OpenRegionResponse.RegionOpeningState 
execOpenRegion(ServerName server,
+        AdminProtos.OpenRegionRequest.RegionOpenInfo openReq) throws 
IOException {
+      sendTransitionReport(server, openReq.getRegion(),
+          
RegionServerStatusProtos.RegionStateTransition.TransitionCode.OPENED);
+      // Concurrency?
+      // Now update the state of our cluster in regionsToRegionServers.
+      SortedSet<byte[]> regions = regionsToRegionServers.get(server);
+      if (regions == null) {
+        regions = new ConcurrentSkipListSet<byte[]>(Bytes.BYTES_COMPARATOR);
+        regionsToRegionServers.put(server, regions);
+      }
+      RegionInfo hri = ProtobufUtil.toRegionInfo(openReq.getRegion());
+      if (regions.contains(hri.getRegionName())) {
+        throw new UnsupportedOperationException(hri.getRegionNameAsString());
+      }
+      regions.add(hri.getRegionName());
+      return AdminProtos.OpenRegionResponse.RegionOpeningState.OPENED;
+    }
+
+    @Override
+    protected AdminProtos.CloseRegionResponse execCloseRegion(ServerName 
server, byte[] regionName)
+        throws IOException {
+      RegionInfo hri = am.getRegionInfo(regionName);
+      sendTransitionReport(server, ProtobufUtil.toRegionInfo(hri),
+          
RegionServerStatusProtos.RegionStateTransition.TransitionCode.CLOSED);
+      return 
AdminProtos.CloseRegionResponse.newBuilder().setClosed(true).build();
+    }
+  }
+
+  protected static class ServerNotYetRunningRsExecutor implements 
MockRSExecutor {
+    @Override
+    public AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server,
+        AdminProtos.ExecuteProceduresRequest req) throws IOException {
+      throw new ServerNotRunningYetException("wait on server startup");
+    }
+  }
+
+  protected static class FaultyRsExecutor implements MockRSExecutor {
+    private final IOException exception;
+
+    public FaultyRsExecutor(final IOException exception) {
+      this.exception = exception;
+    }
+
+    @Override
+    public AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server,
+        AdminProtos.ExecuteProceduresRequest req) throws IOException {
+      throw exception;
+    }
+  }
+
+  protected class SocketTimeoutRsExecutor extends GoodRsExecutor {
+    private final int maxSocketTimeoutRetries;
+    private final int maxServerRetries;
+
+    private ServerName lastServer;
+    private int sockTimeoutRetries;
+    private int serverRetries;
+
+    public SocketTimeoutRsExecutor(int maxSocketTimeoutRetries, int 
maxServerRetries) {
+      this.maxServerRetries = maxServerRetries;
+      this.maxSocketTimeoutRetries = maxSocketTimeoutRetries;
+    }
+
+    @Override
+    public AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server,
+        AdminProtos.ExecuteProceduresRequest req) throws IOException {
+      // SocketTimeoutException should be a temporary problem
+      // unless the server will be declared dead.
+      if (sockTimeoutRetries++ < maxSocketTimeoutRetries) {
+        if (sockTimeoutRetries == 1) assertNotEquals(lastServer, server);
+        lastServer = server;
+        LOG.debug("Socket timeout for server=" + server + " retries=" + 
sockTimeoutRetries);
+        throw new SocketTimeoutException("simulate socket timeout");
+      } else if (serverRetries++ < maxServerRetries) {
+        LOG.info("Mark server=" + server + " as dead. serverRetries=" + 
serverRetries);
+        master.getServerManager().moveFromOnlineToDeadServers(server);
+        sockTimeoutRetries = 0;
+        throw new SocketTimeoutException("simulate socket timeout");
+      } else {
+        return super.sendRequest(server, req);
+      }
+    }
+  }
+
+  /**
+   * Takes open request and then returns nothing so acts like a RS that went 
zombie.
+   * No response (so proc is stuck/suspended on the Master and won't wake 
up.). We
+   * then send in a crash for this server after a few seconds; crash is 
supposed to
+   * take care of the suspended procedures.
+   */
+  protected class HangThenRSCrashExecutor extends GoodRsExecutor {
+    private int invocations;
+
+    @Override
+    protected AdminProtos.OpenRegionResponse.RegionOpeningState execOpenRegion(
+        final ServerName server, AdminProtos.OpenRegionRequest.RegionOpenInfo 
openReq)
+        throws IOException {
+      if (this.invocations++ > 0) {
+        // Return w/o problem the second time through here.
+        return super.execOpenRegion(server, openReq);
+      }
+      // The procedure on master will just hang forever because nothing comes 
back
+      // from the RS in this case.
+      LOG.info("Return null response from serverName=" + server + "; means 
STUCK...TODO timeout");
+      executor.schedule(new Runnable() {
+        @Override
+        public void run() {
+          LOG.info("Sending in CRASH of " + server);
+          doCrash(server);
+        }
+      }, 1, TimeUnit.SECONDS);
+      return null;
+    }
+  }
+
+  /**
+   * Takes open request and then returns nothing so acts like a RS that went 
zombie.
+   * No response (so proc is stuck/suspended on the Master and won't wake up.).
+   * Different with HangThenRSCrashExecutor,  HangThenRSCrashExecutor will 
create
+   * ServerCrashProcedure to handle the server crash. However, this 
HangThenRSRestartExecutor
+   * will restart RS directly, situation for RS crashed when SCP is not 
enabled.
+   */
+  protected class HangThenRSRestartExecutor extends GoodRsExecutor {
+    private int invocations;
+
+    @Override
+    protected AdminProtos.OpenRegionResponse.RegionOpeningState execOpenRegion(
+        final ServerName server, AdminProtos.OpenRegionRequest.RegionOpenInfo 
openReq)
+        throws IOException {
+      if (this.invocations++ > 0) {
+        // Return w/o problem the second time through here.
+        return super.execOpenRegion(server, openReq);
+      }
+      // The procedure on master will just hang forever because nothing comes 
back
+      // from the RS in this case.
+      LOG.info("Return null response from serverName=" + server + "; means 
STUCK...TODO timeout");
+      executor.schedule(new Runnable() {
+        @Override
+        public void run() {
+          LOG.info("Restarting RS of " + server);
+          doRestart(server);
+        }
+      }, 1, TimeUnit.SECONDS);
+      return null;
+    }
+  }
+
+  protected class HangOnCloseThenRSCrashExecutor extends GoodRsExecutor {
+    public static final int TYPES_OF_FAILURE = 6;
+    private int invocations;
+
+    @Override
+    protected AdminProtos.CloseRegionResponse execCloseRegion(ServerName 
server, byte[] regionName)
+        throws IOException {
+      switch (this.invocations++) {
+        case 0:
+          throw new NotServingRegionException("Fake");
+        case 1:
+          executor.schedule(new Runnable() {
+            @Override
+            public void run() {
+              LOG.info("Sending in CRASH of " + server);
+              doCrash(server);
+            }
+          }, 1, TimeUnit.SECONDS);
+          throw new RegionServerAbortedException("Fake!");
+        case 2:
+          executor.schedule(new Runnable() {
+            @Override
+            public void run() {
+              LOG.info("Sending in CRASH of " + server);
+              doCrash(server);
+            }
+          }, 1, TimeUnit.SECONDS);
+          throw new RegionServerStoppedException("Fake!");
+        case 3:
+          throw new ServerNotRunningYetException("Fake!");
+        case 4:
+          LOG.info("Returned null from serverName={}; means STUCK...TODO 
timeout", server);
+          executor.schedule(new Runnable() {
+            @Override
+            public void run() {
+              LOG.info("Sending in CRASH of " + server);
+              doCrash(server);
+            }
+          }, 1, TimeUnit.SECONDS);
+          return null;
+        default:
+          return super.execCloseRegion(server, regionName);
+      }
+    }
+  }
+
+  protected class RandRsExecutor extends NoopRsExecutor {
+    private final Random rand = new Random();
+
+    @Override
+    public AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server,
+        AdminProtos.ExecuteProceduresRequest req) throws IOException {
+      switch (rand.nextInt(5)) {
+        case 0:
+          throw new ServerNotRunningYetException("wait on server startup");
+        case 1:
+          throw new SocketTimeoutException("simulate socket timeout");
+        case 2:
+          throw new RemoteException("java.io.IOException", "unexpected 
exception");
+        default:
+          // fall out
+      }
+      return super.sendRequest(server, req);
+    }
+
+    @Override
+    protected AdminProtos.OpenRegionResponse.RegionOpeningState execOpenRegion(
+        final ServerName server, AdminProtos.OpenRegionRequest.RegionOpenInfo 
openReq)
+        throws IOException {
+      switch (rand.nextInt(6)) {
+        case 0:
+          LOG.info("Return OPENED response");
+          sendTransitionReport(server, openReq.getRegion(),
+              
RegionServerStatusProtos.RegionStateTransition.TransitionCode.OPENED);
+          return AdminProtos.OpenRegionResponse.RegionOpeningState.OPENED;
+        case 1:
+          LOG.info("Return transition report that OPENED/ALREADY_OPENED 
response");
+          sendTransitionReport(server, openReq.getRegion(),
+              
RegionServerStatusProtos.RegionStateTransition.TransitionCode.OPENED);
+          return 
AdminProtos.OpenRegionResponse.RegionOpeningState.ALREADY_OPENED;
+        case 2:
+          LOG.info("Return transition report that FAILED_OPEN/FAILED_OPENING 
response");
+          sendTransitionReport(server, openReq.getRegion(),
+              
RegionServerStatusProtos.RegionStateTransition.TransitionCode.FAILED_OPEN);
+          return 
AdminProtos.OpenRegionResponse.RegionOpeningState.FAILED_OPENING;
+        default:
+          // fall out
+      }
+      // The procedure on master will just hang forever because nothing comes 
back
+      // from the RS in this case.
+      LOG.info(
+          "Return null as response; means proc stuck so we send in a crash 
report after a few seconds...");
+      executor.schedule(new Runnable() {
+        @Override
+        public void run() {
+          LOG.info("Delayed CRASHING of " + server);
+          doCrash(server);
+        }
+      }, 5, TimeUnit.SECONDS);
+      return null;
+    }
+
+    @Override
+    protected AdminProtos.CloseRegionResponse execCloseRegion(ServerName 
server, byte[] regionName)
+        throws IOException {
+      AdminProtos.CloseRegionResponse.Builder resp = 
AdminProtos.CloseRegionResponse.newBuilder();
+      boolean closed = rand.nextBoolean();
+      if (closed) {
+        RegionInfo hri = am.getRegionInfo(regionName);
+        sendTransitionReport(server, ProtobufUtil.toRegionInfo(hri),
+            
RegionServerStatusProtos.RegionStateTransition.TransitionCode.CLOSED);
+      }
+      resp.setClosed(closed);
+      return resp.build();
+    }
+  }
+
+  protected class CallQueueTooBigOnceRsExecutor extends GoodRsExecutor {
+
+    private boolean invoked = false;
+
+    private ServerName lastServer;
+
+    @Override
+    public AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server,
+        AdminProtos.ExecuteProceduresRequest req) throws IOException {
+      if (!invoked) {
+        lastServer = server;
+        invoked = true;
+        throw new CallQueueTooBigException("simulate queue full");
+      }
+      // better select another server since the server is over loaded, but 
anyway, it is fine to
+      // still select the same server since it is not dead yet...
+      if (lastServer.equals(server)) {
+        LOG.warn("We still select the same server, which is not good.");
+      }
+      return super.sendRequest(server, req);
+    }
+  }
+
+  protected class TimeoutThenCallQueueTooBigRsExecutor extends GoodRsExecutor {
+
+    private final int queueFullTimes;
+
+    private int retries;
+
+    private ServerName lastServer;
+
+    public TimeoutThenCallQueueTooBigRsExecutor(int queueFullTimes) {
+      this.queueFullTimes = queueFullTimes;
+    }
+
+    @Override
+    public AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server,
+        AdminProtos.ExecuteProceduresRequest req) throws IOException {
+      retries++;
+      if (retries == 1) {
+        lastServer = server;
+        throw new CallTimeoutException("simulate call timeout");
+      }
+      // should always retry on the same server
+      assertEquals(lastServer, server);
+      if (retries < queueFullTimes) {
+        throw new CallQueueTooBigException("simulate queue full");
+      }
+      return super.sendRequest(server, req);
+    }
+  }
+
+  protected interface MockRSExecutor {
+    AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server,
+        AdminProtos.ExecuteProceduresRequest req) throws IOException;
+  }
+
+  protected class MockRSProcedureDispatcher extends RSProcedureDispatcher {
+    private MockRSExecutor mockRsExec;
+
+    public MockRSProcedureDispatcher(final MasterServices master) {
+      super(master);
+    }
+
+    public void setMockRsExecutor(final MockRSExecutor mockRsExec) {
+      this.mockRsExec = mockRsExec;
+    }
+
+    @Override
+    protected void remoteDispatch(ServerName serverName, Set<RemoteProcedure> 
remoteProcedures) {
+      submitTask(new MockRemoteCall(serverName, remoteProcedures));
+    }
+
+    private class MockRemoteCall extends ExecuteProceduresRemoteCall {
+      public MockRemoteCall(final ServerName serverName, final 
Set<RemoteProcedure> operations) {
+        super(serverName, operations);
+      }
+
+      @Override
+      public void dispatchOpenRequests(MasterProcedureEnv env,
+          List<RegionOpenOperation> operations) {
+        request.addOpenRegion(buildOpenRegionRequest(env, getServerName(), 
operations));
+      }
+
+      @Override
+      public void dispatchCloseRequests(MasterProcedureEnv env,
+          List<RegionCloseOperation> operations) {
+        for (RegionCloseOperation op : operations) {
+          request.addCloseRegion(op.buildCloseRegionRequest(getServerName()));
+        }
+      }
+
+      @Override
+      protected AdminProtos.ExecuteProceduresResponse sendRequest(final 
ServerName serverName,
+          final AdminProtos.ExecuteProceduresRequest request) throws 
IOException {
+        return mockRsExec.sendRequest(serverName, request);
+      }
+    }
+  }
+
+  protected void collectAssignmentManagerMetrics() {
+    assignSubmittedCount = assignProcMetrics.getSubmittedCounter().getCount();
+    assignFailedCount = assignProcMetrics.getFailedCounter().getCount();
+    unassignSubmittedCount = 
unassignProcMetrics.getSubmittedCounter().getCount();
+    unassignFailedCount = unassignProcMetrics.getFailedCounter().getCount();
+  }
+}

Reply via email to