Reidddddd commented on a change in pull request #3911:
URL: https://github.com/apache/hbase/pull/3911#discussion_r761614848
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java
##########
@@ -242,21 +276,150 @@ public boolean isAborted() {
if (children == null) {
return Collections.emptyList();
}
+
+ Configuration conf = HBaseConfiguration.create();
+
+ /** if use other balancer, return all regionservers */
+ if (!conf.get(HConstants.HBASE_MASTER_LOADBALANCER_CLASS)
+ .equals(RSGroupBasedLoadBalancer.class.getName())
+ || hostServerName == null) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Use replication random choose policy...");
+ }
+ return parseServerNameFromList(children);
+ } else {
+ /** if use rsgroup balancer,
+ * just return regionservers belong to the same rsgroup or default
rsgroup */
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Use replication rsgroup choose policy...");
+ }
+ Map<String, String> serverNameHostPortMapping = new HashMap<>();
+ for (String serverName : children) {
+ String mappingKey =
+ serverName.split(",")[0] + ServerName.SERVERNAME_SEPARATOR +
serverName.split(",")[1];
+ serverNameHostPortMapping.put(mappingKey, serverName);
+ }
+
+ String groupName = null;
+ RSGroupInfo rsGroupInfo = null;
+ try {
+ rsGroupInfo = getRSGroupInfoOfServer(conn.toConnection(),
hostServerName.getAddress());
+ }catch (IOException e) {
+ e.printStackTrace();
+ LOG.error("rsGroupInfo error!", e);
+ }
+ if (rsGroupInfo != null) {
+ groupName = rsGroupInfo.getName();
+ }
+ try {
+ List<ServerName> serverList =
+ getGroupServerListFromTargetZkCluster(groupName, zkw,
serverNameHostPortMapping);
+ if (serverList.size() > 0) {
+ // if target cluster open group balancer, serverList must has
server(s)
+ LOG.debug("group list > 0");
+ threadLocal.get().getAndSet(true);
+ return serverList;
+ }
+ else {
+ // if not, choose sinkers from all regionservers
+ LOG.debug("target group list <= 0");
+ return parseServerNameFromList(children);
+ }
+ }catch (IOException | KeeperException e) {
+ LOG.error("Get server list from target zk error", e);
+ return Collections.emptyList();
+ }
+ }
+ }
+
+ protected List<ServerName> parseServerNameFromList(List<String> children) {
+ if (children == null) {
+ return Collections.emptyList();
+ }
+ StringBuffer sb = new StringBuffer();
List<ServerName> addresses = new ArrayList<>(children.size());
for (String child : children) {
addresses.add(ServerName.parseServerName(child));
+ sb.append(ServerName.parseServerName(child)).append("/");
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Find " + addresses.size() + " child znodes from target cluster zk. "
+ sb.toString());
}
return addresses;
}
+ protected List<ServerName> getGroupServerListFromTargetZkCluster(String
groupName,
+ ZKWatcher zkw, Map<String, String> serverNameHostPortMapping)
+ throws KeeperException, IOException {
+ /** get group info from slave cluster zk */
+ List<String> groupInfos = ZKUtil.listChildrenAndWatchForNewChildren(
+ zkw, ZNodePaths.joinZNode(zkw.getZNodePaths().baseZNode, "rsgroup"));
+ /** if target cluster have same name group */
+ if(groupInfos == null){
+ if(LOG.isDebugEnabled()){
+ LOG.debug("groupInfos == null");
+ }
+ return Collections.emptyList();
+ }else{
+ if (groupInfos.size() > 0) {
+ if (groupInfos.contains(groupName)) {
+ return getServerListFromWithRSGroupName(groupName, zkw,
serverNameHostPortMapping);
+ } else if (!groupInfos.contains(groupName)) {
+ /** if target cluster does not have same name group, return a empty
list */
+ return Collections.emptyList();
+ }
+ } else {
+ /** if target cluster does not use group balancer, return a empty list
*/
+ return Collections.emptyList();
+ }
+ }
+
+ return Collections.emptyList();
+ }
+
+ protected List<ServerName> getServerListFromWithRSGroupName(
+ String groupName, ZKWatcher zkw, Map<String, String>
serverNameHostPortMapping)
+ throws IOException {
+ List<ServerName> serverList = new ArrayList<>();
+ RSGroupInfo detail = retrieveGroupInfo(
+ zkw, ZNodePaths.joinZNode(zkw.getZNodePaths().baseZNode, "rsgroup"),
groupName);
+ // choose server from rsZNode children which also in same group with local
mashine
+ for (Address serverInfo : detail.getServers()) {
+ String serverPrefix =
+ serverInfo.getHostname() + ServerName.SERVERNAME_SEPARATOR +
serverInfo.getPort();
+ if (serverNameHostPortMapping.containsKey(serverPrefix)) {
+ ServerName sn =
ServerName.parseServerName(serverNameHostPortMapping.get(serverPrefix));
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Match server in " + groupName + " success " +
serverPrefix + "/" + sn);
+ }
+ serverList.add(sn);
+ }
+ }
+ return serverList;
+ }
+
protected synchronized void chooseSinks() {
List<ServerName> slaveAddresses = fetchSlavesAddresses();
if (slaveAddresses.isEmpty()) {
LOG.warn("No sinks available at peer. Will not be able to replicate");
}
Collections.shuffle(slaveAddresses, ThreadLocalRandom.current());
- int numSinks = (int) Math.ceil(slaveAddresses.size() * ratio);
+ float actualRatio=ratio;
+ if(getIsGroup() && conf.get(HConstants.HBASE_MASTER_LOADBALANCER_CLASS,
LoadBalancerFactory
+
.getDefaultLoadBalancerClass().getName()).equals(RSGroupBasedLoadBalancer.class.getName())
+ && hostServerName != null) {
+ actualRatio=groupRatio;
+ }
+
+ int numSinks = (int) Math.ceil(slaveAddresses.size() * actualRatio);
this.sinkServers = slaveAddresses.subList(0, numSinks);
+ StringBuffer sb = new StringBuffer();
Review comment:
ditto.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]