This is an automated email from the ASF dual-hosted git repository.

tianjy pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 4e74afe  HBASE-21934 RemoteProcedureDispatcher should track the 
ongoing dispatched calls
4e74afe is described below

commit 4e74afe12f83d1438d861a4f16194481c035da41
Author: Jingyun Tian <[email protected]>
AuthorDate: Mon Mar 4 14:11:27 2019 +0800

    HBASE-21934 RemoteProcedureDispatcher should track the ongoing dispatched 
calls
---
 .../procedure2/RemoteProcedureDispatcher.java      |  29 +++
 .../assignment/RegionTransitionProcedure.java      |   5 +
 .../master/procedure/ServerRemoteProcedure.java    | 132 +++++++++++
 .../master/replication/RefreshPeerProcedure.java   |  72 +-----
 .../procedure/TestServerRemoteProcedure.java       | 246 +++++++++++++++++++++
 5 files changed, 416 insertions(+), 68 deletions(-)

diff --git 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java
 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java
index 4c54c57..fe34d80 100644
--- 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java
+++ 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java
@@ -172,6 +172,16 @@ public abstract class RemoteProcedureDispatcher<TEnv, 
TRemote extends Comparable
     }
   }
 
+  public void removeCompletedOperation(final TRemote key, RemoteProcedure rp) {
+    BufferNode node = nodeMap.get(key);
+    if (node == null) {
+      LOG.warn("since no node for this key {}, we can't removed the finished 
remote procedure",
+        key);
+      return;
+    }
+    node.operationCompleted(rp);
+  }
+
   /**
    * Remove a remote node
    * @param key the node identifier
@@ -237,6 +247,16 @@ public abstract class RemoteProcedureDispatcher<TEnv, 
TRemote extends Comparable
      * method.
      */
     void remoteOperationFailed(TEnv env, RemoteProcedureException error);
