This is an automated email from the ASF dual-hosted git repository. yuanbo pushed a commit to branch TUBEMQ-421 in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
commit 380436e5b802b5d3f02c316d6ea2931a0a49aaf0 Author: gosonzhang <[email protected]> AuthorDate: Sun Dec 20 16:46:04 2020 +0800 [TUBEMQ-463]Adjust Master rebalance process implementation (#355) Co-authored-by: gosonzhang <[email protected]> --- .../apache/tubemq/server/master/MasterConfig.java | 10 ++ .../org/apache/tubemq/server/master/TMaster.java | 180 ++++++++++++--------- 2 files changed, 115 insertions(+), 75 deletions(-) diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/MasterConfig.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/MasterConfig.java index 84ff2ed..b112d66 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/MasterConfig.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/MasterConfig.java @@ -79,6 +79,7 @@ public class MasterConfig extends AbstractFileConfig { private String visitName = ""; private String visitPassword = ""; private long authValidTimeStampPeriodMs = TBaseConstants.CFG_DEFAULT_AUTH_TIMESTAMP_VALID_INTERVAL; + private int rebalanceParallel = 4; /** * getters @@ -253,6 +254,10 @@ public class MasterConfig extends AbstractFileConfig { return authValidTimeStampPeriodMs; } + public int getRebalanceParallel() { + return rebalanceParallel; + } + /** * Load file section attributes * @@ -460,6 +465,10 @@ public class MasterConfig extends AbstractFileConfig { this.visitName = masterConf.get("visitName").trim(); this.visitPassword = masterConf.get("visitPassword").trim(); } + if (TStringUtils.isNotBlank(masterConf.get("rebalanceParallel"))) { + int tmpParallel = this.getInt(masterConf, "rebalanceParallel"); + this.rebalanceParallel = (tmpParallel <= 0) ? 1 : (Math.min(tmpParallel, 20)); + } } /** @@ -606,6 +615,7 @@ public class MasterConfig extends AbstractFileConfig { .append("useWebProxy", useWebProxy) .append("visitName", visitName) .append("visitPassword", visitPassword) + .append("rebalanceParallel", rebalanceParallel) .append(",").append(replicationConfig.toString()) .append(",").append(tlsConfig.toString()) .append(",").append(zkConfig.toString()) diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java index 8811cb0..34570de 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java @@ -27,7 +27,10 @@ import java.util.Map; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.codec.binary.StringUtils; import org.apache.commons.collections.CollectionUtils; @@ -126,6 +129,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable { new RpcServiceFactory(); private final ConsumerEventManager consumerEventManager; //consumer event manager private final TopicPSInfoManager topicPSInfoManager; //topic publish/subscribe info manager + private final ExecutorService executor; private final BrokerInfoHolder brokerHolder; //broker holder private final ProducerInfoHolder producerHolder; //producer holder private final ConsumerInfoHolder consumerHolder; //consumer holder @@ -144,11 +148,10 @@ public class TMaster extends HasThread implements MasterService, Stoppable { private AtomicLong idGenerator = new AtomicLong(0); //id generator private volatile boolean stopped = false; //stop flag private Thread balancerChore; //balance chore - private Thread resetBalancerChore; //reset balance chore private boolean initialized = false; private boolean startupBalance = true; - private boolean startupResetBalance = true; private int balanceDelayTimes = 0; + private AtomicInteger curBalanceParal = new AtomicInteger(0); private Sleeper stopSleeper = new Sleeper(1000, this); private SimpleVisitTokenManager visitTokenManager; @@ -165,6 +168,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable { this.checkAndCreateBdbDataPath(); this.masterAddInfo = new NodeAddrInfo(masterConfig.getHostName(), masterConfig.getPort()); + this.executor = Executors.newFixedThreadPool(this.masterConfig.getRebalanceParallel()); this.visitTokenManager = new SimpleVisitTokenManager(this.masterConfig); this.serverAuthHandler = new SimpleCertificateMasterHandler(this.masterConfig); this.producerHolder = new ProducerInfoHolder(); @@ -1832,7 +1836,6 @@ public class TMaster extends HasThread implements MasterService, Stoppable { if (!this.stopped) { Thread.sleep(masterConfig.getFirstBalanceDelayAfterStartMs()); this.balancerChore = startBalancerChore(this); - this.resetBalancerChore = startResetBalancerChore(this); initialized = true; while (!this.stopped) { stopSleeper.sleep(); @@ -1851,10 +1854,9 @@ public class TMaster extends HasThread implements MasterService, Stoppable { /** * Load balance */ - private void balance() { - // #lizard forgives + private void balance(final TMaster tMaster) { final StringBuilder strBuffer = new StringBuilder(512); - long rebalanceId = idGenerator.incrementAndGet(); + final long rebalanceId = idGenerator.incrementAndGet(); if (defaultBdbStoreService != null) { logger.info(strBuffer.append("[Rebalance Start] ").append(rebalanceId) .append(", isMaster=").append(defaultBdbStoreService.isMaster()) @@ -1865,20 +1867,86 @@ public class TMaster extends HasThread implements MasterService, Stoppable { .append(", BDB service is null isMaster= false, isPrimaryNodeActive=false").toString()); } strBuffer.delete(0, strBuffer.length()); + int curDoingTasks = this.curBalanceParal.get(); + if (curDoingTasks > 0) { + logger.info(strBuffer.append("[Rebalance End] ").append(rebalanceId) + .append(", the previous rebalance has ") + .append(curDoingTasks).append(" task(s) in progress!").toString()); + return; + } + final boolean isStartBalance = startupBalance; + List<String> groupsNeedToBalance = isStartBalance ? + consumerHolder.getAllGroup() : getNeedToBalanceGroupList(strBuffer); + strBuffer.delete(0, strBuffer.length()); + if (!groupsNeedToBalance.isEmpty()) { + // set parallel rebalance signal + curBalanceParal.set(masterConfig.getRebalanceParallel()); + // calculate process count + int unitNum = (groupsNeedToBalance.size() + masterConfig.getRebalanceParallel() - 1) + / masterConfig.getRebalanceParallel(); + // start processer to do reblance; + int startIndex = 0; + int endIndex = 0; + for (int i = 0; i < masterConfig.getRebalanceParallel(); i++) { + // get groups need to rebalance + startIndex = Math.min((i) * unitNum, groupsNeedToBalance.size()); + endIndex = Math.min((i + 1) * unitNum, groupsNeedToBalance.size()); + final List<String> subGroups = groupsNeedToBalance.subList(startIndex, endIndex); + // execute rebalance + this.executor.execute(new Runnable() { + @Override + public void run() { + try { + if (subGroups.isEmpty()) { + return; + } + // first process reset rebalance task; + try { + tMaster.processResetbalance(rebalanceId, + isStartBalance, subGroups); + } catch (Throwable e) { + logger.warn("[Rebalance processor] Error during reset-reb", e); + } + if (tMaster.isStopped()) { + return; + } + // second process normal rebalance task; + try { + tMaster.processRebalance(rebalanceId, + isStartBalance, subGroups); + } catch (Throwable e) { + logger.warn("[Rebalance processor] Error during normal-reb", e); + } + } catch (Throwable e) { + logger.warn("[Rebalance processor] Error during process", e); + } finally { + curBalanceParal.decrementAndGet(); + } + } + }); + } + } + startupBalance = false; + logger.info(strBuffer.append("[Rebalance End] ").append(rebalanceId).toString()); + } + + // process unReset group rebalance + public void processRebalance(long rebalanceId, boolean isFirstReb, List<String> groups) { + // #lizard forgives Map<String, Map<String, List<Partition>>> finalSubInfoMap = null; - if (startupBalance) { - finalSubInfoMap = - this.loadBalancer.bukAssign(consumerHolder, topicPSInfoManager, - consumerHolder.getAllGroup(), defaultBrokerConfManager, - masterConfig.getMaxGroupBrokerConsumeRate(), strBuffer); - startupBalance = false; + final StringBuilder strBuffer = new StringBuilder(512); + // choose different load balance strategy + if (isFirstReb) { + finalSubInfoMap = this.loadBalancer.bukAssign(consumerHolder, + topicPSInfoManager, groups, defaultBrokerConfManager, + masterConfig.getMaxGroupBrokerConsumeRate(), strBuffer); } else { - List<String> groupsNeedToBalance = getNeedToBalanceGroupList(strBuffer); - finalSubInfoMap = - this.loadBalancer.balanceCluster(currentSubInfo, consumerHolder, brokerHolder, - topicPSInfoManager, groupsNeedToBalance, defaultBrokerConfManager, - masterConfig.getMaxGroupBrokerConsumeRate(), strBuffer); + finalSubInfoMap = this.loadBalancer.balanceCluster(currentSubInfo, + consumerHolder, brokerHolder, topicPSInfoManager, groups, + defaultBrokerConfManager, masterConfig.getMaxGroupBrokerConsumeRate(), + strBuffer); } + // allocate partitions to consumers for (Map.Entry<String, Map<String, List<Partition>>> entry : finalSubInfoMap.entrySet()) { String consumerId = entry.getKey(); if (consumerId == null) { @@ -1891,7 +1959,8 @@ public class TMaster extends HasThread implements MasterService, Stoppable { || tupleInfo.f1 == null) { continue; } - List<String> blackTopicList = this.defaultBrokerConfManager.getBdbBlackTopicList(tupleInfo.f0); + List<String> blackTopicList = + this.defaultBrokerConfManager.getBdbBlackTopicList(tupleInfo.f0); Map<String, List<Partition>> topicSubPartMap = entry.getValue(); List<SubscribeInfo> deletedSubInfoList = new ArrayList<>(); List<SubscribeInfo> addedSubInfoList = new ArrayList<>(); @@ -1899,7 +1968,8 @@ public class TMaster extends HasThread implements MasterService, Stoppable { String topic = topicEntry.getKey(); List<Partition> finalPartList = topicEntry.getValue(); Map<String, Partition> currentPartMap = null; - Map<String, Map<String, Partition>> curTopicSubInfoMap = currentSubInfo.get(consumerId); + Map<String, Map<String, Partition>> curTopicSubInfoMap = + currentSubInfo.get(consumerId); if (curTopicSubInfoMap == null || curTopicSubInfoMap.get(topic) == null) { currentPartMap = new HashMap<>(); } else { @@ -1948,8 +2018,8 @@ public class TMaster extends HasThread implements MasterService, Stoppable { for (Partition currentPart : currentPartMap.values()) { if ((blackTopicList.contains(currentPart.getTopic())) || (!finalPartList.contains(currentPart))) { - deletedSubInfoList - .add(new SubscribeInfo(consumerId, tupleInfo.f0, false, currentPart)); + deletedSubInfoList.add(new SubscribeInfo(consumerId, + tupleInfo.f0, false, currentPart)); } } for (Partition finalPart : finalPartList) { @@ -1990,42 +2060,24 @@ public class TMaster extends HasThread implements MasterService, Stoppable { } } } - logger.info(strBuffer.append("[Rebalance End] ") - .append(rebalanceId).toString()); } /** - * Reset balance + * process Reset balance */ - private void resetBalance() { + public void processResetbalance(long rebalanceId, boolean isFirstReb, List<String> groups) { // #lizard forgives - //consumer need reset offset final StringBuilder strBuffer = new StringBuilder(512); - long rebalanceId = idGenerator.incrementAndGet(); - if (defaultBdbStoreService != null) { - logger.info(strBuffer.append("[ResetRebalance Start] ").append(rebalanceId) - .append(", isMaster=").append(defaultBdbStoreService.isMaster()) - .append(", isPrimaryNodeActive=") - .append(defaultBdbStoreService.isPrimaryNodeActive()).toString()); - } else { - logger.info(strBuffer.append("[ResetRebalance Start] ").append(rebalanceId) - .append(", BDB service is null isMaster= false, isPrimaryNodeActive=false").toString()); - } - strBuffer.delete(0, strBuffer.length()); Map<String, Map<String, Map<String, Partition>>> finalSubInfoMap = null; // choose different load balance strategy - if (startupResetBalance) { - finalSubInfoMap = - this.loadBalancer.resetBukAssign(consumerHolder, topicPSInfoManager, - consumerHolder.getAllGroup(), this.zkOffsetStorage, - this.defaultBrokerConfManager, strBuffer); - startupResetBalance = false; + if (isFirstReb) { + finalSubInfoMap = this.loadBalancer.resetBukAssign(consumerHolder, + topicPSInfoManager, groups, this.zkOffsetStorage, + this.defaultBrokerConfManager, strBuffer); } else { - List<String> groupsNeedToBalance = getNeedToBalanceGroupList(strBuffer); - finalSubInfoMap = - this.loadBalancer.resetBalanceCluster(currentSubInfo, consumerHolder, - topicPSInfoManager, groupsNeedToBalance, this.zkOffsetStorage, - this.defaultBrokerConfManager, strBuffer); + finalSubInfoMap = this.loadBalancer.resetBalanceCluster(currentSubInfo, + consumerHolder, topicPSInfoManager, groups, this.zkOffsetStorage, + this.defaultBrokerConfManager, strBuffer); } // filter for (Map.Entry<String, Map<String, Map<String, Partition>>> entry @@ -2041,7 +2093,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable { || tupleInfo.f1 == null) { continue; } - + // allocate partitions to consumers List<String> blackTopicList = this.defaultBrokerConfManager.getBdbBlackTopicList(tupleInfo.f0); Map<String, Map<String, Partition>> topicSubPartMap = entry.getValue(); @@ -2107,7 +2159,6 @@ public class TMaster extends HasThread implements MasterService, Stoppable { } } } - logger.info(strBuffer.append("[ResetRebalance End] ").append(rebalanceId).toString()); } /** @@ -2240,9 +2291,6 @@ public class TMaster extends HasThread implements MasterService, Stoppable { if (this.balancerChore != null) { this.balancerChore.interrupt(); } - if (this.resetBalancerChore != null) { - this.resetBalancerChore.interrupt(); - } } /** @@ -2252,11 +2300,12 @@ public class TMaster extends HasThread implements MasterService, Stoppable { * @return */ private Thread startBalancerChore(final TMaster master) { - Chore chore = new Chore("BalancerChore", masterConfig.getConsumerBalancePeriodMs(), master) { + Chore chore = new Chore("BalancerChore", + masterConfig.getConsumerBalancePeriodMs(), master) { @Override protected void chore() { try { - master.balance(); + master.balance(master); } catch (Throwable e) { logger.warn("Rebalance throwable error: ", e); } @@ -2265,26 +2314,6 @@ public class TMaster extends HasThread implements MasterService, Stoppable { return ThreadUtils.setDaemonThreadRunning(chore.getThread()); } - /** - * Start reset balance chore - * - * @param master - * @return - */ - private Thread startResetBalancerChore(final TMaster master) { - Chore chore = new Chore("ResetBalancerChore", masterConfig.getConsumerBalancePeriodMs(), master) { - @Override - protected void chore() { - try { - master.resetBalance(); - } catch (Throwable e) { - logger.warn("Reset Rebalance throwable error: ", e); - } - } - }; - return ThreadUtils.setDaemonThreadRunning(chore.getThread()); - } - public void stop() { stop(""); } @@ -2301,6 +2330,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable { try { webServer.stop(); rpcServiceFactory.destroy(); + executor.shutdown(); stopChores(); heartbeatManager.stop(); zkOffsetStorage.close();
