This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch HBASE-21512
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit c0bc79ca54d84474c240c7956a53c9819652d0d1
Author: zhangduo <zhang...@apache.org>
AuthorDate: Thu Dec 6 21:25:34 2018 +0800

    HBASE-21526 Use AsyncClusterConnection in ServerManager for getRsAdmin
---
 .../hbase/client/AsyncClusterConnection.java       |   6 +
 .../hadoop/hbase/client/AsyncConnectionImpl.java   |   4 +
 .../hbase/client/AsyncRegionServerAdmin.java       | 210 +++++++++++++++++++++
 .../org/apache/hadoop/hbase/util/FutureUtils.java  |   2 +-
 .../org/apache/hadoop/hbase/master/HMaster.java    |  15 +-
 .../apache/hadoop/hbase/master/ServerManager.java  |  67 -------
 .../master/procedure/RSProcedureDispatcher.java    |  44 +++--
 7 files changed, 262 insertions(+), 86 deletions(-)

diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
index c7dea25..1327fd7 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.ipc.RpcClient;
 import org.apache.yetus.audience.InterfaceAudience;
 
@@ -27,6 +28,11 @@ import org.apache.yetus.audience.InterfaceAudience;
 public interface AsyncClusterConnection extends AsyncConnection {
 
   /**
+   * Get the admin service for the given region server.
+   */
+  AsyncRegionServerAdmin getRegionServerAdmin(ServerName serverName);
+
+  /**
    * Get the nonce generator for this connection.
    */
   NonceGenerator getNonceGenerator();
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
index 81dafaf..8cb19a6 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
@@ -359,4 +359,8 @@ class AsyncConnectionImpl implements AsyncClusterConnection 
{
     return new HBaseHbck(MasterProtos.HbckService.newBlockingStub(
       rpcClient.createBlockingRpcChannel(masterServer, user, rpcTimeout)), 
rpcControllerFactory);
   }
+
+  public AsyncRegionServerAdmin getRegionServerAdmin(ServerName serverName) {
+    return new AsyncRegionServerAdmin(serverName, this);
+  }
 }
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java
new file mode 100644
index 0000000..9accd89
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
+
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileResponse;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionResponse;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
+
+/**
+ * A simple wrapper of the {@link AdminService} for a region server, which 
returns a
+ * {@link CompletableFuture}. This is easier to use, as if you use the raw 
protobuf interface, you
+ * need to get the result from the {@link RpcCallback}, and if there is an 
exception, you need to
+ * get it from the {@link RpcController} passed in.
+ * <p/>
+ * Notice that there is no retry, and this is intentional. We have different 
retry for different
+ * usage for now, if later we want to unify them, we can move the retry logic 
into this class.
+ */
+@InterfaceAudience.Private
+public class AsyncRegionServerAdmin {
+
+  private final ServerName server;
+
+  private final AsyncConnectionImpl conn;
+
+  AsyncRegionServerAdmin(ServerName server, AsyncConnectionImpl conn) {
+    this.server = server;
+    this.conn = conn;
+  }
+
+  @FunctionalInterface
+  private interface RpcCall<RESP> {
+    void call(AdminService.Interface stub, HBaseRpcController controller, 
RpcCallback<RESP> done);
+  }
+
+  private <RESP> CompletableFuture<RESP> call(RpcCall<RESP> rpcCall) {
+    CompletableFuture<RESP> future = new CompletableFuture<>();
+    HBaseRpcController controller = conn.rpcControllerFactory.newController();
+    try {
+      rpcCall.call(conn.getAdminStub(server), controller, new 
RpcCallback<RESP>() {
+
+        @Override
+        public void run(RESP resp) {
+          if (controller.failed()) {
+            future.completeExceptionally(controller.getFailed());
+          } else {
+            future.complete(resp);
+          }
+        }
+      });
+    } catch (IOException e) {
+      future.completeExceptionally(e);
+    }
+    return future;
+  }
+
+  public CompletableFuture<GetRegionInfoResponse> 
getRegionInfo(GetRegionInfoRequest request) {
+    return call((stub, controller, done) -> stub.getRegionInfo(controller, 
request, done));
+  }
+
+  public CompletableFuture<GetStoreFileResponse> 
getStoreFile(GetStoreFileRequest request) {
+    return call((stub, controller, done) -> stub.getStoreFile(controller, 
request, done));
+  }
+
+  public CompletableFuture<GetOnlineRegionResponse> getOnlineRegion(
+      GetOnlineRegionRequest request) {
+    return call((stub, controller, done) -> stub.getOnlineRegion(controller, 
request, done));
+  }
+
+  public CompletableFuture<OpenRegionResponse> openRegion(OpenRegionRequest 
request) {
+    return call((stub, controller, done) -> stub.openRegion(controller, 
request, done));
+  }
+
+  public CompletableFuture<WarmupRegionResponse> 
warmupRegion(WarmupRegionRequest request) {
+    return call((stub, controller, done) -> stub.warmupRegion(controller, 
request, done));
+  }
+
+  public CompletableFuture<CloseRegionResponse> closeRegion(CloseRegionRequest 
request) {
+    return call((stub, controller, done) -> stub.closeRegion(controller, 
request, done));
+  }
+
+  public CompletableFuture<FlushRegionResponse> flushRegion(FlushRegionRequest 
request) {
+    return call((stub, controller, done) -> stub.flushRegion(controller, 
request, done));
+  }
+
+  public CompletableFuture<CompactionSwitchResponse> compactionSwitch(
+      CompactionSwitchRequest request) {
+    return call((stub, controller, done) -> stub.compactionSwitch(controller, 
request, done));
+  }
+
+  public CompletableFuture<CompactRegionResponse> 
compactRegion(CompactRegionRequest request) {
+    return call((stub, controller, done) -> stub.compactRegion(controller, 
request, done));
+  }
+
+  public CompletableFuture<ReplicateWALEntryResponse> replicateWALEntry(
+      ReplicateWALEntryRequest request) {
+    return call((stub, controller, done) -> stub.replicateWALEntry(controller, 
request, done));
+  }
+
+  public CompletableFuture<ReplicateWALEntryResponse> 
replay(ReplicateWALEntryRequest request) {
+    return call((stub, controller, done) -> stub.replay(controller, request, 
done));
+  }
+
+  public CompletableFuture<RollWALWriterResponse> 
rollWALWriter(RollWALWriterRequest request) {
+    return call((stub, controller, done) -> stub.rollWALWriter(controller, 
request, done));
+  }
+
+  public CompletableFuture<GetServerInfoResponse> 
getServerInfo(GetServerInfoRequest request) {
+    return call((stub, controller, done) -> stub.getServerInfo(controller, 
request, done));
+  }
+
+  public CompletableFuture<StopServerResponse> stopServer(StopServerRequest 
request) {
+    return call((stub, controller, done) -> stub.stopServer(controller, 
request, done));
+  }
+
+  public CompletableFuture<UpdateFavoredNodesResponse> updateFavoredNodes(
+      UpdateFavoredNodesRequest request) {
+    return call((stub, controller, done) -> 
stub.updateFavoredNodes(controller, request, done));
+  }
+
+  public CompletableFuture<UpdateConfigurationResponse> updateConfiguration(
+      UpdateConfigurationRequest request) {
+    return call((stub, controller, done) -> 
stub.updateConfiguration(controller, request, done));
+  }
+
+  public CompletableFuture<GetRegionLoadResponse> 
getRegionLoad(GetRegionLoadRequest request) {
+    return call((stub, controller, done) -> stub.getRegionLoad(controller, 
request, done));
+  }
+
+  public CompletableFuture<ClearCompactionQueuesResponse> 
clearCompactionQueues(
+      ClearCompactionQueuesRequest request) {
+    return call((stub, controller, done) -> 
stub.clearCompactionQueues(controller, request, done));
+  }
+
+  public CompletableFuture<ClearRegionBlockCacheResponse> 
clearRegionBlockCache(
+      ClearRegionBlockCacheRequest request) {
+    return call((stub, controller, done) -> 
stub.clearRegionBlockCache(controller, request, done));
+  }
+
+  public CompletableFuture<GetSpaceQuotaSnapshotsResponse> 
getSpaceQuotaSnapshots(
+      GetSpaceQuotaSnapshotsRequest request) {
+    return call((stub, controller, done) -> 
stub.getSpaceQuotaSnapshots(controller, request, done));
+  }
+
+  public CompletableFuture<ExecuteProceduresResponse> executeProcedures(
+      ExecuteProceduresRequest request) {
+    return call((stub, controller, done) -> stub.executeProcedures(controller, 
request, done));
+  }
+}
diff --git 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java
index 861dacb..38cd952 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java
@@ -134,4 +134,4 @@ public final class FutureUtils {
       throw new IOException(cause);
     }
   }