+
+    /**
+     * Whether store this remote procedure in dispatched queue
+     * only OpenRegionProcedure and CloseRegionProcedure return false since 
they are
+     * not fully controlled by dispatcher
+     */
+    default boolean storeInDispatchedQueue() {
+      return true;
+    }
+
   }
 
   /**
@@ -330,6 +350,7 @@ public abstract class RemoteProcedureDispatcher<TEnv, 
TRemote extends Comparable
   protected final class BufferNode extends 
DelayedContainerWithTimestamp<TRemote>
       implements RemoteNode<TEnv, TRemote> {
     private Set<RemoteProcedure> operations;
+    private final Set<RemoteProcedure> dispatchedOperations = new HashSet<>();
 
     protected BufferNode(final TRemote key) {
       super(key, 0);
@@ -358,6 +379,8 @@ public abstract class RemoteProcedureDispatcher<TEnv, 
TRemote extends Comparable
     public synchronized void dispatch() {
       if (operations != null) {
         remoteDispatch(getKey(), operations);
+        operations.stream().filter(operation -> 
operation.storeInDispatchedQueue())
+            .forEach(operation -> dispatchedOperations.add(operation));
         this.operations = null;
       }
     }
@@ -367,6 +390,12 @@ public abstract class RemoteProcedureDispatcher<TEnv, 
TRemote extends Comparable
         abortPendingOperations(getKey(), operations);
         this.operations = null;
       }
+      abortPendingOperations(getKey(), dispatchedOperations);
+      this.dispatchedOperations.clear();
+    }
+
+    public synchronized void operationCompleted(final RemoteProcedure 
remoteProcedure){
+      this.dispatchedOperations.remove(remoteProcedure);
     }
 
     @Override
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java
index a14cf47..2980adf 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java
@@ -283,6 +283,11 @@ public abstract class RegionTransitionProcedure
     return true;
   }
 
+  @Override
+  public boolean storeInDispatchedQueue() {
+    return false;
+  }
+
   protected void reportTransition(final MasterProcedureEnv env, final 
ServerName serverName,
       final TransitionCode code, final long seqId) throws 
UnexpectedStateException {
     final RegionStateNode regionNode = getRegionState(env);
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerRemoteProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerRemoteProcedure.java
new file mode 100644
index 0000000..6c00380
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerRemoteProcedure.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.procedure;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.procedure2.FailedRemoteDispatchException;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
+import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
+import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
+import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher;
+import org.apache.hadoop.hbase.procedure2.RemoteProcedureException;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
[email protected]
+/**
+ * This extract the common used methods of procedures which are send to remote 
servers. Developers
+ * who extends this class only need to override remoteCallBuild() and 
complete(). This procedure
+ * will help add the operation to {@link RSProcedureDispatcher}
+ *
+ * If adding the operation to dispatcher failed, addOperationToNode will throw
+ * FailedRemoteDispatchException, and this procedure will return null which 
procedure Executor will
+ * mark this procedure as complete. Thus the upper layer of this procedure 
must have a way to
+ * check if this procedure really succeed and how to deal with it.
+ *
+ * If sending the operation to remote RS failed, dispatcher will call 
remoteCallFailed() to
+ * handle this, which actually call remoteOperationDone with the exception.
+ * If the targetServer crashed but this procedure has no response, than 
dispatcher will call
+ * remoteOperationFailed() to handle this, which also calls 
remoteOperationDone with the exception.
+ * If the operation is successful, then remoteOperationCompleted will be 
called and actually calls
+ * the remoteOperationDone without exception.
+ *
+ * In remoteOperationDone, we'll check if the procedure is already get wake up 
by others. Then
+ * developer could implement complete() based on their own purpose.
+ *
+ * But basic logic is that if operation succeed, set succ to true and do the 
clean work.
+ *
+ * If operation failed and require to resend it to the same server, leave the 
succ as false.
+ *
+ * If operation failed and require to resend it to another server, set succ to 
true and upper layer
+ * should be able to find out this operation not work and send a operation to 
another server.
+ */
+public abstract class ServerRemoteProcedure extends 
Procedure<MasterProcedureEnv>
+    implements RemoteProcedureDispatcher.RemoteProcedure<MasterProcedureEnv, 
ServerName> {
+  protected static final Logger LOG = 
LoggerFactory.getLogger(ServerRemoteProcedure.class);
+  protected ProcedureEvent<?> event;
+  protected ServerName targetServer;
+  protected boolean dispatched;
+  protected boolean succ;
+
+  protected abstract void complete(MasterProcedureEnv env, Throwable error);
+
+  @Override
+  protected synchronized Procedure<MasterProcedureEnv>[] 
execute(MasterProcedureEnv env)
+      throws ProcedureYieldException, ProcedureSuspendedException, 
InterruptedException {
+    if (dispatched) {
+      if (succ) {
+        return null;
+      }
+      dispatched = false;
+    }
+    try {
+      env.getRemoteDispatcher().addOperationToNode(targetServer, this);
+    } catch (FailedRemoteDispatchException frde) {
+      LOG.warn("Can not send remote operation {} to {}, this operation will "
+          + "be retried to send to another server",
+        this.getProcId(), targetServer);
+      return null;
+    }
+    dispatched = true;
+    event = new ProcedureEvent<>(this);
+    event.suspendIfNotReady(this);
+    throw new ProcedureSuspendedException();
+  }
+
+  @Override
+  protected synchronized void completionCleanup(MasterProcedureEnv env) {
+    env.getRemoteDispatcher().removeCompletedOperation(targetServer, this);
+  }
+
+  @Override
+  public synchronized boolean remoteCallFailed(MasterProcedureEnv env, 
ServerName serverName,
+                                               IOException exception) {
+    remoteOperationDone(env, exception);
+    return false;
+  }
+
+  @Override
+  public synchronized void remoteOperationCompleted(MasterProcedureEnv env) {
+    remoteOperationDone(env, null);
+  }
+
+  @Override
+  public synchronized void remoteOperationFailed(MasterProcedureEnv env,
+      RemoteProcedureException error) {
+    remoteOperationDone(env, error);
+  }
+
+  synchronized void remoteOperationDone(MasterProcedureEnv env, Throwable 
error) {
+    if (this.isFinished()) {
+      LOG.info("This procedure {} is already finished, skip the rest 
processes", this.getProcId());
+      return;
+    }
+    if (event == null) {
+      LOG.warn("procedure event for {} is null, maybe the procedure is created 
when recovery",
+          getProcId());
+      return;
+    }
+    complete(env, error);
+    event.wake(env.getProcedureScheduler());
+    event = null;
+  }
+}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java
index 6fe7662..db0b3ea 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java
@@ -22,15 +22,10 @@ import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
 import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface;
 import 
org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.ServerOperation;
-import org.apache.hadoop.hbase.procedure2.FailedRemoteDispatchException;
-import org.apache.hadoop.hbase.procedure2.Procedure;
-import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
+import org.apache.hadoop.hbase.master.procedure.ServerRemoteProcedure;
 import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
-import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
-import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
 import 
org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation;
 import 
org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure;
-import org.apache.hadoop.hbase.procedure2.RemoteProcedureException;
 import org.apache.hadoop.hbase.replication.regionserver.RefreshPeerCallable;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
@@ -42,7 +37,7 @@ import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.R
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RefreshPeerStateData;
 
 @InterfaceAudience.Private
-public class RefreshPeerProcedure extends Procedure<MasterProcedureEnv>
+public class RefreshPeerProcedure extends ServerRemoteProcedure
     implements PeerProcedureInterface, RemoteProcedure<MasterProcedureEnv, 
ServerName> {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(RefreshPeerProcedure.class);
@@ -51,16 +46,6 @@ public class RefreshPeerProcedure extends 
Procedure<MasterProcedureEnv>
 
   private PeerOperationType type;
 
-  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = 
"IS2_INCONSISTENT_SYNC",
-      justification = "Will never change after construction")
-  private ServerName targetServer;
-
-  private boolean dispatched;
-
-  private ProcedureEvent<?> event;
-
-  private boolean succ;
-
   public RefreshPeerProcedure() {
   }
 
@@ -122,12 +107,8 @@ public class RefreshPeerProcedure extends 
Procedure<MasterProcedureEnv>
             
.setTargetServer(ProtobufUtil.toServerName(remote)).build().toByteArray());
   }
 
