This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new c08e877 HBASE-26235 We could start RegionServerTracker before
becoming active master (#3645)
c08e877 is described below
commit c08e877d6ab73006913f0e827d3dd4e8c2978e56
Author: Duo Zhang <[email protected]>
AuthorDate: Mon Aug 30 12:14:10 2021 +0800
HBASE-26235 We could start RegionServerTracker before becoming active
master (#3645)
Signed-off-by: Yulin Niu <[email protected]>
---
.../apache/hadoop/hbase/zookeeper/ZNodePaths.java | 5 +
.../org/apache/hadoop/hbase/master/HMaster.java | 11 +-
.../hadoop/hbase/master/RegionServerTracker.java | 136 +++++++++++----------
.../hadoop/hbase/regionserver/HRegionServer.java | 12 +-
4 files changed, 89 insertions(+), 75 deletions(-)
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZNodePaths.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZNodePaths.java
index a0065a9..1affd9e 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZNodePaths.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZNodePaths.java
@@ -22,6 +22,7 @@ import static
org.apache.hadoop.hbase.HConstants.SPLIT_LOGDIR_NAME;
import static org.apache.hadoop.hbase.HConstants.ZOOKEEPER_ZNODE_PARENT;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.yetus.audience.InterfaceAudience;
@@ -207,6 +208,10 @@ public class ZNodePaths {
path.equals(tableZNode) || path.startsWith(tableZNode + "/");
}
+ public String getRsPath(ServerName sn) {
+ return joinZNode(rsZNode, sn.toString());
+ }
+
/**
* Join the prefix znode name with the suffix znode name to generate a
proper full znode name.
* <p>
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 4d8721a..e6ab627 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -268,7 +268,7 @@ public class HMaster extends HRegionServer implements
MasterServices {
// Manager and zk listener for master election
private final ActiveMasterManager activeMasterManager;
// Region server tracker
- private RegionServerTracker regionServerTracker;
+ private final RegionServerTracker regionServerTracker;
// Draining region server tracker
private DrainingServerTracker drainingServerTracker;
// Tracker for load balancer state
@@ -483,6 +483,8 @@ public class HMaster extends HRegionServer implements
MasterServices {
this.activeMasterManager = createActiveMasterManager(zooKeeper,
serverName, this);
cachedClusterId = new CachedClusterId(this, conf);
+
+ this.regionServerTracker = new RegionServerTracker(zooKeeper, this);
} catch (Throwable t) {
// Make sure we log the exception. HMaster is often started via
reflection and the
// cause of failed startup is lost.
@@ -908,8 +910,7 @@ public class HMaster extends HRegionServer implements
MasterServices {
// filesystem that COULD BE 'alive' (we'll schedule SCPs for each and let
SCP figure it out).
// We also pass dirs that are already 'splitting'... so we can do some
checks down in tracker.
// TODO: Generate the splitting and live Set in one pass instead of two as
we currently do.
- this.regionServerTracker = new RegionServerTracker(zooKeeper, this,
this.serverManager);
- this.regionServerTracker.start(
+ this.regionServerTracker.upgrade(
procsByType.getOrDefault(ServerCrashProcedure.class,
Collections.emptyList()).stream()
.map(p -> (ServerCrashProcedure) p).map(p ->
p.getServerName()).collect(Collectors.toSet()),
walManager.getLiveServersFromWALDir(),
walManager.getSplittingServersFromWALDir());
@@ -2718,8 +2719,8 @@ public class HMaster extends HRegionServer implements
MasterServices {
}
@Override
- public List<ServerName> getRegionServers() {
- return serverManager.getOnlineServersList();
+ public Collection<ServerName> getRegionServers() {
+ return regionServerTracker.getRegionServers();
}
/**
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionServerTracker.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionServerTracker.java
index 336f9dc..65cc7ae 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionServerTracker.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionServerTracker.java
@@ -20,8 +20,6 @@ package org.apache.hadoop.hbase.master;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
@@ -31,18 +29,17 @@ import org.apache.hadoop.hbase.ServerMetrics;
import org.apache.hadoop.hbase.ServerMetricsBuilder;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.VersionInfoUtil;
-import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.ZKListener;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
-import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
-import
org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
import
org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import
org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionServerInfo;
@@ -61,27 +58,27 @@ import
org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionServe
@InterfaceAudience.Private
public class RegionServerTracker extends ZKListener {
private static final Logger LOG =
LoggerFactory.getLogger(RegionServerTracker.class);
- private final Set<ServerName> regionServers = new HashSet<>();
- private final ServerManager serverManager;
+ // indicate whether we are active master
+ private boolean active;
+ private volatile Set<ServerName> regionServers = Collections.emptySet();
private final MasterServices server;
// As we need to send request to zk when processing the nodeChildrenChanged
event, we'd better
// move the operation to a single threaded thread pool in order to not block
the zk event
// processing since all the zk listener across HMaster will be called in one
thread sequentially.
private final ExecutorService executor;
- public RegionServerTracker(ZKWatcher watcher, MasterServices server,
- ServerManager serverManager) {
+ public RegionServerTracker(ZKWatcher watcher, MasterServices server) {
super(watcher);
this.server = server;
- this.serverManager = serverManager;
this.executor = Executors.newSingleThreadExecutor(
new
ThreadFactoryBuilder().setDaemon(true).setNameFormat("RegionServerTracker-%d").build());
+ watcher.registerListener(this);
+ refresh();
}
- private Pair<ServerName, RegionServerInfo> getServerInfo(String name)
- throws KeeperException, IOException {
- ServerName serverName = ServerName.parseServerName(name);
- String nodePath = ZNodePaths.joinZNode(watcher.getZNodePaths().rsZNode,
name);
+ private RegionServerInfo getServerInfo(ServerName serverName)
+ throws KeeperException, IOException {
+ String nodePath = watcher.getZNodePaths().getRsPath(serverName);
byte[] data;
try {
data = ZKUtil.getData(watcher, nodePath);
@@ -91,24 +88,26 @@ public class RegionServerTracker extends ZKListener {
if (data == null) {
// we should receive a children changed event later and then we will
expire it, so we still
// need to add it to the region server set.
- LOG.warn("Server node {} does not exist, already dead?", name);
- return Pair.newPair(serverName, null);
+ LOG.warn("Server node {} does not exist, already dead?", serverName);
+ return null;
}
if (data.length == 0 || !ProtobufUtil.isPBMagicPrefix(data)) {
// this should not happen actually, unless we have bugs or someone has
messed zk up.
- LOG.warn("Invalid data for region server node {} on zookeeper, data
length = {}", name,
+ LOG.warn("Invalid data for region server node {} on zookeeper, data
length = {}", serverName,
data.length);
- return Pair.newPair(serverName, null);
+ return null;
}
RegionServerInfo.Builder builder = RegionServerInfo.newBuilder();
int magicLen = ProtobufUtil.lengthOfPBMagic();
ProtobufUtil.mergeFrom(builder, data, magicLen, data.length - magicLen);
- return Pair.newPair(serverName, builder.build());
+ return builder.build();
}
/**
- * Starts the tracking of online RegionServers. All RSes will be tracked
after this method is
- * called.
+ * Upgrade to active master mode, where besides tracking the changes of
region server set, we will
+ * also started to add new region servers to ServerManager and also schedule
SCP if a region
+ * server dies. Starts the tracking of online RegionServers. All RSes will
be tracked after this
+ * method is called.
* <p/>
* In this method, we will also construct the region server sets in {@link
ServerManager}. If a
* region server is dead between the crash of the previous master instance
and the start of the
@@ -119,38 +118,32 @@ public class RegionServerTracker extends ZKListener {
* @param liveServersFromWALDir the live region servers from wal directory.
* @param splittingServersFromWALDir Servers whose WALs are being actively
'split'.
*/
- public void start(Set<ServerName> deadServersFromPE, Set<ServerName>
liveServersFromWALDir,
- Set<ServerName> splittingServersFromWALDir)
- throws KeeperException, IOException {
- LOG.info("Starting RegionServerTracker; {} have existing
ServerCrashProcedures, {} " +
- "possibly 'live' servers, and {} 'splitting'.",
deadServersFromPE.size(),
- liveServersFromWALDir.size(), splittingServersFromWALDir.size());
+ public void upgrade(Set<ServerName> deadServersFromPE, Set<ServerName>
liveServersFromWALDir,
+ Set<ServerName> splittingServersFromWALDir) throws KeeperException,
IOException {
+ LOG.info(
+ "Upgrading RegionServerTracker to active master mode; {} have existing" +
+ "ServerCrashProcedures, {} possibly 'live' servers, and {}
'splitting'.",
+ deadServersFromPE.size(), liveServersFromWALDir.size(),
splittingServersFromWALDir.size());
// deadServersFromPE is made from a list of outstanding
ServerCrashProcedures.
// splittingServersFromWALDir are being actively split -- the directory in
the FS ends in
// '-SPLITTING'. Each splitting server should have a corresponding SCP.
Log if not.
splittingServersFromWALDir.stream().filter(s ->
!deadServersFromPE.contains(s)).
forEach(s -> LOG.error("{} has no matching ServerCrashProcedure", s));
- //create ServerNode for all possible live servers from wal directory
+ // create ServerNode for all possible live servers from wal directory
liveServersFromWALDir
.forEach(sn ->
server.getAssignmentManager().getRegionStates().getOrCreateServer(sn));
- watcher.registerListener(this);
+ ServerManager serverManager = server.getServerManager();
synchronized (this) {
- List<String> servers =
- ZKUtil.listChildrenAndWatchForNewChildren(watcher,
watcher.getZNodePaths().rsZNode);
- if (null != servers) {
- for (String n : servers) {
- Pair<ServerName, RegionServerInfo> pair = getServerInfo(n);
- ServerName serverName = pair.getFirst();
- RegionServerInfo info = pair.getSecond();
- regionServers.add(serverName);
- ServerMetrics serverMetrics = info != null ?
- ServerMetricsBuilder.of(serverName,
VersionInfoUtil.getVersionNumber(info.getVersionInfo()),
- info.getVersionInfo().getVersion()) :
- ServerMetricsBuilder.of(serverName);
- serverManager.checkAndRecordNewServer(serverName, serverMetrics);
- }
+ Set<ServerName> liveServers = regionServers;
+ for (ServerName serverName : liveServers) {
+ RegionServerInfo info = getServerInfo(serverName);
+ ServerMetrics serverMetrics = info != null ?
ServerMetricsBuilder.of(serverName,
+ VersionInfoUtil.getVersionNumber(info.getVersionInfo()),
+ info.getVersionInfo().getVersion()) :
ServerMetricsBuilder.of(serverName);
+ serverManager.checkAndRecordNewServer(serverName, serverMetrics);
}
serverManager.findDeadServersAndProcess(deadServersFromPE,
liveServersFromWALDir);
+ active = true;
}
}
@@ -158,32 +151,24 @@ public class RegionServerTracker extends ZKListener {
executor.shutdownNow();
}
- private synchronized void refresh() {
- List<String> names;
- try {
- names = ZKUtil.listChildrenAndWatchForNewChildren(watcher,
watcher.getZNodePaths().rsZNode);
- } catch (KeeperException e) {
- // here we need to abort as we failed to set watcher on the rs node
which means that we can
- // not track the node deleted evetnt any more.
- server.abort("Unexpected zk exception getting RS nodes", e);
- return;
- }
- Set<ServerName> servers = CollectionUtils.isEmpty(names) ?
Collections.emptySet() :
-
names.stream().map(ServerName::parseServerName).collect(Collectors.toSet());
+ public Set<ServerName> getRegionServers() {
+ return regionServers;
+ }
- for (Iterator<ServerName> iter = regionServers.iterator();
iter.hasNext();) {
- ServerName sn = iter.next();
- if (!servers.contains(sn)) {
- LOG.info("RegionServer ephemeral node deleted, processing expiration
[{}]", sn);
- serverManager.expireServer(sn);
- iter.remove();
- }
+ // execute the operations which are only needed for active masters, such as
expire old servers,
+ // add new servers, etc.
+ private void processAsActiveMaster(Set<ServerName> newServers) {
+ Set<ServerName> oldServers = regionServers;
+ ServerManager serverManager = server.getServerManager();
+ // expire dead servers
+ for (ServerName crashedServer : Sets.difference(oldServers, newServers)) {
+ LOG.info("RegionServer ephemeral node deleted, processing expiration
[{}]", crashedServer);
+ serverManager.expireServer(crashedServer);
}
- // here we do not need to parse the region server info as it is useless
now, we only need the
- // server name.
+ // check whether there are new servers, log them
boolean newServerAdded = false;
- for (ServerName sn : servers) {
- if (regionServers.add(sn)) {
+ for (ServerName sn : newServers) {
+ if (!oldServers.contains(sn)) {
newServerAdded = true;
LOG.info("RegionServer ephemeral node created, adding [" + sn + "]");
}
@@ -195,6 +180,25 @@ public class RegionServerTracker extends ZKListener {
}
}
+ private synchronized void refresh() {
+ List<String> names;
+ try {
+ names = ZKUtil.listChildrenAndWatchForNewChildren(watcher,
watcher.getZNodePaths().rsZNode);
+ } catch (KeeperException e) {
+ // here we need to abort as we failed to set watcher on the rs node
which means that we can
+ // not track the node deleted event any more.
+ server.abort("Unexpected zk exception getting RS nodes", e);
+ return;
+ }
+ Set<ServerName> newServers = CollectionUtils.isEmpty(names) ?
Collections.emptySet() :
+ names.stream().map(ServerName::parseServerName)
+ .collect(Collectors.collectingAndThen(Collectors.toSet(),
Collections::unmodifiableSet));
+ if (active) {
+ processAsActiveMaster(newServers);
+ }
+ this.regionServers = newServers;
+ }
+
@Override
public void nodeChildrenChanged(String path) {
if (path.equals(watcher.getZNodePaths().rsZNode) && !server.isAborted() &&
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 816b8f3..5bfdc13 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -189,7 +189,6 @@ import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZKNodeTracker;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
-import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience;
@@ -695,7 +694,12 @@ public class HRegionServer extends Thread implements
}
this.rpcServices.start(zooKeeper);
this.metaRegionLocationCache = new MetaRegionLocationCache(zooKeeper);
- this.regionServerAddressTracker = new
RegionServerAddressTracker(zooKeeper, this);
+ if (!(this instanceof HMaster)) {
+ // do not create this field for HMaster, we have another region server
tracker for HMaster.
+ this.regionServerAddressTracker = new
RegionServerAddressTracker(zooKeeper, this);
+ } else {
+ this.regionServerAddressTracker = null;
+ }
// This violates 'no starting stuff in Constructor' but Master depends
on the below chore
// and executor being created and takes a different startup route. Lots
of overlap between HRS
// and M (An M IS A HRS now). Need to refactor so less duplication
between M and its super
@@ -3616,7 +3620,7 @@ public class HRegionServer extends Thread implements
}
private String getMyEphemeralNodePath() {
- return ZNodePaths.joinZNode(this.zooKeeper.getZNodePaths().rsZNode,
getServerName().toString());
+ return zooKeeper.getZNodePaths().getRsPath(serverName);
}
private boolean isHealthCheckerConfigured() {
@@ -3995,7 +3999,7 @@ public class HRegionServer extends Thread implements
return masterAddressTracker.getBackupMasters();
}
- public List<ServerName> getRegionServers() {
+ public Collection<ServerName> getRegionServers() {
return regionServerAddressTracker.getRegionServers();
}