This is an automated email from the ASF dual-hosted git repository.

bbeaudreault pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.6 by this push:
     new fdf27d16cf2 HBASE-28215: region reopen procedure batching/throttling 
(#5534)
fdf27d16cf2 is described below

commit fdf27d16cf2d69e11309d1e3381590136f735efb
Author: Ray Mattingly <[email protected]>
AuthorDate: Mon Dec 4 09:25:09 2023 -0500

    HBASE-28215: region reopen procedure batching/throttling (#5534)
    
    Signed-off-by: Bryan Beaudreault <[email protected]>
---
 .../master/procedure/ModifyTableProcedure.java     |  14 +-
 .../procedure/ReopenTableRegionsProcedure.java     | 151 +++++++++++---
 ...estReopenTableRegionsProcedureBatchBackoff.java | 103 ++++++++++
 .../TestReopenTableRegionsProcedureBatching.java   | 216 +++++++++++++++++++++
 4 files changed, 458 insertions(+), 26 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java
index 6b86b4fbf02..bc907154e7e 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java
@@ -17,6 +17,11 @@
  */
 package org.apache.hadoop.hbase.master.procedure;
 
+import static 
org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure.PROGRESSIVE_BATCH_BACKOFF_MILLIS_DEFAULT;
+import static 
org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure.PROGRESSIVE_BATCH_BACKOFF_MILLIS_KEY;
+import static 
org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure.PROGRESSIVE_BATCH_SIZE_MAX_DISABLED;
+import static 
org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure.PROGRESSIVE_BATCH_SIZE_MAX_KEY;
+
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
@@ -25,6 +30,7 @@ import java.util.List;
 import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.ConcurrentTableModificationException;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseIOException;
@@ -220,7 +226,13 @@ public class ModifyTableProcedure extends 
AbstractStateMachineTableProcedure<Mod
           break;
         case MODIFY_TABLE_REOPEN_ALL_REGIONS:
           if (isTableEnabled(env)) {
-            addChildProcedure(new ReopenTableRegionsProcedure(getTableName()));
+            Configuration conf = env.getMasterConfiguration();
+            long backoffMillis = 
conf.getLong(PROGRESSIVE_BATCH_BACKOFF_MILLIS_KEY,
+              PROGRESSIVE_BATCH_BACKOFF_MILLIS_DEFAULT);
+            int batchSizeMax =
+              conf.getInt(PROGRESSIVE_BATCH_SIZE_MAX_KEY, 
PROGRESSIVE_BATCH_SIZE_MAX_DISABLED);
+            addChildProcedure(
+              new ReopenTableRegionsProcedure(getTableName(), backoffMillis, 
batchSizeMax));
           }
           setNextState(ModifyTableState.MODIFY_TABLE_ASSIGN_NEW_REPLICAS);
           break;
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ReopenTableRegionsProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ReopenTableRegionsProcedure.java
index 4efb1768b0c..353636e6ddd 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ReopenTableRegionsProcedure.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ReopenTableRegionsProcedure.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.master.procedure;
 
+import com.google.errorprone.annotations.RestrictedApi;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -53,6 +54,17 @@ public class ReopenTableRegionsProcedure
 
   private static final Logger LOG = 
LoggerFactory.getLogger(ReopenTableRegionsProcedure.class);
 
+  public static final String PROGRESSIVE_BATCH_BACKOFF_MILLIS_KEY =
+    "hbase.reopen.table.regions.progressive.batch.backoff.ms";
+  public static final long PROGRESSIVE_BATCH_BACKOFF_MILLIS_DEFAULT = 0L;
+  public static final String PROGRESSIVE_BATCH_SIZE_MAX_KEY =
+    "hbase.reopen.table.regions.progressive.batch.size.max";
+  public static final int PROGRESSIVE_BATCH_SIZE_MAX_DISABLED = -1;
+  private static final int PROGRESSIVE_BATCH_SIZE_MAX_DEFAULT_VALUE = 
Integer.MAX_VALUE;
+
+  // this minimum prevents a max which would break this procedure
+  private static final int MINIMUM_BATCH_SIZE_MAX = 1;
+
   private TableName tableName;
 
   // Specify specific regions of a table to reopen.
@@ -61,20 +73,46 @@ public class ReopenTableRegionsProcedure
 
   private List<HRegionLocation> regions = Collections.emptyList();
 
+  private List<HRegionLocation> currentRegionBatch = Collections.emptyList();
+
   private RetryCounter retryCounter;
 
+  private long reopenBatchBackoffMillis;
+  private int reopenBatchSize;
+  private int reopenBatchSizeMax;
+  private long regionsReopened = 0;
+  private long batchesProcessed = 0;
+
   public ReopenTableRegionsProcedure() {
-    regionNames = Collections.emptyList();
+    this(null);
   }
 
   public ReopenTableRegionsProcedure(TableName tableName) {
-    this.tableName = tableName;
-    this.regionNames = Collections.emptyList();
+    this(tableName, Collections.emptyList());
   }
 
   public ReopenTableRegionsProcedure(final TableName tableName, final 
List<byte[]> regionNames) {
+    this(tableName, regionNames, PROGRESSIVE_BATCH_BACKOFF_MILLIS_DEFAULT,
+      PROGRESSIVE_BATCH_SIZE_MAX_DISABLED);
+  }
+
+  public ReopenTableRegionsProcedure(final TableName tableName, long 
reopenBatchBackoffMillis,
+    int reopenBatchSizeMax) {
+    this(tableName, Collections.emptyList(), reopenBatchBackoffMillis, 
reopenBatchSizeMax);
+  }
+
+  public ReopenTableRegionsProcedure(final TableName tableName, final 
List<byte[]> regionNames,
+    long reopenBatchBackoffMillis, int reopenBatchSizeMax) {
     this.tableName = tableName;
     this.regionNames = regionNames;
+    this.reopenBatchBackoffMillis = reopenBatchBackoffMillis;
+    if (reopenBatchSizeMax == PROGRESSIVE_BATCH_SIZE_MAX_DISABLED) {
+      this.reopenBatchSize = Integer.MAX_VALUE;
+      this.reopenBatchSizeMax = Integer.MAX_VALUE;
+    } else {
+      this.reopenBatchSize = 1;
+      this.reopenBatchSizeMax = Math.max(reopenBatchSizeMax, 
MINIMUM_BATCH_SIZE_MAX);
+    }
   }
 
   @Override
@@ -87,6 +125,30 @@ public class ReopenTableRegionsProcedure
     return TableOperationType.REGION_EDIT;
   }
 
+  @RestrictedApi(explanation = "Should only be called in tests", link = "",
+      allowedOnPath = ".*/src/test/.*")
+  public long getRegionsReopened() {
+    return regionsReopened;
+  }
+
+  @RestrictedApi(explanation = "Should only be called in tests", link = "",
+      allowedOnPath = ".*/src/test/.*")
+  public long getBatchesProcessed() {
+    return batchesProcessed;
+  }
+
+  @RestrictedApi(explanation = "Should only be called internally or in tests", 
link = "",
+      allowedOnPath = ".*(/src/test/.*|ReopenTableRegionsProcedure).java")
+  protected int progressBatchSize() {
+    int previousBatchSize = reopenBatchSize;
+    reopenBatchSize = Math.min(reopenBatchSizeMax, 2 * reopenBatchSize);
+    if (reopenBatchSize < previousBatchSize) {
+      // the batch size should never decrease. this must be overflow, so just 
use max
+      reopenBatchSize = reopenBatchSizeMax;
+    }
+    return reopenBatchSize;
+  }
+
   private boolean canSchedule(MasterProcedureEnv env, HRegionLocation loc) {
     if (loc.getSeqNum() < 0) {
       return false;
@@ -114,7 +176,13 @@ public class ReopenTableRegionsProcedure
         
setNextState(ReopenTableRegionsState.REOPEN_TABLE_REGIONS_REOPEN_REGIONS);
         return Flow.HAS_MORE_STATE;
       case REOPEN_TABLE_REGIONS_REOPEN_REGIONS:
-        for (HRegionLocation loc : regions) {
+        // if we didn't finish reopening the last batch yet, let's keep trying 
until we do.
+        // at that point, the batch will be empty and we can generate a new 
batch
+        if (!regions.isEmpty() && currentRegionBatch.isEmpty()) {
+          currentRegionBatch = 
regions.stream().limit(reopenBatchSize).collect(Collectors.toList());
+          batchesProcessed++;
+        }
+        for (HRegionLocation loc : currentRegionBatch) {
           RegionStateNode regionNode =
             
env.getAssignmentManager().getRegionStates().getRegionStateNode(loc.getRegion());
           // this possible, maybe the region has already been merged or split, 
see HBASE-20921
@@ -133,39 +201,72 @@ public class ReopenTableRegionsProcedure
             regionNode.unlock();
           }
           addChildProcedure(proc);
+          regionsReopened++;
         }
         
setNextState(ReopenTableRegionsState.REOPEN_TABLE_REGIONS_CONFIRM_REOPENED);
         return Flow.HAS_MORE_STATE;
       case REOPEN_TABLE_REGIONS_CONFIRM_REOPENED:
-        regions = 
regions.stream().map(env.getAssignmentManager().getRegionStates()::checkReopened)
-          .filter(l -> l != null).collect(Collectors.toList());
+        // update region lists based on what's been reopened
+        regions = filterReopened(env, regions);
+        currentRegionBatch = filterReopened(env, currentRegionBatch);
+
+        // existing batch didn't fully reopen, so try to resolve that first.
+        // since this is a retry, don't do the batch backoff
+        if (!currentRegionBatch.isEmpty()) {
+          return reopenIfSchedulable(env, currentRegionBatch, false);
+        }
+
         if (regions.isEmpty()) {
           return Flow.NO_MORE_STATE;
         }
-        if (regions.stream().anyMatch(loc -> canSchedule(env, loc))) {
-          retryCounter = null;
-          
setNextState(ReopenTableRegionsState.REOPEN_TABLE_REGIONS_REOPEN_REGIONS);
-          return Flow.HAS_MORE_STATE;
-        }
-        // We can not schedule TRSP for all the regions need to reopen, wait 
for a while and retry
-        // again.
-        if (retryCounter == null) {
-          retryCounter = 
ProcedureUtil.createRetryCounter(env.getMasterConfiguration());
-        }
-        long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
-        LOG.info(
-          "There are still {} region(s) which need to be reopened for table {} 
are in "
-            + "OPENING state, suspend {}secs and try again later",
-          regions.size(), tableName, backoff / 1000);
-        setTimeout(Math.toIntExact(backoff));
-        setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
-        skipPersistence();
-        throw new ProcedureSuspendedException();
+
+        // current batch is finished, schedule more regions
+        return reopenIfSchedulable(env, regions, true);
       default:
         throw new UnsupportedOperationException("unhandled state=" + state);
     }
   }
 
+  private List<HRegionLocation> filterReopened(MasterProcedureEnv env,
+    List<HRegionLocation> regionsToCheck) {
+    return 
regionsToCheck.stream().map(env.getAssignmentManager().getRegionStates()::checkReopened)
+      .filter(l -> l != null).collect(Collectors.toList());
+  }
+
+  private Flow reopenIfSchedulable(MasterProcedureEnv env, 
List<HRegionLocation> regionsToReopen,
+    boolean shouldBatchBackoff) throws ProcedureSuspendedException {
+    if (regionsToReopen.stream().anyMatch(loc -> canSchedule(env, loc))) {
+      retryCounter = null;
+      
setNextState(ReopenTableRegionsState.REOPEN_TABLE_REGIONS_REOPEN_REGIONS);
+      if (shouldBatchBackoff && reopenBatchBackoffMillis > 0) {
+        progressBatchSize();
+        setBackoffState(reopenBatchBackoffMillis);
+        throw new ProcedureSuspendedException();
+      } else {
+        return Flow.HAS_MORE_STATE;
+      }
+    }
+
+    // We can not schedule TRSP for all the regions need to reopen, wait for a 
while and retry
+    // again.
+    if (retryCounter == null) {
+      retryCounter = 
ProcedureUtil.createRetryCounter(env.getMasterConfiguration());
+    }
+    long backoffMillis = retryCounter.getBackoffTimeAndIncrementAttempts();
+    LOG.info(
+      "There are still {} region(s) which need to be reopened for table {}. {} 
are in "
+        + "OPENING state, suspend {}secs and try again later",
+      regions.size(), tableName, currentRegionBatch.size(), backoffMillis / 
1000);
+    setBackoffState(backoffMillis);
+    throw new ProcedureSuspendedException();
+  }
+
+  private void setBackoffState(long millis) {
+    setTimeout(Math.toIntExact(millis));
+    setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
+    skipPersistence();
+  }
+
   private List<HRegionLocation>
     getRegionLocationsForReopen(List<HRegionLocation> tableRegionsForReopen) {
 
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestReopenTableRegionsProcedureBatchBackoff.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestReopenTableRegionsProcedureBatchBackoff.java
new file mode 100644
index 00000000000..410f56022d0
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestReopenTableRegionsProcedureBatchBackoff.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.procedure;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.master.ServerManager;
+import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Confirm that we will rate limit reopen batches when reopening all table 
regions. This can avoid
+ * the pain associated with reopening too many regions at once.
+ */
+@Category({ MasterTests.class, MediumTests.class })
+public class TestReopenTableRegionsProcedureBatchBackoff {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    
HBaseClassTestRule.forClass(TestReopenTableRegionsProcedureBatchBackoff.class);
+
+  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+  private static TableName TABLE_NAME = TableName.valueOf("BatchBackoff");
+  private static final int BACKOFF_MILLIS_PER_RS = 3_000;
+  private static final int REOPEN_BATCH_SIZE = 1;
+
+  private static byte[] CF = Bytes.toBytes("cf");
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    Configuration conf = UTIL.getConfiguration();
+    conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 1);
+    UTIL.startMiniCluster(1);
+    UTIL.createMultiRegionTable(TABLE_NAME, CF, 10);
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testRegionBatchBackoff() throws IOException {
+    ProcedureExecutor<MasterProcedureEnv> procExec =
+      UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor();
+    List<RegionInfo> regions = UTIL.getAdmin().getRegions(TABLE_NAME);
+    assertTrue(10 <= regions.size());
+    ReopenTableRegionsProcedure proc =
+      new ReopenTableRegionsProcedure(TABLE_NAME, BACKOFF_MILLIS_PER_RS, 
REOPEN_BATCH_SIZE);
+    procExec.submitProcedure(proc);
+    Instant startedAt = Instant.now();
+    ProcedureSyncWait.waitForProcedureToComplete(procExec, proc, 60_000);
+    Instant stoppedAt = Instant.now();
+    assertTrue(Duration.between(startedAt, stoppedAt).toMillis()
+        > (long) regions.size() * BACKOFF_MILLIS_PER_RS);
+  }
+
+  @Test
+  public void testRegionBatchNoBackoff() throws IOException {
+    ProcedureExecutor<MasterProcedureEnv> procExec =
+      UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor();
+    List<RegionInfo> regions = UTIL.getAdmin().getRegions(TABLE_NAME);
+    assertTrue(10 <= regions.size());
+    int noBackoffMillis = 0;
+    ReopenTableRegionsProcedure proc =
+      new ReopenTableRegionsProcedure(TABLE_NAME, noBackoffMillis, 
REOPEN_BATCH_SIZE);
+    procExec.submitProcedure(proc);
+    ProcedureSyncWait.waitForProcedureToComplete(procExec, proc,
+      (long) regions.size() * BACKOFF_MILLIS_PER_RS);
+  }
+}
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestReopenTableRegionsProcedureBatching.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestReopenTableRegionsProcedureBatching.java
new file mode 100644
index 00000000000..32b1d38b280
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestReopenTableRegionsProcedureBatching.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.procedure;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.master.RegionState.State;
+import org.apache.hadoop.hbase.master.ServerManager;
+import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
+import org.apache.hadoop.hbase.master.assignment.RegionStateNode;
+import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
+
+/**
+ * Confirm that we will batch region reopens when reopening all table regions. 
This can avoid the
+ * pain associated with reopening too many regions at once.
+ */
+@Category({ MasterTests.class, MediumTests.class })
+public class TestReopenTableRegionsProcedureBatching {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestReopenTableRegionsProcedureBatching.class);
+
+  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+  private static final int BACKOFF_MILLIS_PER_RS = 0;
+  private static final int REOPEN_BATCH_SIZE_MAX = 1;
+
+  private static TableName TABLE_NAME = TableName.valueOf("Batching");
+
+  private static byte[] CF = Bytes.toBytes("cf");
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    Configuration conf = UTIL.getConfiguration();
+    conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 1);
+    UTIL.startMiniCluster(1);
+    UTIL.createMultiRegionTable(TABLE_NAME, CF);
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testSmallMaxBatchSize() throws IOException {
+    AssignmentManager am = 
UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager();
+    ProcedureExecutor<MasterProcedureEnv> procExec =
+      UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor();
+    List<RegionInfo> regions = UTIL.getAdmin().getRegions(TABLE_NAME);
+    assertTrue(2 <= regions.size());
+    Set<StuckRegion> stuckRegions =
+      regions.stream().map(r -> stickRegion(am, procExec, 
r)).collect(Collectors.toSet());
+    ReopenTableRegionsProcedure proc =
+      new ReopenTableRegionsProcedure(TABLE_NAME, BACKOFF_MILLIS_PER_RS, 
REOPEN_BATCH_SIZE_MAX);
+    procExec.submitProcedure(proc);
+    UTIL.waitFor(10000, () -> proc.getState() == 
ProcedureState.WAITING_TIMEOUT);
+
+    // the first batch should be small
+    confirmBatchSize(1, stuckRegions, proc);
+    ProcedureSyncWait.waitForProcedureToComplete(procExec, proc, 60_000);
+
+    // other batches should also be small
+    assertTrue(proc.getBatchesProcessed() >= regions.size());
+
+    // all regions should only be opened once
+    assertEquals(proc.getRegionsReopened(), regions.size());
+  }
+
+  @Test
+  public void testDefaultMaxBatchSize() throws IOException {
+    AssignmentManager am = 
UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager();
+    ProcedureExecutor<MasterProcedureEnv> procExec =
+      UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor();
+    List<RegionInfo> regions = UTIL.getAdmin().getRegions(TABLE_NAME);
+    assertTrue(2 <= regions.size());
+    Set<StuckRegion> stuckRegions =
+      regions.stream().map(r -> stickRegion(am, procExec, 
r)).collect(Collectors.toSet());
+    ReopenTableRegionsProcedure proc = new 
ReopenTableRegionsProcedure(TABLE_NAME);
+    procExec.submitProcedure(proc);
+    UTIL.waitFor(10000, () -> proc.getState() == 
ProcedureState.WAITING_TIMEOUT);
+
+    // the first batch should be large
+    confirmBatchSize(regions.size(), stuckRegions, proc);
+    ProcedureSyncWait.waitForProcedureToComplete(procExec, proc, 60_000);
+
+    // all regions should only be opened once
+    assertEquals(proc.getRegionsReopened(), regions.size());
+  }
+
+  @Test
+  public void testNegativeBatchSizeDoesNotBreak() throws IOException {
+    AssignmentManager am = 
UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager();
+    ProcedureExecutor<MasterProcedureEnv> procExec =
+      UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor();
+    List<RegionInfo> regions = UTIL.getAdmin().getRegions(TABLE_NAME);
+    assertTrue(2 <= regions.size());
+    Set<StuckRegion> stuckRegions =
+      regions.stream().map(r -> stickRegion(am, procExec, 
r)).collect(Collectors.toSet());
+    ReopenTableRegionsProcedure proc =
+      new ReopenTableRegionsProcedure(TABLE_NAME, BACKOFF_MILLIS_PER_RS, -100);
+    procExec.submitProcedure(proc);
+    UTIL.waitFor(10000, () -> proc.getState() == 
ProcedureState.WAITING_TIMEOUT);
+
+    // the first batch should be small
+    confirmBatchSize(1, stuckRegions, proc);
+    ProcedureSyncWait.waitForProcedureToComplete(procExec, proc, 60_000);
+
+    // other batches should also be small
+    assertTrue(proc.getBatchesProcessed() >= regions.size());
+
+    // all regions should only be opened once
+    assertEquals(proc.getRegionsReopened(), regions.size());
+  }
+
+  @Test
+  public void testBatchSizeDoesNotOverflow() {
+    ReopenTableRegionsProcedure proc =
+      new ReopenTableRegionsProcedure(TABLE_NAME, BACKOFF_MILLIS_PER_RS, 
Integer.MAX_VALUE);
+    int currentBatchSize = 1;
+    while (currentBatchSize < Integer.MAX_VALUE) {
+      currentBatchSize = proc.progressBatchSize();
+      assertTrue(currentBatchSize > 0);
+    }
+  }
+
+  private void confirmBatchSize(int expectedBatchSize, Set<StuckRegion> 
stuckRegions,
+    ReopenTableRegionsProcedure proc) {
+    while (true) {
+      if (proc.getBatchesProcessed() == 0) {
+        continue;
+      }
+      stuckRegions.forEach(this::unstickRegion);
+      UTIL.waitFor(5000, () -> expectedBatchSize == proc.getRegionsReopened());
+      break;
+    }
+  }
+
+  static class StuckRegion {
+    final TransitRegionStateProcedure trsp;
+    final RegionStateNode regionNode;
+    final long openSeqNum;
+
+    public StuckRegion(TransitRegionStateProcedure trsp, RegionStateNode 
regionNode,
+      long openSeqNum) {
+      this.trsp = trsp;
+      this.regionNode = regionNode;
+      this.openSeqNum = openSeqNum;
+    }
+  }
+
+  private StuckRegion stickRegion(AssignmentManager am,
+    ProcedureExecutor<MasterProcedureEnv> procExec, RegionInfo regionInfo) {
+    RegionStateNode regionNode = 
am.getRegionStates().getRegionStateNode(regionInfo);
+    TransitRegionStateProcedure trsp =
+      TransitRegionStateProcedure.unassign(procExec.getEnvironment(), 
regionInfo);
+    regionNode.lock();
+    long openSeqNum;
+    try {
+      openSeqNum = regionNode.getOpenSeqNum();
+      regionNode.setState(State.OPENING);
+      regionNode.setOpenSeqNum(-1L);
+      regionNode.setProcedure(trsp);
+    } finally {
+      regionNode.unlock();
+    }
+    return new StuckRegion(trsp, regionNode, openSeqNum);
+  }
+
+  private void unstickRegion(StuckRegion stuckRegion) {
+    stuckRegion.regionNode.lock();
+    try {
+      stuckRegion.regionNode.setState(State.OPEN);
+      stuckRegion.regionNode.setOpenSeqNum(stuckRegion.openSeqNum);
+      stuckRegion.regionNode.unsetProcedure(stuckRegion.trsp);
+    } finally {
+      stuckRegion.regionNode.unlock();
+    }
+  }
+}

Reply via email to