bharathv commented on a change in pull request #2130:
URL: https://github.com/apache/hbase/pull/2130#discussion_r466035689



##########
File path: 
hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
##########
@@ -115,20 +129,50 @@
   MasterRegistry(Configuration conf) throws IOException {
     this.hedgedReadFanOut = Math.max(1, 
conf.getInt(MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY,
       MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT));
-    int rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
+    rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
       conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, 
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
     // XXX: we pass cluster id as null here since we do not have a cluster id 
yet, we have to fetch
     // this through the master registry...
     // This is a problem as we will use the cluster id to determine the 
authentication method
     rpcClient = RpcClientFactory.createClient(conf, null);
     rpcControllerFactory = RpcControllerFactory.instantiate(conf);
-    Set<ServerName> masterAddrs = parseMasterAddrs(conf);
+    // Generate the seed list of master stubs. Subsequent RPCs try to keep a 
live list of masters
+    // by fetching the end points from this list.
+    populateMasterStubs(parseMasterAddrs(conf));
+    Runnable masterEndPointRefresher = () -> {
+      while (!Thread.interrupted()) {
+        try {
+          // Spurious wake ups are okay, worst case we make an extra RPC call 
to refresh. We won't
+          // have duplicate refreshes because once the thread is past the 
wait(), notify()s are
+          // ignored until the thread is back to the waiting state.
+          synchronized (refreshMasters) {
+            refreshMasters.wait(WAIT_TIME_OUT_MS);
+          }
+          LOG.debug("Attempting to refresh master address end points.");
+          Set<ServerName> newMasters = new HashSet<>(getMasters().get());

Review comment:
       Okay. Looks like we have all the plumbing for client metrics already in 
place via HBASE-12911. I can add metrics for this RPC.

##########
File path: 
hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
##########
@@ -170,6 +214,11 @@ public static String getMasterAddr(Configuration conf) 
throws UnknownHostExcepti
     callable.call(controller, stub, resp -> {
       if (controller.failed()) {
         future.completeExceptionally(controller.getFailed());
+        // RPC has failed, trigger a refresh of master end points. We can have 
some spurious

Review comment:
       Ya, I think thats a valid concern. I was thinking about a similar edge 
case, hence the exponential back-off proposal. I think we can either implement 
a back-off or wrap it in a simple rate limiter (ex: no more than 1 getMasters 
in a 2 min window). WDYT?

##########
File path: 
hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
##########
@@ -115,20 +129,50 @@
   MasterRegistry(Configuration conf) throws IOException {
     this.hedgedReadFanOut = Math.max(1, 
conf.getInt(MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY,
       MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT));
-    int rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
+    rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
       conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, 
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
     // XXX: we pass cluster id as null here since we do not have a cluster id 
yet, we have to fetch
     // this through the master registry...
     // This is a problem as we will use the cluster id to determine the 
authentication method
     rpcClient = RpcClientFactory.createClient(conf, null);
     rpcControllerFactory = RpcControllerFactory.instantiate(conf);
-    Set<ServerName> masterAddrs = parseMasterAddrs(conf);
+    // Generate the seed list of master stubs. Subsequent RPCs try to keep a 
live list of masters
+    // by fetching the end points from this list.
+    populateMasterStubs(parseMasterAddrs(conf));
+    Runnable masterEndPointRefresher = () -> {
+      while (!Thread.interrupted()) {
+        try {
+          // Spurious wake ups are okay, worst case we make an extra RPC call 
to refresh. We won't
+          // have duplicate refreshes because once the thread is past the 
wait(), notify()s are
+          // ignored until the thread is back to the waiting state.
+          synchronized (refreshMasters) {
+            refreshMasters.wait(WAIT_TIME_OUT_MS);
+          }
+          LOG.debug("Attempting to refresh master address end points.");
+          Set<ServerName> newMasters = new HashSet<>(getMasters().get());
+          populateMasterStubs(newMasters);
+          LOG.debug("Finished refreshing master end points. {}", newMasters);
+        } catch (InterruptedException e) {
+          LOG.debug("Interrupted during wait, aborting 
refresh-masters-thread.", e);

Review comment:
       >  is there a reason the thread could be interrupted (and thus 
terminated) while the connection remains active
   
   I don't think so. The only interruptor for this thread should be from the 
shutdown from the registry#close(), which is called during connection clean up.
   
   > So can a connection enter a state where it's still alive with no 
masterRefreshThread running?
   
   AFAICT that is not possible. The only way it can break the loop is via 
interruption.

##########
File path: 
hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
##########
@@ -115,20 +132,49 @@
   MasterRegistry(Configuration conf) throws IOException {
     this.hedgedReadFanOut = Math.max(1, 
conf.getInt(MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY,
       MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT));
-    int rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
+    rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
       conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, 
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
     // XXX: we pass cluster id as null here since we do not have a cluster id 
yet, we have to fetch
     // this through the master registry...
     // This is a problem as we will use the cluster id to determine the 
authentication method
     rpcClient = RpcClientFactory.createClient(conf, null);
     rpcControllerFactory = RpcControllerFactory.instantiate(conf);
-    Set<ServerName> masterAddrs = parseMasterAddrs(conf);
+    // Generate the seed list of master stubs. Subsequent RPCs try to keep a 
live list of masters
+    // by fetching the end points from this list.
+    populateMasterStubs(parseMasterAddrs(conf));
+    masterAddrRefresher = Executors.newSingleThreadExecutor(new 
ThreadFactoryBuilder()
+        .setNameFormat("MasterRegistry refresh 
end-points").setDaemon(true).build());

Review comment:
       will do




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to