This is an automated email from the ASF dual-hosted git repository.
bbeaudreault pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.6 by this push:
new fdf27d16cf2 HBASE-28215: region reopen procedure batching/throttling
(#5534)
fdf27d16cf2 is described below
commit fdf27d16cf2d69e11309d1e3381590136f735efb
Author: Ray Mattingly <[email protected]>
AuthorDate: Mon Dec 4 09:25:09 2023 -0500
HBASE-28215: region reopen procedure batching/throttling (#5534)
Signed-off-by: Bryan Beaudreault <[email protected]>
---
.../master/procedure/ModifyTableProcedure.java | 14 +-
.../procedure/ReopenTableRegionsProcedure.java | 151 +++++++++++---
...estReopenTableRegionsProcedureBatchBackoff.java | 103 ++++++++++
.../TestReopenTableRegionsProcedureBatching.java | 216 +++++++++++++++++++++
4 files changed, 458 insertions(+), 26 deletions(-)
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java
index 6b86b4fbf02..bc907154e7e 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java
@@ -17,6 +17,11 @@
*/
package org.apache.hadoop.hbase.master.procedure;
+import static
org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure.PROGRESSIVE_BATCH_BACKOFF_MILLIS_DEFAULT;
+import static
org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure.PROGRESSIVE_BATCH_BACKOFF_MILLIS_KEY;
+import static
org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure.PROGRESSIVE_BATCH_SIZE_MAX_DISABLED;
+import static
org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure.PROGRESSIVE_BATCH_SIZE_MAX_KEY;
+
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
@@ -25,6 +30,7 @@ import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ConcurrentTableModificationException;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseIOException;
@@ -220,7 +226,13 @@ public class ModifyTableProcedure extends
AbstractStateMachineTableProcedure<Mod
break;
case MODIFY_TABLE_REOPEN_ALL_REGIONS:
if (isTableEnabled(env)) {
- addChildProcedure(new ReopenTableRegionsProcedure(getTableName()));
+ Configuration conf = env.getMasterConfiguration();
+ long backoffMillis =
conf.getLong(PROGRESSIVE_BATCH_BACKOFF_MILLIS_KEY,
+ PROGRESSIVE_BATCH_BACKOFF_MILLIS_DEFAULT);
+ int batchSizeMax =
+ conf.getInt(PROGRESSIVE_BATCH_SIZE_MAX_KEY,
PROGRESSIVE_BATCH_SIZE_MAX_DISABLED);
+ addChildProcedure(
+ new ReopenTableRegionsProcedure(getTableName(), backoffMillis,
batchSizeMax));
}
setNextState(ModifyTableState.MODIFY_TABLE_ASSIGN_NEW_REPLICAS);
break;
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ReopenTableRegionsProcedure.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ReopenTableRegionsProcedure.java
index 4efb1768b0c..353636e6ddd 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ReopenTableRegionsProcedure.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ReopenTableRegionsProcedure.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.master.procedure;
+import com.google.errorprone.annotations.RestrictedApi;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@@ -53,6 +54,17 @@ public class ReopenTableRegionsProcedure
private static final Logger LOG =
LoggerFactory.getLogger(ReopenTableRegionsProcedure.class);
+ public static final String PROGRESSIVE_BATCH_BACKOFF_MILLIS_KEY =
+ "hbase.reopen.table.regions.progressive.batch.backoff.ms";
+ public static final long PROGRESSIVE_BATCH_BACKOFF_MILLIS_DEFAULT = 0L;
+ public static final String PROGRESSIVE_BATCH_SIZE_MAX_KEY =
+ "hbase.reopen.table.regions.progressive.batch.size.max";
+ public static final int PROGRESSIVE_BATCH_SIZE_MAX_DISABLED = -1;
+ private static final int PROGRESSIVE_BATCH_SIZE_MAX_DEFAULT_VALUE =
Integer.MAX_VALUE;
+
+ // this minimum prevents a max which would break this procedure
+ private static final int MINIMUM_BATCH_SIZE_MAX = 1;
+
private TableName tableName;
// Specify specific regions of a table to reopen.
@@ -61,20 +73,46 @@ public class ReopenTableRegionsProcedure
private List<HRegionLocation> regions = Collections.emptyList();
+ private List<HRegionLocation> currentRegionBatch = Collections.emptyList();
+
private RetryCounter retryCounter;
+ private long reopenBatchBackoffMillis;
+ private int reopenBatchSize;
+ private int reopenBatchSizeMax;
+ private long regionsReopened = 0;
+ private long batchesProcessed = 0;
+
public ReopenTableRegionsProcedure() {
- regionNames = Collections.emptyList();
+ this(null);
}
public ReopenTableRegionsProcedure(TableName tableName) {
- this.tableName = tableName;
- this.regionNames = Collections.emptyList();
+ this(tableName, Collections.emptyList());
}
public ReopenTableRegionsProcedure(final TableName tableName, final
List<byte[]> regionNames) {
+ this(tableName, regionNames, PROGRESSIVE_BATCH_BACKOFF_MILLIS_DEFAULT,
+ PROGRESSIVE_BATCH_SIZE_MAX_DISABLED);
+ }
+
+ public ReopenTableRegionsProcedure(final TableName tableName, long
reopenBatchBackoffMillis,
+ int reopenBatchSizeMax) {
+ this(tableName, Collections.emptyList(), reopenBatchBackoffMillis,
reopenBatchSizeMax);
+ }
+
+ public ReopenTableRegionsProcedure(final TableName tableName, final
List<byte[]> regionNames,
+ long reopenBatchBackoffMillis, int reopenBatchSizeMax) {
this.tableName = tableName;
this.regionNames = regionNames;
+ this.reopenBatchBackoffMillis = reopenBatchBackoffMillis;
+ if (reopenBatchSizeMax == PROGRESSIVE_BATCH_SIZE_MAX_DISABLED) {
+ this.reopenBatchSize = Integer.MAX_VALUE;
+ this.reopenBatchSizeMax = Integer.MAX_VALUE;
+ } else {
+ this.reopenBatchSize = 1;
+ this.reopenBatchSizeMax = Math.max(reopenBatchSizeMax,
MINIMUM_BATCH_SIZE_MAX);
+ }
}
@Override
@@ -87,6 +125,30 @@ public class ReopenTableRegionsProcedure
return TableOperationType.REGION_EDIT;
}
+ @RestrictedApi(explanation = "Should only be called in tests", link = "",
+ allowedOnPath = ".*/src/test/.*")
+ public long getRegionsReopened() {
+ return regionsReopened;
+ }
+
+ @RestrictedApi(explanation = "Should only be called in tests", link = "",
+ allowedOnPath = ".*/src/test/.*")
+ public long getBatchesProcessed() {
+ return batchesProcessed;
+ }
+
+ @RestrictedApi(explanation = "Should only be called internally or in tests",
link = "",
+ allowedOnPath = ".*(/src/test/.*|ReopenTableRegionsProcedure).java")
+ protected int progressBatchSize() {
+ int previousBatchSize = reopenBatchSize;
+ reopenBatchSize = Math.min(reopenBatchSizeMax, 2 * reopenBatchSize);
+ if (reopenBatchSize < previousBatchSize) {
+ // the batch size should never decrease. this must be overflow, so just
use max
+ reopenBatchSize = reopenBatchSizeMax;
+ }
+ return reopenBatchSize;
+ }
+
private boolean canSchedule(MasterProcedureEnv env, HRegionLocation loc) {
if (loc.getSeqNum() < 0) {
return false;
@@ -114,7 +176,13 @@ public class ReopenTableRegionsProcedure
setNextState(ReopenTableRegionsState.REOPEN_TABLE_REGIONS_REOPEN_REGIONS);
return Flow.HAS_MORE_STATE;
case REOPEN_TABLE_REGIONS_REOPEN_REGIONS:
- for (HRegionLocation loc : regions) {
+ // if we didn't finish reopening the last batch yet, let's keep trying
until we do.
+ // at that point, the batch will be empty and we can generate a new
batch
+ if (!regions.isEmpty() && currentRegionBatch.isEmpty()) {
+ currentRegionBatch =
regions.stream().limit(reopenBatchSize).collect(Collectors.toList());
+ batchesProcessed++;
+ }
+ for (HRegionLocation loc : currentRegionBatch) {
RegionStateNode regionNode =
env.getAssignmentManager().getRegionStates().getRegionStateNode(loc.getRegion());
// this possible, maybe the region has already been merged or split,
see HBASE-20921
@@ -133,39 +201,72 @@ public class ReopenTableRegionsProcedure
regionNode.unlock();
}
addChildProcedure(proc);
+ regionsReopened++;
}
setNextState(ReopenTableRegionsState.REOPEN_TABLE_REGIONS_CONFIRM_REOPENED);
return Flow.HAS_MORE_STATE;
case REOPEN_TABLE_REGIONS_CONFIRM_REOPENED:
- regions =
regions.stream().map(env.getAssignmentManager().getRegionStates()::checkReopened)
- .filter(l -> l != null).collect(Collectors.toList());
+ // update region lists based on what's been reopened
+ regions = filterReopened(env, regions);
+ currentRegionBatch = filterReopened(env, currentRegionBatch);
+
+ // existing batch didn't fully reopen, so try to resolve that first.
+ // since this is a retry, don't do the batch backoff
+ if (!currentRegionBatch.isEmpty()) {
+ return reopenIfSchedulable(env, currentRegionBatch, false);
+ }
+
if (regions.isEmpty()) {
return Flow.NO_MORE_STATE;
}
- if (regions.stream().anyMatch(loc -> canSchedule(env, loc))) {
- retryCounter = null;
-
setNextState(ReopenTableRegionsState.REOPEN_TABLE_REGIONS_REOPEN_REGIONS);
- return Flow.HAS_MORE_STATE;
- }
- // We can not schedule TRSP for all the regions need to reopen, wait
for a while and retry
- // again.
- if (retryCounter == null) {
- retryCounter =
ProcedureUtil.createRetryCounter(env.getMasterConfiguration());
- }
- long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
- LOG.info(
- "There are still {} region(s) which need to be reopened for table {}
are in "
- + "OPENING state, suspend {}secs and try again later",
- regions.size(), tableName, backoff / 1000);
- setTimeout(Math.toIntExact(backoff));
- setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
- skipPersistence();
- throw new ProcedureSuspendedException();
+
+ // current batch is finished, schedule more regions
+ return reopenIfSchedulable(env, regions, true);
default:
throw new UnsupportedOperationException("unhandled state=" + state);
}
}
+ private List<HRegionLocation> filterReopened(MasterProcedureEnv env,
+ List<HRegionLocation> regionsToCheck) {
+ return
regionsToCheck.stream().map(env.getAssignmentManager().getRegionStates()::checkReopened)
+ .filter(l -> l != null).collect(Collectors.toList());
+ }
+
+ private Flow reopenIfSchedulable(MasterProcedureEnv env,
List<HRegionLocation> regionsToReopen,
+ boolean shouldBatchBackoff) throws ProcedureSuspendedException {
+ if (regionsToReopen.stream().anyMatch(loc -> canSchedule(env, loc))) {
+ retryCounter = null;
+
setNextState(ReopenTableRegionsState.REOPEN_TABLE_REGIONS_REOPEN_REGIONS);
+ if (shouldBatchBackoff && reopenBatchBackoffMillis > 0) {
+ progressBatchSize();
+ setBackoffState(reopenBatchBackoffMillis);
+ throw new ProcedureSuspendedException();
+ } else {
+ return Flow.HAS_MORE_STATE;
+ }
+ }
+
+ // We can not schedule TRSP for all the regions need to reopen, wait for a
while and retry
+ // again.
+ if (retryCounter == null) {
+ retryCounter =
ProcedureUtil.createRetryCounter(env.getMasterConfiguration());
+ }
+ long backoffMillis = retryCounter.getBackoffTimeAndIncrementAttempts();
+ LOG.info(
+ "There are still {} region(s) which need to be reopened for table {}. {}
are in "
+ + "OPENING state, suspend {}secs and try again later",
+ regions.size(), tableName, currentRegionBatch.size(), backoffMillis /
1000);
+ setBackoffState(backoffMillis);
+ throw new ProcedureSuspendedException();
+ }
+
+ private void setBackoffState(long millis) {
+ setTimeout(Math.toIntExact(millis));
+ setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
+ skipPersistence();
+ }
+
private List<HRegionLocation>
getRegionLocationsForReopen(List<HRegionLocation> tableRegionsForReopen) {
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestReopenTableRegionsProcedureBatchBackoff.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestReopenTableRegionsProcedureBatchBackoff.java
new file mode 100644
index 00000000000..410f56022d0
--- /dev/null
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestReopenTableRegionsProcedureBatchBackoff.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.procedure;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.master.ServerManager;
+import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Confirm that we will rate limit reopen batches when reopening all table
regions. This can avoid
+ * the pain associated with reopening too many regions at once.
+ */
+@Category({ MasterTests.class, MediumTests.class })
+public class TestReopenTableRegionsProcedureBatchBackoff {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+
HBaseClassTestRule.forClass(TestReopenTableRegionsProcedureBatchBackoff.class);
+
+ private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+ private static TableName TABLE_NAME = TableName.valueOf("BatchBackoff");
+ private static final int BACKOFF_MILLIS_PER_RS = 3_000;
+ private static final int REOPEN_BATCH_SIZE = 1;
+
+ private static byte[] CF = Bytes.toBytes("cf");
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ Configuration conf = UTIL.getConfiguration();
+ conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 1);
+ UTIL.startMiniCluster(1);
+ UTIL.createMultiRegionTable(TABLE_NAME, CF, 10);
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ UTIL.shutdownMiniCluster();
+ }
+
+ @Test
+ public void testRegionBatchBackoff() throws IOException {
+ ProcedureExecutor<MasterProcedureEnv> procExec =
+ UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor();
+ List<RegionInfo> regions = UTIL.getAdmin().getRegions(TABLE_NAME);
+ assertTrue(10 <= regions.size());
+ ReopenTableRegionsProcedure proc =
+ new ReopenTableRegionsProcedure(TABLE_NAME, BACKOFF_MILLIS_PER_RS,
REOPEN_BATCH_SIZE);
+ procExec.submitProcedure(proc);
+ Instant startedAt = Instant.now();
+ ProcedureSyncWait.waitForProcedureToComplete(procExec, proc, 60_000);
+ Instant stoppedAt = Instant.now();
+ assertTrue(Duration.between(startedAt, stoppedAt).toMillis()
+ > (long) regions.size() * BACKOFF_MILLIS_PER_RS);
+ }
+
+ @Test
+ public void testRegionBatchNoBackoff() throws IOException {
+ ProcedureExecutor<MasterProcedureEnv> procExec =
+ UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor();
+ List<RegionInfo> regions = UTIL.getAdmin().getRegions(TABLE_NAME);
+ assertTrue(10 <= regions.size());
+ int noBackoffMillis = 0;
+ ReopenTableRegionsProcedure proc =
+ new ReopenTableRegionsProcedure(TABLE_NAME, noBackoffMillis,
REOPEN_BATCH_SIZE);
+ procExec.submitProcedure(proc);
+ ProcedureSyncWait.waitForProcedureToComplete(procExec, proc,
+ (long) regions.size() * BACKOFF_MILLIS_PER_RS);
+ }
+}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestReopenTableRegionsProcedureBatching.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestReopenTableRegionsProcedureBatching.java
new file mode 100644
index 00000000000..32b1d38b280
--- /dev/null
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestReopenTableRegionsProcedureBatching.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.procedure;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.master.RegionState.State;
+import org.apache.hadoop.hbase.master.ServerManager;
+import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
+import org.apache.hadoop.hbase.master.assignment.RegionStateNode;
+import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
+
+/**
+ * Confirm that we will batch region reopens when reopening all table regions.
This can avoid the
+ * pain associated with reopening too many regions at once.
+ */
+@Category({ MasterTests.class, MediumTests.class })
+public class TestReopenTableRegionsProcedureBatching {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestReopenTableRegionsProcedureBatching.class);
+
+ private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+ private static final int BACKOFF_MILLIS_PER_RS = 0;
+ private static final int REOPEN_BATCH_SIZE_MAX = 1;
+
+ private static TableName TABLE_NAME = TableName.valueOf("Batching");
+
+ private static byte[] CF = Bytes.toBytes("cf");
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ Configuration conf = UTIL.getConfiguration();
+ conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 1);
+ UTIL.startMiniCluster(1);
+ UTIL.createMultiRegionTable(TABLE_NAME, CF);
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ UTIL.shutdownMiniCluster();
+ }
+
+ @Test
+ public void testSmallMaxBatchSize() throws IOException {
+ AssignmentManager am =
UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager();
+ ProcedureExecutor<MasterProcedureEnv> procExec =
+ UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor();
+ List<RegionInfo> regions = UTIL.getAdmin().getRegions(TABLE_NAME);
+ assertTrue(2 <= regions.size());
+ Set<StuckRegion> stuckRegions =
+ regions.stream().map(r -> stickRegion(am, procExec,
r)).collect(Collectors.toSet());
+ ReopenTableRegionsProcedure proc =
+ new ReopenTableRegionsProcedure(TABLE_NAME, BACKOFF_MILLIS_PER_RS,
REOPEN_BATCH_SIZE_MAX);
+ procExec.submitProcedure(proc);
+ UTIL.waitFor(10000, () -> proc.getState() ==
ProcedureState.WAITING_TIMEOUT);
+
+ // the first batch should be small
+ confirmBatchSize(1, stuckRegions, proc);
+ ProcedureSyncWait.waitForProcedureToComplete(procExec, proc, 60_000);
+
+ // other batches should also be small
+ assertTrue(proc.getBatchesProcessed() >= regions.size());
+
+ // all regions should only be opened once
+ assertEquals(proc.getRegionsReopened(), regions.size());
+ }
+
+ @Test
+ public void testDefaultMaxBatchSize() throws IOException {
+ AssignmentManager am =
UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager();
+ ProcedureExecutor<MasterProcedureEnv> procExec =
+ UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor();
+ List<RegionInfo> regions = UTIL.getAdmin().getRegions(TABLE_NAME);
+ assertTrue(2 <= regions.size());
+ Set<StuckRegion> stuckRegions =
+ regions.stream().map(r -> stickRegion(am, procExec,
r)).collect(Collectors.toSet());
+ ReopenTableRegionsProcedure proc = new
ReopenTableRegionsProcedure(TABLE_NAME);
+ procExec.submitProcedure(proc);
+ UTIL.waitFor(10000, () -> proc.getState() ==
ProcedureState.WAITING_TIMEOUT);
+
+ // the first batch should be large
+ confirmBatchSize(regions.size(), stuckRegions, proc);
+ ProcedureSyncWait.waitForProcedureToComplete(procExec, proc, 60_000);
+
+ // all regions should only be opened once
+ assertEquals(proc.getRegionsReopened(), regions.size());
+ }
+
+ @Test
+ public void testNegativeBatchSizeDoesNotBreak() throws IOException {
+ AssignmentManager am =
UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager();
+ ProcedureExecutor<MasterProcedureEnv> procExec =
+ UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor();
+ List<RegionInfo> regions = UTIL.getAdmin().getRegions(TABLE_NAME);
+ assertTrue(2 <= regions.size());
+ Set<StuckRegion> stuckRegions =
+ regions.stream().map(r -> stickRegion(am, procExec,
r)).collect(Collectors.toSet());
+ ReopenTableRegionsProcedure proc =
+ new ReopenTableRegionsProcedure(TABLE_NAME, BACKOFF_MILLIS_PER_RS, -100);
+ procExec.submitProcedure(proc);
+ UTIL.waitFor(10000, () -> proc.getState() ==
ProcedureState.WAITING_TIMEOUT);
+
+ // the first batch should be small
+ confirmBatchSize(1, stuckRegions, proc);
+ ProcedureSyncWait.waitForProcedureToComplete(procExec, proc, 60_000);
+
+ // other batches should also be small
+ assertTrue(proc.getBatchesProcessed() >= regions.size());
+
+ // all regions should only be opened once
+ assertEquals(proc.getRegionsReopened(), regions.size());
+ }
+
+ @Test
+ public void testBatchSizeDoesNotOverflow() {
+ ReopenTableRegionsProcedure proc =
+ new ReopenTableRegionsProcedure(TABLE_NAME, BACKOFF_MILLIS_PER_RS,
Integer.MAX_VALUE);
+ int currentBatchSize = 1;
+ while (currentBatchSize < Integer.MAX_VALUE) {
+ currentBatchSize = proc.progressBatchSize();
+ assertTrue(currentBatchSize > 0);
+ }
+ }
+
+ private void confirmBatchSize(int expectedBatchSize, Set<StuckRegion>
stuckRegions,
+ ReopenTableRegionsProcedure proc) {
+ while (true) {
+ if (proc.getBatchesProcessed() == 0) {
+ continue;
+ }
+ stuckRegions.forEach(this::unstickRegion);
+ UTIL.waitFor(5000, () -> expectedBatchSize == proc.getRegionsReopened());
+ break;
+ }
+ }
+
+ static class StuckRegion {
+ final TransitRegionStateProcedure trsp;
+ final RegionStateNode regionNode;
+ final long openSeqNum;
+
+ public StuckRegion(TransitRegionStateProcedure trsp, RegionStateNode
regionNode,
+ long openSeqNum) {
+ this.trsp = trsp;
+ this.regionNode = regionNode;
+ this.openSeqNum = openSeqNum;
+ }
+ }
+
+ private StuckRegion stickRegion(AssignmentManager am,
+ ProcedureExecutor<MasterProcedureEnv> procExec, RegionInfo regionInfo) {
+ RegionStateNode regionNode =
am.getRegionStates().getRegionStateNode(regionInfo);
+ TransitRegionStateProcedure trsp =
+ TransitRegionStateProcedure.unassign(procExec.getEnvironment(),
regionInfo);
+ regionNode.lock();
+ long openSeqNum;
+ try {
+ openSeqNum = regionNode.getOpenSeqNum();
+ regionNode.setState(State.OPENING);
+ regionNode.setOpenSeqNum(-1L);
+ regionNode.setProcedure(trsp);
+ } finally {
+ regionNode.unlock();
+ }
+ return new StuckRegion(trsp, regionNode, openSeqNum);
+ }
+
+ private void unstickRegion(StuckRegion stuckRegion) {
+ stuckRegion.regionNode.lock();
+ try {
+ stuckRegion.regionNode.setState(State.OPEN);
+ stuckRegion.regionNode.setOpenSeqNum(stuckRegion.openSeqNum);
+ stuckRegion.regionNode.unsetProcedure(stuckRegion.trsp);
+ } finally {
+ stuckRegion.regionNode.unlock();
+ }
+ }
+}