This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch HBASE-22514
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit ce72633e88d27a4255b569457504504763848b38
Author: Duo Zhang <zhang...@apache.org>
AuthorDate: Wed Sep 11 22:10:52 2019 +0800

    HBASE-22987 Calculate the region servers in default group in foreground 
(#599)
    
    Signed-off-by: Guanghao Zhang <zg...@apache.org>
---
 .../hbase/rsgroup/RSGroupInfoManagerImpl.java      | 138 +++++----------------
 1 file changed, 32 insertions(+), 106 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java
index 6725066..7224869 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java
@@ -30,6 +30,7 @@ import java.util.OptionalLong;
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
+import java.util.stream.Collectors;
 import org.apache.hadoop.hbase.Coprocessor;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.NamespaceDescriptor;
@@ -174,8 +175,6 @@ final class RSGroupInfoManagerImpl implements 
RSGroupInfoManager {
   private final RSGroupStartupWorker rsGroupStartupWorker;
   // contains list of groups that were last flushed to persistent store
   private Set<String> prevRSGroups = new HashSet<>();
-  private final ServerEventsListenerThread serverEventsListenerThread =
-    new ServerEventsListenerThread();
 
   private RSGroupInfoManagerImpl(MasterServices masterServices) throws 
IOException {
     this.masterServices = masterServices;
@@ -184,11 +183,34 @@ final class RSGroupInfoManagerImpl implements 
RSGroupInfoManager {
     this.rsGroupStartupWorker = new RSGroupStartupWorker();
   }
 
+  private synchronized void updateDefaultServers() {
+    LOG.info("Updating default servers.");
+    Map<String, RSGroupInfo> newGroupMap = 
Maps.newHashMap(holder.groupName2Group);
+    RSGroupInfo oldDefaultGroupInfo = getRSGroup(RSGroupInfo.DEFAULT_GROUP);
+    assert oldDefaultGroupInfo != null;
+    RSGroupInfo newDefaultGroupInfo =
+      new RSGroupInfo(RSGroupInfo.DEFAULT_GROUP, getDefaultServers());
+    newDefaultGroupInfo.addAllTables(oldDefaultGroupInfo.getTables());
+    newGroupMap.put(RSGroupInfo.DEFAULT_GROUP, newDefaultGroupInfo);
+    // do not need to persist, as we do not persist default group.
+    resetRSGroupMap(newGroupMap);
+    LOG.info("Updated default servers, {} servers", 
newDefaultGroupInfo.getServers().size());
+  }
 
   private synchronized void init() throws IOException {
     refresh(false);
-    serverEventsListenerThread.start();
-    
masterServices.getServerManager().registerListener(serverEventsListenerThread);
+    masterServices.getServerManager().registerListener(new ServerListener() {
+
+      @Override
+      public void serverAdded(ServerName serverName) {
+        updateDefaultServers();
+      }
+
+      @Override
+      public void serverRemoved(ServerName serverName) {
+        updateDefaultServers();
+      }
+    });
     migrate();
   }
 
@@ -225,19 +247,11 @@ final class RSGroupInfoManagerImpl implements 
RSGroupInfoManager {
   }
 
   /**
-   * @param master the master to get online servers for
    * @return Set of online Servers named for their hostname and port (not 
ServerName).
    */
-  private static Set<Address> getOnlineServers(final MasterServices master) {
-    Set<Address> onlineServers = new HashSet<Address>();
-    if (master == null) {
-      return onlineServers;
-    }
-
-    for (ServerName server : 
master.getServerManager().getOnlineServers().keySet()) {
-      onlineServers.add(server.getAddress());
-    }
-    return onlineServers;
+  private Set<Address> getOnlineServers() {
+    return 
masterServices.getServerManager().getOnlineServers().keySet().stream()
+      .map(ServerName::getAddress).collect(Collectors.toSet());
   }
 
   @Override
@@ -249,8 +263,7 @@ final class RSGroupInfoManagerImpl implements 
RSGroupInfoManager {
     // it. If not 'default' group, add server to 'dst' rsgroup EVEN IF IT IS 
NOT online (could be a
     // rsgroup of dead servers that are to come back later).
     Set<Address> onlineServers =
-      dst.getName().equals(RSGroupInfo.DEFAULT_GROUP) ? 
getOnlineServers(this.masterServices)
-        : null;
+      dst.getName().equals(RSGroupInfo.DEFAULT_GROUP) ? getOnlineServers() : 
null;
     for (Address el : servers) {
       src.removeServer(el);
       if (onlineServers != null) {
@@ -617,25 +630,8 @@ final class RSGroupInfoManagerImpl implements 
RSGroupInfoManager {
     this.prevRSGroups.addAll(currentGroups);
   }
 
-  // Called by getDefaultServers. Presume it has lock in place.
-  private List<ServerName> getOnlineRS() throws IOException {
-    if (masterServices != null) {
-      return masterServices.getServerManager().getOnlineServersList();
-    }
-    LOG.debug("Reading online RS from zookeeper");
-    List<ServerName> servers = new ArrayList<>();
-    try {
-      for (String el : ZKUtil.listChildrenNoWatch(watcher, 
watcher.getZNodePaths().rsZNode)) {
-        servers.add(ServerName.parseServerName(el));
-      }
-    } catch (KeeperException e) {
-      throw new IOException("Failed to retrieve server list from zookeeper", 
e);
-    }
-    return servers;
-  }
-
   // Called by ServerEventsListenerThread. Presume it has lock on this manager 
when it runs.
-  private SortedSet<Address> getDefaultServers() throws IOException {
+  private SortedSet<Address> getDefaultServers() {
     // Build a list of servers in other groups than default group, from 
rsGroupMap
     Set<Address> serversInOtherGroup = new HashSet<>();
     for (RSGroupInfo group : listRSGroups() /* get from rsGroupMap */) {
@@ -646,7 +642,7 @@ final class RSGroupInfoManagerImpl implements 
RSGroupInfoManager {
 
     // Get all online servers from Zookeeper and find out servers in default 
group
     SortedSet<Address> defaultServers = Sets.newTreeSet();
-    for (ServerName serverName : getOnlineRS()) {
+    for (ServerName serverName : 
masterServices.getServerManager().getOnlineServers().keySet()) {
       Address server = Address.fromParts(serverName.getHostname(), 
serverName.getPort());
       if (!serversInOtherGroup.contains(server)) { // not in other groups
         defaultServers.add(server);
@@ -655,76 +651,6 @@ final class RSGroupInfoManagerImpl implements 
RSGroupInfoManager {
     return defaultServers;
   }
 
-  // Called by ServerEventsListenerThread. Synchronize on this because redoing
-  // the rsGroupMap then writing it out.
-  private synchronized void updateDefaultServers(SortedSet<Address> servers) {
-    Map<String, RSGroupInfo> rsGroupMap = holder.groupName2Group;
-    RSGroupInfo info = rsGroupMap.get(RSGroupInfo.DEFAULT_GROUP);
-    RSGroupInfo newInfo = new RSGroupInfo(info.getName(), servers);
-    HashMap<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
-    newGroupMap.put(newInfo.getName(), newInfo);
-    resetRSGroupMap(newGroupMap);
-  }
-
-  /**
-   * Calls {@link RSGroupInfoManagerImpl#updateDefaultServers(SortedSet)} to 
update list of known
-   * servers. Notifications about server changes are received by registering 
{@link ServerListener}.
-   * As a listener, we need to return immediately, so the real work of 
updating the servers is done
-   * asynchronously in this thread.
-   */
-  private class ServerEventsListenerThread extends Thread implements 
ServerListener {
-    private final Logger LOG = 
LoggerFactory.getLogger(ServerEventsListenerThread.class);
-    private boolean changed = false;
-
-    ServerEventsListenerThread() {
-      setDaemon(true);
-    }
-
-    @Override
-    public void serverAdded(ServerName serverName) {
-      serverChanged();
-    }
-
-    @Override
-    public void serverRemoved(ServerName serverName) {
-      serverChanged();
-    }
-
-    private synchronized void serverChanged() {
-      changed = true;
-      this.notify();
-    }
-
-    @Override
-    public void run() {
-      setName(ServerEventsListenerThread.class.getName() + "-" + 
masterServices.getServerName());
-      SortedSet<Address> prevDefaultServers = new TreeSet<>();
-      while (isMasterRunning(masterServices)) {
-        try {
-          LOG.info("Updating default servers.");
-          SortedSet<Address> servers = 
RSGroupInfoManagerImpl.this.getDefaultServers();
-          if (!servers.equals(prevDefaultServers)) {
-            RSGroupInfoManagerImpl.this.updateDefaultServers(servers);
-            prevDefaultServers = servers;
-            LOG.info("Updated with servers: " + servers.size());
-          }
-          try {
-            synchronized (this) {
-              while (!changed) {
-                wait();
-              }
-              changed = false;
-            }
-          } catch (InterruptedException e) {
-            LOG.warn("Interrupted", e);
-          }
-        } catch (IOException e) {
-          LOG.warn("Failed to update default servers", e);
-        }
-      }
-    }
-  }
-
   private class RSGroupStartupWorker extends Thread {
     private final Logger LOG = 
LoggerFactory.getLogger(RSGroupStartupWorker.class);
     private volatile boolean online = false;

Reply via email to