Repository: hbase
Updated Branches:
  refs/heads/master fdbaa4c3f -> f9d51b67e


HBASE-21245 Add exponential backoff when retrying for sync replication related 
procedures


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/f9d51b67
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/f9d51b67
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/f9d51b67

Branch: refs/heads/master
Commit: f9d51b67e45f1f213c37b14bfe96e442bc71a995
Parents: fdbaa4c
Author: Guanghao Zhang <zg...@apache.org>
Authored: Sat Sep 29 09:51:57 2018 +0800
Committer: Guanghao Zhang <zg...@apache.org>
Committed: Sun Sep 30 20:05:34 2018 +0800

----------------------------------------------------------------------
 .../replication/AbstractPeerProcedure.java      |  30 ++++
 .../master/replication/ModifyPeerProcedure.java |  46 ++---
 .../SyncReplicationReplayWALProcedure.java      |  52 +++---
 ...ransitPeerSyncReplicationStateProcedure.java | 111 ++++++++----
 ...eerSyncReplicationStateProcedureBackoff.java | 172 +++++++++++++++++++
 5 files changed, 314 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/f9d51b67/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerProcedure.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerProcedure.java
index e133a65..d3a4eb8 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerProcedure.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerProcedure.java
@@ -18,14 +18,20 @@
 package org.apache.hadoop.hbase.master.replication;
 
 import java.io.IOException;
+
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
 import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface;
 import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
 import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
+import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
 import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
+import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.yetus.audience.InterfaceAudience;
 