-  private void complete(MasterProcedureEnv env, Throwable error) {
-    if (event == null) {
-      LOG.warn("procedure event for {} is null, maybe the procedure is created 
when recovery",
-        getProcId());
-      return;
-    }
+  @Override
+  protected void complete(MasterProcedureEnv env, Throwable error) {
     if (error != null) {
       LOG.warn("Refresh peer {} for {} on {} failed", peerId, type, 
targetServer, error);
       this.succ = false;
@@ -135,51 +116,6 @@ public class RefreshPeerProcedure extends 
Procedure<MasterProcedureEnv>
       LOG.info("Refresh peer {} for {} on {} suceeded", peerId, type, 
targetServer);
       this.succ = true;
     }
-
-    event.wake(env.getProcedureScheduler());
-    event = null;
-  }
-
-  @Override
-  public synchronized boolean remoteCallFailed(MasterProcedureEnv env, 
ServerName remote,
-      IOException exception) {
-    complete(env, exception);
-    return true;
-  }
-
-  @Override
-  public synchronized void remoteOperationCompleted(MasterProcedureEnv env) {
-    complete(env, null);
-  }
-
-  @Override
-  public synchronized void remoteOperationFailed(MasterProcedureEnv env,
-      RemoteProcedureException error) {
-    complete(env, error);
-  }
-
-  @Override
-  protected synchronized Procedure<MasterProcedureEnv>[] 
execute(MasterProcedureEnv env)
-      throws ProcedureYieldException, ProcedureSuspendedException, 
InterruptedException {
-    if (dispatched) {
-      if (succ) {
-        return null;
-      }
-      // retry
-      dispatched = false;
-    }
-    try {
-      env.getRemoteDispatcher().addOperationToNode(targetServer, this);
-    } catch (FailedRemoteDispatchException frde) {
-      LOG.info("Can not add remote operation for refreshing peer {} for {} to 
{}, " +
-        "this is usually because the server is already dead, " +
-        "give up and mark the procedure as complete", peerId, type, 
targetServer, frde);
-      return null;
-    }
-    dispatched = true;
-    event = new ProcedureEvent<>(this);
-    event.suspendIfNotReady(this);
-    throw new ProcedureSuspendedException();
   }
 
   @Override
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerRemoteProcedure.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerRemoteProcedure.java
new file mode 100644
index 0000000..c0311d3
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerRemoteProcedure.java
@@ -0,0 +1,246 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.procedure;
+
+import java.io.IOException;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
+import org.apache.hadoop.hbase.master.assignment.MockMasterServices;
+import org.apache.hadoop.hbase.master.replication.RefreshPeerProcedure;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
+import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher;
+import org.apache.hadoop.hbase.procedure2.RemoteProcedureException;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import 
org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
+
+@Category({ MasterTests.class, MediumTests.class })
+public class TestServerRemoteProcedure {
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestServerRemoteProcedure.class);
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestServerRemoteProcedure.class);
+  @Rule
+  public TestName name = new TestName();
+  @Rule
+  public final ExpectedException exception = ExpectedException.none();
+  protected HBaseTestingUtility util;
+  protected MockRSProcedureDispatcher rsDispatcher;
+  protected MockMasterServices master;
+  protected AssignmentManager am;
+  protected NavigableMap<ServerName, SortedSet<byte[]>> regionsToRegionServers 
=
+      new ConcurrentSkipListMap<>();
+  // Simple executor to run some simple tasks.
+  protected ScheduledExecutorService executor;
+
+  @Before
+  public void setUp() throws Exception {
+    util = new HBaseTestingUtility();
+    this.executor = Executors.newSingleThreadScheduledExecutor(new 
ThreadFactoryBuilder()
+        .setUncaughtExceptionHandler((t, e) -> LOG.warn("Uncaught: ", 
e)).build());
+    master = new MockMasterServices(util.getConfiguration(), 
this.regionsToRegionServers);
+    rsDispatcher = new MockRSProcedureDispatcher(master);
+    rsDispatcher.setMockRsExecutor(new NoopRSExecutor());
+    master.start(2, rsDispatcher);
+    am = master.getAssignmentManager();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    master.stop("tearDown");
+    this.executor.shutdownNow();
+  }
+
+  @Test
+  public void testRemoteProcedureAndCrashBeforeResponse() throws Exception {
+    ServerName worker = 
master.getServerManager().getOnlineServersList().get(0);
+    ServerRemoteProcedure refreshPeerProcedure =
+        new RefreshPeerProcedure("test", 
PeerProcedureInterface.PeerOperationType.ADD, worker);
+    Future<byte[]> future = submitProcedure(refreshPeerProcedure);
+    Thread.sleep(2000);
+    master.getServerManager().expireServer(worker);
+    // if remoteCallFailed is called for this procedure, this procedure should 
be finished.
+    future.get(5000, TimeUnit.MILLISECONDS);
+    Assert.assertTrue(refreshPeerProcedure.isSuccess());
+  }
+
+  @Test
+  public void testRemoteCompleteAndFailedAtTheSameTime() throws Exception {
+    ServerName worker = 
master.getServerManager().getOnlineServersList().get(0);
+    ServerRemoteProcedure noopServerRemoteProcedure = new 
NoopServerRemoteProcedure(worker);
+    Future<byte[]> future = submitProcedure(noopServerRemoteProcedure);
+    Thread.sleep(2000);
+    // complete the process and fail the process at the same time
+    ExecutorService threadPool = Executors.newFixedThreadPool(2);
+    threadPool.execute(() -> noopServerRemoteProcedure
+        
.remoteOperationDone(master.getMasterProcedureExecutor().getEnvironment(), 
null));
+    threadPool.execute(() -> noopServerRemoteProcedure.remoteCallFailed(
+      master.getMasterProcedureExecutor().getEnvironment(), worker, new 
IOException()));
+    future.get(2000, TimeUnit.MILLISECONDS);
+    Assert.assertTrue(noopServerRemoteProcedure.isSuccess());
+  }
+
+  private Future<byte[]> submitProcedure(final Procedure<MasterProcedureEnv> 
proc) {
+    return 
ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), proc);
+  }
+
+  private static class NoopServerRemoteProcedure extends ServerRemoteProcedure
+      implements PeerProcedureInterface {
+
+    public NoopServerRemoteProcedure(ServerName targetServer) {
+      this.targetServer = targetServer;
+    }
+
+    @Override
+    protected void rollback(MasterProcedureEnv env) throws IOException, 
InterruptedException {
+      return;
+    }
+
+    @Override
+    protected boolean abort(MasterProcedureEnv env) {
+      return false;
+    }
+
+    @Override
+    protected void serializeStateData(ProcedureStateSerializer serializer) 
throws IOException {
+      return;
+    }
+
+    @Override
+    protected void deserializeStateData(ProcedureStateSerializer serializer) 
throws IOException {
+      return;
+    }
+
+    @Override
+    public RemoteProcedureDispatcher.RemoteOperation 
remoteCallBuild(MasterProcedureEnv env,
+        ServerName serverName) {
+      return new RSProcedureDispatcher.ServerOperation(null, 0L, 
this.getClass(), new byte[0]);
+    }
+
+    @Override
+    public synchronized void remoteOperationCompleted(MasterProcedureEnv env) {
+      complete(env, null);
+    }
+
+    @Override
+    public synchronized void remoteOperationFailed(MasterProcedureEnv env,
+        RemoteProcedureException error) {
+      complete(env, error);
+    }
+
+    @Override
+    public void complete(MasterProcedureEnv env, Throwable error) {
+      this.succ = true;
+      return;
+    }
+
+    @Override
+    public String getPeerId() {
+      return "test";
+    }
+
+    @Override
+    public PeerOperationType getPeerOperationType() {
+      return PeerOperationType.ADD;
+    }
+  }
+
+  protected interface MockRSExecutor {
+    AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server,
+        AdminProtos.ExecuteProceduresRequest req) throws IOException;
+  }
+
+  protected static class NoopRSExecutor implements MockRSExecutor {
+    @Override
+    public AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server,
+        AdminProtos.ExecuteProceduresRequest req) throws IOException {
+      if (req.getOpenRegionCount() > 0) {
+        for (AdminProtos.OpenRegionRequest request : req.getOpenRegionList()) {
+          for (AdminProtos.OpenRegionRequest.RegionOpenInfo openReq : 
request.getOpenInfoList()) {
+            execOpenRegion(server, openReq);
+          }
+        }
+      }
+      return AdminProtos.ExecuteProceduresResponse.getDefaultInstance();
+    }
+
+    protected AdminProtos.OpenRegionResponse.RegionOpeningState 
execOpenRegion(ServerName server,
+        AdminProtos.OpenRegionRequest.RegionOpenInfo regionInfo) throws 
IOException {
+      return null;
+    }
+  }
+
+  protected static class MockRSProcedureDispatcher extends 
RSProcedureDispatcher {
+    private MockRSExecutor mockRsExec;
+
+    public MockRSProcedureDispatcher(final MasterServices master) {
+      super(master);
+    }
+
+    public void setMockRsExecutor(final MockRSExecutor mockRsExec) {
+      this.mockRsExec = mockRsExec;
+    }
+
+    @Override
+    protected void remoteDispatch(ServerName serverName,
+        @SuppressWarnings("rawtypes") Set<RemoteProcedure> remoteProcedures) {
+      submitTask(new MockRSProcedureDispatcher.MockRemoteCall(serverName, 
remoteProcedures));
+    }
+
+    private class MockRemoteCall extends ExecuteProceduresRemoteCall {
+      public MockRemoteCall(final ServerName serverName,
+          @SuppressWarnings("rawtypes") final Set<RemoteProcedure> operations) 
{
+        super(serverName, operations);
+      }
+
+      @Override
+      protected AdminProtos.ExecuteProceduresResponse sendRequest(final 
ServerName serverName,
+          final AdminProtos.ExecuteProceduresRequest request) throws 
IOException {
+        return mockRsExec.sendRequest(serverName, request);
+      }
+    }
+  }
+}

Reply via email to