-}
\ No newline at end of file
+}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 7579fd5..cf56c4f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -195,6 +195,7 @@ import org.apache.hadoop.hbase.util.BloomFilterUtil;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.CompressionTest;
 import org.apache.hadoop.hbase.util.EncryptionTest;
+import org.apache.hadoop.hbase.util.FutureUtils;
 import org.apache.hadoop.hbase.util.HBaseFsck;
 import org.apache.hadoop.hbase.util.HFileArchiveUtil;
 import org.apache.hadoop.hbase.util.HasThread;
@@ -227,6 +228,7 @@ import 
org.apache.hbase.thirdparty.com.google.common.collect.Lists;
 import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
 
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceViolationPolicy;
@@ -1955,6 +1957,15 @@ public class HMaster extends HRegionServer implements 
MasterServices {
     });
   }
 
+  private void warmUpRegion(ServerName server, RegionInfo region) {
+    FutureUtils.addListener(asyncClusterConnection.getRegionServerAdmin(server)
+      .warmupRegion(RequestConverter.buildWarmupRegionRequest(region)), (r, e) 
-> {
+        if (e != null) {
+          LOG.warn("Failed to warm up region {} on server {}", region, server, 
e);
+        }
+      });
+  }
+
   // Public so can be accessed by tests. Blocks until move is done.
   // Replace with an async implementation from which you can get
   // a success/failure result.
