This is an automated email from the ASF dual-hosted git repository.
stack pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 727ebbd HBASE-22527 [hbck2] Add a master web ui to show the
problematic regions
727ebbd is described below
commit 727ebbddac811076b336322f4e01f16d20a05641
Author: Guanghao Zhang <[email protected]>
AuthorDate: Thu Jul 18 09:41:11 2019 +0800
HBASE-22527 [hbck2] Add a master web ui to show the problematic regions
---
.../tmpl/master/AssignmentManagerStatusTmpl.jamon | 103 +++-
.../hbase/master/assignment/AssignmentManager.java | 57 ++
.../assignment/TestAMProblematicRegions.java | 127 +++++
.../master/assignment/TestAssignmentManager.java | 599 +-------------------
.../assignment/TestAssignmentManagerBase.java | 629 +++++++++++++++++++++
5 files changed, 911 insertions(+), 604 deletions(-)
diff --git
a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/AssignmentManagerStatusTmpl.jamon
b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/AssignmentManagerStatusTmpl.jamon
index 9f31483..1d8fa70 100644
---
a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/AssignmentManagerStatusTmpl.jamon
+++
b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/AssignmentManagerStatusTmpl.jamon
@@ -17,27 +17,108 @@ See the License for the specific language governing
permissions and
limitations under the License.
</%doc>
<%import>
-org.apache.hadoop.hbase.master.assignment.AssignmentManager;
-org.apache.hadoop.hbase.master.assignment.AssignmentManager.RegionInTransitionStat;
-org.apache.hadoop.hbase.master.assignment.RegionStates.RegionFailedOpen;
-org.apache.hadoop.hbase.master.RegionState;
+java.util.Map;
+java.util.Set;
+java.util.SortedSet;
+java.util.concurrent.atomic.AtomicInteger;
+java.util.stream.Collectors;
org.apache.hadoop.conf.Configuration;
org.apache.hadoop.hbase.HBaseConfiguration;
org.apache.hadoop.hbase.HConstants;
+org.apache.hadoop.hbase.ServerName;
+org.apache.hadoop.hbase.client.RegionInfo;
org.apache.hadoop.hbase.client.RegionInfoDisplay;
-java.util.HashSet;
-java.util.SortedSet;
-java.util.Map;
-java.util.concurrent.atomic.AtomicInteger;
+org.apache.hadoop.hbase.master.RegionState;
+org.apache.hadoop.hbase.master.assignment.AssignmentManager;
+org.apache.hadoop.hbase.master.assignment.AssignmentManager.RegionInTransitionStat;
+org.apache.hadoop.hbase.master.assignment.RegionStates.RegionFailedOpen;
+org.apache.hadoop.hbase.util.Pair;
</%import>
<%args>
AssignmentManager assignmentManager;
int limit = 100;
</%args>
-<%java SortedSet<RegionState> rit = assignmentManager
- .getRegionStates().getRegionsInTransitionOrderedByTimestamp();
-%>
+<%java>
+SortedSet<RegionState> rit = assignmentManager.getRegionStates()
+ .getRegionsInTransitionOrderedByTimestamp();
+Map<String, Pair<ServerName, Set<ServerName>>> problematicRegions =
assignmentManager
+ .getProblematicRegions();
+</%java>
+
+<%if !problematicRegions.isEmpty() %>
+<%java>
+int totalSize = problematicRegions.size();
+int sizePerPage = Math.min(10, totalSize);
+int numOfPages = (int) Math.ceil(totalSize * 1.0 / sizePerPage);
+</%java>
+ <section>
+ <h2><a name="problem-regions">Problematic Regions</a></h2>
+ <p>
+ <span>
+ <% problematicRegions.size() %> problematic region(s). There are
three case: 1. Master
+ thought this region opened, but no regionserver reported it. 2.
Master thought this
+ region opened on Server1, but regionserver reported Server2. 3.
More than one
+ regionservers reported opened this region. Notice: the reported
online regionservers
+ may be not right when there are regions in transition. Please
check them in
+ regionserver's web UI.
+ </span>
+ </p>
+ <div class="tabbable">
+ <div class="tab-content">
+ <%java int recordItr = 0; %>
+ <%for Map.Entry<String, Pair<ServerName, Set<ServerName>>> entry :
problematicRegions.entrySet() %>
+ <%if (recordItr % sizePerPage) == 0 %>
+ <%if recordItr == 0 %>
+ <div class="tab-pane active" id="tab_prs<% (recordItr /
sizePerPage) + 1 %>">
+ <%else>
+ <div class="tab-pane" id="tab_prs<% (recordItr /
sizePerPage) + 1 %>">
+ </%if>
+ <table class="table table-striped" style="margin-bottom:0px;">
+ <tr>
+ <th>Region</th>
+ <th>Location in META</th>
+ <th>Reported Online Region Servers</th>
+ </tr>
+ </%if>
+
+ <tr>
+ <td><% entry.getKey() %></td>
+ <td><% entry.getValue().getFirst() %></td>
+ <td><%
entry.getValue().getSecond().stream().map(ServerName::getServerName)
+ .collect(Collectors.joining(", ")) %></td>
+ </tr>
+ <%java recordItr++; %>
+ <%if (recordItr % sizePerPage) == 0 %>
+ </table>
+ </div>
+ </%if>
+ </%for>
+
+ <%if (recordItr % sizePerPage) != 0 %>
+ <%for ; (recordItr % sizePerPage) != 0 ; recordItr++ %>
+ <tr><td colspan="3" style="height:61px"></td></tr>
+ </%for>
+ </table>
+ </div>
+ </%if>
+
+ </div>
+ <nav>
+ <ul class="nav nav-pills pagination">
+ <%for int i = 1 ; i <= numOfPages; i++ %>
+ <%if i == 1 %>
+ <li class="active">
+ <%else>
+ <li>
+ </%if>
+ <a href="#tab_prs<% i %>"><% i %></a></li>
+ </%for>
+ </ul>
+ </nav>
+ </div>
+ </section>
+</%if>
<%if !rit.isEmpty() %>
<%java>
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java
index 69b552e..e680454 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -156,6 +157,8 @@ public class AssignmentManager implements ServerListener {
private final RegionStates regionStates = new RegionStates();
private final RegionStateStore regionStateStore;
+ private final Map<ServerName, Set<byte[]>> rsReports = new HashMap<>();
+
private final boolean shouldAssignRegionsWithFavoredNodes;
private final int assignDispatchWaitQueueMaxSize;
private final int assignDispatchWaitMillis;
@@ -962,6 +965,11 @@ public class AssignmentManager implements ServerListener {
}
}
+ // Track the regionserver reported online regions in memory.
+ synchronized (rsReports) {
+ rsReports.put(serverName, regionNames);
+ }
+
if (regionNames.isEmpty()) {
// nothing to do if we don't have regions
LOG.trace("no online region found on " + serverName);
@@ -1882,4 +1890,53 @@ public class AssignmentManager implements ServerListener
{
MasterServices getMaster() {
return master;
}
+
+ /**
+ * Found the potentially problematic opened regions. There are three case:
+ * case 1. Master thought this region opened, but no regionserver reported
it.
+ * case 2. Master thought this region opened on Server1, but regionserver
reported Server2
+ * case 3. More than one regionservers reported opened this region
+ *
+ * @return the map of potentially problematic opened regions. Key is the
region name. Value is
+ * a pair of location in meta and the regionservers which reported
opened this region.
+ */
+ public Map<String, Pair<ServerName, Set<ServerName>>>
getProblematicRegions() {
+ Map<String, Set<ServerName>> reportedOnlineRegions = new HashMap<>();
+ synchronized (rsReports) {
+ for (Map.Entry<ServerName, Set<byte[]>> entry : rsReports.entrySet()) {
+ for (byte[] regionName : entry.getValue()) {
+ reportedOnlineRegions
+ .computeIfAbsent(RegionInfo.getRegionNameAsString(regionName), r
-> new HashSet<>())
+ .add(entry.getKey());
+ }
+ }
+ }
+
+ Map<String, Pair<ServerName, Set<ServerName>>> problematicRegions = new
HashMap<>();
+ List<RegionState> rits = regionStates.getRegionsStateInTransition();
+ for (RegionState regionState : regionStates.getRegionStates()) {
+ // Only consider the opened region and not in transition
+ if (!rits.contains(regionState) && regionState.isOpened()) {
+ String regionName = regionState.getRegion().getRegionNameAsString();
+ ServerName serverName = regionState.getServerName();
+ if (reportedOnlineRegions.containsKey(regionName)) {
+ Set<ServerName> reportedServers =
reportedOnlineRegions.get(regionName);
+ if (reportedServers.contains(serverName)) {
+ if (reportedServers.size() > 1) {
+ // More than one regionserver reported opened this region
+ problematicRegions.put(regionName, new Pair<>(serverName,
reportedServers));
+ }
+ } else {
+ // Master thought this region opened on Server1, but regionserver
reported Server2
+ problematicRegions.put(regionName, new Pair<>(serverName,
reportedServers));
+ }
+ } else {
+ // Master thought this region opened, but no regionserver reported
it.
+ problematicRegions.put(regionName, new Pair<>(serverName, new
HashSet<>()));
+ }
+ }
+ }
+
+ return problematicRegions;
+ }
}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAMProblematicRegions.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAMProblematicRegions.java
new file mode 100644
index 0000000..d07e129
--- /dev/null
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAMProblematicRegions.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.assignment;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Pair;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category({ MasterTests.class, MediumTests.class })
+public class TestAMProblematicRegions extends TestAssignmentManagerBase {
+ private static final Logger LOG =
LoggerFactory.getLogger(TestAMProblematicRegions.class);
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestAMProblematicRegions.class);
+
+ @Test
+ public void testForMeta() throws Exception {
+ byte[] metaRegionNameAsBytes =
RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName();
+ String metaRegionName =
RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionNameAsString();
+ List<ServerName> serverNames =
master.getServerManager().getOnlineServersList();
+ assertEquals(NSERVERS, serverNames.size());
+
+ Map<String, Pair<ServerName, Set<ServerName>>> problematicRegions =
am.getProblematicRegions();
+
+ // Test for case1: Master thought this region opened, but no regionserver
reported it.
+ assertTrue(problematicRegions.containsKey(metaRegionName));
+ Pair<ServerName, Set<ServerName>> pair =
problematicRegions.get(metaRegionName);
+ ServerName locationInMeta = pair.getFirst();
+ Set<ServerName> reportedRegionServers = pair.getSecond();
+ assertTrue(serverNames.contains(locationInMeta));
+ assertEquals(0, reportedRegionServers.size());
+
+ // Reported right region location. Then not in problematic regions.
+ am.reportOnlineRegions(locationInMeta,
Collections.singleton(metaRegionNameAsBytes));
+ problematicRegions = am.getProblematicRegions();
+ assertFalse(problematicRegions.containsKey(metaRegionName));
+ }
+
+ @Test
+ public void testForUserTable() throws Exception {
+ TableName tableName = TableName.valueOf("testForUserTable");
+ RegionInfo hri = createRegionInfo(tableName, 1);
+ String regionName = hri.getRegionNameAsString();
+ rsDispatcher.setMockRsExecutor(new GoodRsExecutor());
+ Future<byte[]> future = submitProcedure(am.createAssignProcedure(hri));
+ waitOnFuture(future);
+
+ List<ServerName> serverNames =
master.getServerManager().getOnlineServersList();
+ assertEquals(NSERVERS, serverNames.size());
+
+ // Test for case1: Master thought this region opened, but no regionserver
reported it.
+ Map<String, Pair<ServerName, Set<ServerName>>> problematicRegions =
am.getProblematicRegions();
+ assertTrue(problematicRegions.containsKey(regionName));
+ Pair<ServerName, Set<ServerName>> pair =
problematicRegions.get(regionName);
+ ServerName locationInMeta = pair.getFirst();
+ Set<ServerName> reportedRegionServers = pair.getSecond();
+ assertTrue(serverNames.contains(locationInMeta));
+ assertEquals(0, reportedRegionServers.size());
+
+ // Test for case2: Master thought this region opened on Server1, but
regionserver reported
+ // Server2
+ final ServerName tempLocationInMeta = locationInMeta;
+ final ServerName anotherServer =
+ serverNames.stream().filter(s ->
!s.equals(tempLocationInMeta)).findFirst().get();
+ am.reportOnlineRegions(anotherServer,
Collections.singleton(hri.getRegionName()));
+ problematicRegions = am.getProblematicRegions();
+ assertTrue(problematicRegions.containsKey(regionName));
+ pair = problematicRegions.get(regionName);
+ locationInMeta = pair.getFirst();
+ reportedRegionServers = pair.getSecond();
+ assertEquals(1, reportedRegionServers.size());
+ assertFalse(reportedRegionServers.contains(locationInMeta));
+ assertTrue(reportedRegionServers.contains(anotherServer));
+
+ // Test for case3: More than one regionservers reported opened this region.
+ am.reportOnlineRegions(locationInMeta,
Collections.singleton(hri.getRegionName()));
+ problematicRegions = am.getProblematicRegions();
+ assertTrue(problematicRegions.containsKey(regionName));
+ pair = problematicRegions.get(regionName);
+ locationInMeta = pair.getFirst();
+ reportedRegionServers = pair.getSecond();
+ assertEquals(2, reportedRegionServers.size());
+ assertTrue(reportedRegionServers.contains(locationInMeta));
+ assertTrue(reportedRegionServers.contains(anotherServer));
+
+ // Reported right region location. Then not in problematic regions.
+ am.reportOnlineRegions(anotherServer, Collections.EMPTY_SET);
+ problematicRegions = am.getProblematicRegions();
+ assertFalse(problematicRegions.containsKey(regionName));
+ }
+}
\ No newline at end of file
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java
index 86186e3..33aba1b 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java
@@ -18,154 +18,47 @@
package org.apache.hadoop.hbase.master.assignment;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.net.SocketTimeoutException;
-import java.util.List;
-import java.util.NavigableMap;
-import java.util.Random;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.CallQueueTooBigException;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
-import org.apache.hadoop.hbase.ipc.CallTimeoutException;
-import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
-import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.RegionState.State;
-import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
-import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
-import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
-import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher;
-import org.apache.hadoop.hbase.procedure2.Procedure;
-import org.apache.hadoop.hbase.procedure2.ProcedureMetrics;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
-import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
-import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException;
-import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MasterTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.ipc.RemoteException;
-import org.junit.After;
-import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Ignore;
-import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import org.junit.rules.ExpectedException;
-import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
-import
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
-import
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest;
-import
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse;
-import
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest;
-import
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo;
-import
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse;
-import
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState;
-import
org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
-import
org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
-import
org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
-
@Category({MasterTests.class, LargeTests.class})
-public class TestAssignmentManager {
+public class TestAssignmentManager extends TestAssignmentManagerBase {
+ private static final Logger LOG =
LoggerFactory.getLogger(TestAssignmentManager.class);
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestAssignmentManager.class);
- private static final Logger LOG =
LoggerFactory.getLogger(TestAssignmentManager.class);
-
- @Rule public TestName name = new TestName();
- @Rule public final ExpectedException exception = ExpectedException.none();
-
- private static final int PROC_NTHREADS = 64;
- private static final int NREGIONS = 1 * 1000;
- private static final int NSERVERS = Math.max(1, NREGIONS / 100);
-
- private HBaseTestingUtility UTIL;
- private MockRSProcedureDispatcher rsDispatcher;
- private MockMasterServices master;
- private AssignmentManager am;
- private NavigableMap<ServerName, SortedSet<byte []>> regionsToRegionServers =
- new ConcurrentSkipListMap<ServerName, SortedSet<byte []>>();
- // Simple executor to run some simple tasks.
- private ScheduledExecutorService executor;
-
- private ProcedureMetrics assignProcMetrics;
- private ProcedureMetrics unassignProcMetrics;
-
- private long assignSubmittedCount = 0;
- private long assignFailedCount = 0;
- private long unassignSubmittedCount = 0;
- private long unassignFailedCount = 0;
-
- private void setupConfiguration(Configuration conf) throws Exception {
- FSUtils.setRootDir(conf, UTIL.getDataTestDir());
- conf.setBoolean(WALProcedureStore.USE_HSYNC_CONF_KEY, false);
- conf.setInt(WALProcedureStore.SYNC_WAIT_MSEC_CONF_KEY, 10);
- conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS,
PROC_NTHREADS);
- conf.setInt(RSProcedureDispatcher.RS_RPC_STARTUP_WAIT_TIME_CONF_KEY, 1000);
- conf.setInt(AssignmentManager.ASSIGN_MAX_ATTEMPTS, 100); // Have many so
we succeed eventually.
- }
-
- @Before
- public void setUp() throws Exception {
- UTIL = new HBaseTestingUtility();
- this.executor = Executors.newSingleThreadScheduledExecutor();
- setupConfiguration(UTIL.getConfiguration());
- master = new MockMasterServices(UTIL.getConfiguration(),
this.regionsToRegionServers);
- rsDispatcher = new MockRSProcedureDispatcher(master);
- master.start(NSERVERS, rsDispatcher);
- am = master.getAssignmentManager();
- assignProcMetrics =
am.getAssignmentManagerMetrics().getAssignProcMetrics();
- unassignProcMetrics =
am.getAssignmentManagerMetrics().getUnassignProcMetrics();
- setUpMeta();
- }
-
- private void setUpMeta() throws Exception {
- rsDispatcher.setMockRsExecutor(new GoodRsExecutor());
- am.assign(RegionInfoBuilder.FIRST_META_REGIONINFO);
- am.wakeMetaLoadedEvent();
- }
-
- @After
- public void tearDown() throws Exception {
- master.stop("tearDown");
- this.executor.shutdownNow();
- }
-
- @Test (expected=NullPointerException.class)
+ @Test(expected = NullPointerException.class)
public void testWaitServerReportEventWithNullServer() throws
UnexpectedStateException {
// Test what happens if we pass in null server. I'd expect it throws NPE.
- if (this.am.waitServerReportEvent(null, null)) throw new
UnexpectedStateException();
+ if (this.am.waitServerReportEvent(null, null)) {
+ throw new UnexpectedStateException();
+ }
}
@Test
@@ -471,484 +364,4 @@ public class TestAssignmentManager {
assertEquals(unassignSubmittedCount + 1,
unassignProcMetrics.getSubmittedCounter().getCount());
assertEquals(unassignFailedCount,
unassignProcMetrics.getFailedCounter().getCount());
}
-
-
- private Future<byte[]> submitProcedure(final Procedure<MasterProcedureEnv>
proc) {
- return
ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), proc);
- }
-
- private byte[] waitOnFuture(final Future<byte[]> future) throws Exception {
- try {
- return future.get(5, TimeUnit.SECONDS);
- } catch (ExecutionException e) {
- LOG.info("ExecutionException", e);
- Exception ee = (Exception)e.getCause();
- if (ee instanceof InterruptedIOException) {
- for (Procedure<?> p:
this.master.getMasterProcedureExecutor().getProcedures()) {
- LOG.info(p.toStringDetails());
- }
- }
- throw (Exception)e.getCause();
- }
- }
-
- //
============================================================================================
- // Helpers
- //
============================================================================================
- private void bulkSubmit(final AssignProcedure[] procs) throws Exception {
- final Thread[] threads = new Thread[PROC_NTHREADS];
- for (int i = 0; i < threads.length; ++i) {
- final int threadId = i;
- threads[i] = new Thread() {
- @Override
- public void run() {
- TableName tableName = TableName.valueOf("table-" + threadId);
- int n = (procs.length / threads.length);
- int start = threadId * n;
- int stop = start + n;
- for (int j = start; j < stop; ++j) {
- procs[j] = createAndSubmitAssign(tableName, j);
- }
- }
- };
- threads[i].start();
- }
- for (int i = 0; i < threads.length; ++i) {
- threads[i].join();
- }
- for (int i = procs.length - 1; i >= 0 && procs[i] == null; --i) {
- procs[i] = createAndSubmitAssign(TableName.valueOf("table-sync"), i);
- }
- }
-
- private AssignProcedure createAndSubmitAssign(TableName tableName, int
regionId) {
- RegionInfo hri = createRegionInfo(tableName, regionId);
- AssignProcedure proc = am.createAssignProcedure(hri);
- master.getMasterProcedureExecutor().submitProcedure(proc);
- return proc;
- }
-
- private RegionInfo createRegionInfo(final TableName tableName, final long
regionId) {
- return RegionInfoBuilder.newBuilder(tableName)
- .setStartKey(Bytes.toBytes(regionId))
- .setEndKey(Bytes.toBytes(regionId + 1))
- .setSplit(false)
- .setRegionId(0)
- .build();
- }
-
- private void sendTransitionReport(final ServerName serverName,
- final
org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo
regionInfo,
- final TransitionCode state) throws IOException {
- ReportRegionStateTransitionRequest.Builder req =
- ReportRegionStateTransitionRequest.newBuilder();
- req.setServer(ProtobufUtil.toServerName(serverName));
- req.addTransition(RegionStateTransition.newBuilder()
- .addRegionInfo(regionInfo)
- .setTransitionCode(state)
- .setOpenSeqNum(1)
- .build());
- am.reportRegionStateTransition(req.build());
- }
-
- private void doCrash(final ServerName serverName) {
- this.am.submitServerCrash(serverName, false/*No WALs here*/);
- }
-
- private void doRestart(final ServerName serverName) {
- try {
- this.master.restartRegionServer(serverName);
- } catch (IOException e) {
- LOG.warn("Can not restart RS with new startcode");
- }
- }
-
- private class NoopRsExecutor implements MockRSExecutor {
- @Override
- public ExecuteProceduresResponse sendRequest(ServerName server,
- ExecuteProceduresRequest request) throws IOException {
- if (request.getOpenRegionCount() > 0) {
- for (OpenRegionRequest req : request.getOpenRegionList()) {
- for (RegionOpenInfo openReq : req.getOpenInfoList()) {
- execOpenRegion(server, openReq);
- }
- }
- }
- if (request.getCloseRegionCount() > 0) {
- for (CloseRegionRequest req : request.getCloseRegionList()) {
- execCloseRegion(server, req.getRegion().getValue().toByteArray());
- }
- }
- return ExecuteProceduresResponse.newBuilder().build();
- }
-
- protected RegionOpeningState execOpenRegion(ServerName server,
RegionOpenInfo regionInfo)
- throws IOException {
- return null;
- }
-
- protected CloseRegionResponse execCloseRegion(ServerName server, byte[]
regionName)
- throws IOException {
- return null;
- }
- }
-
- private class GoodRsExecutor extends NoopRsExecutor {
- @Override
- protected RegionOpeningState execOpenRegion(ServerName server,
RegionOpenInfo openReq)
- throws IOException {
- sendTransitionReport(server, openReq.getRegion(), TransitionCode.OPENED);
- // Concurrency?
- // Now update the state of our cluster in regionsToRegionServers.
- SortedSet<byte []> regions = regionsToRegionServers.get(server);
- if (regions == null) {
- regions = new ConcurrentSkipListSet<byte[]>(Bytes.BYTES_COMPARATOR);
- regionsToRegionServers.put(server, regions);
- }
- RegionInfo hri = ProtobufUtil.toRegionInfo(openReq.getRegion());
- if (regions.contains(hri.getRegionName())) {
- throw new UnsupportedOperationException(hri.getRegionNameAsString());
- }
- regions.add(hri.getRegionName());
- return RegionOpeningState.OPENED;
- }
-
- @Override
- protected CloseRegionResponse execCloseRegion(ServerName server, byte[]
regionName)
- throws IOException {
- RegionInfo hri = am.getRegionInfo(regionName);
- sendTransitionReport(server, ProtobufUtil.toRegionInfo(hri),
TransitionCode.CLOSED);
- return CloseRegionResponse.newBuilder().setClosed(true).build();
- }
- }
-
- private static class ServerNotYetRunningRsExecutor implements MockRSExecutor
{
- @Override
- public ExecuteProceduresResponse sendRequest(ServerName server,
ExecuteProceduresRequest req)
- throws IOException {
- throw new ServerNotRunningYetException("wait on server startup");
- }
- }
-
- private static class FaultyRsExecutor implements MockRSExecutor {
- private final IOException exception;
-
- public FaultyRsExecutor(final IOException exception) {
- this.exception = exception;
- }
-
- @Override
- public ExecuteProceduresResponse sendRequest(ServerName server,
ExecuteProceduresRequest req)
- throws IOException {
- throw exception;
- }
- }
-
- private class SocketTimeoutRsExecutor extends GoodRsExecutor {
- private final int maxSocketTimeoutRetries;
- private final int maxServerRetries;
-
- private ServerName lastServer;
- private int sockTimeoutRetries;
- private int serverRetries;
-
- public SocketTimeoutRsExecutor(int maxSocketTimeoutRetries, int
maxServerRetries) {
- this.maxServerRetries = maxServerRetries;
- this.maxSocketTimeoutRetries = maxSocketTimeoutRetries;
- }
-
- @Override
- public ExecuteProceduresResponse sendRequest(ServerName server,
ExecuteProceduresRequest req)
- throws IOException {
- // SocketTimeoutException should be a temporary problem
- // unless the server will be declared dead.
- if (sockTimeoutRetries++ < maxSocketTimeoutRetries) {
- if (sockTimeoutRetries == 1) assertNotEquals(lastServer, server);
- lastServer = server;
- LOG.debug("Socket timeout for server=" + server + " retries=" +
sockTimeoutRetries);
- throw new SocketTimeoutException("simulate socket timeout");
- } else if (serverRetries++ < maxServerRetries) {
- LOG.info("Mark server=" + server + " as dead. serverRetries=" +
serverRetries);
- master.getServerManager().moveFromOnlineToDeadServers(server);
- sockTimeoutRetries = 0;
- throw new SocketTimeoutException("simulate socket timeout");
- } else {
- return super.sendRequest(server, req);
- }
- }
- }
-
- /**
- * Takes open request and then returns nothing so acts like a RS that went
zombie.
- * No response (so proc is stuck/suspended on the Master and won't wake
up.). We
- * then send in a crash for this server after a few seconds; crash is
supposed to
- * take care of the suspended procedures.
- */
- private class HangThenRSCrashExecutor extends GoodRsExecutor {
- private int invocations;
-
- @Override
- protected RegionOpeningState execOpenRegion(final ServerName server,
RegionOpenInfo openReq)
- throws IOException {
- if (this.invocations++ > 0) {
- // Return w/o problem the second time through here.
- return super.execOpenRegion(server, openReq);
- }
- // The procedure on master will just hang forever because nothing comes
back
- // from the RS in this case.
- LOG.info("Return null response from serverName=" + server + "; means
STUCK...TODO timeout");
- executor.schedule(new Runnable() {
- @Override
- public void run() {
- LOG.info("Sending in CRASH of " + server);
- doCrash(server);
- }
- }, 1, TimeUnit.SECONDS);
- return null;
- }
- }
-
- /**
- * Takes open request and then returns nothing so acts like a RS that went
zombie.
- * No response (so proc is stuck/suspended on the Master and won't wake up.).
- * Different with HangThenRSCrashExecutor, HangThenRSCrashExecutor will
create
- * ServerCrashProcedure to handle the server crash. However, this
HangThenRSRestartExecutor
- * will restart RS directly, situation for RS crashed when SCP is not
enabled.
- */
- private class HangThenRSRestartExecutor extends GoodRsExecutor {
- private int invocations;
-
- @Override
- protected RegionOpeningState execOpenRegion(final ServerName server,
RegionOpenInfo openReq)
- throws IOException {
- if (this.invocations++ > 0) {
- // Return w/o problem the second time through here.
- return super.execOpenRegion(server, openReq);
- }
- // The procedure on master will just hang forever because nothing comes
back
- // from the RS in this case.
- LOG.info("Return null response from serverName=" + server + "; means
STUCK...TODO timeout");
- executor.schedule(new Runnable() {
- @Override
- public void run() {
- LOG.info("Restarting RS of " + server);
- doRestart(server);
- }
- }, 1, TimeUnit.SECONDS);
- return null;
- }
- }
-
- private class HangOnCloseThenRSCrashExecutor extends GoodRsExecutor {
- public static final int TYPES_OF_FAILURE = 6;
- private int invocations;
-
- @Override
- protected CloseRegionResponse execCloseRegion(ServerName server, byte[]
regionName)
- throws IOException {
- switch (this.invocations++) {
- case 0: throw new NotServingRegionException("Fake");
- case 1:
- executor.schedule(new Runnable() {
- @Override
- public void run() {
- LOG.info("Sending in CRASH of " + server);
- doCrash(server);
- }
- }, 1, TimeUnit.SECONDS);
- throw new RegionServerAbortedException("Fake!");
- case 2:
- executor.schedule(new Runnable() {
- @Override
- public void run() {
- LOG.info("Sending in CRASH of " + server);
- doCrash(server);
- }
- }, 1, TimeUnit.SECONDS);
- throw new RegionServerStoppedException("Fake!");
- case 3: throw new ServerNotRunningYetException("Fake!");
- case 4:
- LOG.info("Returned null from serverName={}; means STUCK...TODO
timeout", server);
- executor.schedule(new Runnable() {
- @Override
- public void run() {
- LOG.info("Sending in CRASH of " + server);
- doCrash(server);
- }
- }, 1, TimeUnit.SECONDS);
- return null;
- default:
- return super.execCloseRegion(server, regionName);
- }
- }
- }
-
- private class RandRsExecutor extends NoopRsExecutor {
- private final Random rand = new Random();
-
- @Override
- public ExecuteProceduresResponse sendRequest(ServerName server,
ExecuteProceduresRequest req)
- throws IOException {
- switch (rand.nextInt(5)) {
- case 0: throw new ServerNotRunningYetException("wait on server
startup");
- case 1: throw new SocketTimeoutException("simulate socket timeout");
- case 2: throw new RemoteException("java.io.IOException", "unexpected
exception");
- default:
- // fall out
- }
- return super.sendRequest(server, req);
- }
-
- @Override
- protected RegionOpeningState execOpenRegion(final ServerName server,
RegionOpenInfo openReq)
- throws IOException {
- switch (rand.nextInt(6)) {
- case 0:
- LOG.info("Return OPENED response");
- sendTransitionReport(server, openReq.getRegion(),
TransitionCode.OPENED);
- return OpenRegionResponse.RegionOpeningState.OPENED;
- case 1:
- LOG.info("Return transition report that OPENED/ALREADY_OPENED
response");
- sendTransitionReport(server, openReq.getRegion(),
TransitionCode.OPENED);
- return OpenRegionResponse.RegionOpeningState.ALREADY_OPENED;
- case 2:
- LOG.info("Return transition report that FAILED_OPEN/FAILED_OPENING
response");
- sendTransitionReport(server, openReq.getRegion(),
TransitionCode.FAILED_OPEN);
- return OpenRegionResponse.RegionOpeningState.FAILED_OPENING;
- default:
- // fall out
- }
- // The procedure on master will just hang forever because nothing comes
back
- // from the RS in this case.
- LOG.info("Return null as response; means proc stuck so we send in a
crash report after a few seconds...");
- executor.schedule(new Runnable() {
- @Override
- public void run() {
- LOG.info("Delayed CRASHING of " + server);
- doCrash(server);
- }
- }, 5, TimeUnit.SECONDS);
- return null;
- }
-
- @Override
- protected CloseRegionResponse execCloseRegion(ServerName server, byte[]
regionName)
- throws IOException {
- CloseRegionResponse.Builder resp = CloseRegionResponse.newBuilder();
- boolean closed = rand.nextBoolean();
- if (closed) {
- RegionInfo hri = am.getRegionInfo(regionName);
- sendTransitionReport(server, ProtobufUtil.toRegionInfo(hri),
TransitionCode.CLOSED);
- }
- resp.setClosed(closed);
- return resp.build();
- }
- }
-
- protected class CallQueueTooBigOnceRsExecutor extends GoodRsExecutor {
-
- private boolean invoked = false;
-
- private ServerName lastServer;
-
- @Override
- public ExecuteProceduresResponse sendRequest(ServerName server,
ExecuteProceduresRequest req)
- throws IOException {
- if (!invoked) {
- lastServer = server;
- invoked = true;
- throw new CallQueueTooBigException("simulate queue full");
- }
- // better select another server since the server is over loaded, but
anyway, it is fine to
- // still select the same server since it is not dead yet...
- if (lastServer.equals(server)) {
- LOG.warn("We still select the same server, which is not good.");
- }
- return super.sendRequest(server, req);
- }
- }
-
- protected class TimeoutThenCallQueueTooBigRsExecutor extends GoodRsExecutor {
-
- private final int queueFullTimes;
-
- private int retries;
-
- private ServerName lastServer;
-
- public TimeoutThenCallQueueTooBigRsExecutor(int queueFullTimes) {
- this.queueFullTimes = queueFullTimes;
- }
-
- @Override
- public ExecuteProceduresResponse sendRequest(ServerName server,
ExecuteProceduresRequest req)
- throws IOException {
- retries++;
- if (retries == 1) {
- lastServer = server;
- throw new CallTimeoutException("simulate call timeout");
- }
- // should always retry on the same server
- assertEquals(lastServer, server);
- if (retries < queueFullTimes) {
- throw new CallQueueTooBigException("simulate queue full");
- }
- return super.sendRequest(server, req);
- }
- }
-
- private interface MockRSExecutor {
- ExecuteProceduresResponse sendRequest(ServerName server,
ExecuteProceduresRequest req)
- throws IOException;
- }
-
- private class MockRSProcedureDispatcher extends RSProcedureDispatcher {
- private MockRSExecutor mockRsExec;
-
- public MockRSProcedureDispatcher(final MasterServices master) {
- super(master);
- }
-
- public void setMockRsExecutor(final MockRSExecutor mockRsExec) {
- this.mockRsExec = mockRsExec;
- }
-
- @Override
- protected void remoteDispatch(ServerName serverName, Set<RemoteProcedure>
remoteProcedures) {
- submitTask(new MockRemoteCall(serverName, remoteProcedures));
- }
-
- private class MockRemoteCall extends ExecuteProceduresRemoteCall {
- public MockRemoteCall(final ServerName serverName, final
Set<RemoteProcedure> operations) {
- super(serverName, operations);
- }
-
- @Override
- public void dispatchOpenRequests(MasterProcedureEnv env,
- List<RegionOpenOperation> operations) {
- request.addOpenRegion(buildOpenRegionRequest(env, getServerName(),
operations));
- }
-
- @Override
- public void dispatchCloseRequests(MasterProcedureEnv env,
- List<RegionCloseOperation> operations) {
- for (RegionCloseOperation op : operations) {
- request.addCloseRegion(op.buildCloseRegionRequest(getServerName()));
- }
- }
-
- @Override
- protected ExecuteProceduresResponse sendRequest(final ServerName
serverName,
- final ExecuteProceduresRequest request) throws IOException {
- return mockRsExec.sendRequest(serverName, request);
- }
- }
- }
-
- private void collectAssignmentManagerMetrics() {
- assignSubmittedCount = assignProcMetrics.getSubmittedCounter().getCount();
- assignFailedCount = assignProcMetrics.getFailedCounter().getCount();
- unassignSubmittedCount =
unassignProcMetrics.getSubmittedCounter().getCount();
- unassignFailedCount = unassignProcMetrics.getFailedCounter().getCount();
- }
}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManagerBase.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManagerBase.java
new file mode 100644
index 0000000..17ea05a
--- /dev/null
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManagerBase.java
@@ -0,0 +1,629 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.assignment;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.net.SocketTimeoutException;
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.Random;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CallQueueTooBigException;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.ipc.CallTimeoutException;
+import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
+import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
+import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureMetrics;
+import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
+import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException;
+import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.ipc.RemoteException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos;
+
+/**
+ * Base class for AM test.
+ */
+public class TestAssignmentManagerBase {
+ private static final Logger LOG =
LoggerFactory.getLogger(TestAssignmentManagerBase.class);
+
+ @Rule
+ public TestName name = new TestName();
+ @Rule
+ public final ExpectedException exception = ExpectedException.none();
+
+ private static final int PROC_NTHREADS = 64;
+ protected static final int NREGIONS = 1 * 1000;
+ protected static final int NSERVERS = Math.max(1, NREGIONS / 100);
+
+ protected HBaseTestingUtility UTIL;
+ protected MockRSProcedureDispatcher rsDispatcher;
+ protected MockMasterServices master;
+ protected AssignmentManager am;
+ protected NavigableMap<ServerName, SortedSet<byte[]>> regionsToRegionServers
=
+ new ConcurrentSkipListMap<>();
+ // Simple executor to run some simple tasks.
+ protected ScheduledExecutorService executor;
+
+ protected ProcedureMetrics assignProcMetrics;
+ protected ProcedureMetrics unassignProcMetrics;
+
+ protected long assignSubmittedCount = 0;
+ protected long assignFailedCount = 0;
+ protected long unassignSubmittedCount = 0;
+ protected long unassignFailedCount = 0;
+
+ protected void setupConfiguration(Configuration conf) throws Exception {
+ FSUtils.setRootDir(conf, UTIL.getDataTestDir());
+ conf.setBoolean(WALProcedureStore.USE_HSYNC_CONF_KEY, false);
+ conf.setInt(WALProcedureStore.SYNC_WAIT_MSEC_CONF_KEY, 10);
+ conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS,
PROC_NTHREADS);
+ conf.setInt(RSProcedureDispatcher.RS_RPC_STARTUP_WAIT_TIME_CONF_KEY, 1000);
+ conf.setInt(AssignmentManager.ASSIGN_MAX_ATTEMPTS, 100); // Have many so
we succeed eventually.
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ UTIL = new HBaseTestingUtility();
+ this.executor = Executors.newSingleThreadScheduledExecutor();
+ setupConfiguration(UTIL.getConfiguration());
+ master = new MockMasterServices(UTIL.getConfiguration(),
this.regionsToRegionServers);
+ rsDispatcher = new MockRSProcedureDispatcher(master);
+ master.start(NSERVERS, rsDispatcher);
+ am = master.getAssignmentManager();
+ assignProcMetrics =
am.getAssignmentManagerMetrics().getAssignProcMetrics();
+ unassignProcMetrics =
am.getAssignmentManagerMetrics().getUnassignProcMetrics();
+ setUpMeta();
+ }
+
+ private void setUpMeta() throws Exception {
+ rsDispatcher.setMockRsExecutor(new GoodRsExecutor());
+ am.assign(RegionInfoBuilder.FIRST_META_REGIONINFO);
+ am.wakeMetaLoadedEvent();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ master.stop("tearDown");
+ this.executor.shutdownNow();
+ }
+
+ protected Future<byte[]> submitProcedure(final Procedure<MasterProcedureEnv>
proc) {
+ return
ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), proc);
+ }
+
+ protected byte[] waitOnFuture(final Future<byte[]> future) throws Exception {
+ try {
+ return future.get(5, TimeUnit.SECONDS);
+ } catch (ExecutionException e) {
+ LOG.info("ExecutionException", e);
+ Exception ee = (Exception) e.getCause();
+ if (ee instanceof InterruptedIOException) {
+ for (Procedure<?> p :
this.master.getMasterProcedureExecutor().getProcedures()) {
+ LOG.info(p.toStringDetails());
+ }
+ }
+ throw (Exception) e.getCause();
+ }
+ }
+
+ //
============================================================================================
+ // Helpers
+ //
============================================================================================
+ protected void bulkSubmit(final AssignProcedure[] procs) throws Exception {
+ final Thread[] threads = new Thread[PROC_NTHREADS];
+ for (int i = 0; i < threads.length; ++i) {
+ final int threadId = i;
+ threads[i] = new Thread() {
+ @Override
+ public void run() {
+ TableName tableName = TableName.valueOf("table-" + threadId);
+ int n = (procs.length / threads.length);
+ int start = threadId * n;
+ int stop = start + n;
+ for (int j = start; j < stop; ++j) {
+ procs[j] = createAndSubmitAssign(tableName, j);
+ }
+ }
+ };
+ threads[i].start();
+ }
+ for (int i = 0; i < threads.length; ++i) {
+ threads[i].join();
+ }
+ for (int i = procs.length - 1; i >= 0 && procs[i] == null; --i) {
+ procs[i] = createAndSubmitAssign(TableName.valueOf("table-sync"), i);
+ }
+ }
+
+ private AssignProcedure createAndSubmitAssign(TableName tableName, int
regionId) {
+ RegionInfo hri = createRegionInfo(tableName, regionId);
+ AssignProcedure proc = am.createAssignProcedure(hri);
+ master.getMasterProcedureExecutor().submitProcedure(proc);
+ return proc;
+ }
+
+ protected RegionInfo createRegionInfo(final TableName tableName, final long
regionId) {
+ return
RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes(regionId))
+ .setEndKey(Bytes.toBytes(regionId +
1)).setSplit(false).setRegionId(0).build();
+ }
+
+ private void sendTransitionReport(final ServerName serverName,
+ final
org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo
regionInfo,
+ final RegionServerStatusProtos.RegionStateTransition.TransitionCode
state)
+ throws IOException {
+ RegionServerStatusProtos.ReportRegionStateTransitionRequest.Builder req =
+
RegionServerStatusProtos.ReportRegionStateTransitionRequest.newBuilder();
+ req.setServer(ProtobufUtil.toServerName(serverName));
+ req.addTransition(
+
RegionServerStatusProtos.RegionStateTransition.newBuilder().addRegionInfo(regionInfo)
+ .setTransitionCode(state).setOpenSeqNum(1).build());
+ am.reportRegionStateTransition(req.build());
+ }
+
+ private void doCrash(final ServerName serverName) {
+ this.am.submitServerCrash(serverName, false/*No WALs here*/);
+ }
+
+ private void doRestart(final ServerName serverName) {
+ try {
+ this.master.restartRegionServer(serverName);
+ } catch (IOException e) {
+ LOG.warn("Can not restart RS with new startcode");
+ }
+ }
+
+ private class NoopRsExecutor implements MockRSExecutor {
+ @Override
+ public AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server,
+ AdminProtos.ExecuteProceduresRequest request) throws IOException {
+ if (request.getOpenRegionCount() > 0) {
+ for (AdminProtos.OpenRegionRequest req : request.getOpenRegionList()) {
+ for (AdminProtos.OpenRegionRequest.RegionOpenInfo openReq :
req.getOpenInfoList()) {
+ execOpenRegion(server, openReq);
+ }
+ }
+ }
+ if (request.getCloseRegionCount() > 0) {
+ for (AdminProtos.CloseRegionRequest req :
request.getCloseRegionList()) {
+ execCloseRegion(server, req.getRegion().getValue().toByteArray());
+ }
+ }
+ return AdminProtos.ExecuteProceduresResponse.newBuilder().build();
+ }
+
+ protected AdminProtos.OpenRegionResponse.RegionOpeningState
execOpenRegion(ServerName server,
+ AdminProtos.OpenRegionRequest.RegionOpenInfo regionInfo) throws
IOException {
+ return null;
+ }
+
+ protected AdminProtos.CloseRegionResponse execCloseRegion(ServerName
server, byte[] regionName)
+ throws IOException {
+ return null;
+ }
+ }
+
+ protected class GoodRsExecutor extends NoopRsExecutor {
+ @Override
+ protected AdminProtos.OpenRegionResponse.RegionOpeningState
execOpenRegion(ServerName server,
+ AdminProtos.OpenRegionRequest.RegionOpenInfo openReq) throws
IOException {
+ sendTransitionReport(server, openReq.getRegion(),
+
RegionServerStatusProtos.RegionStateTransition.TransitionCode.OPENED);
+ // Concurrency?
+ // Now update the state of our cluster in regionsToRegionServers.
+ SortedSet<byte[]> regions = regionsToRegionServers.get(server);
+ if (regions == null) {
+ regions = new ConcurrentSkipListSet<byte[]>(Bytes.BYTES_COMPARATOR);
+ regionsToRegionServers.put(server, regions);
+ }
+ RegionInfo hri = ProtobufUtil.toRegionInfo(openReq.getRegion());
+ if (regions.contains(hri.getRegionName())) {
+ throw new UnsupportedOperationException(hri.getRegionNameAsString());
+ }
+ regions.add(hri.getRegionName());
+ return AdminProtos.OpenRegionResponse.RegionOpeningState.OPENED;
+ }
+
+ @Override
+ protected AdminProtos.CloseRegionResponse execCloseRegion(ServerName
server, byte[] regionName)
+ throws IOException {
+ RegionInfo hri = am.getRegionInfo(regionName);
+ sendTransitionReport(server, ProtobufUtil.toRegionInfo(hri),
+
RegionServerStatusProtos.RegionStateTransition.TransitionCode.CLOSED);
+ return
AdminProtos.CloseRegionResponse.newBuilder().setClosed(true).build();
+ }
+ }
+
+ protected static class ServerNotYetRunningRsExecutor implements
MockRSExecutor {
+ @Override
+ public AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server,
+ AdminProtos.ExecuteProceduresRequest req) throws IOException {
+ throw new ServerNotRunningYetException("wait on server startup");
+ }
+ }
+
+ protected static class FaultyRsExecutor implements MockRSExecutor {
+ private final IOException exception;
+
+ public FaultyRsExecutor(final IOException exception) {
+ this.exception = exception;
+ }
+
+ @Override
+ public AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server,
+ AdminProtos.ExecuteProceduresRequest req) throws IOException {
+ throw exception;
+ }
+ }
+
+ protected class SocketTimeoutRsExecutor extends GoodRsExecutor {
+ private final int maxSocketTimeoutRetries;
+ private final int maxServerRetries;
+
+ private ServerName lastServer;
+ private int sockTimeoutRetries;
+ private int serverRetries;
+
+ public SocketTimeoutRsExecutor(int maxSocketTimeoutRetries, int
maxServerRetries) {
+ this.maxServerRetries = maxServerRetries;
+ this.maxSocketTimeoutRetries = maxSocketTimeoutRetries;
+ }
+
+ @Override
+ public AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server,
+ AdminProtos.ExecuteProceduresRequest req) throws IOException {
+ // SocketTimeoutException should be a temporary problem
+ // unless the server will be declared dead.
+ if (sockTimeoutRetries++ < maxSocketTimeoutRetries) {
+ if (sockTimeoutRetries == 1) assertNotEquals(lastServer, server);
+ lastServer = server;
+ LOG.debug("Socket timeout for server=" + server + " retries=" +
sockTimeoutRetries);
+ throw new SocketTimeoutException("simulate socket timeout");
+ } else if (serverRetries++ < maxServerRetries) {
+ LOG.info("Mark server=" + server + " as dead. serverRetries=" +
serverRetries);
+ master.getServerManager().moveFromOnlineToDeadServers(server);
+ sockTimeoutRetries = 0;
+ throw new SocketTimeoutException("simulate socket timeout");
+ } else {
+ return super.sendRequest(server, req);
+ }
+ }
+ }
+
+ /**
+ * Takes open request and then returns nothing so acts like a RS that went
zombie.
+ * No response (so proc is stuck/suspended on the Master and won't wake
up.). We
+ * then send in a crash for this server after a few seconds; crash is
supposed to
+ * take care of the suspended procedures.
+ */
+ protected class HangThenRSCrashExecutor extends GoodRsExecutor {
+ private int invocations;
+
+ @Override
+ protected AdminProtos.OpenRegionResponse.RegionOpeningState execOpenRegion(
+ final ServerName server, AdminProtos.OpenRegionRequest.RegionOpenInfo
openReq)
+ throws IOException {
+ if (this.invocations++ > 0) {
+ // Return w/o problem the second time through here.
+ return super.execOpenRegion(server, openReq);
+ }
+ // The procedure on master will just hang forever because nothing comes
back
+ // from the RS in this case.
+ LOG.info("Return null response from serverName=" + server + "; means
STUCK...TODO timeout");
+ executor.schedule(new Runnable() {
+ @Override
+ public void run() {
+ LOG.info("Sending in CRASH of " + server);
+ doCrash(server);
+ }
+ }, 1, TimeUnit.SECONDS);
+ return null;
+ }
+ }
+
+ /**
+ * Takes open request and then returns nothing so acts like a RS that went
zombie.
+ * No response (so proc is stuck/suspended on the Master and won't wake up.).
+ * Different with HangThenRSCrashExecutor, HangThenRSCrashExecutor will
create
+ * ServerCrashProcedure to handle the server crash. However, this
HangThenRSRestartExecutor
+ * will restart RS directly, situation for RS crashed when SCP is not
enabled.
+ */
+ protected class HangThenRSRestartExecutor extends GoodRsExecutor {
+ private int invocations;
+
+ @Override
+ protected AdminProtos.OpenRegionResponse.RegionOpeningState execOpenRegion(
+ final ServerName server, AdminProtos.OpenRegionRequest.RegionOpenInfo
openReq)
+ throws IOException {
+ if (this.invocations++ > 0) {
+ // Return w/o problem the second time through here.
+ return super.execOpenRegion(server, openReq);
+ }
+ // The procedure on master will just hang forever because nothing comes
back
+ // from the RS in this case.
+ LOG.info("Return null response from serverName=" + server + "; means
STUCK...TODO timeout");
+ executor.schedule(new Runnable() {
+ @Override
+ public void run() {
+ LOG.info("Restarting RS of " + server);
+ doRestart(server);
+ }
+ }, 1, TimeUnit.SECONDS);
+ return null;
+ }
+ }
+
+ protected class HangOnCloseThenRSCrashExecutor extends GoodRsExecutor {
+ public static final int TYPES_OF_FAILURE = 6;
+ private int invocations;
+
+ @Override
+ protected AdminProtos.CloseRegionResponse execCloseRegion(ServerName
server, byte[] regionName)
+ throws IOException {
+ switch (this.invocations++) {
+ case 0:
+ throw new NotServingRegionException("Fake");
+ case 1:
+ executor.schedule(new Runnable() {
+ @Override
+ public void run() {
+ LOG.info("Sending in CRASH of " + server);
+ doCrash(server);
+ }
+ }, 1, TimeUnit.SECONDS);
+ throw new RegionServerAbortedException("Fake!");
+ case 2:
+ executor.schedule(new Runnable() {
+ @Override
+ public void run() {
+ LOG.info("Sending in CRASH of " + server);
+ doCrash(server);
+ }
+ }, 1, TimeUnit.SECONDS);
+ throw new RegionServerStoppedException("Fake!");
+ case 3:
+ throw new ServerNotRunningYetException("Fake!");
+ case 4:
+ LOG.info("Returned null from serverName={}; means STUCK...TODO
timeout", server);
+ executor.schedule(new Runnable() {
+ @Override
+ public void run() {
+ LOG.info("Sending in CRASH of " + server);
+ doCrash(server);
+ }
+ }, 1, TimeUnit.SECONDS);
+ return null;
+ default:
+ return super.execCloseRegion(server, regionName);
+ }
+ }
+ }
+
+ protected class RandRsExecutor extends NoopRsExecutor {
+ private final Random rand = new Random();
+
+ @Override
+ public AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server,
+ AdminProtos.ExecuteProceduresRequest req) throws IOException {
+ switch (rand.nextInt(5)) {
+ case 0:
+ throw new ServerNotRunningYetException("wait on server startup");
+ case 1:
+ throw new SocketTimeoutException("simulate socket timeout");
+ case 2:
+ throw new RemoteException("java.io.IOException", "unexpected
exception");
+ default:
+ // fall out
+ }
+ return super.sendRequest(server, req);
+ }
+
+ @Override
+ protected AdminProtos.OpenRegionResponse.RegionOpeningState execOpenRegion(
+ final ServerName server, AdminProtos.OpenRegionRequest.RegionOpenInfo
openReq)
+ throws IOException {
+ switch (rand.nextInt(6)) {
+ case 0:
+ LOG.info("Return OPENED response");
+ sendTransitionReport(server, openReq.getRegion(),
+
RegionServerStatusProtos.RegionStateTransition.TransitionCode.OPENED);
+ return AdminProtos.OpenRegionResponse.RegionOpeningState.OPENED;
+ case 1:
+ LOG.info("Return transition report that OPENED/ALREADY_OPENED
response");
+ sendTransitionReport(server, openReq.getRegion(),
+
RegionServerStatusProtos.RegionStateTransition.TransitionCode.OPENED);
+ return
AdminProtos.OpenRegionResponse.RegionOpeningState.ALREADY_OPENED;
+ case 2:
+ LOG.info("Return transition report that FAILED_OPEN/FAILED_OPENING
response");
+ sendTransitionReport(server, openReq.getRegion(),
+
RegionServerStatusProtos.RegionStateTransition.TransitionCode.FAILED_OPEN);
+ return
AdminProtos.OpenRegionResponse.RegionOpeningState.FAILED_OPENING;
+ default:
+ // fall out
+ }
+ // The procedure on master will just hang forever because nothing comes
back
+ // from the RS in this case.
+ LOG.info(
+ "Return null as response; means proc stuck so we send in a crash
report after a few seconds...");
+ executor.schedule(new Runnable() {
+ @Override
+ public void run() {
+ LOG.info("Delayed CRASHING of " + server);
+ doCrash(server);
+ }
+ }, 5, TimeUnit.SECONDS);
+ return null;
+ }
+
+ @Override
+ protected AdminProtos.CloseRegionResponse execCloseRegion(ServerName
server, byte[] regionName)
+ throws IOException {
+ AdminProtos.CloseRegionResponse.Builder resp =
AdminProtos.CloseRegionResponse.newBuilder();
+ boolean closed = rand.nextBoolean();
+ if (closed) {
+ RegionInfo hri = am.getRegionInfo(regionName);
+ sendTransitionReport(server, ProtobufUtil.toRegionInfo(hri),
+
RegionServerStatusProtos.RegionStateTransition.TransitionCode.CLOSED);
+ }
+ resp.setClosed(closed);
+ return resp.build();
+ }
+ }
+
+ protected class CallQueueTooBigOnceRsExecutor extends GoodRsExecutor {
+
+ private boolean invoked = false;
+
+ private ServerName lastServer;
+
+ @Override
+ public AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server,
+ AdminProtos.ExecuteProceduresRequest req) throws IOException {
+ if (!invoked) {
+ lastServer = server;
+ invoked = true;
+ throw new CallQueueTooBigException("simulate queue full");
+ }
+ // better select another server since the server is over loaded, but
anyway, it is fine to
+ // still select the same server since it is not dead yet...
+ if (lastServer.equals(server)) {
+ LOG.warn("We still select the same server, which is not good.");
+ }
+ return super.sendRequest(server, req);
+ }
+ }
+
+ protected class TimeoutThenCallQueueTooBigRsExecutor extends GoodRsExecutor {
+
+ private final int queueFullTimes;
+
+ private int retries;
+
+ private ServerName lastServer;
+
+ public TimeoutThenCallQueueTooBigRsExecutor(int queueFullTimes) {
+ this.queueFullTimes = queueFullTimes;
+ }
+
+ @Override
+ public AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server,
+ AdminProtos.ExecuteProceduresRequest req) throws IOException {
+ retries++;
+ if (retries == 1) {
+ lastServer = server;
+ throw new CallTimeoutException("simulate call timeout");
+ }
+ // should always retry on the same server
+ assertEquals(lastServer, server);
+ if (retries < queueFullTimes) {
+ throw new CallQueueTooBigException("simulate queue full");
+ }
+ return super.sendRequest(server, req);
+ }
+ }
+
+ protected interface MockRSExecutor {
+ AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server,
+ AdminProtos.ExecuteProceduresRequest req) throws IOException;
+ }
+
+ protected class MockRSProcedureDispatcher extends RSProcedureDispatcher {
+ private MockRSExecutor mockRsExec;
+
+ public MockRSProcedureDispatcher(final MasterServices master) {
+ super(master);
+ }
+
+ public void setMockRsExecutor(final MockRSExecutor mockRsExec) {
+ this.mockRsExec = mockRsExec;
+ }
+
+ @Override
+ protected void remoteDispatch(ServerName serverName, Set<RemoteProcedure>
remoteProcedures) {
+ submitTask(new MockRemoteCall(serverName, remoteProcedures));
+ }
+
+ private class MockRemoteCall extends ExecuteProceduresRemoteCall {
+ public MockRemoteCall(final ServerName serverName, final
Set<RemoteProcedure> operations) {
+ super(serverName, operations);
+ }
+
+ @Override
+ public void dispatchOpenRequests(MasterProcedureEnv env,
+ List<RegionOpenOperation> operations) {
+ request.addOpenRegion(buildOpenRegionRequest(env, getServerName(),
operations));
+ }
+
+ @Override
+ public void dispatchCloseRequests(MasterProcedureEnv env,
+ List<RegionCloseOperation> operations) {
+ for (RegionCloseOperation op : operations) {
+ request.addCloseRegion(op.buildCloseRegionRequest(getServerName()));
+ }
+ }
+
+ @Override
+ protected AdminProtos.ExecuteProceduresResponse sendRequest(final
ServerName serverName,
+ final AdminProtos.ExecuteProceduresRequest request) throws
IOException {
+ return mockRsExec.sendRequest(serverName, request);
+ }
+ }
+ }
+
+ protected void collectAssignmentManagerMetrics() {
+ assignSubmittedCount = assignProcMetrics.getSubmittedCounter().getCount();
+ assignFailedCount = assignProcMetrics.getFailedCounter().getCount();
+ unassignSubmittedCount =
unassignProcMetrics.getSubmittedCounter().getCount();
+ unassignFailedCount = unassignProcMetrics.getFailedCounter().getCount();
+ }
+}