Repository: hbase
Updated Branches:
  refs/heads/branch-2.1 ea4194039 -> 2da6dbe56


HBASE-21172 Reimplement the retry backoff logic for ReopenTableRegionsProcedure


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/2da6dbe5
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/2da6dbe5
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/2da6dbe5

Branch: refs/heads/branch-2.1
Commit: 2da6dbe563fd67c34f2a489f1f742d31b00fa159
Parents: ea41940
Author: Duo Zhang <zhang...@apache.org>
Authored: Tue Sep 11 17:09:27 2018 +0800
Committer: Duo Zhang <zhang...@apache.org>
Committed: Wed Sep 12 16:01:55 2018 +0800

----------------------------------------------------------------------
 .../hadoop/hbase/procedure2/Procedure.java      |  11 +-
 .../hadoop/hbase/procedure2/ProcedureUtil.java  |  57 +++++----
 .../hbase/procedure2/TestProcedureUtil.java     |  15 ++-
 .../assignment/RegionTransitionProcedure.java   |   9 +-
 .../procedure/ReopenTableRegionsProcedure.java  |  31 +++--
 .../TestReopenTableRegionsProcedureBackoff.java | 118 +++++++++++++++++++
 6 files changed, 202 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/2da6dbe5/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
index b2685f6..a832c78 100644
--- 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
+++ 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
@@ -769,14 +769,21 @@ public abstract class Procedure<TEnvironment> implements 
Comparable<Procedure<TE
 
   /**
    * Called by the ProcedureExecutor when the timeout set by setTimeout() is 
expired.
+   * <p/>
+   * Another usage for this method is to implement retrying. A procedure can 
set the state to
+   * {@code WAITING_TIMEOUT} by calling {@code setState} method, and throw a
+   * {@link ProcedureSuspendedException} to halt the execution of the 
procedure, and do not forget a
+   * call {@link #setTimeout(int)} method to set the timeout. And you should 
also override this
+   * method to wake up the procedure, and also return false to tell the 
ProcedureExecutor that the
+   * timeout event has been handled.
    * @return true to let the framework handle the timeout as abort, false in 
case the procedure
    *         handled the timeout itself.
    */
   protected synchronized boolean setTimeoutFailure(TEnvironment env) {
     if (state == ProcedureState.WAITING_TIMEOUT) {
       long timeDiff = EnvironmentEdgeManager.currentTime() - lastUpdate;
-      setFailure("ProcedureExecutor", new TimeoutIOException(
-        "Operation timed out after " + StringUtils.humanTimeDiff(timeDiff)));
+      setFailure("ProcedureExecutor",
+        new TimeoutIOException("Operation timed out after " + 
StringUtils.humanTimeDiff(timeDiff)));
       return true;
     }
     return false;

http://git-wip-us.apache.org/repos/asf/hbase/blob/2da6dbe5/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureUtil.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureUtil.java
 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureUtil.java
index 8a438d4..8c8746e 100644
--- 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureUtil.java
+++ 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureUtil.java
@@ -44,40 +44,42 @@ public final class ProcedureUtil {
   // ==========================================================================
   //  Reflection helpers to create/validate a Procedure object
   // ==========================================================================
-  public static Procedure newProcedure(final String className) throws 
BadProcedureException {
+  private static Procedure<?> newProcedure(String className) throws 
BadProcedureException {
     try {
-      final Class<?> clazz = Class.forName(className);
+      Class<?> clazz = Class.forName(className);
       if (!Modifier.isPublic(clazz.getModifiers())) {
         throw new Exception("the " + clazz + " class is not public");
       }
 
-      final Constructor<?> ctor = clazz.getConstructor();
+      @SuppressWarnings("rawtypes")
+      Constructor<? extends Procedure> ctor = 
clazz.asSubclass(Procedure.class).getConstructor();
       assert ctor != null : "no constructor found";
       if (!Modifier.isPublic(ctor.getModifiers())) {
         throw new Exception("the " + clazz + " constructor is not public");
       }
-      return (Procedure)ctor.newInstance();
+      return ctor.newInstance();
     } catch (Exception e) {
-      throw new BadProcedureException("The procedure class " + className +
-          " must be accessible and have an empty constructor", e);
+      throw new BadProcedureException(
+        "The procedure class " + className + " must be accessible and have an 
empty constructor",
+        e);
     }
   }
 
-  public static void validateClass(final Procedure proc) throws 
BadProcedureException {
+  static void validateClass(Procedure<?> proc) throws BadProcedureException {
     try {
-      final Class<?> clazz = proc.getClass();
+      Class<?> clazz = proc.getClass();
       if (!Modifier.isPublic(clazz.getModifiers())) {
         throw new Exception("the " + clazz + " class is not public");
       }
 
-      final Constructor<?> ctor = clazz.getConstructor();
+      Constructor<?> ctor = clazz.getConstructor();
       assert ctor != null;
       if (!Modifier.isPublic(ctor.getModifiers())) {
         throw new Exception("the " + clazz + " constructor is not public");
       }
     } catch (Exception e) {
       throw new BadProcedureException("The procedure class " + 
proc.getClass().getName() +
-          " must be accessible and have an empty constructor", e);
+        " must be accessible and have an empty constructor", e);
     }
   }
 
@@ -150,9 +152,10 @@ public final class ProcedureUtil {
 
   /**
    * Helper to convert the procedure to protobuf.
+   * <p/>
    * Used by ProcedureStore implementations.
    */
-  public static ProcedureProtos.Procedure convertToProtoProcedure(final 
Procedure proc)
+  public static ProcedureProtos.Procedure convertToProtoProcedure(Procedure<?> 
proc)
       throws IOException {
     Preconditions.checkArgument(proc != null);
     validateClass(proc);
@@ -214,16 +217,17 @@ public final class ProcedureUtil {
 
   /**
    * Helper to convert the protobuf procedure.
+   * <p/>
    * Used by ProcedureStore implementations.
-   *
-   * TODO: OPTIMIZATION: some of the field never change during the execution
-   *                     (e.g. className, procId, parentId, ...).
-   *                     We can split in 'data' and 'state', and the store
-   *                     may take advantage of it by storing the data only on 
insert().
+   * <p/>
+   * TODO: OPTIMIZATION: some of the field never change during the execution 
(e.g. className,
+   * procId, parentId, ...). We can split in 'data' and 'state', and the store 
may take advantage of
+   * it by storing the data only on insert().
    */
-  public static Procedure convertToProcedure(final ProcedureProtos.Procedure 
proto) throws IOException {
+  public static Procedure<?> convertToProcedure(ProcedureProtos.Procedure 
proto)
+      throws IOException {
     // Procedure from class name
-    final Procedure proc = newProcedure(proto.getClassName());
+    Procedure<?> proc = newProcedure(proto.getClassName());
 
     // set fields
     proc.setProcId(proto.getProcId());
@@ -300,8 +304,7 @@ public final class ProcedureUtil {
   }
 
   public static LockServiceProtos.LockedResource convertToProtoLockedResource(
-      LockedResource lockedResource) throws IOException
-  {
+      LockedResource lockedResource) throws IOException {
     LockServiceProtos.LockedResource.Builder builder =
         LockServiceProtos.LockedResource.newBuilder();
 
@@ -328,4 +331,18 @@ public final class ProcedureUtil {
 
     return builder.build();
   }
+
+  /**
+   * Get an exponential backoff time, in milliseconds. The base unit is 1 
second, and the max
+   * backoff time is 10 minutes. This is the general backoff policy for most 
procedure
+   * implementation.
+   */
+  public static long getBackoffTimeMs(int attempts) {
+    long maxBackoffTime = 10L * 60 * 1000; // Ten minutes, hard coded for now.
+    // avoid overflow
+    if (attempts >= 30) {
+      return maxBackoffTime;
+    }
+    return Math.min((long) (1000 * Math.pow(2, attempts)), maxBackoffTime);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/2da6dbe5/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureUtil.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureUtil.java
 
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureUtil.java
index 0fcb4f4..6342bec 100644
--- 
a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureUtil.java
+++ 
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureUtil.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.procedure2;
 
 import static org.junit.Assert.assertEquals;
 
+import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import 
org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure;
 import org.apache.hadoop.hbase.testclassification.MasterTests;
@@ -29,12 +30,12 @@ import org.junit.experimental.categories.Category;
 
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
 
-@Category({MasterTests.class, SmallTests.class})
+@Category({ MasterTests.class, SmallTests.class })
 public class TestProcedureUtil {
 
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
-      HBaseClassTestRule.forClass(TestProcedureUtil.class);
+    HBaseClassTestRule.forClass(TestProcedureUtil.class);
 
   @Test
   public void testValidation() throws Exception {
@@ -57,6 +58,16 @@ public class TestProcedureUtil {
     assertEquals("Procedure protobuf does not match", proto1, proto2);
   }
 
+  @Test
+  public void testGetBackoffTimeMs() {
+    for (int i = 30; i < 1000; i++) {
+      assertEquals(TimeUnit.MINUTES.toMillis(10), 
ProcedureUtil.getBackoffTimeMs(30));
+    }
+    assertEquals(1000, ProcedureUtil.getBackoffTimeMs(0));
+    assertEquals(2000, ProcedureUtil.getBackoffTimeMs(1));
+    assertEquals(32000, ProcedureUtil.getBackoffTimeMs(5));
+  }
+
   public static class TestProcedureNoDefaultConstructor extends TestProcedure {
     public TestProcedureNoDefaultConstructor(int x) {}
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/2da6dbe5/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java
index c10bf2d..3b0735e 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java
@@ -31,6 +31,7 @@ import 
org.apache.hadoop.hbase.procedure2.FailedRemoteDispatchException;
 import org.apache.hadoop.hbase.procedure2.Procedure;
 import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
 import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
+import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
 import 
org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation;
 import 
org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure;
 import org.apache.hadoop.hbase.procedure2.RemoteProcedureException;
@@ -360,7 +361,7 @@ public abstract class RegionTransitionProcedure
       // If here, success so clear out the attempt counter so we start fresh 
each time we get stuck.
       this.attempt = 0;
     } catch (IOException e) {
-      long backoff = getBackoffTime(this.attempt++);
+      long backoff = ProcedureUtil.getBackoffTimeMs(this.attempt++);
       LOG.warn("Failed transition, suspend {}secs {}; {}; waiting on rectified 
condition fixed " +
               "by other Procedure or operator intervention", backoff / 1000, 
this,
           regionNode.toShortString(), e);
@@ -372,12 +373,6 @@ public abstract class RegionTransitionProcedure
     return new Procedure[] {this};
   }
 
-  private long getBackoffTime(int attempts) {
-    long backoffTime = (long)(1000 * Math.pow(2, attempts));
-    long maxBackoffTime = 60 * 60 * 1000; // An hour. Hard-coded for for now.
-    return backoffTime < maxBackoffTime? backoffTime: maxBackoffTime;
-  }
-
   /**
    * At end of timeout, wake ourselves up so we run again.
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/2da6dbe5/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ReopenTableRegionsProcedure.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ReopenTableRegionsProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ReopenTableRegionsProcedure.java
index 8f3aa22..8a31953 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ReopenTableRegionsProcedure.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ReopenTableRegionsProcedure.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.master.RegionPlan;
 import org.apache.hadoop.hbase.master.assignment.MoveRegionProcedure;
 import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
 import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
+import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
 import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
@@ -36,6 +37,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ReopenTableRegionsState;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ReopenTableRegionsStateData;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
 
 /**
  * Used for reopening the regions for a table.
@@ -52,6 +54,8 @@ public class ReopenTableRegionsProcedure
 
   private List<HRegionLocation> regions = Collections.emptyList();
 
+  private int attempt;
+
   public ReopenTableRegionsProcedure() {
   }
 
@@ -104,23 +108,34 @@ public class ReopenTableRegionsProcedure
           return Flow.NO_MORE_STATE;
         }
         if (regions.stream().anyMatch(l -> l.getSeqNum() >= 0)) {
+          attempt = 0;
           
setNextState(ReopenTableRegionsState.REOPEN_TABLE_REGIONS_REOPEN_REGIONS);
           return Flow.HAS_MORE_STATE;
         }
-        LOG.info("There are still {} region(s) which need to be reopened for 
table {} are in " +
-          "OPENING state, try again later", regions.size(), tableName);
         // All the regions need to reopen are in OPENING state which means we 
can not schedule any
-        // MRPs. Then sleep for one second, and yield the procedure to let 
other procedures run
-        // first and hope next time we can get some regions in other state to 
make progress.
-        // TODO: add a delay for ProcedureYieldException so that we do not 
need to sleep here which
-        // blocks a procedure worker.
-        Thread.sleep(1000);
-        throw new ProcedureYieldException();
+        // MRPs.
+        long backoff = ProcedureUtil.getBackoffTimeMs(this.attempt++);
+        LOG.info(
+          "There are still {} region(s) which need to be reopened for table {} 
are in " +
+            "OPENING state, suspend {}secs and try again later",
+          regions.size(), tableName, backoff / 1000);
+        setTimeout(Math.toIntExact(backoff));
+        setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
+        throw new ProcedureSuspendedException();
       default:
         throw new UnsupportedOperationException("unhandled state=" + state);
     }
   }
 
+  /**
+   * At end of timeout, wake ourselves up so we run again.
+   */
+  @Override
+  protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) {
+    setState(ProcedureProtos.ProcedureState.RUNNABLE);
+    env.getProcedureScheduler().addFront(this);
+    return false; // 'false' means that this procedure handled the timeout
+  }
   @Override
   protected void rollbackState(MasterProcedureEnv env, ReopenTableRegionsState 
state)
       throws IOException, InterruptedException {

http://git-wip-us.apache.org/repos/asf/hbase/blob/2da6dbe5/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestReopenTableRegionsProcedureBackoff.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestReopenTableRegionsProcedureBackoff.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestReopenTableRegionsProcedureBackoff.java
new file mode 100644
index 0000000..0154191
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestReopenTableRegionsProcedureBackoff.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.procedure;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.master.RegionState.State;
+import org.apache.hadoop.hbase.master.ServerManager;
+import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
+import org.apache.hadoop.hbase.master.assignment.RegionStates.RegionStateNode;
+import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
+
+/**
+ * Confirm that we will do backoff when retrying on reopening table regions, 
to avoid consuming all
+ * the CPUs.
+ */
+@Category({ MasterTests.class, MediumTests.class })
+public class TestReopenTableRegionsProcedureBackoff {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestReopenTableRegionsProcedureBackoff.class);
+
+  private static final Logger LOG =
+    LoggerFactory.getLogger(TestReopenTableRegionsProcedureBackoff.class);
+
+  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+  private static TableName TABLE_NAME = TableName.valueOf("Backoff");
+
+  private static byte[] CF = Bytes.toBytes("cf");
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    
UTIL.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 
1);
+    UTIL.startMiniCluster(1);
+    UTIL.createTable(TABLE_NAME, CF);
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testRetryBackoff() throws IOException, InterruptedException {
+    AssignmentManager am = 
UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager();
+    ProcedureExecutor<MasterProcedureEnv> procExec =
+      UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor();
+    RegionInfo regionInfo = UTIL.getAdmin().getRegions(TABLE_NAME).get(0);
+    RegionStateNode regionNode = 
am.getRegionStates().getRegionStateNode(regionInfo);
+    long openSeqNum;
+    synchronized (regionNode) {
+      openSeqNum = regionNode.getOpenSeqNum();
+      // make a fake state to let the procedure wait.
+      regionNode.setState(State.OPENING);
+      regionNode.setOpenSeqNum(-1L);
+    }
+    ReopenTableRegionsProcedure proc = new 
ReopenTableRegionsProcedure(TABLE_NAME);
+    procExec.submitProcedure(proc);
+    UTIL.waitFor(10000, () -> proc.getState() == 
ProcedureState.WAITING_TIMEOUT);
+    long oldTimeout = 0;
+    int timeoutIncrements = 0;
+    for (;;) {
+      long timeout = proc.getTimeout();
+      if (timeout > oldTimeout) {
+        LOG.info("Timeout incremented, was {}, now is {}, increments={}", 
timeout, oldTimeout,
+          timeoutIncrements);
+        oldTimeout = timeout;
+        timeoutIncrements++;
+        if (timeoutIncrements > 3) {
+          // If we incremented at least twice, break; the backoff is working.
+          break;
+        }
+      }
+      Thread.sleep(1000);
+    }
+    synchronized (regionNode) {
+      // reset to the correct state
+      regionNode.setState(State.OPEN);
+      regionNode.setOpenSeqNum(openSeqNum);
+    }
+    ProcedureSyncWait.waitForProcedureToComplete(procExec, proc, 60000);
+    assertTrue(regionNode.getOpenSeqNum() > openSeqNum);
+  }
+}

Reply via email to