Apache9 commented on a change in pull request #1593:
URL: https://github.com/apache/hbase/pull/1593#discussion_r419840354



##########
File path: 
hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
##########
@@ -117,105 +147,97 @@ public static String getMasterAddr(Configuration conf) 
throws UnknownHostExcepti
     return String.format("%s:%d", hostname, port);
   }
 
-  /**
-   * @return Stub needed to make RPC using a hedged channel to the master end 
points.
-   */
-  private ClientMetaService.Interface getMasterStub() throws IOException {
-    return ClientMetaService.newStub(
-        rpcClient.createHedgedRpcChannel(masterServers, User.getCurrent(), 
rpcTimeoutMs));
+  @FunctionalInterface
+  private interface Callable<T> {
+    void call(HBaseRpcController controller, ClientMetaService.Interface stub, 
RpcCallback<T> done);
   }
 
-  /**
-   * Parses the list of master addresses from the provided configuration. 
Supported format is
-   * comma separated host[:port] values. If no port number if specified, 
default master port is
-   * assumed.
-   * @param conf Configuration to parse from.
-   */
-  private void parseMasterAddrs(Configuration conf) throws 
UnknownHostException {
-    String configuredMasters = getMasterAddr(conf);
-    for (String masterAddr: 
configuredMasters.split(MASTER_ADDRS_CONF_SEPARATOR)) {
-      HostAndPort masterHostPort =
-          
HostAndPort.fromString(masterAddr.trim()).withDefaultPort(HConstants.DEFAULT_MASTER_PORT);
-      masterServers.add(ServerName.valueOf(masterHostPort.toString(), 
ServerName.NON_STARTCODE));
-    }
-    Preconditions.checkArgument(!masterServers.isEmpty(), "At least one master 
address is needed");
+  private <T extends Message> CompletableFuture<T> 
call(ClientMetaService.Interface stub,
+    Callable<T> callable) {
+    HBaseRpcController controller = rpcControllerFactory.newController();
+    CompletableFuture<T> future = new CompletableFuture<>();
+    callable.call(controller, stub, resp -> {
+      if (controller.failed()) {
+        future.completeExceptionally(controller.getFailed());
+      } else {
+        future.complete(resp);
+      }
+    });
+    return future;
   }
 
-  @VisibleForTesting
-  public Set<ServerName> getParsedMasterServers() {
-    return Collections.unmodifiableSet(masterServers);
+  private IOException badResponse(String debug) {
+    return new IOException(String.format("Invalid result for request %s. Will 
be retried", debug));
   }
 
-  /**
-   * Returns a call back that can be passed along to the non-blocking rpc 
call. It is invoked once
-   * the rpc finishes and the response is propagated to the passed future.
-   * @param future Result future to which the rpc response is propagated.
-   * @param isValidResp Checks if the rpc response has a valid result.
-   * @param transformResult Transforms the result to a different form as 
expected by callers.
-   * @param hrc RpcController instance for this rpc.
-   * @param debug Debug message passed along to the caller in case of 
exceptions.
-   * @param <T> RPC result type.
-   * @param <R> Transformed type of the result.
-   * @return A call back that can be embedded in the non-blocking rpc call.
-   */
-  private <T, R> RpcCallback<T> getRpcCallBack(CompletableFuture<R> future,
-      Predicate<T> isValidResp, Function<T, R> transformResult, 
HBaseRpcController hrc,
-      final String debug) {
-    return rpcResult -> {
-      if (rpcResult == null) {
-        future.completeExceptionally(
-            new MasterRegistryFetchException(masterServers, hrc.getFailed()));
-        return;
-      }
-      if (!isValidResp.test(rpcResult)) {
-        // Rpc returned ok, but result was malformed.
-        future.completeExceptionally(new IOException(
-            String.format("Invalid result for request %s. Will be retried", 
debug)));
-        return;
-      }
-      future.complete(transformResult.apply(rpcResult));
-    };
+  // send requests concurrently to hedgedReadsFanout masters
+  private <T extends Message> void groupCall(CompletableFuture<T> future, int 
startIndexInclusive,
+    Callable<T> callable, Predicate<T> isValidResp, String debug,
+    ConcurrentLinkedQueue<Throwable> errors) {
+    int endIndexExclusive = Math.min(startIndexInclusive + hedgedReadFanOut, 
masterStubs.size());
+    AtomicInteger remaining = new AtomicInteger(endIndexExclusive - 
startIndexInclusive);
+    for (int i = startIndexInclusive; i < endIndexExclusive; i++) {
+      addListener(call(masterStubs.get(i), callable), (r, e) -> {
+        // a simple check to skip all the later operations earlier
+        if (future.isDone()) {
+          return;
+        }
+        if (e == null && !isValidResp.test(r)) {
+          e = badResponse(debug);
+        }
+        if (e != null) {
+          // make sure when remaining reaches 0 we have all exceptions in the 
errors queue
+          errors.add(e);
+          if (remaining.decrementAndGet() == 0) {
+            if (endIndexExclusive == masterStubs.size()) {
+              // we are done, complete the future with exception
+              RetriesExhaustedException ex = new 
RetriesExhaustedException("masters",

Review comment:
       The RetriesExhaustedException is used to wrap all the exceptions and 
then it will be wrapped by the MasterRegistryFetchException to include all the 
master addresses.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to