bbeaudreault commented on code in PR #5631:
URL: https://github.com/apache/hbase/pull/5631#discussion_r1476392904


##########
hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryRpcStubHolder.java:
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.ipc.RpcClient;
+import org.apache.hadoop.hbase.ipc.RpcClientFactory;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.IOExceptionSupplier;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
+
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ClientMetaService;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ConnectionRegistryService;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetConnectionRegistryRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetConnectionRegistryResponse;
+
+/**
+ * A class for creating {@link RpcClient} and related stubs used by
+ * {@link AbstractRpcBasedConnectionRegistry}. We need to connect to bootstrap 
nodes to get the
+ * cluster id first, before creating the final {@link RpcClient} and related 
stubs.
+ * <p>
+ * See HBASE-25051 for more details.
+ */
[email protected]
+class ConnectionRegistryRpcStubHolder implements Closeable {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ConnectionRegistryRpcStubHolder.class);
+
+  private final Configuration conf;
+
+  // used for getting cluster id
+  private final Configuration noAuthConf;
+
+  private final User user;
+
+  private final RpcControllerFactory rpcControllerFactory;
+
+  private final Set<ServerName> bootstrapNodes;
+
+  private final int rpcTimeoutMs;
+
+  private volatile ImmutableMap<ServerName, ClientMetaService.Interface> 
addr2Stub;
+
+  private volatile RpcClient rpcClient;
+
+  private CompletableFuture<ImmutableMap<ServerName, 
ClientMetaService.Interface>> addr2StubFuture;
+
+  ConnectionRegistryRpcStubHolder(Configuration conf, User user,
+    RpcControllerFactory rpcControllerFactory, Set<ServerName> bootstrapNodes) 
{
+    this.conf = conf;
+    if (User.isHBaseSecurityEnabled(conf)) {
+      this.noAuthConf = new Configuration(conf);
+      this.noAuthConf.set(User.HBASE_SECURITY_CONF_KEY, "simple");
+    } else {
+      this.noAuthConf = conf;
+    }
+    this.user = user;
+    this.rpcControllerFactory = rpcControllerFactory;
+    this.bootstrapNodes = Collections.unmodifiableSet(bootstrapNodes);
+    this.rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
+      conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, 
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
+  }
+
+  private ImmutableMap<ServerName, ClientMetaService.Interface> 
createStubs(RpcClient rpcClient,
+    Collection<ServerName> addrs) {
+    LOG.debug("Going to use new servers to create stubs: {}", addrs);
+    Preconditions.checkNotNull(addrs);
+    ImmutableMap.Builder<ServerName, ClientMetaService.Interface> builder =
+      ImmutableMap.builderWithExpectedSize(addrs.size());
+    for (ServerName masterAddr : addrs) {
+      builder.put(masterAddr,
+        ClientMetaService.newStub(rpcClient.createRpcChannel(masterAddr, user, 
rpcTimeoutMs)));
+    }
+    return builder.build();
+  }
+
+  private void createStubsAndComplete(String clusterId,
+    CompletableFuture<ImmutableMap<ServerName, ClientMetaService.Interface>> 
future) {
+    RpcClient c = RpcClientFactory.createClient(conf, clusterId);
+    ImmutableMap<ServerName, ClientMetaService.Interface> m = createStubs(c, 
bootstrapNodes);
+    rpcClient = c;
+    addr2Stub = m;
+    future.complete(m);
+  }
+
+  private void createStubs(List<ServerName> bootstrapServers, int index,
+    CompletableFuture<ImmutableMap<ServerName, ClientMetaService.Interface>> 
future) {
+    // use null cluster id here as we do not know the cluster id yet, we will 
fetch it through this
+    // rpc client and use it to create the final rpc client
+    RpcClient getConnectionRegistryRpcClient = 
RpcClientFactory.createClient(noAuthConf, null);
+    // user and rpcTimeout are both not important here, as we will not 
actually send any rpc calls
+    // out, only a preamble connection header, but if we pass null as user, 
there will be NPE in
+    // some code paths...
+    RpcChannel channel =
+      
getConnectionRegistryRpcClient.createRpcChannel(bootstrapServers.get(index), 
user, 0);
+    ConnectionRegistryService.Interface stub = 
ConnectionRegistryService.newStub(channel);
+    HBaseRpcController controller = rpcControllerFactory.newController();
+    stub.getConnectionRegistry(controller, 
GetConnectionRegistryRequest.getDefaultInstance(),
+      new RpcCallback<GetConnectionRegistryResponse>() {
+
+        @Override
+        public void run(GetConnectionRegistryResponse resp) {
+          synchronized (ConnectionRegistryRpcStubHolder.this) {
+            addr2StubFuture = null;
+            if (controller.failed()) {
+              if 
(ConnectionUtils.isUnexpectedPreambleHeaderException(controller.getFailed())) {
+                // this means we have connected to an old server where it does 
not support passing
+                // cluster id through preamble connnection header, so we 
fallback to use null
+                // cluster id, which is the old behavior
+                LOG.debug("Failed to get connection registry info, should be 
an old server,"
+                  + " fallback to use null cluster id", 
controller.getFailed());
+                createStubsAndComplete(null, future);
+              } else {
+                LOG.debug("Failed to get connection registry info", 
controller.getFailed());
+                if (index == bootstrapServers.size() - 1) {
+                  future.completeExceptionally(controller.getFailed());
+                } else {
+                  // try next bootstrap server
+                  createStubs(bootstrapServers, index + 1, future);
+                }
+              }
+            } else {
+              LOG.debug("Got connection registry info: {}", resp);
+              String clusterId = resp.getClusterId();
+              createStubsAndComplete(clusterId, future);
+            }
+          }
+          getConnectionRegistryRpcClient.close();
+        }
+      });
+  }
+
+  private CompletableFuture<ImmutableMap<ServerName, 
ClientMetaService.Interface>> createStubs() {
+    CompletableFuture<ImmutableMap<ServerName, ClientMetaService.Interface>> 
future =
+      new CompletableFuture<>();
+    List<ServerName> bootstrapServers = new 
ArrayList<ServerName>(bootstrapNodes);
+    Collections.shuffle(bootstrapServers);
+    createStubs(bootstrapServers, 0, future);
+    return future;
+  }
+
+  CompletableFuture<ImmutableMap<ServerName, ClientMetaService.Interface>> 
getStubs() {
+    ImmutableMap<ServerName, ClientMetaService.Interface> s = this.addr2Stub;
+    if (s != null) {
+      return CompletableFuture.completedFuture(s);
+    }
+    synchronized (this) {
+      s = this.addr2Stub;
+      if (s != null) {
+        return CompletableFuture.completedFuture(s);
+      }
+      if (addr2StubFuture != null) {
+        return addr2StubFuture;
+      }
+      addr2StubFuture = createStubs();
+      return addr2StubFuture;
+    }
+  }
+
+  void refreshStubs(IOExceptionSupplier<Collection<ServerName>> 
fetchEndpoints) throws IOException {
+    // There is no actual call yet so we have not initialize the rpc client 
and related stubs yet,
+    // give up refreshing
+    if (addr2Stub == null) {
+      LOG.debug("Skip refreshing stubs as we have not initialized rpc client 
yet");
+      return;
+    }
+    LOG.debug("Going to refresh stubs");
+    assert rpcClient != null;
+    addr2Stub = createStubs(rpcClient, fetchEndpoints.get());

Review Comment:
   do we not need to synchronize the setting of addr2Stub? Looks like we are 
doing it in 2 places, here and above, and we're also reading it in a 
synchronized block above.



##########
hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryRpcStubHolder.java:
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.ipc.RpcClient;
+import org.apache.hadoop.hbase.ipc.RpcClientFactory;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.IOExceptionSupplier;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
+
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ClientMetaService;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ConnectionRegistryService;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetConnectionRegistryRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetConnectionRegistryResponse;
+
+/**
+ * A class for creating {@link RpcClient} and related stubs used by
+ * {@link AbstractRpcBasedConnectionRegistry}. We need to connect to bootstrap 
nodes to get the
+ * cluster id first, before creating the final {@link RpcClient} and related 
stubs.
+ * <p>
+ * See HBASE-25051 for more details.
+ */
[email protected]
+class ConnectionRegistryRpcStubHolder implements Closeable {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ConnectionRegistryRpcStubHolder.class);
+
+  private final Configuration conf;
+
+  // used for getting cluster id
+  private final Configuration noAuthConf;
+
+  private final User user;
+
+  private final RpcControllerFactory rpcControllerFactory;
+
+  private final Set<ServerName> bootstrapNodes;
+
+  private final int rpcTimeoutMs;
+
+  private volatile ImmutableMap<ServerName, ClientMetaService.Interface> 
addr2Stub;
+
+  private volatile RpcClient rpcClient;
+
+  private CompletableFuture<ImmutableMap<ServerName, 
ClientMetaService.Interface>> addr2StubFuture;
+
+  ConnectionRegistryRpcStubHolder(Configuration conf, User user,

Review Comment:
   This class is called a StubHolder, but it also does some rpc requests and it 
also gets passed in a future. I wonder if we should move the the rpc code from 
createStubs out to a separate class. Let this class just handle synchronization 
around updating the volatiles?
   
   Right now the class is a bit complicated with lots of createStubs overloads 
and a long rpc method in the middle



##########
hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryRpcStubHolder.java:
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.ipc.RpcClient;
+import org.apache.hadoop.hbase.ipc.RpcClientFactory;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.IOExceptionSupplier;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
+
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ClientMetaService;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ConnectionRegistryService;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetConnectionRegistryRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetConnectionRegistryResponse;
+
+/**
+ * A class for creating {@link RpcClient} and related stubs used by
+ * {@link AbstractRpcBasedConnectionRegistry}. We need to connect to bootstrap 
nodes to get the
+ * cluster id first, before creating the final {@link RpcClient} and related 
stubs.
+ * <p>
+ * See HBASE-25051 for more details.
+ */
[email protected]
+class ConnectionRegistryRpcStubHolder implements Closeable {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ConnectionRegistryRpcStubHolder.class);
+
+  private final Configuration conf;
+
+  // used for getting cluster id
+  private final Configuration noAuthConf;
+
+  private final User user;
+
+  private final RpcControllerFactory rpcControllerFactory;
+
+  private final Set<ServerName> bootstrapNodes;
+
+  private final int rpcTimeoutMs;
+
+  private volatile ImmutableMap<ServerName, ClientMetaService.Interface> 
addr2Stub;
+
+  private volatile RpcClient rpcClient;
+
+  private CompletableFuture<ImmutableMap<ServerName, 
ClientMetaService.Interface>> addr2StubFuture;
+
+  ConnectionRegistryRpcStubHolder(Configuration conf, User user,
+    RpcControllerFactory rpcControllerFactory, Set<ServerName> bootstrapNodes) 
{
+    this.conf = conf;
+    if (User.isHBaseSecurityEnabled(conf)) {
+      this.noAuthConf = new Configuration(conf);
+      this.noAuthConf.set(User.HBASE_SECURITY_CONF_KEY, "simple");
+    } else {
+      this.noAuthConf = conf;
+    }
+    this.user = user;
+    this.rpcControllerFactory = rpcControllerFactory;
+    this.bootstrapNodes = Collections.unmodifiableSet(bootstrapNodes);
+    this.rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
+      conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, 
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
+  }
+
+  private ImmutableMap<ServerName, ClientMetaService.Interface> 
createStubs(RpcClient rpcClient,
+    Collection<ServerName> addrs) {
+    LOG.debug("Going to use new servers to create stubs: {}", addrs);
+    Preconditions.checkNotNull(addrs);
+    ImmutableMap.Builder<ServerName, ClientMetaService.Interface> builder =
+      ImmutableMap.builderWithExpectedSize(addrs.size());
+    for (ServerName masterAddr : addrs) {
+      builder.put(masterAddr,
+        ClientMetaService.newStub(rpcClient.createRpcChannel(masterAddr, user, 
rpcTimeoutMs)));
+    }
+    return builder.build();
+  }
+
+  private void createStubsAndComplete(String clusterId,
+    CompletableFuture<ImmutableMap<ServerName, ClientMetaService.Interface>> 
future) {
+    RpcClient c = RpcClientFactory.createClient(conf, clusterId);
+    ImmutableMap<ServerName, ClientMetaService.Interface> m = createStubs(c, 
bootstrapNodes);
+    rpcClient = c;
+    addr2Stub = m;
+    future.complete(m);
+  }
+
+  private void createStubs(List<ServerName> bootstrapServers, int index,
+    CompletableFuture<ImmutableMap<ServerName, ClientMetaService.Interface>> 
future) {
+    // use null cluster id here as we do not know the cluster id yet, we will 
fetch it through this
+    // rpc client and use it to create the final rpc client
+    RpcClient getConnectionRegistryRpcClient = 
RpcClientFactory.createClient(noAuthConf, null);
+    // user and rpcTimeout are both not important here, as we will not 
actually send any rpc calls
+    // out, only a preamble connection header, but if we pass null as user, 
there will be NPE in
+    // some code paths...
+    RpcChannel channel =
+      
getConnectionRegistryRpcClient.createRpcChannel(bootstrapServers.get(index), 
user, 0);
+    ConnectionRegistryService.Interface stub = 
ConnectionRegistryService.newStub(channel);
+    HBaseRpcController controller = rpcControllerFactory.newController();
+    stub.getConnectionRegistry(controller, 
GetConnectionRegistryRequest.getDefaultInstance(),
+      new RpcCallback<GetConnectionRegistryResponse>() {
+
+        @Override
+        public void run(GetConnectionRegistryResponse resp) {
+          synchronized (ConnectionRegistryRpcStubHolder.this) {
+            addr2StubFuture = null;
+            if (controller.failed()) {
+              if 
(ConnectionUtils.isUnexpectedPreambleHeaderException(controller.getFailed())) {
+                // this means we have connected to an old server where it does 
not support passing
+                // cluster id through preamble connnection header, so we 
fallback to use null
+                // cluster id, which is the old behavior
+                LOG.debug("Failed to get connection registry info, should be 
an old server,"
+                  + " fallback to use null cluster id", 
controller.getFailed());
+                createStubsAndComplete(null, future);
+              } else {
+                LOG.debug("Failed to get connection registry info", 
controller.getFailed());
+                if (index == bootstrapServers.size() - 1) {
+                  future.completeExceptionally(controller.getFailed());
+                } else {
+                  // try next bootstrap server
+                  createStubs(bootstrapServers, index + 1, future);
+                }
+              }
+            } else {
+              LOG.debug("Got connection registry info: {}", resp);
+              String clusterId = resp.getClusterId();
+              createStubsAndComplete(clusterId, future);
+            }
+          }
+          getConnectionRegistryRpcClient.close();
+        }
+      });
+  }
+
+  private CompletableFuture<ImmutableMap<ServerName, 
ClientMetaService.Interface>> createStubs() {
+    CompletableFuture<ImmutableMap<ServerName, ClientMetaService.Interface>> 
future =
+      new CompletableFuture<>();
+    List<ServerName> bootstrapServers = new 
ArrayList<ServerName>(bootstrapNodes);
+    Collections.shuffle(bootstrapServers);
+    createStubs(bootstrapServers, 0, future);
+    return future;
+  }
+
+  CompletableFuture<ImmutableMap<ServerName, ClientMetaService.Interface>> 
getStubs() {
+    ImmutableMap<ServerName, ClientMetaService.Interface> s = this.addr2Stub;
+    if (s != null) {
+      return CompletableFuture.completedFuture(s);
+    }
+    synchronized (this) {
+      s = this.addr2Stub;
+      if (s != null) {
+        return CompletableFuture.completedFuture(s);
+      }
+      if (addr2StubFuture != null) {
+        return addr2StubFuture;
+      }
+      addr2StubFuture = createStubs();
+      return addr2StubFuture;
+    }
+  }
+
+  void refreshStubs(IOExceptionSupplier<Collection<ServerName>> 
fetchEndpoints) throws IOException {
+    // There is no actual call yet so we have not initialize the rpc client 
and related stubs yet,
+    // give up refreshing
+    if (addr2Stub == null) {
+      LOG.debug("Skip refreshing stubs as we have not initialized rpc client 
yet");
+      return;
+    }
+    LOG.debug("Going to refresh stubs");
+    assert rpcClient != null;
+    addr2Stub = createStubs(rpcClient, fetchEndpoints.get());

Review Comment:
   I now see above that we are synchronizing in the callback of createStubs, 
but still seems like we need it here too



##########
hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java:
##########
@@ -663,4 +664,13 @@ static void setCoprocessorError(RpcController controller, 
Throwable error) {
       controller.setFailed(error.toString());
     }
   }
+
+  static boolean isUnexpectedPreambleHeaderException(IOException e) {
+    if (!(e instanceof RemoteException)) {
+      return false;
+    }
+    RemoteException re = (RemoteException) e;
+    return FatalConnectionException.class.getName().equals(re.getClassName())

Review Comment:
   Do we want a specific exception for this? Seems brittle to check message. I 
know we do that in other places, but just checking here in this new code



##########
hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryRpcStubHolder.java:
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.ipc.RpcClient;
+import org.apache.hadoop.hbase.ipc.RpcClientFactory;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.IOExceptionSupplier;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
+
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ClientMetaService;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ConnectionRegistryService;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetConnectionRegistryRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetConnectionRegistryResponse;
+
+/**
+ * A class for creating {@link RpcClient} and related stubs used by
+ * {@link AbstractRpcBasedConnectionRegistry}. We need to connect to bootstrap 
nodes to get the
+ * cluster id first, before creating the final {@link RpcClient} and related 
stubs.
+ * <p>
+ * See HBASE-25051 for more details.
+ */
[email protected]
+class ConnectionRegistryRpcStubHolder implements Closeable {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ConnectionRegistryRpcStubHolder.class);
+
+  private final Configuration conf;
+
+  // used for getting cluster id
+  private final Configuration noAuthConf;
+
+  private final User user;
+
+  private final RpcControllerFactory rpcControllerFactory;
+
+  private final Set<ServerName> bootstrapNodes;
+
+  private final int rpcTimeoutMs;
+
+  private volatile ImmutableMap<ServerName, ClientMetaService.Interface> 
addr2Stub;
+
+  private volatile RpcClient rpcClient;
+
+  private CompletableFuture<ImmutableMap<ServerName, 
ClientMetaService.Interface>> addr2StubFuture;
+
+  ConnectionRegistryRpcStubHolder(Configuration conf, User user,
+    RpcControllerFactory rpcControllerFactory, Set<ServerName> bootstrapNodes) 
{
+    this.conf = conf;
+    if (User.isHBaseSecurityEnabled(conf)) {
+      this.noAuthConf = new Configuration(conf);
+      this.noAuthConf.set(User.HBASE_SECURITY_CONF_KEY, "simple");
+    } else {
+      this.noAuthConf = conf;
+    }
+    this.user = user;
+    this.rpcControllerFactory = rpcControllerFactory;
+    this.bootstrapNodes = Collections.unmodifiableSet(bootstrapNodes);
+    this.rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
+      conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, 
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
+  }
+
+  private ImmutableMap<ServerName, ClientMetaService.Interface> 
createStubs(RpcClient rpcClient,
+    Collection<ServerName> addrs) {
+    LOG.debug("Going to use new servers to create stubs: {}", addrs);
+    Preconditions.checkNotNull(addrs);
+    ImmutableMap.Builder<ServerName, ClientMetaService.Interface> builder =
+      ImmutableMap.builderWithExpectedSize(addrs.size());
+    for (ServerName masterAddr : addrs) {
+      builder.put(masterAddr,
+        ClientMetaService.newStub(rpcClient.createRpcChannel(masterAddr, user, 
rpcTimeoutMs)));
+    }
+    return builder.build();
+  }
+
+  private void createStubsAndComplete(String clusterId,
+    CompletableFuture<ImmutableMap<ServerName, ClientMetaService.Interface>> 
future) {
+    RpcClient c = RpcClientFactory.createClient(conf, clusterId);
+    ImmutableMap<ServerName, ClientMetaService.Interface> m = createStubs(c, 
bootstrapNodes);
+    rpcClient = c;
+    addr2Stub = m;
+    future.complete(m);
+  }
+
+  private void createStubs(List<ServerName> bootstrapServers, int index,
+    CompletableFuture<ImmutableMap<ServerName, ClientMetaService.Interface>> 
future) {
+    // use null cluster id here as we do not know the cluster id yet, we will 
fetch it through this
+    // rpc client and use it to create the final rpc client
+    RpcClient getConnectionRegistryRpcClient = 
RpcClientFactory.createClient(noAuthConf, null);
+    // user and rpcTimeout are both not important here, as we will not 
actually send any rpc calls
+    // out, only a preamble connection header, but if we pass null as user, 
there will be NPE in
+    // some code paths...
+    RpcChannel channel =
+      
getConnectionRegistryRpcClient.createRpcChannel(bootstrapServers.get(index), 
user, 0);

Review Comment:
   Even if the rpcTimeout is not used, I think it's preferable to pass a real 
value in. We have a real value in this class, let's pass it in? Maybe something 
changes down the line, I don't like passing a 0 value for timeout



##########
hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java:
##########
@@ -688,41 +691,75 @@ private void doBadPreambleHandling(String msg) throws 
IOException {
   }
 
   private void doBadPreambleHandling(String msg, Exception e) throws 
IOException {
-    RpcServer.LOG.warn(msg);
+    RpcServer.LOG.warn(msg, e);
     doRespond(getErrorResponse(msg, e));
   }
 
+  private boolean doConnectionRegistryResponse() throws IOException {
+    if (!(rpcServer.server instanceof ConnectionRegistryEndpoint)) {
+      // should be in tests or some scenarios where we should not reach here
+      return false;
+    }
+    // on backup masters, this request may be blocked since we need to fetch 
it from filesystem,
+    // but since it is just backup master, it is not a critical problem
+    String clusterId = ((ConnectionRegistryEndpoint) 
rpcServer.server).getClusterId();

Review Comment:
   is there no way to protect this from blocking? maybe in a followup since 
this PR is already large. 
   
   my concern is that backup masters may soon become active, we don't want them 
blocked.



##########
hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java:
##########
@@ -127,88 +120,15 @@ public void write(ChannelHandlerContext ctx, Object msg, 
ChannelPromise promise)
     }
   }
 
-  private void finishCall(ResponseHeader responseHeader, ByteBufInputStream 
in, Call call)
-    throws IOException {
-    Message value;
-    if (call.responseDefaultType != null) {
-      Message.Builder builder = call.responseDefaultType.newBuilderForType();
-      if (!builder.mergeDelimitedFrom(in)) {
-        // The javadoc of mergeDelimitedFrom says returning false means the 
stream reaches EOF
-        // before reading any bytes out, so here we need to manually finish 
create the EOFException
-        // and finish the call
-        call.setException(new EOFException("EOF while reading response with 
type: "
-          + call.responseDefaultType.getClass().getName()));
-        return;
-      }
-      value = builder.build();
-    } else {
-      value = null;
-    }
-    CellScanner cellBlockScanner;
-    if (responseHeader.hasCellBlockMeta()) {
-      int size = responseHeader.getCellBlockMeta().getLength();
-      // Maybe we could read directly from the ByteBuf.
-      // The problem here is that we do not know when to release it.
-      byte[] cellBlock = new byte[size];
-      in.readFully(cellBlock);
-      cellBlockScanner = cellBlockBuilder.createCellScanner(this.codec, 
this.compressor, cellBlock);
-    } else {
-      cellBlockScanner = null;
-    }
-    call.setResponse(value, cellBlockScanner);
-  }
-
   private void readResponse(ChannelHandlerContext ctx, ByteBuf buf) throws 
IOException {
-    int totalSize = buf.readInt();
-    ByteBufInputStream in = new ByteBufInputStream(buf);
-    ResponseHeader responseHeader = ResponseHeader.parseDelimitedFrom(in);
-    int id = responseHeader.getCallId();
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("got response header " + 
TextFormat.shortDebugString(responseHeader)
-        + ", totalSize: " + totalSize + " bytes");
-    }
-    RemoteException remoteExc;
-    if (responseHeader.hasException()) {
-      ExceptionResponse exceptionResponse = responseHeader.getException();
-      remoteExc = IPCUtil.createRemoteException(exceptionResponse);
-      if (IPCUtil.isFatalConnectionException(exceptionResponse)) {
-        // Here we will cleanup all calls so do not need to fall back, just 
return.
-        exceptionCaught(ctx, remoteExc);
-        return;
-      }
-    } else {
-      remoteExc = null;
-    }
-    Call call = id2Call.remove(id);
-    if (call == null) {
-      // So we got a response for which we have no corresponding 'call' here 
on the client-side.
-      // We probably timed out waiting, cleaned up all references, and now the 
server decides
-      // to return a response. There is nothing we can do w/ the response at 
this stage. Clean
-      // out the wire of the response so its out of the way and we can get 
other responses on
-      // this connection.
-      if (LOG.isDebugEnabled()) {
-        int readSoFar = 
IPCUtil.getTotalSizeWhenWrittenDelimited(responseHeader);
-        int whatIsLeftToRead = totalSize - readSoFar;
-        LOG.debug("Unknown callId: " + id + ", skipping over this response of 
" + whatIsLeftToRead
-          + " bytes");
-      }
-      return;
-    }
-    call.callStats.setResponseSizeBytes(totalSize);
-    if (remoteExc != null) {
-      call.setException(remoteExc);
-      return;
-    }
     try {
-      finishCall(responseHeader, in, call);
+      conn.readResponse(new ByteBufInputStream(buf), id2Call,
+        remoteExc -> exceptionCaught(ctx, remoteExc));
     } catch (IOException e) {
-      // As the call has been removed from id2Call map, if we hit an exception 
here, the
-      // exceptionCaught method can not help us finish the call, so here we 
need to catch the
-      // exception and finish it
-      // And in netty, the decoding the frame based, when reaching here we 
have already read a full
+      // In netty, the decoding the frame based, when reaching here we have 
already read a full
       // frame, so hitting exception here does not mean the stream decoding is 
broken, thus we do
       // not need to throw the exception out and close the connection.
-      call.setException(e);
+      LOG.warn("failed to process response", e);

Review Comment:
   I'm not sure I follow this change -- why is it ok to now remove the 
call.setException? was it never necessary, or what part of the other changes 
here negate the need?



##########
hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryRpcStubHolder.java:
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.ipc.RpcClient;
+import org.apache.hadoop.hbase.ipc.RpcClientFactory;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.IOExceptionSupplier;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
+
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ClientMetaService;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ConnectionRegistryService;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetConnectionRegistryRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetConnectionRegistryResponse;
+
+/**
+ * A class for creating {@link RpcClient} and related stubs used by
+ * {@link AbstractRpcBasedConnectionRegistry}. We need to connect to bootstrap 
nodes to get the
+ * cluster id first, before creating the final {@link RpcClient} and related 
stubs.
+ * <p>
+ * See HBASE-25051 for more details.
+ */
[email protected]
+class ConnectionRegistryRpcStubHolder implements Closeable {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ConnectionRegistryRpcStubHolder.class);
+
+  private final Configuration conf;
+
+  // used for getting cluster id
+  private final Configuration noAuthConf;
+
+  private final User user;
+
+  private final RpcControllerFactory rpcControllerFactory;
+
+  private final Set<ServerName> bootstrapNodes;
+
+  private final int rpcTimeoutMs;
+
+  private volatile ImmutableMap<ServerName, ClientMetaService.Interface> 
addr2Stub;
+
+  private volatile RpcClient rpcClient;
+
+  private CompletableFuture<ImmutableMap<ServerName, 
ClientMetaService.Interface>> addr2StubFuture;
+
+  ConnectionRegistryRpcStubHolder(Configuration conf, User user,
+    RpcControllerFactory rpcControllerFactory, Set<ServerName> bootstrapNodes) 
{
+    this.conf = conf;
+    if (User.isHBaseSecurityEnabled(conf)) {
+      this.noAuthConf = new Configuration(conf);
+      this.noAuthConf.set(User.HBASE_SECURITY_CONF_KEY, "simple");
+    } else {
+      this.noAuthConf = conf;
+    }
+    this.user = user;
+    this.rpcControllerFactory = rpcControllerFactory;
+    this.bootstrapNodes = Collections.unmodifiableSet(bootstrapNodes);
+    this.rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
+      conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, 
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
+  }
+
+  private ImmutableMap<ServerName, ClientMetaService.Interface> 
createStubs(RpcClient rpcClient,
+    Collection<ServerName> addrs) {
+    LOG.debug("Going to use new servers to create stubs: {}", addrs);
+    Preconditions.checkNotNull(addrs);
+    ImmutableMap.Builder<ServerName, ClientMetaService.Interface> builder =
+      ImmutableMap.builderWithExpectedSize(addrs.size());
+    for (ServerName masterAddr : addrs) {
+      builder.put(masterAddr,
+        ClientMetaService.newStub(rpcClient.createRpcChannel(masterAddr, user, 
rpcTimeoutMs)));
+    }
+    return builder.build();
+  }
+
+  private void createStubsAndComplete(String clusterId,
+    CompletableFuture<ImmutableMap<ServerName, ClientMetaService.Interface>> 
future) {
+    RpcClient c = RpcClientFactory.createClient(conf, clusterId);
+    ImmutableMap<ServerName, ClientMetaService.Interface> m = createStubs(c, 
bootstrapNodes);
+    rpcClient = c;
+    addr2Stub = m;
+    future.complete(m);
+  }
+
+  private void createStubs(List<ServerName> bootstrapServers, int index,
+    CompletableFuture<ImmutableMap<ServerName, ClientMetaService.Interface>> 
future) {
+    // use null cluster id here as we do not know the cluster id yet, we will 
fetch it through this
+    // rpc client and use it to create the final rpc client
+    RpcClient getConnectionRegistryRpcClient = 
RpcClientFactory.createClient(noAuthConf, null);
+    // user and rpcTimeout are both not important here, as we will not 
actually send any rpc calls
+    // out, only a preamble connection header, but if we pass null as user, 
there will be NPE in
+    // some code paths...
+    RpcChannel channel =
+      
getConnectionRegistryRpcClient.createRpcChannel(bootstrapServers.get(index), 
user, 0);
+    ConnectionRegistryService.Interface stub = 
ConnectionRegistryService.newStub(channel);
+    HBaseRpcController controller = rpcControllerFactory.newController();
+    stub.getConnectionRegistry(controller, 
GetConnectionRegistryRequest.getDefaultInstance(),
+      new RpcCallback<GetConnectionRegistryResponse>() {
+
+        @Override
+        public void run(GetConnectionRegistryResponse resp) {
+          synchronized (ConnectionRegistryRpcStubHolder.this) {
+            addr2StubFuture = null;
+            if (controller.failed()) {
+              if 
(ConnectionUtils.isUnexpectedPreambleHeaderException(controller.getFailed())) {
+                // this means we have connected to an old server where it does 
not support passing
+                // cluster id through preamble connnection header, so we 
fallback to use null
+                // cluster id, which is the old behavior
+                LOG.debug("Failed to get connection registry info, should be 
an old server,"
+                  + " fallback to use null cluster id", 
controller.getFailed());
+                createStubsAndComplete(null, future);

Review Comment:
   I wonder, do we want to try the next bootstrap server? We can fall through 
to old behavior if all bootstrap servers fail preamble?



##########
hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryRpcStubHolder.java:
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.ipc.RpcClient;
+import org.apache.hadoop.hbase.ipc.RpcClientFactory;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.IOExceptionSupplier;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
+
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ClientMetaService;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ConnectionRegistryService;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetConnectionRegistryRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetConnectionRegistryResponse;
+
+/**
+ * A class for creating {@link RpcClient} and related stubs used by
+ * {@link AbstractRpcBasedConnectionRegistry}. We need to connect to bootstrap 
nodes to get the
+ * cluster id first, before creating the final {@link RpcClient} and related 
stubs.
+ * <p>
+ * See HBASE-25051 for more details.
+ */
[email protected]
+class ConnectionRegistryRpcStubHolder implements Closeable {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ConnectionRegistryRpcStubHolder.class);
+
+  private final Configuration conf;
+
+  // used for getting cluster id
+  private final Configuration noAuthConf;
+
+  private final User user;
+
+  private final RpcControllerFactory rpcControllerFactory;
+
+  private final Set<ServerName> bootstrapNodes;
+
+  private final int rpcTimeoutMs;
+
+  private volatile ImmutableMap<ServerName, ClientMetaService.Interface> 
addr2Stub;
+
+  private volatile RpcClient rpcClient;
+
+  private CompletableFuture<ImmutableMap<ServerName, 
ClientMetaService.Interface>> addr2StubFuture;
+
+  ConnectionRegistryRpcStubHolder(Configuration conf, User user,
+    RpcControllerFactory rpcControllerFactory, Set<ServerName> bootstrapNodes) 
{
+    this.conf = conf;
+    if (User.isHBaseSecurityEnabled(conf)) {
+      this.noAuthConf = new Configuration(conf);
+      this.noAuthConf.set(User.HBASE_SECURITY_CONF_KEY, "simple");
+    } else {
+      this.noAuthConf = conf;
+    }
+    this.user = user;
+    this.rpcControllerFactory = rpcControllerFactory;
+    this.bootstrapNodes = Collections.unmodifiableSet(bootstrapNodes);
+    this.rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
+      conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, 
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
+  }
+
+  private ImmutableMap<ServerName, ClientMetaService.Interface> 
createStubs(RpcClient rpcClient,
+    Collection<ServerName> addrs) {
+    LOG.debug("Going to use new servers to create stubs: {}", addrs);
+    Preconditions.checkNotNull(addrs);
+    ImmutableMap.Builder<ServerName, ClientMetaService.Interface> builder =
+      ImmutableMap.builderWithExpectedSize(addrs.size());
+    for (ServerName masterAddr : addrs) {
+      builder.put(masterAddr,
+        ClientMetaService.newStub(rpcClient.createRpcChannel(masterAddr, user, 
rpcTimeoutMs)));
+    }
+    return builder.build();
+  }
+
+  private void createStubsAndComplete(String clusterId,
+    CompletableFuture<ImmutableMap<ServerName, ClientMetaService.Interface>> 
future) {
+    RpcClient c = RpcClientFactory.createClient(conf, clusterId);
+    ImmutableMap<ServerName, ClientMetaService.Interface> m = createStubs(c, 
bootstrapNodes);
+    rpcClient = c;
+    addr2Stub = m;
+    future.complete(m);
+  }
+
+  private void createStubs(List<ServerName> bootstrapServers, int index,
+    CompletableFuture<ImmutableMap<ServerName, ClientMetaService.Interface>> 
future) {
+    // use null cluster id here as we do not know the cluster id yet, we will 
fetch it through this
+    // rpc client and use it to create the final rpc client
+    RpcClient getConnectionRegistryRpcClient = 
RpcClientFactory.createClient(noAuthConf, null);
+    // user and rpcTimeout are both not important here, as we will not 
actually send any rpc calls
+    // out, only a preamble connection header, but if we pass null as user, 
there will be NPE in
+    // some code paths...
+    RpcChannel channel =
+      
getConnectionRegistryRpcClient.createRpcChannel(bootstrapServers.get(index), 
user, 0);
+    ConnectionRegistryService.Interface stub = 
ConnectionRegistryService.newStub(channel);
+    HBaseRpcController controller = rpcControllerFactory.newController();
+    stub.getConnectionRegistry(controller, 
GetConnectionRegistryRequest.getDefaultInstance(),
+      new RpcCallback<GetConnectionRegistryResponse>() {
+
+        @Override
+        public void run(GetConnectionRegistryResponse resp) {
+          synchronized (ConnectionRegistryRpcStubHolder.this) {
+            addr2StubFuture = null;
+            if (controller.failed()) {
+              if 
(ConnectionUtils.isUnexpectedPreambleHeaderException(controller.getFailed())) {
+                // this means we have connected to an old server where it does 
not support passing
+                // cluster id through preamble connnection header, so we 
fallback to use null
+                // cluster id, which is the old behavior
+                LOG.debug("Failed to get connection registry info, should be 
an old server,"
+                  + " fallback to use null cluster id", 
controller.getFailed());
+                createStubsAndComplete(null, future);
+              } else {
+                LOG.debug("Failed to get connection registry info", 
controller.getFailed());
+                if (index == bootstrapServers.size() - 1) {
+                  future.completeExceptionally(controller.getFailed());
+                } else {
+                  // try next bootstrap server
+                  createStubs(bootstrapServers, index + 1, future);
+                }
+              }
+            } else {
+              LOG.debug("Got connection registry info: {}", resp);
+              String clusterId = resp.getClusterId();
+              createStubsAndComplete(clusterId, future);
+            }
+          }
+          getConnectionRegistryRpcClient.close();

Review Comment:
   do you want to do this in a `finally` block? i think some of the above 
methods could throw an exception



##########
hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java:
##########
@@ -303,12 +309,16 @@ protected void initChannel(Channel ch) throws Exception {
       .addListener(new ChannelFutureListener() {
 
         private void succeed(Channel ch) throws IOException {
-          ch.writeAndFlush(connectionHeaderPreamble.retainedDuplicate());
+          if (connectionRegistryCall != null) {
+            getConnectionRegistry(ch);
+            return;
+          }
+          NettyFutureUtils.safeWriteAndFlush(ch, 
connectionHeaderPreamble.retainedDuplicate());
           if (useSasl) {
             saslNegotiate(ch);
           } else {
             // send the connection header to server
-            ch.write(connectionHeaderWithLength.retainedDuplicate());
+            NettyFutureUtils.safeWrite(ch, 
connectionHeaderWithLength.retainedDuplicate());

Review Comment:
   just checking -- was this change just a cleanup you noticed, or related to 
the change?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to