This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 08a12a64e [INLONG-6038][TubeMQ] Optimize
FlowCtrlRuleHandler.updateFlowCtrlInfo() (#6039)
08a12a64e is described below
commit 08a12a64e133fbed5ac6583b4f2d6da871651e45
Author: Goson Zhang <[email protected]>
AuthorDate: Tue Sep 27 18:51:32 2022 +0800
[INLONG-6038][TubeMQ] Optimize FlowCtrlRuleHandler.updateFlowCtrlInfo()
(#6039)
---
.../tubemq/client/config/ConsumerConfig.java | 2 +-
.../client/consumer/BaseMessageConsumer.java | 19 +-
.../tubemq/client/consumer/RmtDataCache.java | 209 +++++++++++----------
.../consumer/SimpleClientBalanceConsumer.java | 4 +-
.../corebase/policies/FlowCtrlRuleHandler.java | 34 ++--
.../corebase/policies/TestFlowCtrlRuleHandler.java | 3 +-
.../inlong/tubemq/server/broker/TubeBroker.java | 48 ++---
7 files changed, 166 insertions(+), 153 deletions(-)
diff --git
a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/config/ConsumerConfig.java
b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/config/ConsumerConfig.java
index 6dd19b8d3..590032d92 100644
---
a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/config/ConsumerConfig.java
+++
b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/config/ConsumerConfig.java
@@ -245,7 +245,7 @@ public class ConsumerConfig extends TubeClientConfig {
}
public void setMaxSubInfoReportIntvlTimes(int maxSubInfoReportIntvlTimes) {
- this.maxSubInfoReportIntvlTimes = maxSubInfoReportIntvlTimes;
+ this.maxSubInfoReportIntvlTimes = Math.max(maxSubInfoReportIntvlTimes,
3);
}
private void validConsumerGroupParameter(String consumerGroup) {
diff --git
a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/BaseMessageConsumer.java
b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/BaseMessageConsumer.java
index 177fcbb82..5840c28e4 100644
---
a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/BaseMessageConsumer.java
+++
b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/BaseMessageConsumer.java
@@ -111,7 +111,7 @@ public class BaseMessageConsumer implements MessageConsumer
{
// -1: Unsubscribed
// 0: In Process
// 1: Subscribed
- private AtomicInteger subStatus = new AtomicInteger(-1);
+ private final AtomicInteger subStatus = new AtomicInteger(-1);
// rebalance
private int reportIntervalTimes = 0;
private int rebalanceRetryTimes = 0;
@@ -610,7 +610,7 @@ public class BaseMessageConsumer implements MessageConsumer
{
masterService.consumerRegisterC2M(createMasterRegisterRequest(),
AddressUtils.getLocalAddress(),
consumerConfig.isTlsEnable());
if (response != null && response.getSuccess()) {
- processRegisterAllocAndRspFlowRules(response);
+ processRegisterAllocAndRspFlowRules(response, strBuffer);
processRegAuthorizedToken(response);
break;
}
@@ -1094,11 +1094,12 @@ public class BaseMessageConsumer implements
MessageConsumer {
return builder.build();
}
- private void
processRegisterAllocAndRspFlowRules(ClientMaster.RegisterResponseM2C response) {
+ private void
processRegisterAllocAndRspFlowRules(ClientMaster.RegisterResponseM2C response,
+ StringBuilder strBuffer) {
if (response.hasNotAllocated() && !response.getNotAllocated()) {
consumeSubInfo.compareAndSetIsNotAllocated(true, false);
}
- rmtDataCache.updFlowCtrlInfoInfo(response);
+ rmtDataCache.updFlowCtrlInfoInfo(response, strBuffer);
}
private void processRegAuthorizedToken(ClientMaster.RegisterResponseM2C
response) {
@@ -1107,11 +1108,12 @@ public class BaseMessageConsumer implements
MessageConsumer {
}
}
- private void
procHeartBeatRspAllocAndFlowRules(ClientMaster.HeartResponseM2C response) {
+ private void
procHeartBeatRspAllocAndFlowRules(ClientMaster.HeartResponseM2C response,
+ StringBuilder strBuffer) {
if (response.hasNotAllocated() && !response.getNotAllocated()) {
consumeSubInfo.compareAndSetIsNotAllocated(true, false);
}
- rmtDataCache.updFlowCtrlInfoInfo(response);
+ rmtDataCache.updFlowCtrlInfoInfo(response, strBuffer);
}
private ClientMaster.MasterCertificateInfo
genMasterCertificateInfo(boolean force) {
@@ -1478,7 +1480,7 @@ public class BaseMessageConsumer implements
MessageConsumer {
} else {
// Process the successful response. Record the
response information,
// including control rules and latest auth
token.
-
processRegisterAllocAndRspFlowRules(regResponse);
+
processRegisterAllocAndRspFlowRules(regResponse, strBuffer);
processRegAuthorizedToken(regResponse);
logger.info(strBuffer.append("[Re-register] ")
.append(consumerId).toString());
@@ -1505,7 +1507,7 @@ public class BaseMessageConsumer implements
MessageConsumer {
// Process the heartbeat success response
heartbeatRetryTimes = 0;
// Get the authorization rules and update the local rules
- procHeartBeatRspAllocAndFlowRules(response);
+ procHeartBeatRspAllocAndFlowRules(response, strBuffer);
// Get the latest authorized token
processHeartBeatAuthorizedToken(response);
// Check if master requires to check authorization next time.
If so, set the flag
@@ -1695,5 +1697,4 @@ public class BaseMessageConsumer implements
MessageConsumer {
}
}
}
-
}
diff --git
a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/RmtDataCache.java
b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/RmtDataCache.java
index d3dd0762f..3170f7db2 100644
---
a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/RmtDataCache.java
+++
b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/RmtDataCache.java
@@ -73,7 +73,7 @@ public class RmtDataCache implements Closeable {
private long lastEmptyBrokerPrintTime = 0;
private long lastEmptyTopicPrintTime = 0;
private long lastBrokerUpdatedTime = System.currentTimeMillis();
- private AtomicLong lstBrokerConfigId =
+ private final AtomicLong lstBrokerConfigId =
new AtomicLong(TBaseConstants.META_VALUE_UNDEFINED);
private Map<Integer, BrokerInfo> brokersMap =
new ConcurrentHashMap<>();
@@ -96,12 +96,15 @@ public class RmtDataCache implements Closeable {
new AtomicLong(TBaseConstants.META_VALUE_UNDEFINED);
private boolean isFirstReport = true;
private long reportIntCount = 0;
+ private final long maxReportTimes;
// partition cache
private final AtomicInteger waitCont = new AtomicInteger(0);
private final ConcurrentHashMap<String, Timeout> timeouts =
new ConcurrentHashMap<>();
private final ConcurrentLinkedQueue<String> indexPartition =
new ConcurrentLinkedQueue<String>();
+ private volatile long lstReportTime = 0;
+ private final AtomicLong partMapChgTime = new AtomicLong(0);
private final ConcurrentHashMap<String /* index */, PartitionExt>
partitionMap =
new ConcurrentHashMap<>();
private final ConcurrentHashMap<String /* index */, Long> partitionUsedMap
=
@@ -116,7 +119,7 @@ public class RmtDataCache implements Closeable {
new ConcurrentHashMap<>();
private final ConcurrentHashMap<String/* partitionKey */, Integer>
partRegisterBookMap =
new ConcurrentHashMap<>();
- private AtomicBoolean isClosed = new AtomicBoolean(false);
+ private final AtomicBoolean isClosed = new AtomicBoolean(false);
private CountDownLatch dataProcessSync = new CountDownLatch(0);
/**
@@ -130,6 +133,7 @@ public class RmtDataCache implements Closeable {
if (refCont.incrementAndGet() == 1) {
timer = new HashedWheelTimer();
}
+ this.maxReportTimes = consumerConfig.getMaxSubInfoReportIntvlTimes() *
10L;
Map<Partition, ConsumeOffsetInfo> tmpPartOffsetMap = new HashMap<>();
if (partitionList != null) {
for (Partition partition : partitionList) {
@@ -163,41 +167,39 @@ public class RmtDataCache implements Closeable {
* update ops task in cache
*
* @param opsTaskInfo ops task info
+ * @param strBuff the string buffer
*
*/
- public void updOpsTaskInfo(ClientMaster.OpsTaskInfo opsTaskInfo) {
+ public void updOpsTaskInfo(ClientMaster.OpsTaskInfo opsTaskInfo,
StringBuilder strBuff) {
if (opsTaskInfo == null) {
return;
}
- // update flowctrl info
- if (opsTaskInfo.hasGroupFlowCheckId()) {
- if (opsTaskInfo.getGroupFlowCheckId() >= 0
- && opsTaskInfo.getGroupFlowCheckId() !=
groupFlowCtrlRuleHandler.getFlowCtrlId()) {
- try {
-
groupFlowCtrlRuleHandler.updateFlowCtrlInfo(TBaseConstants.META_VALUE_UNDEFINED,
- opsTaskInfo.getGroupFlowCheckId(),
opsTaskInfo.getGroupFlowControlInfo());
- } catch (Exception e1) {
- logger.warn("[Remote Data Cache] found parse group
flowCtrl rules failure", e1);
- }
- }
- }
- if (opsTaskInfo.hasDefFlowCheckId()) {
- if (opsTaskInfo.getDefFlowCheckId() >= 0
- && opsTaskInfo.getDefFlowCheckId() !=
defFlowCtrlRuleHandler.getFlowCtrlId()) {
- try {
-
defFlowCtrlRuleHandler.updateFlowCtrlInfo(TBaseConstants.META_VALUE_UNDEFINED,
- opsTaskInfo.getDefFlowCheckId(),
opsTaskInfo.getDefFlowControlInfo());
- } catch (Exception e1) {
- logger.warn("[Remote Data Cache] found parse default
flowCtrl rules failure", e1);
- }
+ // update group flowctrl info
+ if (opsTaskInfo.hasGroupFlowCheckId()
+ && opsTaskInfo.getGroupFlowCheckId() >= 0
+ && opsTaskInfo.getGroupFlowCheckId() !=
groupFlowCtrlRuleHandler.getFlowCtrlId()) {
+ try {
+ groupFlowCtrlRuleHandler.updateFlowCtrlInfo(
+ opsTaskInfo.getQryPriorityId(),
+ opsTaskInfo.getGroupFlowCheckId(),
+ opsTaskInfo.getGroupFlowControlInfo(), strBuff);
+ } catch (Exception e1) {
+ logger.warn("[Remote Data Cache] found parse group flowCtrl
rules failure", e1);
+ }
+ }
+ // update default flowctrl info
+ if (opsTaskInfo.hasDefFlowCheckId()
+ && opsTaskInfo.getDefFlowCheckId() >= 0
+ && opsTaskInfo.getDefFlowCheckId() !=
defFlowCtrlRuleHandler.getFlowCtrlId()) {
+ try {
+ defFlowCtrlRuleHandler.updateFlowCtrlInfo(
+ TBaseConstants.META_VALUE_UNDEFINED,
+ opsTaskInfo.getDefFlowCheckId(),
+ opsTaskInfo.getDefFlowControlInfo(), strBuff);
+ } catch (Exception e1) {
+ logger.warn("[Remote Data Cache] found parse default flowCtrl
rules failure", e1);
}
}
- // update priority id
- int qryPriorityId = opsTaskInfo.hasQryPriorityId()
- ? opsTaskInfo.getQryPriorityId() :
groupFlowCtrlRuleHandler.getQryPriorityId();
- if (qryPriorityId != groupFlowCtrlRuleHandler.getQryPriorityId()) {
- groupFlowCtrlRuleHandler.setQryPriorityId(qryPriorityId);
- }
// update consume control info
if (opsTaskInfo.hasCsmFrmMaxOffsetCtrlId()
&& opsTaskInfo.getCsmFrmMaxOffsetCtrlId() >= 0) {
@@ -217,83 +219,81 @@ public class RmtDataCache implements Closeable {
/**
* update ops task in cache
*
- * @param response master register response
+ * @param response master register response
+ * @param strBuff the string buffer
*
*/
- public void updFlowCtrlInfoInfo(ClientMaster.RegisterResponseM2C response)
{
+ public void updFlowCtrlInfoInfo(ClientMaster.RegisterResponseM2C response,
+ StringBuilder strBuff) {
if (response == null) {
return;
}
- // update flowctrl info
- if (response.hasGroupFlowCheckId()) {
- if (response.getGroupFlowCheckId() >= 0
- && response.getGroupFlowCheckId() !=
groupFlowCtrlRuleHandler.getFlowCtrlId()) {
- try {
-
groupFlowCtrlRuleHandler.updateFlowCtrlInfo(TBaseConstants.META_VALUE_UNDEFINED,
- response.getGroupFlowCheckId(),
response.getGroupFlowControlInfo());
- } catch (Exception e1) {
- logger.warn("[Remote Data Cache] found parse group
flowCtrl rules failure", e1);
- }
- }
- }
- if (response.hasDefFlowCheckId()) {
- if (response.getDefFlowCheckId() >= 0
- && response.getDefFlowCheckId() !=
defFlowCtrlRuleHandler.getFlowCtrlId()) {
- try {
-
defFlowCtrlRuleHandler.updateFlowCtrlInfo(TBaseConstants.META_VALUE_UNDEFINED,
- response.getDefFlowCheckId(),
response.getDefFlowControlInfo());
- } catch (Exception e1) {
- logger.warn("[Remote Data Cache] found parse default
flowCtrl rules failure", e1);
- }
+ // update group flowctrl info
+ if (response.hasGroupFlowCheckId()
+ && response.getGroupFlowCheckId() >= 0
+ && response.getGroupFlowCheckId() !=
groupFlowCtrlRuleHandler.getFlowCtrlId()) {
+ try {
+ groupFlowCtrlRuleHandler.updateFlowCtrlInfo(
+ response.getQryPriorityId(),
+ response.getGroupFlowCheckId(),
+ response.getGroupFlowControlInfo(), strBuff);
+ } catch (Exception e1) {
+ logger.warn("[Remote Data Cache] found parse group flowCtrl
rules failure", e1);
+ }
+ }
+ // update default flowctrl info
+ if (response.hasDefFlowCheckId()
+ && response.getDefFlowCheckId() >= 0
+ && response.getDefFlowCheckId() !=
defFlowCtrlRuleHandler.getFlowCtrlId()) {
+ try {
+ defFlowCtrlRuleHandler.updateFlowCtrlInfo(
+ TBaseConstants.META_VALUE_UNDEFINED,
+ response.getDefFlowCheckId(),
+ response.getDefFlowControlInfo(), strBuff);
+ } catch (Exception e1) {
+ logger.warn("[Remote Data Cache] found parse default flowCtrl
rules failure", e1);
}
}
- // update priority id
- int qryPriorityId = response.hasQryPriorityId()
- ? response.getQryPriorityId() :
groupFlowCtrlRuleHandler.getQryPriorityId();
- if (qryPriorityId != groupFlowCtrlRuleHandler.getQryPriorityId()) {
- groupFlowCtrlRuleHandler.setQryPriorityId(qryPriorityId);
- }
}
/**
* update ops task in cache
*
- * @param response master register response
+ * @param response master register response
+ * @param strBuff the string buffer
*
*/
- public void updFlowCtrlInfoInfo(ClientMaster.HeartResponseM2C response) {
+ public void updFlowCtrlInfoInfo(ClientMaster.HeartResponseM2C response,
+ StringBuilder strBuff) {
if (response == null) {
return;
}
- // update flowctrl info
- if (response.hasGroupFlowCheckId()) {
- if (response.getGroupFlowCheckId() >= 0
- && response.getGroupFlowCheckId() !=
groupFlowCtrlRuleHandler.getFlowCtrlId()) {
- try {
-
groupFlowCtrlRuleHandler.updateFlowCtrlInfo(TBaseConstants.META_VALUE_UNDEFINED,
- response.getGroupFlowCheckId(),
response.getGroupFlowControlInfo());
- } catch (Exception e1) {
- logger.warn("[Remote Data Cache] found parse group
flowCtrl rules failure", e1);
- }
- }
- }
- if (response.hasDefFlowCheckId()) {
- if (response.getDefFlowCheckId() >= 0
- && response.getDefFlowCheckId() !=
defFlowCtrlRuleHandler.getFlowCtrlId()) {
- try {
-
defFlowCtrlRuleHandler.updateFlowCtrlInfo(TBaseConstants.META_VALUE_UNDEFINED,
- response.getDefFlowCheckId(),
response.getDefFlowControlInfo());
- } catch (Exception e1) {
- logger.warn("[Remote Data Cache] found parse default
flowCtrl rules failure", e1);
- }
+ // update group flowctrl info
+ if (response.hasGroupFlowCheckId()
+ && response.getGroupFlowCheckId() >= 0
+ && response.getGroupFlowCheckId() !=
groupFlowCtrlRuleHandler.getFlowCtrlId()) {
+ try {
+ groupFlowCtrlRuleHandler.updateFlowCtrlInfo(
+ response.getQryPriorityId(),
+ response.getGroupFlowCheckId(),
+ response.getGroupFlowControlInfo(), strBuff);
+ } catch (Exception e1) {
+ logger.warn("[Remote Data Cache] found parse group flowCtrl
rules failure", e1);
+ }
+ }
+ // update default flowctrl info
+ if (response.hasDefFlowCheckId()
+ && response.getDefFlowCheckId() >= 0
+ && response.getDefFlowCheckId() !=
defFlowCtrlRuleHandler.getFlowCtrlId()) {
+ try {
+ defFlowCtrlRuleHandler.updateFlowCtrlInfo(
+ TBaseConstants.META_VALUE_UNDEFINED,
+ response.getDefFlowCheckId(),
+ response.getDefFlowControlInfo(), strBuff);
+ } catch (Exception e1) {
+ logger.warn("[Remote Data Cache] found parse default flowCtrl
rules failure", e1);
}
}
- // update priority id
- int qryPriorityId = response.hasQryPriorityId()
- ? response.getQryPriorityId() :
groupFlowCtrlRuleHandler.getQryPriorityId();
- if (qryPriorityId != groupFlowCtrlRuleHandler.getQryPriorityId()) {
- groupFlowCtrlRuleHandler.setQryPriorityId(qryPriorityId);
- }
}
public boolean isCsmFromMaxOffset() {
@@ -566,12 +566,16 @@ public class RmtDataCache implements Closeable {
if (!this.partitionMap.isEmpty()) {
isFirstReport = false;
builder.setReportSubInfo(true);
+ lstReportTime = partMapChgTime.get();
+ builder.addAllPartSubInfo(getSubscribedPartitionInfo());
+ }
+ } else {
+ if (lstReportTime != partMapChgTime.get()
+ || ((++this.reportIntCount) % this.maxReportTimes == 0)) {
+ builder.setReportSubInfo(true);
+ lstReportTime = partMapChgTime.get();
builder.addAllPartSubInfo(getSubscribedPartitionInfo());
}
- } else if ((++this.reportIntCount)
- % consumerConfig.getMaxSubInfoReportIntvlTimes() == 0) {
- builder.setReportSubInfo(true);
- builder.addAllPartSubInfo(getSubscribedPartitionInfo());
}
return builder.build();
}
@@ -1068,7 +1072,7 @@ public class RmtDataCache implements Closeable {
for (Map.Entry<BrokerInfo, List<Partition>> entry :
unRegisterInfoMap.entrySet()) {
for (Partition partition : entry.getValue()) {
PartitionExt partitionExt =
- partitionMap.remove(partition.getPartitionKey());
+ rmvPartitionFromMap(partition.getPartitionKey());
if (partitionExt != null) {
lastPackConsumed = partitionExt.isLastPackConsumed();
if (!cancelTimeTask(partition.getPartitionKey())
@@ -1137,7 +1141,7 @@ public class RmtDataCache implements Closeable {
try {
waitPartitions(partitionKeys, inUseWaitPeriodMs);
PartitionExt partitionExt =
- partitionMap.remove(partitionKey);
+ rmvPartitionFromMap(partitionKey);
if (partitionExt == null) {
result.setSuccResult(null);
return result.isSuccess();
@@ -1187,7 +1191,7 @@ public class RmtDataCache implements Closeable {
* @param partition partition to be removed
*/
public void removePartition(Partition partition) {
- partitionMap.remove(partition.getPartitionKey());
+ rmvPartitionFromMap(partition.getPartitionKey());
cancelTimeTask(partition.getPartitionKey());
indexPartition.remove(partition.getPartitionKey());
partitionUsedMap.remove(partition.getPartitionKey());
@@ -1505,7 +1509,7 @@ public class RmtDataCache implements Closeable {
}
updateOffsetCache(partition.getPartitionKey(),
entry.getValue().getCurrOffset(),
entry.getValue().getMaxOffset());
- partitionMap.put(partition.getPartitionKey(),
+ addPartitionToMap(partition.getPartitionKey(),
new PartitionExt(this.groupFlowCtrlRuleHandler,
this.defFlowCtrlRuleHandler, partition.getBroker(),
partition.getTopic(), partition.getPartitionId()));
@@ -1530,6 +1534,19 @@ public class RmtDataCache implements Closeable {
&& this.dataProcessSync.getCount() != 0);
}
+ private void addPartitionToMap(String partKey, PartitionExt partitionExt) {
+ partitionMap.put(partKey, partitionExt);
+ partMapChgTime.set(System.currentTimeMillis());
+ }
+
+ private PartitionExt rmvPartitionFromMap(String partKey) {
+ PartitionExt tmpPartExt = partitionMap.remove(partKey);
+ if (tmpPartExt != null) {
+ partMapChgTime.set(System.currentTimeMillis());
+ }
+ return tmpPartExt;
+ }
+
private void pauseProcess() {
this.dataProcessSync = new CountDownLatch(1);
}
diff --git
a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/SimpleClientBalanceConsumer.java
b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/SimpleClientBalanceConsumer.java
index d1a1a68aa..330ab1d6b 100644
---
a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/SimpleClientBalanceConsumer.java
+++
b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/SimpleClientBalanceConsumer.java
@@ -1035,7 +1035,7 @@ public class SimpleClientBalanceConsumer implements
ClientBalanceConsumer {
lstMetaQueryTime.set(System.currentTimeMillis());
}
// Get the authorization rules and update the local rules
- clientRmtDataCache.updOpsTaskInfo(response.getOpsTaskInfo());
+ clientRmtDataCache.updOpsTaskInfo(response.getOpsTaskInfo(),
strBuffer);
// Get the latest authorized token
processHeartBeatAuthorizedToken(response);
// Warning if heartbeat interval is too long
@@ -1171,7 +1171,7 @@ public class SimpleClientBalanceConsumer implements
ClientBalanceConsumer {
clientRmtDataCache.updateReg2MasterTime();
clientRmtDataCache.updateBrokerInfoList(response.getBrokerConfigId(),
response.getBrokerConfigListList(), sBuffer);
- clientRmtDataCache.updOpsTaskInfo(response.getOpsTaskInfo());
+ clientRmtDataCache.updOpsTaskInfo(response.getOpsTaskInfo(),
sBuffer);
processRegAuthorizedToken(response);
result.setSuccResult();
return result.isSuccess();
diff --git
a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/policies/FlowCtrlRuleHandler.java
b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/policies/FlowCtrlRuleHandler.java
index 31ecf1180..93cab8faa 100644
---
a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/policies/FlowCtrlRuleHandler.java
+++
b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/policies/FlowCtrlRuleHandler.java
@@ -88,30 +88,27 @@ public class FlowCtrlRuleHandler {
/**
* Parse flow control information and update stored cached old content
*
- * @param qyrPriorityId query priority id
+ * @param qryPriorityId the query priority id
* @param flowCtrlId flow control information id
* @param flowCtrlInfo flow control information content
- * @throws Exception Exception thrown
+ * @param strBuff the string buffer
+ * @throws Exception the exception thrown
*/
- public void updateFlowCtrlInfo(final int qyrPriorityId,
- final long flowCtrlId,
- final String flowCtrlInfo) throws Exception
{
+ public void updateFlowCtrlInfo(int qryPriorityId, long flowCtrlId,
+ String flowCtrlInfo, StringBuilder strBuff)
throws Exception {
if (flowCtrlId == this.flowCtrlId.get()) {
return;
}
+ long befFlowCtrlId;
+ int befQryPriorityId = TBaseConstants.META_VALUE_UNDEFINED;
Map<Integer, List<FlowCtrlItem>> flowCtrlItemsMap = null;
if (TStringUtils.isNotBlank(flowCtrlInfo)) {
flowCtrlItemsMap = parseFlowCtrlInfo(flowCtrlInfo);
}
writeLock.lock();
try {
- this.flowCtrlId.set(flowCtrlId);
+ befFlowCtrlId = this.flowCtrlId.getAndSet(flowCtrlId);
this.strFlowCtrlInfo = flowCtrlInfo;
- logger.info(new StringBuilder(512)
- .append("[Flow Ctrl] Updated ").append(flowCtrlName)
- .append(" to flowId=").append(flowCtrlId)
- .append(",qyrPriorityId=").append(qyrPriorityId).toString());
- this.qryPriorityId.set(qyrPriorityId);
clearStatisData();
if (flowCtrlItemsMap == null
|| flowCtrlItemsMap.isEmpty()) {
@@ -120,10 +117,24 @@ public class FlowCtrlRuleHandler {
flowCtrlRuleSet = flowCtrlItemsMap;
initialStatisData();
}
+ if (qryPriorityId != TBaseConstants.META_VALUE_UNDEFINED
+ && qryPriorityId != this.qryPriorityId.get()) {
+ befQryPriorityId = this.qryPriorityId.getAndSet(qryPriorityId);
+ }
this.lastUpdateTime = System.currentTimeMillis();
} finally {
writeLock.unlock();
}
+ strBuff.append("[Flow Ctrl] Update ").append(flowCtrlName)
+ .append(", flowId from ").append(befFlowCtrlId)
+ .append(" to ").append(flowCtrlId);
+ if (qryPriorityId != TBaseConstants.META_VALUE_UNDEFINED
+ && qryPriorityId != befQryPriorityId) {
+ strBuff.append(", qryPriorityId from ").append(befQryPriorityId)
+ .append(" to ").append(qryPriorityId);
+ }
+ logger.info(strBuff.toString());
+ strBuff.delete(0, strBuff.length());
}
/**
@@ -782,5 +793,4 @@ public class FlowCtrlRuleHandler {
}
return timeHour * 100 + timeMin;
}
-
}
diff --git
a/inlong-tubemq/tubemq-core/src/test/java/org/apache/inlong/tubemq/corebase/policies/TestFlowCtrlRuleHandler.java
b/inlong-tubemq/tubemq-core/src/test/java/org/apache/inlong/tubemq/corebase/policies/TestFlowCtrlRuleHandler.java
index 09fc784fa..0ea34a238 100644
---
a/inlong-tubemq/tubemq-core/src/test/java/org/apache/inlong/tubemq/corebase/policies/TestFlowCtrlRuleHandler.java
+++
b/inlong-tubemq/tubemq-core/src/test/java/org/apache/inlong/tubemq/corebase/policies/TestFlowCtrlRuleHandler.java
@@ -39,8 +39,9 @@ public class TestFlowCtrlRuleHandler {
@Test
public void testFlowCtrlRuleHandler() {
try {
+ StringBuilder strBuff = new StringBuilder(512);
FlowCtrlRuleHandler handler = new FlowCtrlRuleHandler(true);
- handler.updateFlowCtrlInfo(2, 10, mockFlowCtrlInfo());
+ handler.updateFlowCtrlInfo(2, 10, mockFlowCtrlInfo(), strBuff);
TimeZone timeZone = TimeZone.getTimeZone("GMT+8:00");
Calendar rightNow = Calendar.getInstance(timeZone);
int hour = rightNow.get(Calendar.HOUR_OF_DAY);
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/TubeBroker.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/TubeBroker.java
index 397ccfd1f..437b51f48 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/TubeBroker.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/TubeBroker.java
@@ -329,24 +329,14 @@ public class TubeBroker implements Stoppable {
// update default flow controller rules
FlowCtrlRuleHandler defFlowCtrlHandler =
metadataManager.getFlowCtrlRuleHandler();
- long flowCheckId = defFlowCtrlHandler.getFlowCtrlId();
- int qryPriorityId = defFlowCtrlHandler.getQryPriorityId();
- if (response.hasFlowCheckId()) {
- qryPriorityId = response.hasQryPriorityId()
- ? response.getQryPriorityId() : qryPriorityId;
- if (response.getFlowCheckId() != flowCheckId) {
- flowCheckId = response.getFlowCheckId();
- try {
- defFlowCtrlHandler
- .updateFlowCtrlInfo(qryPriorityId,
- flowCheckId,
response.getFlowControlInfo());
- } catch (Exception e1) {
- logger.warn(
- "[HeartBeat response] found parse flowCtrl rules
failure", e1);
- }
- }
- if (qryPriorityId != defFlowCtrlHandler.getQryPriorityId()) {
- defFlowCtrlHandler.setQryPriorityId(qryPriorityId);
+ if (response.hasFlowCheckId()
+ && response.getFlowCheckId() >= 0
+ && response.getFlowCheckId() !=
defFlowCtrlHandler.getFlowCtrlId()) {
+ try {
+
defFlowCtrlHandler.updateFlowCtrlInfo(response.getQryPriorityId(),
+ response.getFlowCheckId(),
response.getFlowControlInfo(), strBuff);
+ } catch (Exception e1) {
+ logger.warn("[HeartBeat response] found update flowCtrl rules
failure", e1);
}
}
// update configure report requirement
@@ -450,20 +440,14 @@ public class TubeBroker implements Stoppable {
// process default flow controller rules
FlowCtrlRuleHandler defFlowCtrlHandler =
metadataManager.getFlowCtrlRuleHandler();
- if (response.hasFlowCheckId()) {
- int qryPriorityId = response.hasQryPriorityId()
- ? response.getQryPriorityId() :
defFlowCtrlHandler.getQryPriorityId();
- if (response.getFlowCheckId() !=
defFlowCtrlHandler.getFlowCtrlId()) {
- try {
- defFlowCtrlHandler
- .updateFlowCtrlInfo(response.getQryPriorityId(),
- response.getFlowCheckId(),
response.getFlowControlInfo());
- } catch (Exception e1) {
- logger.warn("[Register response] found parse flowCtrl
rules failure", e1);
- }
- }
- if (qryPriorityId != defFlowCtrlHandler.getQryPriorityId()) {
- defFlowCtrlHandler.setQryPriorityId(qryPriorityId);
+ if (response.hasFlowCheckId()
+ && response.getFlowCheckId() >= 0
+ && response.getFlowCheckId() !=
defFlowCtrlHandler.getFlowCtrlId()) {
+ try {
+
defFlowCtrlHandler.updateFlowCtrlInfo(response.getQryPriorityId(),
+ response.getFlowCheckId(),
response.getFlowControlInfo(), strBuff);
+ } catch (Exception e1) {
+ logger.warn("[Register response] update default flowCtrl rules
failure", e1);
}
}
// update auth info