virajjasani commented on a change in pull request #2130:
URL: https://github.com/apache/hbase/pull/2130#discussion_r460075547



##########
File path: 
hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
##########
@@ -115,20 +129,50 @@
   MasterRegistry(Configuration conf) throws IOException {
     this.hedgedReadFanOut = Math.max(1, 
conf.getInt(MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY,
       MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT));
-    int rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
+    rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
       conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, 
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
     // XXX: we pass cluster id as null here since we do not have a cluster id 
yet, we have to fetch
     // this through the master registry...
     // This is a problem as we will use the cluster id to determine the 
authentication method
     rpcClient = RpcClientFactory.createClient(conf, null);
     rpcControllerFactory = RpcControllerFactory.instantiate(conf);
-    Set<ServerName> masterAddrs = parseMasterAddrs(conf);
+    // Generate the seed list of master stubs. Subsequent RPCs try to keep a 
live list of masters
+    // by fetching the end points from this list.
+    populateMasterStubs(parseMasterAddrs(conf));
+    Runnable masterEndPointRefresher = () -> {
+      while (!Thread.interrupted()) {
+        try {
+          // Spurious wake ups are okay, worst case we make an extra RPC call 
to refresh. We won't
+          // have duplicate refreshes because once the thread is past the 
wait(), notify()s are
+          // ignored until the thread is back to the waiting state.
+          synchronized (refreshMasters) {
+            refreshMasters.wait(WAIT_TIME_OUT_MS);
+          }
+          LOG.debug("Attempting to refresh master address end points.");
+          Set<ServerName> newMasters = new HashSet<>(getMasters().get());
+          populateMasterStubs(newMasters);
+          LOG.debug("Finished refreshing master end points. {}", newMasters);
+        } catch (InterruptedException e) {
+          LOG.debug("Interrupted during wait, aborting 
refresh-masters-thread.", e);
+          break;
+        } catch (ExecutionException | IOException e) {
+          LOG.debug("Error populating latest list of masters.", e);
+        }
+      }
+    };
+    masterAddrRefresherThread = Threads.newDaemonThreadFactory(
+        "MasterRegistry refresh 
end-points").newThread(masterEndPointRefresher);

Review comment:
       Can you please use guava library's ThreadFactoryBuilder? So far the 
consensus on [HBASE-24750](https://issues.apache.org/jira/browse/HBASE-24750) 
is to get rid of our internally maintained ThreadFactory :)

##########
File path: 
hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
##########
@@ -115,20 +129,50 @@
   MasterRegistry(Configuration conf) throws IOException {
     this.hedgedReadFanOut = Math.max(1, 
conf.getInt(MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY,
       MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT));
-    int rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
+    rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
       conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, 
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
     // XXX: we pass cluster id as null here since we do not have a cluster id 
yet, we have to fetch
     // this through the master registry...
     // This is a problem as we will use the cluster id to determine the 
authentication method
     rpcClient = RpcClientFactory.createClient(conf, null);
     rpcControllerFactory = RpcControllerFactory.instantiate(conf);
-    Set<ServerName> masterAddrs = parseMasterAddrs(conf);
+    // Generate the seed list of master stubs. Subsequent RPCs try to keep a 
live list of masters
+    // by fetching the end points from this list.
+    populateMasterStubs(parseMasterAddrs(conf));
+    Runnable masterEndPointRefresher = () -> {
+      while (!Thread.interrupted()) {
+        try {
+          // Spurious wake ups are okay, worst case we make an extra RPC call 
to refresh. We won't
+          // have duplicate refreshes because once the thread is past the 
wait(), notify()s are
+          // ignored until the thread is back to the waiting state.
+          synchronized (refreshMasters) {
+            refreshMasters.wait(WAIT_TIME_OUT_MS);
+          }
+          LOG.debug("Attempting to refresh master address end points.");
+          Set<ServerName> newMasters = new HashSet<>(getMasters().get());
+          populateMasterStubs(newMasters);
+          LOG.debug("Finished refreshing master end points. {}", newMasters);
+        } catch (InterruptedException e) {
+          LOG.debug("Interrupted during wait, aborting 
refresh-masters-thread.", e);
+          break;
+        } catch (ExecutionException | IOException e) {
+          LOG.debug("Error populating latest list of masters.", e);
+        }
+      }
+    };
+    masterAddrRefresherThread = Threads.newDaemonThreadFactory(
+        "MasterRegistry refresh 
end-points").newThread(masterEndPointRefresher);
+    masterAddrRefresherThread.start();

Review comment:
       Don't want to use SingleThreadExecutor.submit()?

##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
##########
@@ -2931,6 +2935,27 @@ public GetActiveMasterResponse 
getActiveMaster(RpcController rpcController,
     return resp.build();
   }
 
+  @Override
+  public GetMastersResponse getMasters(RpcController rpcController, 
GetMastersRequest request)
+      throws ServiceException {
+    GetMastersResponse.Builder resp = GetMastersResponse.newBuilder();
+    // Active master
+    Optional<ServerName> serverName = master.getActiveMaster();
+    serverName.ifPresent(name -> 
resp.addMasterServers(GetMastersResponseEntry.newBuilder()
+        
.setServerName(ProtobufUtil.toServerName(name)).setIsActive(true).build()));
+    // Backup masters
+    try {
+      // TODO: Cache the backup masters to avoid a ZK RPC for each 
getMasters() call.

Review comment:
       We are planning to have a cache with ZKWatcher for backupMasters ZNode 
right? I believe as of now, we don't subscribe for any event.

##########
File path: 
hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistry.java
##########
@@ -126,4 +131,46 @@ public void testRegistryRPCs() throws Exception {
       }
     }
   }
+
+  /**
+   * Tests that the list of masters configured in the MasterRegistry is 
dynamically refreshed in the
+   * event of errors.
+   */
+  @Test
+  public void testDynamicMasterConfigurationRefresh() throws Exception {
+    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
+    String currentMasterAddrs = 
Preconditions.checkNotNull(conf.get(HConstants.MASTER_ADDRS_KEY));
+    HMaster activeMaster = TEST_UTIL.getHBaseCluster().getMaster();
+    String clusterId = activeMaster.getClusterId();
+    // Add a non-working master
+    ServerName badServer = ServerName.valueOf("localhost", 1234, -1);
+    conf.set(HConstants.MASTER_ADDRS_KEY, badServer.toShortString() + "," + 
currentMasterAddrs);
+    // Set the hedging fan out so that all masters are queried.
+    conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, 4);
+    try (MasterRegistry registry = new MasterRegistry(conf)) {
+      final Set<ServerName> masters = registry.getParsedMasterServers();
+      assertTrue(masters.contains(badServer));
+      // Make a registry RPC, this should trigger a refresh since one of the 
hedged RPC fails.
+      assertEquals(registry.getClusterId().get(), clusterId);
+      // Wait for new set of masters to be populated.
+      TEST_UTIL.waitFor(5000,
+          (Waiter.Predicate<Exception>) () -> 
!registry.getParsedMasterServers().equals(masters));
+      // new set of masters should not include the bad server
+      final Set<ServerName> newMasters = registry.getParsedMasterServers();
+      // Bad one should be out.
+      assertEquals(3, newMasters.size());
+      assertFalse(newMasters.contains(badServer));
+      // Kill the active master
+      activeMaster.stopMaster();
+      TEST_UTIL.waitFor(10000,
+        () -> TEST_UTIL.getMiniHBaseCluster().getLiveMasterThreads().size() == 
2);
+      assertEquals(registry.getClusterId().get(), clusterId);
+      // Make another registry RPC call which should trigger another refresh.
+      TEST_UTIL.waitFor(100000, (Waiter.Predicate<Exception>) () ->
+          registry.getParsedMasterServers().size() == 2);
+      final Set<ServerName> newMasters2 = registry.getParsedMasterServers();
+      assertEquals(2, newMasters2.size());
+      assertFalse(newMasters2.contains(activeMaster));

Review comment:
       `newMasters2.contains(activeMaster.getServerName())`

##########
File path: 
hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistry.java
##########
@@ -126,4 +131,46 @@ public void testRegistryRPCs() throws Exception {
       }
     }
   }
+
+  /**
+   * Tests that the list of masters configured in the MasterRegistry is 
dynamically refreshed in the
+   * event of errors.
+   */
+  @Test
+  public void testDynamicMasterConfigurationRefresh() throws Exception {
+    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
+    String currentMasterAddrs = 
Preconditions.checkNotNull(conf.get(HConstants.MASTER_ADDRS_KEY));
+    HMaster activeMaster = TEST_UTIL.getHBaseCluster().getMaster();
+    String clusterId = activeMaster.getClusterId();
+    // Add a non-working master
+    ServerName badServer = ServerName.valueOf("localhost", 1234, -1);
+    conf.set(HConstants.MASTER_ADDRS_KEY, badServer.toShortString() + "," + 
currentMasterAddrs);
+    // Set the hedging fan out so that all masters are queried.
+    conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, 4);
+    try (MasterRegistry registry = new MasterRegistry(conf)) {
+      final Set<ServerName> masters = registry.getParsedMasterServers();
+      assertTrue(masters.contains(badServer));
+      // Make a registry RPC, this should trigger a refresh since one of the 
hedged RPC fails.
+      assertEquals(registry.getClusterId().get(), clusterId);
+      // Wait for new set of masters to be populated.
+      TEST_UTIL.waitFor(5000,
+          (Waiter.Predicate<Exception>) () -> 
!registry.getParsedMasterServers().equals(masters));

Review comment:
       nit: upto you if you want to use `ExplainingPredicate` to throw 
Exception with specific message

##########
File path: 
hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
##########
@@ -170,6 +214,11 @@ public static String getMasterAddr(Configuration conf) 
throws UnknownHostExcepti
     callable.call(controller, stub, resp -> {
       if (controller.failed()) {
         future.completeExceptionally(controller.getFailed());
+        // RPC has failed, trigger a refresh of master end points. We can have 
some spurious
+        // refreshes, but that is okay since the RPC is not expensive and not 
in a hot path.
+        synchronized (refreshMasters) {
+          refreshMasters.notify();

Review comment:
       For any generic RPC failure, we want to expedite populating masters with 
another RPC call.
   
   Let's say there are some sequence of events:
   1. getClusterId() RPC call failed
   2. master refresher thread was in `waiting` state, so we notify it and it 
will trigger getMasters() call
   3. the call fails again and we `notify` refreshMasters but no one is waiting 
on it, notify is ignored
   4. master refresher thread again waits for 5 min before populating masters.
   
   Do we really want step 4 to wait for 5 min (assuming no other RPC call 
happens and masters list is stale)? Maybe we can expedite populating masters 
with the help of AtomicBoolean check (and also avoid `synchronized + wait` 
calls i.e 5 min wait)?
   
   Even if we have network issue, we don't want to delay populate masters by 5 
min right?

##########
File path: 
hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
##########
@@ -115,20 +129,50 @@
   MasterRegistry(Configuration conf) throws IOException {
     this.hedgedReadFanOut = Math.max(1, 
conf.getInt(MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY,
       MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT));
-    int rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
+    rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
       conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, 
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
     // XXX: we pass cluster id as null here since we do not have a cluster id 
yet, we have to fetch
     // this through the master registry...
     // This is a problem as we will use the cluster id to determine the 
authentication method
     rpcClient = RpcClientFactory.createClient(conf, null);
     rpcControllerFactory = RpcControllerFactory.instantiate(conf);
-    Set<ServerName> masterAddrs = parseMasterAddrs(conf);
+    // Generate the seed list of master stubs. Subsequent RPCs try to keep a 
live list of masters
+    // by fetching the end points from this list.
+    populateMasterStubs(parseMasterAddrs(conf));
+    Runnable masterEndPointRefresher = () -> {
+      while (!Thread.interrupted()) {
+        try {
+          // Spurious wake ups are okay, worst case we make an extra RPC call 
to refresh. We won't
+          // have duplicate refreshes because once the thread is past the 
wait(), notify()s are
+          // ignored until the thread is back to the waiting state.
+          synchronized (refreshMasters) {
+            refreshMasters.wait(WAIT_TIME_OUT_MS);
+          }
+          LOG.debug("Attempting to refresh master address end points.");
+          Set<ServerName> newMasters = new HashSet<>(getMasters().get());
+          populateMasterStubs(newMasters);
+          LOG.debug("Finished refreshing master end points. {}", newMasters);
+        } catch (InterruptedException e) {
+          LOG.debug("Interrupted during wait, aborting 
refresh-masters-thread.", e);
+          break;
+        } catch (ExecutionException | IOException e) {
+          LOG.debug("Error populating latest list of masters.", e);
+        }
+      }
+    };
+    masterAddrRefresherThread = Threads.newDaemonThreadFactory(
+        "MasterRegistry refresh 
end-points").newThread(masterEndPointRefresher);

Review comment:
       nit: avoid space in Thread prefix name?

##########
File path: 
hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistry.java
##########
@@ -126,4 +131,46 @@ public void testRegistryRPCs() throws Exception {
       }
     }
   }
+
+  /**
+   * Tests that the list of masters configured in the MasterRegistry is 
dynamically refreshed in the
+   * event of errors.
+   */
+  @Test
+  public void testDynamicMasterConfigurationRefresh() throws Exception {
+    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
+    String currentMasterAddrs = 
Preconditions.checkNotNull(conf.get(HConstants.MASTER_ADDRS_KEY));
+    HMaster activeMaster = TEST_UTIL.getHBaseCluster().getMaster();
+    String clusterId = activeMaster.getClusterId();
+    // Add a non-working master
+    ServerName badServer = ServerName.valueOf("localhost", 1234, -1);
+    conf.set(HConstants.MASTER_ADDRS_KEY, badServer.toShortString() + "," + 
currentMasterAddrs);
+    // Set the hedging fan out so that all masters are queried.
+    conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, 4);
+    try (MasterRegistry registry = new MasterRegistry(conf)) {
+      final Set<ServerName> masters = registry.getParsedMasterServers();
+      assertTrue(masters.contains(badServer));
+      // Make a registry RPC, this should trigger a refresh since one of the 
hedged RPC fails.
+      assertEquals(registry.getClusterId().get(), clusterId);
+      // Wait for new set of masters to be populated.
+      TEST_UTIL.waitFor(5000,
+          (Waiter.Predicate<Exception>) () -> 
!registry.getParsedMasterServers().equals(masters));
+      // new set of masters should not include the bad server
+      final Set<ServerName> newMasters = registry.getParsedMasterServers();
+      // Bad one should be out.
+      assertEquals(3, newMasters.size());
+      assertFalse(newMasters.contains(badServer));
+      // Kill the active master
+      activeMaster.stopMaster();
+      TEST_UTIL.waitFor(10000,
+        () -> TEST_UTIL.getMiniHBaseCluster().getLiveMasterThreads().size() == 
2);
+      assertEquals(registry.getClusterId().get(), clusterId);
+      // Make another registry RPC call which should trigger another refresh.
+      TEST_UTIL.waitFor(100000, (Waiter.Predicate<Exception>) () ->
+          registry.getParsedMasterServers().size() == 2);
+      final Set<ServerName> newMasters2 = registry.getParsedMasterServers();
+      assertEquals(2, newMasters2.size());

Review comment:
       After stopping activeMaster, maybe add an extra check to confirm list 
contains one Active and one Backup?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to