ndimiduk commented on a change in pull request #2130:
URL: https://github.com/apache/hbase/pull/2130#discussion_r464597532



##########
File path: 
hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
##########
@@ -115,20 +129,50 @@
   MasterRegistry(Configuration conf) throws IOException {
     this.hedgedReadFanOut = Math.max(1, 
conf.getInt(MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY,
       MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT));
-    int rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
+    rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
       conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, 
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
     // XXX: we pass cluster id as null here since we do not have a cluster id 
yet, we have to fetch
     // this through the master registry...
     // This is a problem as we will use the cluster id to determine the 
authentication method
     rpcClient = RpcClientFactory.createClient(conf, null);
     rpcControllerFactory = RpcControllerFactory.instantiate(conf);
-    Set<ServerName> masterAddrs = parseMasterAddrs(conf);
+    // Generate the seed list of master stubs. Subsequent RPCs try to keep a 
live list of masters
+    // by fetching the end points from this list.
+    populateMasterStubs(parseMasterAddrs(conf));
+    Runnable masterEndPointRefresher = () -> {
+      while (!Thread.interrupted()) {
+        try {
+          // Spurious wake ups are okay, worst case we make an extra RPC call 
to refresh. We won't
+          // have duplicate refreshes because once the thread is past the 
wait(), notify()s are
+          // ignored until the thread is back to the waiting state.
+          synchronized (refreshMasters) {
+            refreshMasters.wait(WAIT_TIME_OUT_MS);
+          }
+          LOG.debug("Attempting to refresh master address end points.");
+          Set<ServerName> newMasters = new HashSet<>(getMasters().get());

Review comment:
       Where would I find metrics regarding calls to `getMasters()`? I suppose 
either client or server-side would be good.

##########
File path: 
hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistryHedgedReads.java
##########
@@ -121,6 +121,11 @@ public boolean hasCellBlockSupport() {
     @Override
     public void callMethod(MethodDescriptor method, RpcController controller, 
Message request,
       Message responsePrototype, RpcCallback<Message> done) {
+      if (!method.getName().equals("GetClusterId")) {
+        // Master registry internally runs other RPCs to keep the master list 
up to date. This check

Review comment:
       Can you say more here? Why is it that the internal RPCs that keep the 
master list up to date are sufficient to skip a call to "GetClusterId"? Can you 
provide a "see also" comment that points the reader off to the counting logic, 
or at least the counter that this condition protects?

##########
File path: 
hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
##########
@@ -115,20 +129,50 @@
   MasterRegistry(Configuration conf) throws IOException {
     this.hedgedReadFanOut = Math.max(1, 
conf.getInt(MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY,
       MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT));
-    int rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
+    rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
       conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, 
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
     // XXX: we pass cluster id as null here since we do not have a cluster id 
yet, we have to fetch
     // this through the master registry...
     // This is a problem as we will use the cluster id to determine the 
authentication method
     rpcClient = RpcClientFactory.createClient(conf, null);
     rpcControllerFactory = RpcControllerFactory.instantiate(conf);
-    Set<ServerName> masterAddrs = parseMasterAddrs(conf);
+    // Generate the seed list of master stubs. Subsequent RPCs try to keep a 
live list of masters
+    // by fetching the end points from this list.
+    populateMasterStubs(parseMasterAddrs(conf));
+    Runnable masterEndPointRefresher = () -> {
+      while (!Thread.interrupted()) {
+        try {
+          // Spurious wake ups are okay, worst case we make an extra RPC call 
to refresh. We won't
+          // have duplicate refreshes because once the thread is past the 
wait(), notify()s are
+          // ignored until the thread is back to the waiting state.
+          synchronized (refreshMasters) {
+            refreshMasters.wait(WAIT_TIME_OUT_MS);
+          }
+          LOG.debug("Attempting to refresh master address end points.");
+          Set<ServerName> newMasters = new HashSet<>(getMasters().get());
+          populateMasterStubs(newMasters);
+          LOG.debug("Finished refreshing master end points. {}", newMasters);
+        } catch (InterruptedException e) {
+          LOG.debug("Interrupted during wait, aborting 
refresh-masters-thread.", e);

Review comment:
       Oh, reading below, there is no thread pool. I +1 Viraj's suggestion of a 
single thread executor service.

##########
File path: 
hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
##########
@@ -115,20 +129,50 @@
   MasterRegistry(Configuration conf) throws IOException {
     this.hedgedReadFanOut = Math.max(1, 
conf.getInt(MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY,
       MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT));
-    int rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
+    rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
       conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, 
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
     // XXX: we pass cluster id as null here since we do not have a cluster id 
yet, we have to fetch
     // this through the master registry...
     // This is a problem as we will use the cluster id to determine the 
authentication method
     rpcClient = RpcClientFactory.createClient(conf, null);
     rpcControllerFactory = RpcControllerFactory.instantiate(conf);
-    Set<ServerName> masterAddrs = parseMasterAddrs(conf);
+    // Generate the seed list of master stubs. Subsequent RPCs try to keep a 
live list of masters
+    // by fetching the end points from this list.
+    populateMasterStubs(parseMasterAddrs(conf));
+    Runnable masterEndPointRefresher = () -> {
+      while (!Thread.interrupted()) {
+        try {
+          // Spurious wake ups are okay, worst case we make an extra RPC call 
to refresh. We won't
+          // have duplicate refreshes because once the thread is past the 
wait(), notify()s are
+          // ignored until the thread is back to the waiting state.
+          synchronized (refreshMasters) {
+            refreshMasters.wait(WAIT_TIME_OUT_MS);
+          }
+          LOG.debug("Attempting to refresh master address end points.");
+          Set<ServerName> newMasters = new HashSet<>(getMasters().get());
+          populateMasterStubs(newMasters);
+          LOG.debug("Finished refreshing master end points. {}", newMasters);
+        } catch (InterruptedException e) {
+          LOG.debug("Interrupted during wait, aborting 
refresh-masters-thread.", e);
+          break;
+        } catch (ExecutionException | IOException e) {
+          LOG.debug("Error populating latest list of masters.", e);
+        }
+      }
+    };
+    masterAddrRefresherThread = Threads.newDaemonThreadFactory(
+        "MasterRegistry refresh 
end-points").newThread(masterEndPointRefresher);
+    masterAddrRefresherThread.start();

Review comment:
       Should the thread be started in the constructor? Now I see that 
`ConnectionRegistry` defines a `close()` but not a `start()` :(

##########
File path: 
hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
##########
@@ -115,20 +129,50 @@
   MasterRegistry(Configuration conf) throws IOException {
     this.hedgedReadFanOut = Math.max(1, 
conf.getInt(MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY,
       MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT));
-    int rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
+    rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
       conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, 
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
     // XXX: we pass cluster id as null here since we do not have a cluster id 
yet, we have to fetch
     // this through the master registry...
     // This is a problem as we will use the cluster id to determine the 
authentication method
     rpcClient = RpcClientFactory.createClient(conf, null);
     rpcControllerFactory = RpcControllerFactory.instantiate(conf);
-    Set<ServerName> masterAddrs = parseMasterAddrs(conf);
+    // Generate the seed list of master stubs. Subsequent RPCs try to keep a 
live list of masters
+    // by fetching the end points from this list.
+    populateMasterStubs(parseMasterAddrs(conf));
+    Runnable masterEndPointRefresher = () -> {
+      while (!Thread.interrupted()) {
+        try {
+          // Spurious wake ups are okay, worst case we make an extra RPC call 
to refresh. We won't
+          // have duplicate refreshes because once the thread is past the 
wait(), notify()s are
+          // ignored until the thread is back to the waiting state.
+          synchronized (refreshMasters) {
+            refreshMasters.wait(WAIT_TIME_OUT_MS);
+          }
+          LOG.debug("Attempting to refresh master address end points.");
+          Set<ServerName> newMasters = new HashSet<>(getMasters().get());
+          populateMasterStubs(newMasters);
+          LOG.debug("Finished refreshing master end points. {}", newMasters);
+        } catch (InterruptedException e) {
+          LOG.debug("Interrupted during wait, aborting 
refresh-masters-thread.", e);

Review comment:
       Should this interrupt be accompanied by a shutdown of the managing 
thread pool? Maybe in an attached exception handler. Or it's okay to leave the 
pool as abandoned, on the assumption that there was only the single thread in 
the pool and the whole process is being terminated.

##########
File path: 
hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
##########
@@ -170,6 +214,11 @@ public static String getMasterAddr(Configuration conf) 
throws UnknownHostExcepti
     callable.call(controller, stub, resp -> {
       if (controller.failed()) {
         future.completeExceptionally(controller.getFailed());
+        // RPC has failed, trigger a refresh of master end points. We can have 
some spurious

Review comment:
       You sure that _any_ failure should require refreshing the list? A 
mechanism wherein RPC failure results in more RPCs has me suspicious...




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to