@@ -2026,7 +2037,9 @@ public class HMaster extends HRegionServer implements 
MasterServices {
       // Warmup the region on the destination before initiating the move. this 
call
       // is synchronous and takes some time. doing it before the source region 
gets
       // closed
-      serverManager.sendRegionWarmup(rp.getDestination(), hri);
+      // A region server could reject the close request because it either does 
not
+      // have the specified region or the region is being split.
+      warmUpRegion(rp.getDestination(), hri);
 
       LOG.info(getClientIdAuditPrefix() + " move " + rp + ", running 
balancer");
       Future<byte []> future = this.assignmentManager.moveAsync(rp);
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
index 0fb1551..a8d5e21 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
@@ -24,7 +24,6 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -51,12 +50,9 @@ import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.YouAreDeadException;
 import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.client.RetriesExhaustedException;
 import org.apache.hadoop.hbase.ipc.HBaseRpcController;
-import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.master.assignment.RegionStates;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
@@ -159,25 +155,16 @@ public class ServerManager {
   private final ConcurrentNavigableMap<ServerName, ServerMetrics> 
onlineServers =
     new ConcurrentSkipListMap<>();
 
-  /**
-   * Map of admin interfaces per registered regionserver; these interfaces we 
use to control
-   * regionservers out on the cluster
-   */
-  private final Map<ServerName, AdminService.BlockingInterface> rsAdmins = new 
HashMap<>();
-
   /** List of region servers that should not get any more new regions. */
   private final ArrayList<ServerName> drainingServers = new ArrayList<>();
 
   private final MasterServices master;
-  private final ClusterConnection connection;
 
   private final DeadServer deadservers = new DeadServer();
 
   private final long maxSkew;
   private final long warningSkew;
 
-  private final RpcControllerFactory rpcControllerFactory;
-
   /** Listeners that are called on server events. */
   private List<ServerListener> listeners = new CopyOnWriteArrayList<>();
 
@@ -189,8 +176,6 @@ public class ServerManager {
     Configuration c = master.getConfiguration();
     maxSkew = c.getLong("hbase.master.maxclockskew", 30000);
     warningSkew = c.getLong("hbase.master.warningclockskew", 10000);
-    this.connection = master.getClusterConnection();
-    this.rpcControllerFactory = this.connection == null? null: 
connection.getRpcControllerFactory();
     persistFlushedSequenceId = c.getBoolean(PERSIST_FLUSHEDSEQUENCEID,
         PERSIST_FLUSHEDSEQUENCEID_DEFAULT);
   }
@@ -438,7 +423,6 @@ public class ServerManager {
   void recordNewServerWithLock(final ServerName serverName, final 
ServerMetrics sl) {
     LOG.info("Registering regionserver=" + serverName);
     this.onlineServers.put(serverName, sl);
-    this.rsAdmins.remove(serverName);
   }
 
   @VisibleForTesting
@@ -633,7 +617,6 @@ public class ServerManager {
       this.onlineServers.remove(sn);
       onlineServers.notifyAll();
     }
-    this.rsAdmins.remove(sn);
   }
 
   /*
@@ -676,34 +659,6 @@ public class ServerManager {
     return this.drainingServers.add(sn);
   }
 
-  // RPC methods to region servers
-
-  private HBaseRpcController newRpcController() {
-    return rpcControllerFactory == null ? null : 
rpcControllerFactory.newController();
-  }
-
-  /**
-   * Sends a WARMUP RPC to the specified server to warmup the specified region.
-   * <p>
-   * A region server could reject the close request because it either does not
-   * have the specified region or the region is being split.
-   * @param server server to warmup a region
-   * @param region region to  warmup
-   */
-  public void sendRegionWarmup(ServerName server,
-      RegionInfo region) {
-    if (server == null) return;
-    try {
-      AdminService.BlockingInterface admin = getRsAdmin(server);
-      HBaseRpcController controller = newRpcController();
-      ProtobufUtil.warmupRegion(controller, admin, region);
-    } catch (IOException e) {
-      LOG.error("Received exception in RPC for warmup server:" +
-        server + "region: " + region +
-        "exception: " + e);
-    }
-  }
-
   /**
    * Contacts a region server and waits up to timeout ms
    * to close the region.  This bypasses the active hmaster.
@@ -737,28 +692,6 @@ public class ServerManager {
   }
 
   /**
-   * @param sn
-   * @return Admin interface for the remote regionserver named <code>sn</code>
-   * @throws IOException
-   * @throws RetriesExhaustedException wrapping a ConnectException if failed
-   */
-  public AdminService.BlockingInterface getRsAdmin(final ServerName sn)
-  throws IOException {
-    AdminService.BlockingInterface admin = this.rsAdmins.get(sn);
-    if (admin == null) {
-      LOG.debug("New admin connection to " + sn.toString());
-      if (sn.equals(master.getServerName()) && master instanceof 
HRegionServer) {
-        // A master is also a region server now, see HBASE-10569 for details
-        admin = ((HRegionServer)master).getRSRpcServices();
-      } else {
-        admin = this.connection.getAdmin(sn);
-      }
-      this.rsAdmins.put(sn, admin);
-    }
-    return admin;
-  }
-
-  /**
    * Calculate min necessary to start. This is not an absolute. It is just
    * a friction that will cause us hang around a bit longer waiting on
    * RegionServers to check-in.
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java
index 638f9d3..f3ab4b3 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java
@@ -18,12 +18,15 @@
 package org.apache.hadoop.hbase.master.procedure;
 
 import java.io.IOException;
+import java.io.InterruptedIOException;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
 import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
@@ -37,11 +40,11 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
 import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap;
 import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
 import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
 import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
-import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
 
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
@@ -159,13 +162,8 @@ public class RSProcedureDispatcher
       this.serverName = serverName;
     }
 
-    protected AdminService.BlockingInterface getRsAdmin() throws IOException {
-      final AdminService.BlockingInterface admin = 
master.getServerManager().getRsAdmin(serverName);
-      if (admin == null) {
-        throw new IOException("Attempting to send OPEN RPC to server " + 
getServerName() +
-          " failed because no RPC connection found to this server");
-      }
-      return admin;
+    protected AsyncRegionServerAdmin getRsAdmin() throws IOException {
+      return 
master.getAsyncClusterConnection().getRegionServerAdmin(serverName);
     }
 
     protected ServerName getServerName() {
@@ -344,9 +342,13 @@ public class RSProcedureDispatcher
     protected ExecuteProceduresResponse sendRequest(final ServerName 
serverName,
         final ExecuteProceduresRequest request) throws IOException {
       try {
-        return getRsAdmin().executeProcedures(null, request);
-      } catch (ServiceException se) {
-        throw ProtobufUtil.getRemoteException(se);
+        return getRsAdmin().executeProcedures(request).get();
+      } catch (InterruptedException e) {
+        throw (IOException) new InterruptedIOException().initCause(e);
+      } catch (ExecutionException e) {
+        Throwable cause = e.getCause();
+        Throwables.propagateIfPossible(cause, IOException.class);
+        throw new IOException(cause);
       }
     }
 
@@ -407,9 +409,13 @@ public class RSProcedureDispatcher
     private OpenRegionResponse sendRequest(final ServerName serverName,
         final OpenRegionRequest request) throws IOException {
       try {
-        return getRsAdmin().openRegion(null, request);
-      } catch (ServiceException se) {
-        throw ProtobufUtil.getRemoteException(se);
+        return getRsAdmin().openRegion(request).get();
+      } catch (InterruptedException e) {
+        throw (IOException) new InterruptedIOException().initCause(e);
+      } catch (ExecutionException e) {
+        Throwable cause = e.getCause();
+        Throwables.propagateIfPossible(cause, IOException.class);
+        throw new IOException(cause);
       }
     }
 
@@ -453,9 +459,13 @@ public class RSProcedureDispatcher
     private CloseRegionResponse sendRequest(final ServerName serverName,
         final CloseRegionRequest request) throws IOException {
       try {
-        return getRsAdmin().closeRegion(null, request);
-      } catch (ServiceException se) {
-        throw ProtobufUtil.getRemoteException(se);
+        return getRsAdmin().closeRegion(request).get();
+      } catch (InterruptedException e) {
+        throw (IOException) new InterruptedIOException().initCause(e);
+      } catch (ExecutionException e) {
+        Throwable cause = e.getCause();
+        Throwables.propagateIfPossible(cause, IOException.class);
+        throw new IOException(cause);
       }
     }
 

Reply via email to