+import 
org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerProcedureStateData;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
 
 /**
  * The base class for all replication peer related procedure.
@@ -39,6 +45,8 @@ public abstract class AbstractPeerProcedure<TState>
   // used to keep compatible with old client where we can only returns after 
updateStorage.
   protected ProcedurePrepareLatch latch;
 
+  protected int attempts;
+
   protected AbstractPeerProcedure() {
   }
 
@@ -106,4 +114,26 @@ public abstract class AbstractPeerProcedure<TState>
     
addChildProcedure(env.getMasterServices().getServerManager().getOnlineServersList().stream()
       .map(sn -> new RefreshPeerProcedure(peerId, type, 
sn)).toArray(RefreshPeerProcedure[]::new));
   }
+
+  @Override
+  protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) {
+    setState(ProcedureProtos.ProcedureState.RUNNABLE);
+    env.getProcedureScheduler().addFront(this);
+    return false;
+  }
+
+  protected final ProcedureSuspendedException suspend(long backoff)
+      throws ProcedureSuspendedException {
+    attempts++;
+    setTimeout(Math.toIntExact(backoff));
+    setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
+    skipPersistence();
+    throw new ProcedureSuspendedException();
+  }
+
+  // will be override in test to simulate error
+  @VisibleForTesting
+  protected void enablePeer(MasterProcedureEnv env) throws 
ReplicationException {
+    env.getReplicationPeerManager().enablePeer(peerId);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f9d51b67/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
index 7690c96..1aa86ed 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
@@ -45,7 +45,6 @@ import org.slf4j.LoggerFactory;
 import 
org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationState;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
 
 /**
  * The base class for all replication peer related procedure except sync 
replication state
@@ -61,8 +60,6 @@ public abstract class ModifyPeerProcedure extends 
AbstractPeerProcedure<PeerModi
   // The sleep interval when waiting table to be enabled or disabled.
   protected static final int SLEEP_INTERVAL_MS = 1000;
 
-  private int attemps;
-
   protected ModifyPeerProcedure() {
   }
 
@@ -172,12 +169,6 @@ public abstract class ModifyPeerProcedure extends 
AbstractPeerProcedure<PeerModi
     }
   }
 
-  // will be override in test to simulate error
-  @VisibleForTesting
-  protected void enablePeer(MasterProcedureEnv env) throws 
ReplicationException {
-    env.getReplicationPeerManager().enablePeer(peerId);
-  }
-
   private void addToMap(Map<String, Long> lastSeqIds, String 
encodedRegionName, long barrier,
       ReplicationQueueStorage queueStorage) throws ReplicationException {
     if (barrier >= 0) {
@@ -249,21 +240,6 @@ public abstract class ModifyPeerProcedure extends 
AbstractPeerProcedure<PeerModi
   }
 
   @Override
-  protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) {
-    setState(ProcedureProtos.ProcedureState.RUNNABLE);
-    env.getProcedureScheduler().addFront(this);
-    return false;
-  }
-
-  private ProcedureSuspendedException suspend(long backoff) throws 
ProcedureSuspendedException {
-    attemps++;
-    setTimeout(Math.toIntExact(backoff));
-    setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
-    skipPersistence();
-    throw new ProcedureSuspendedException();
-  }
-
-  @Override
   protected Flow executeFromState(MasterProcedureEnv env, 
PeerModificationState state)
       throws ProcedureSuspendedException {
     switch (state) {
@@ -277,24 +253,24 @@ public abstract class ModifyPeerProcedure extends 
AbstractPeerProcedure<PeerModi
           releaseLatch();
           return Flow.NO_MORE_STATE;
         } catch (ReplicationException e) {
-          long backoff = ProcedureUtil.getBackoffTimeMs(attemps);
+          long backoff = ProcedureUtil.getBackoffTimeMs(attempts);
           LOG.warn("{} failed to call prePeerModification for peer {}, sleep 
{} secs",
             getClass().getName(), peerId, backoff / 1000, e);
           throw suspend(backoff);
         }
-        attemps = 0;
+        attempts = 0;
         setNextState(PeerModificationState.UPDATE_PEER_STORAGE);
         return Flow.HAS_MORE_STATE;
       case UPDATE_PEER_STORAGE:
         try {
           updatePeerStorage(env);
         } catch (ReplicationException e) {
-          long backoff = ProcedureUtil.getBackoffTimeMs(attemps);
+          long backoff = ProcedureUtil.getBackoffTimeMs(attempts);
           LOG.warn("{} update peer storage for peer {} failed, sleep {} secs", 
getClass().getName(),
             peerId, backoff / 1000, e);
           throw suspend(backoff);
         }
-        attemps = 0;
+        attempts = 0;
         setNextState(PeerModificationState.REFRESH_PEER_ON_RS);
         return Flow.HAS_MORE_STATE;
       case REFRESH_PEER_ON_RS:
@@ -305,24 +281,24 @@ public abstract class ModifyPeerProcedure extends 
AbstractPeerProcedure<PeerModi
         try {
           reopenRegions(env);
         } catch (Exception e) {
-          long backoff = ProcedureUtil.getBackoffTimeMs(attemps);
+          long backoff = ProcedureUtil.getBackoffTimeMs(attempts);
           LOG.warn("{} reopen regions for peer {} failed,  sleep {} secs", 
getClass().getName(),
             peerId, backoff / 1000, e);
           throw suspend(backoff);
         }
-        attemps = 0;
+        attempts = 0;
         
setNextState(PeerModificationState.SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID);
         return Flow.HAS_MORE_STATE;
       case SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID:
         try {
           updateLastPushedSequenceIdForSerialPeer(env);
         } catch (Exception e) {
-          long backoff = ProcedureUtil.getBackoffTimeMs(attemps);
+          long backoff = ProcedureUtil.getBackoffTimeMs(attempts);
           LOG.warn("{} set last sequence id for peer {} failed,  sleep {} 
secs",
             getClass().getName(), peerId, backoff / 1000, e);
           throw suspend(backoff);
         }
-        attemps = 0;
+        attempts = 0;
         setNextState(enablePeerBeforeFinish() ? 
PeerModificationState.SERIAL_PEER_SET_PEER_ENABLED
           : PeerModificationState.POST_PEER_MODIFICATION);
         return Flow.HAS_MORE_STATE;
@@ -330,12 +306,12 @@ public abstract class ModifyPeerProcedure extends 
AbstractPeerProcedure<PeerModi
         try {
           enablePeer(env);
         } catch (ReplicationException e) {
-          long backoff = ProcedureUtil.getBackoffTimeMs(attemps);
+          long backoff = ProcedureUtil.getBackoffTimeMs(attempts);
           LOG.warn("{} enable peer before finish for peer {} failed,  sleep {} 
secs",
             getClass().getName(), peerId, backoff / 1000, e);
           throw suspend(backoff);
         }
-        attemps = 0;
+        attempts = 0;
         
setNextState(PeerModificationState.SERIAL_PEER_ENABLE_PEER_REFRESH_PEER_ON_RS);
         return Flow.HAS_MORE_STATE;
       case SERIAL_PEER_ENABLE_PEER_REFRESH_PEER_ON_RS:
@@ -346,7 +322,7 @@ public abstract class ModifyPeerProcedure extends 
AbstractPeerProcedure<PeerModi
         try {
           postPeerModification(env);
         } catch (ReplicationException e) {
-          long backoff = ProcedureUtil.getBackoffTimeMs(attemps);
+          long backoff = ProcedureUtil.getBackoffTimeMs(attempts);
           LOG.warn("{} failed to call postPeerModification for peer {},  sleep 
{} secs",
             getClass().getName(), peerId, backoff / 1000, e);
           throw suspend(backoff);

http://git-wip-us.apache.org/repos/asf/hbase/blob/f9d51b67/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALProcedure.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALProcedure.java
index 26d6a3f..21a8c81 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALProcedure.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALProcedure.java
@@ -23,11 +23,9 @@ import java.util.List;
 
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
-import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface;
 import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
 import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
-import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
-import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
@@ -38,14 +36,11 @@ import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.S
 
 @InterfaceAudience.Private
 public class SyncReplicationReplayWALProcedure
-    extends StateMachineProcedure<MasterProcedureEnv, 
SyncReplicationReplayWALState>
-    implements PeerProcedureInterface {
+    extends AbstractPeerProcedure<SyncReplicationReplayWALState> {
 
   private static final Logger LOG =
       LoggerFactory.getLogger(SyncReplicationReplayWALProcedure.class);
 
-  private String peerId;
-
   private ServerName worker = null;
 
   private List<String> wals;
@@ -58,9 +53,9 @@ public class SyncReplicationReplayWALProcedure
     this.wals = wals;
   }
 
-  @Override protected Flow executeFromState(MasterProcedureEnv env,
-      SyncReplicationReplayWALState state)
-      throws ProcedureSuspendedException, ProcedureYieldException, 
InterruptedException {
+  @Override
+  protected Flow executeFromState(MasterProcedureEnv env, 
SyncReplicationReplayWALState state)
+      throws ProcedureSuspendedException {
     SyncReplicationReplayWALManager syncReplicationReplayWALManager =
         env.getMasterServices().getSyncReplicationReplayWALManager();
     switch (state) {
@@ -68,15 +63,19 @@ public class SyncReplicationReplayWALProcedure
         try {
           worker = syncReplicationReplayWALManager.getPeerWorker(peerId);
         } catch (ReplicationException e) {
-          LOG.info("Failed to get worker to replay wals {} for peer id={}, 
retry", wals, peerId);
-          throw new ProcedureYieldException();
+          long backoff = ProcedureUtil.getBackoffTimeMs(attempts);
+          LOG.warn("Failed to get worker to replay wals {} for peer id={}, 
sleep {} secs and retry",
+            wals, peerId, backoff / 1000, e);
+          throw suspend(backoff);
         }
         if (worker == null) {
-          LOG.info("No worker to replay wals {} for peer id={}, retry", wals, 
peerId);
-          setNextState(SyncReplicationReplayWALState.ASSIGN_WORKER);
-        } else {
-          setNextState(SyncReplicationReplayWALState.DISPATCH_WALS_TO_WORKER);
+          long backoff = ProcedureUtil.getBackoffTimeMs(attempts);
+          LOG.info("No worker to replay wals {} for peer id={}, sleep {} secs 
and retry", wals,
+            peerId, backoff / 1000);
+          throw suspend(backoff);
         }
+        attempts = 0;
+        setNextState(SyncReplicationReplayWALState.DISPATCH_WALS_TO_WORKER);
         return Flow.HAS_MORE_STATE;
       case DISPATCH_WALS_TO_WORKER:
         addChildProcedure(new SyncReplicationReplayWALRemoteProcedure(peerId, 
wals, worker));
@@ -87,17 +86,23 @@ public class SyncReplicationReplayWALProcedure
         try {
           finished = 
syncReplicationReplayWALManager.isReplayWALFinished(wals.get(0));
         } catch (IOException e) {
-          LOG.info("Failed to check whether replay wals {} finished for peer 
id={}", wals, peerId);
-          throw new ProcedureYieldException();
+          long backoff = ProcedureUtil.getBackoffTimeMs(attempts);
+          LOG.warn("Failed to check whether replay wals {} finished for peer 
id={}" +
+              ", sleep {} secs and retry",
+            wals, peerId, backoff / 1000, e);
+          throw suspend(backoff);
         }
         try {
           syncReplicationReplayWALManager.removePeerWorker(peerId, worker);
         } catch (ReplicationException e) {
-          LOG.info("Failed to remove worker for peer id={}, retry", peerId);
-          throw new ProcedureYieldException();
+          long backoff = ProcedureUtil.getBackoffTimeMs(attempts);
+          LOG.warn("Failed to remove worker {} for peer id={}, sleep {} secs 
and retry", worker,
+            peerId, backoff / 1000, e);
+          throw suspend(backoff);
         }
+        attempts = 0;
         if (!finished) {
-          LOG.info("Failed to replay wals {} for peer id={}, retry", wals, 
peerId);
+          LOG.warn("Failed to replay wals {} for peer id={}, retry", wals, 
peerId);
           setNextState(SyncReplicationReplayWALState.ASSIGN_WORKER);
           return Flow.HAS_MORE_STATE;
         }
@@ -153,11 +158,6 @@ public class SyncReplicationReplayWALProcedure
   }
 
   @Override
-  public String getPeerId() {
-    return peerId;
-  }
-
-  @Override
   public PeerOperationType getPeerOperationType() {
     return PeerOperationType.SYNC_REPLICATION_REPLAY_WAL;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f9d51b67/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
index c650974..8c6232f 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
@@ -28,7 +28,7 @@ import 
org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
 import org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure;
 import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
 import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
-import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
+import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
 import org.apache.hadoop.hbase.replication.ReplicationUtils;
@@ -37,6 +37,8 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import 
org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerSyncReplicationStateTransitionState;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.TransitPeerSyncReplicationStateStateData;
 
@@ -50,7 +52,7 @@ public class TransitPeerSyncReplicationStateProcedure
   private static final Logger LOG =
       LoggerFactory.getLogger(TransitPeerSyncReplicationStateProcedure.class);
 
-  private SyncReplicationState fromState;
+  protected SyncReplicationState fromState;
 
   private SyncReplicationState toState;
 
@@ -107,7 +109,8 @@ public class TransitPeerSyncReplicationStateProcedure
     return 
PeerSyncReplicationStateTransitionState.PRE_PEER_SYNC_REPLICATION_STATE_TRANSITION;
   }
 
-  private void preTransit(MasterProcedureEnv env) throws IOException {
+  @VisibleForTesting
+  protected void preTransit(MasterProcedureEnv env) throws IOException {
     MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
     if (cpHost != null) {
       cpHost.preTransitReplicationPeerSyncReplicationState(peerId, toState);
@@ -139,14 +142,15 @@ public class TransitPeerSyncReplicationStateProcedure
     }
   }
 
-  private void reopenRegions(MasterProcedureEnv env) {
+  @VisibleForTesting
+  protected void reopenRegions(MasterProcedureEnv env) {
     addChildProcedure(
       
env.getReplicationPeerManager().getPeerConfig(peerId).get().getTableCFsMap().keySet().stream()
         
.map(ReopenTableRegionsProcedure::new).toArray(ReopenTableRegionsProcedure[]::new));
   }
 
-  private void createDirForRemoteWAL(MasterProcedureEnv env)
-      throws ProcedureYieldException, IOException {
+  @VisibleForTesting
+  protected void createDirForRemoteWAL(MasterProcedureEnv env) throws 
IOException {
     MasterFileSystem mfs = env.getMasterFileSystem();
     Path remoteWALDir = new Path(mfs.getWALRootDir(), 
ReplicationUtils.REMOTE_WAL_DIR_NAME);
     Path remoteWALDirForPeer = 
ReplicationUtils.getPeerRemoteWALDir(remoteWALDir, peerId);
@@ -155,8 +159,7 @@ public class TransitPeerSyncReplicationStateProcedure
       LOG.warn("Wal dir {} already exists, usually this should not happen, 
continue anyway",
         remoteWALDirForPeer);
     } else if (!walFs.mkdirs(remoteWALDirForPeer)) {
-      LOG.warn("Can not create remote wal dir {}", remoteWALDirForPeer);
-      throw new ProcedureYieldException();
+      throw new IOException("Failed to create remote wal dir " + 
remoteWALDirForPeer);
     }
   }
 
@@ -190,10 +193,33 @@ public class TransitPeerSyncReplicationStateProcedure
     addChildProcedure(new RecoverStandbyProcedure(peerId, serial));
   }
 
+  @VisibleForTesting
+  protected void setPeerNewSyncReplicationState(MasterProcedureEnv env)
+      throws ReplicationException {
+    env.getReplicationPeerManager().setPeerNewSyncReplicationState(peerId, 
toState);
+    if (toState.equals(SyncReplicationState.STANDBY) && enabled) {
+      // disable the peer if we are going to transit to STANDBY state, as we 
need to remove
+      // all the pending replication files. If we do not disable the peer and 
delete the wal
+      // queues on zk directly, RS will get NoNode exception when updating the 
wal position
+      // and crash.
+      env.getReplicationPeerManager().disablePeer(peerId);
+    }
+  }
+
+  @VisibleForTesting
+  protected void removeAllReplicationQueues(MasterProcedureEnv env) throws 
ReplicationException {
+    env.getReplicationPeerManager().removeAllQueues(peerId);
+  }
+
+  @VisibleForTesting
+  protected void transitPeerSyncReplicationState(MasterProcedureEnv env)
+      throws ReplicationException {
+    env.getReplicationPeerManager().transitPeerSyncReplicationState(peerId, 
toState);
+  }
+
   @Override
   protected Flow executeFromState(MasterProcedureEnv env,
-      PeerSyncReplicationStateTransitionState state)
-      throws ProcedureSuspendedException, ProcedureYieldException, 
InterruptedException {
+      PeerSyncReplicationStateTransitionState state) throws 
ProcedureSuspendedException {
     switch (state) {
       case PRE_PEER_SYNC_REPLICATION_STATE_TRANSITION:
         try {
@@ -209,19 +235,16 @@ public class TransitPeerSyncReplicationStateProcedure
         return Flow.HAS_MORE_STATE;
       case SET_PEER_NEW_SYNC_REPLICATION_STATE:
         try {
-          
env.getReplicationPeerManager().setPeerNewSyncReplicationState(peerId, toState);
-          if (toState.equals(SyncReplicationState.STANDBY) && enabled) {
-            // disable the peer if we are going to transit to STANDBY state, 
as we need to remove
-            // all the pending replication files. If we do not disable the 
peer and delete the wal
-            // queues on zk directly, RS will get NoNode exception when 
updating the wal position
-            // and crash.
-            env.getReplicationPeerManager().disablePeer(peerId);
-          }
+          setPeerNewSyncReplicationState(env);
         } catch (ReplicationException e) {
-          LOG.warn("Failed to update peer storage for peer {} when starting 
transiting sync " +
-            "replication peer state from {} to {}, retry", peerId, fromState, 
toState, e);
-          throw new ProcedureYieldException();
+          long backoff = ProcedureUtil.getBackoffTimeMs(attempts);
+          LOG.warn(
+            "Failed to update peer storage for peer {} when starting 
transiting sync " +
+                "replication peer state from {} to {}, sleep {} secs and 
retry",
+            peerId, fromState, toState, backoff / 1000, e);
+          throw suspend(backoff);
         }
+        attempts = 0;
         setNextState(
           
PeerSyncReplicationStateTransitionState.REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_BEGIN);
         return Flow.HAS_MORE_STATE;
@@ -238,12 +261,16 @@ public class TransitPeerSyncReplicationStateProcedure
         return Flow.HAS_MORE_STATE;
       case REMOVE_ALL_REPLICATION_QUEUES_IN_PEER:
         try {
-          env.getReplicationPeerManager().removeAllQueues(peerId);
+          removeAllReplicationQueues(env);
         } catch (ReplicationException e) {
-          LOG.warn("Failed to remove all replication queues peer {} when 
starting transiting" +
-            " sync replication peer state from {} to {}, retry", peerId, 
fromState, toState, e);
-          throw new ProcedureYieldException();
+          long backoff = ProcedureUtil.getBackoffTimeMs(attempts);
+          LOG.warn(
+            "Failed to remove all replication queues peer {} when starting 
transiting" +
+                " sync replication peer state from {} to {}, sleep {} secs and 
retry",
+            peerId, fromState, toState, backoff / 1000, e);
+          throw suspend(backoff);
         }
+        attempts = 0;
         setNextState(fromState.equals(SyncReplicationState.ACTIVE)
           ? PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER
           : 
PeerSyncReplicationStateTransitionState.TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE);
@@ -255,12 +282,16 @@ public class TransitPeerSyncReplicationStateProcedure
         return Flow.HAS_MORE_STATE;
       case TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE:
         try {
-          
env.getReplicationPeerManager().transitPeerSyncReplicationState(peerId, 
toState);
+          transitPeerSyncReplicationState(env);
         } catch (ReplicationException e) {
-          LOG.warn("Failed to update peer storage for peer {} when ending 
transiting sync " +
-            "replication peer state from {} to {}, retry", peerId, fromState, 
toState, e);
-          throw new ProcedureYieldException();
+          long backoff = ProcedureUtil.getBackoffTimeMs(attempts);
+          LOG.warn(
+            "Failed to update peer storage for peer {} when ending transiting 
sync " +
+                "replication peer state from {} to {}, sleep {} secs and 
retry",
+            peerId, fromState, toState, backoff / 1000, e);
+          throw suspend(backoff);
         }
+        attempts = 0;
         setNextState(
           
PeerSyncReplicationStateTransitionState.REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_END);
         return Flow.HAS_MORE_STATE;
@@ -272,12 +303,16 @@ public class TransitPeerSyncReplicationStateProcedure
         return Flow.HAS_MORE_STATE;
       case SYNC_REPLICATION_SET_PEER_ENABLED:
         try {
-          env.getReplicationPeerManager().enablePeer(peerId);
+          enablePeer(env);
         } catch (ReplicationException e) {
-          LOG.warn("Failed to set peer enabled for peer {} when transiting 
sync replication peer " +
-            "state from {} to {}, retry", peerId, fromState, toState, e);
-          throw new ProcedureYieldException();
+          long backoff = ProcedureUtil.getBackoffTimeMs(attempts);
+          LOG.warn(
+            "Failed to set peer enabled for peer {} when transiting sync 
replication peer " +
+                "state from {} to {}, sleep {} secs and retry",
+            peerId, fromState, toState, backoff / 1000, e);
+          throw suspend(backoff);
         }
+        attempts = 0;
         setNextState(
           
PeerSyncReplicationStateTransitionState.SYNC_REPLICATION_ENABLE_PEER_REFRESH_PEER_ON_RS);
         return Flow.HAS_MORE_STATE;
@@ -289,10 +324,14 @@ public class TransitPeerSyncReplicationStateProcedure
         try {
           createDirForRemoteWAL(env);
         } catch (IOException e) {
-          LOG.warn("Failed to create remote wal dir for peer {} when 
transiting sync replication " +
-            "peer state from {} to {}, retry", peerId, fromState, toState, e);
-          throw new ProcedureYieldException();
+          long backoff = ProcedureUtil.getBackoffTimeMs(attempts);
+          LOG.warn(
+            "Failed to create remote wal dir for peer {} when transiting sync 
replication " +
+                "peer state from {} to {}, sleep {} secs and retry",
+            peerId, fromState, toState, backoff / 1000, e);
+          throw suspend(backoff);
         }
+        attempts = 0;
         setNextState(
           
PeerSyncReplicationStateTransitionState.POST_PEER_SYNC_REPLICATION_STATE_TRANSITION);
         return Flow.HAS_MORE_STATE;

http://git-wip-us.apache.org/repos/asf/hbase/blob/f9d51b67/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestTransitPeerSyncReplicationStateProcedureBackoff.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestTransitPeerSyncReplicationStateProcedureBackoff.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestTransitPeerSyncReplicationStateProcedureBackoff.java
new file mode 100644
index 0000000..0526a8e
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestTransitPeerSyncReplicationStateProcedureBackoff.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.replication;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.ProcedureTestUtil;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.SyncReplicationState;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ MasterTests.class, LargeTests.class })
+public class TestTransitPeerSyncReplicationStateProcedureBackoff {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    
HBaseClassTestRule.forClass(TestTransitPeerSyncReplicationStateProcedureBackoff.class);
+
+  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+  private static boolean FAIL = true;
+
+  public static class TestTransitPeerSyncReplicationStateProcedure
+      extends TransitPeerSyncReplicationStateProcedure {
+
+    public TestTransitPeerSyncReplicationStateProcedure() {
+    }
+
+    public TestTransitPeerSyncReplicationStateProcedure(String peerId, 
SyncReplicationState state) {
+      super(peerId, state);
+    }
+
+    private void tryFail() throws ReplicationException {
+      synchronized (TestTransitPeerSyncReplicationStateProcedureBackoff.class) 
{
+        if (FAIL) {
+          throw new ReplicationException("Inject error");
+        }
+        FAIL = true;
+      }
+    }
+
+    @Override
+    protected <T extends Procedure<MasterProcedureEnv>> void addChildProcedure(
+        @SuppressWarnings("unchecked") T... subProcedure) {
+      // Make it a no-op
+    }
+
+    @Override
+    protected void preTransit(MasterProcedureEnv env) throws IOException {
+      fromState = SyncReplicationState.DOWNGRADE_ACTIVE;
+    }
+
+    @Override
+    protected void setPeerNewSyncReplicationState(MasterProcedureEnv env)
+        throws ReplicationException {
+      tryFail();
+    }
+
+    @Override
+    protected void removeAllReplicationQueues(MasterProcedureEnv env) throws 
ReplicationException {
+      tryFail();
+    }
+
+    @Override
+    protected void reopenRegions(MasterProcedureEnv env) {
+      // do nothing;
+    }
+
+    @Override
+    protected void transitPeerSyncReplicationState(MasterProcedureEnv env)
+        throws ReplicationException {
+      tryFail();
+    }
+
+    @Override
+    protected void createDirForRemoteWAL(MasterProcedureEnv env) throws 
IOException {
+      try {
+        tryFail();
+      } catch (ReplicationException e) {
+        throw new IOException(e);
+      }
+    }
+  }
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    UTIL.startMiniCluster(1);
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    UTIL.shutdownMiniCluster();
+  }
+
+  private void assertBackoffIncrease() throws IOException, 
InterruptedException {
+    ProcedureTestUtil.waitUntilProcedureWaitingTimeout(UTIL,
+      TestTransitPeerSyncReplicationStateProcedure.class, 30000);
+    ProcedureTestUtil.waitUntilProcedureTimeoutIncrease(UTIL,
+      TestTransitPeerSyncReplicationStateProcedure.class, 2);
+    synchronized (TestTransitPeerSyncReplicationStateProcedure.class) {
+      FAIL = false;
+    }
+    UTIL.waitFor(30000, () -> FAIL);
+  }
+
+  @Test
+  public void testDowngradeActiveToActive() throws IOException, 
InterruptedException {
+    ProcedureExecutor<MasterProcedureEnv> procExec =
+        UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor();
+    // Test procedure: DOWNGRADE_ACTIVE ==> ACTIVE
+    long procId = procExec.submitProcedure(
+      new TestTransitPeerSyncReplicationStateProcedure("1", 
SyncReplicationState.ACTIVE));
+    // No retry for PRE_PEER_SYNC_REPLICATION_STATE_TRANSITION
+    // SET_PEER_NEW_SYNC_REPLICATION_STATE
+    assertBackoffIncrease();
+    // No retry for REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_BEGIN
+    // No retry for REOPEN_ALL_REGIONS_IN_PEER
+    // TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE
+    assertBackoffIncrease();
+    // No retry for REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_END
+    // No retry for POST_PEER_SYNC_REPLICATION_STATE_TRANSITION
+    UTIL.waitFor(30000, () -> procExec.isFinished(procId));
+  }
+
+  @Test
+  public void testDowngradeActiveToStandby() throws IOException, 
InterruptedException {
+    ProcedureExecutor<MasterProcedureEnv> procExec =
+        UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor();
+    // Test procedure: DOWNGRADE_ACTIVE ==> ACTIVE
+    long procId = procExec.submitProcedure(
+      new TestTransitPeerSyncReplicationStateProcedure("2", 
SyncReplicationState.STANDBY));
+    // No retry for PRE_PEER_SYNC_REPLICATION_STATE_TRANSITION
+    // SET_PEER_NEW_SYNC_REPLICATION_STATE
+    assertBackoffIncrease();
+    // No retry for REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_BEGIN
+    // REMOVE_ALL_REPLICATION_QUEUES_IN_PEER
+    assertBackoffIncrease();
+    // TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE
+    assertBackoffIncrease();
+    // No retry for REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_END
+    // CREATE_DIR_FOR_REMOTE_WAL
+    assertBackoffIncrease();
+    // No retry for POST_PEER_SYNC_REPLICATION_STATE_TRANSITION
+    UTIL.waitFor(30000, () -> procExec.isFinished(procId));
+  }
+}
\ No newline at end of file

Reply via email to