This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new c960960f9 [INLONG-7106][TubeMQ] Add reset offset API by time (#7108)
c960960f9 is described below
commit c960960f923136a615b31e494965bb2aea847384
Author: Goson Zhang <[email protected]>
AuthorDate: Tue Jan 3 10:14:34 2023 +0800
[INLONG-7106][TubeMQ] Add reset offset API by time (#7108)
---
.../tubemq/corebase/utils/ServiceStatusHolder.java | 24 +--
.../server/broker/offset/OffsetHistoryInfo.java | 67 ++++++
.../server/broker/web/BrokerAdminServlet.java | 226 +++++++++++++++++++++
.../tubemq/server/common/fielddef/WebFieldDef.java | 8 +-
.../nodemanage/nodebroker/BrokerAbnHolder.java | 43 ++--
5 files changed, 330 insertions(+), 38 deletions(-)
diff --git
a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/ServiceStatusHolder.java
b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/ServiceStatusHolder.java
index 378bf6de7..2c9ea1013 100644
---
a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/ServiceStatusHolder.java
+++
b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/ServiceStatusHolder.java
@@ -70,16 +70,10 @@ public class ServiceStatusHolder {
}
public static boolean addWriteIOErrCnt() {
- long curTime = lastWriteStatsTime.get();
- if (System.currentTimeMillis() - curTime > statsDurationMs) {
- if (lastWriteStatsTime.compareAndSet(curTime,
System.currentTimeMillis())) {
- curWriteIOExcptCnt.getAndSet(0);
- if (isPauseWrite.get()) {
- isPauseWrite.compareAndSet(true, false);
- }
- }
- }
if (curWriteIOExcptCnt.incrementAndGet() > allowedWriteIOExcptCnt) {
+ if (isPauseWrite.get()) {
+ return true;
+ }
isPauseWrite.set(true);
return true;
}
@@ -95,16 +89,10 @@ public class ServiceStatusHolder {
}
public static boolean addReadIOErrCnt() {
- long curTime = lastReadStatsTime.get();
- if (System.currentTimeMillis() - curTime > statsDurationMs) {
- if (lastReadStatsTime.compareAndSet(curTime,
System.currentTimeMillis())) {
- curReadIOExcptCnt.getAndSet(0);
- if (isPauseRead.get()) {
- isPauseRead.compareAndSet(true, false);
- }
- }
- }
if (curReadIOExcptCnt.incrementAndGet() > allowedReadIOExcptCnt) {
+ if (isPauseRead.get()) {
+ return true;
+ }
isPauseRead.set(true);
return true;
}
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetHistoryInfo.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetHistoryInfo.java
index 3c6b90072..2d5f1e468 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetHistoryInfo.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetHistoryInfo.java
@@ -17,10 +17,17 @@
package org.apache.inlong.tubemq.server.broker.offset;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import org.apache.inlong.tubemq.corebase.TBaseConstants;
+import org.apache.inlong.tubemq.corebase.rv.ProcessResult;
import org.apache.inlong.tubemq.corebase.utils.DateTimeConvertUtils;
+import org.apache.inlong.tubemq.corebase.utils.Tuple3;
import org.apache.inlong.tubemq.server.common.TServerConstants;
/**
@@ -159,4 +166,64 @@ public class OffsetHistoryInfo {
}
strBuff.append("]}");
}
+
+ /**
+ * Parse history offset record info
+ *
+ * @param jsonData string offset information
+ * @param result process result
+ */
+ public static boolean parseRecordInfo(String jsonData, ProcessResult
result) {
+ JsonObject jsonObject = null;
+ try {
+ jsonObject = JsonParser.parseString(jsonData).getAsJsonObject();
+ } catch (Throwable e1) {
+ result.setFailResult(String.format(
+ "Parse history offset value failure, reason is %s",
e1.getMessage()));
+ return result.isSuccess();
+ }
+ if (jsonObject == null) {
+ result.setFailResult("Parse error, history offset value must be
valid json format!");
+ return result.isSuccess();
+ }
+ if (!jsonObject.has("ver")) {
+ result.setFailResult("FIELD ver is required in history offset
value!");
+ return result.isSuccess();
+ }
+ int verValue = jsonObject.get("ver").getAsInt();
+ if (verValue < TServerConstants.OFFSET_HISTORY_RECORD_SHORT_VERSION) {
+ result.setFailResult("Only support v2 or next version in history
offset value!");
+ return result.isSuccess();
+ }
+ if (!jsonObject.has("records")) {
+ result.setFailResult("FIELD records is required in history offset
value!");
+ return result.isSuccess();
+ }
+ List<Tuple3<String, Integer, Long>> resetOffsets = new ArrayList<>();
+ JsonArray records = jsonObject.get("records").getAsJsonArray();
+ for (int i = 0; i < records.size(); i++) {
+ JsonObject itemInfo = records.get(i).getAsJsonObject();
+ if (itemInfo == null) {
+ continue;
+ }
+ String topicName = itemInfo.get("topic").getAsString();
+ JsonArray offsets = itemInfo.get("offsets").getAsJsonArray();
+ for (int j = 0; j < offsets.size(); j++) {
+ JsonObject storeInfo = offsets.get(j).getAsJsonObject();
+ if (storeInfo == null) {
+ continue;
+ }
+ JsonArray partInfos = storeInfo.get("parts").getAsJsonArray();
+ for (int k = 0; k < partInfos.size(); k++) {
+ JsonObject partItem = partInfos.get(k).getAsJsonObject();
+ int partId = partItem.get("partId").getAsInt();
+ long offsetVal = partItem.get("iCfm").getAsLong();
+ resetOffsets.add(new Tuple3<>(topicName, partId,
offsetVal));
+ }
+ }
+ }
+ result.setSuccResult(resetOffsets);
+ return true;
+ }
+
}
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/web/BrokerAdminServlet.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/web/BrokerAdminServlet.java
index 3e723c541..2a73b0e67 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/web/BrokerAdminServlet.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/web/BrokerAdminServlet.java
@@ -44,6 +44,7 @@ import
org.apache.inlong.tubemq.server.broker.msgstore.MessageStore;
import org.apache.inlong.tubemq.server.broker.msgstore.MessageStoreManager;
import org.apache.inlong.tubemq.server.broker.msgstore.disk.GetMessageResult;
import org.apache.inlong.tubemq.server.broker.nodeinfo.ConsumerNodeInfo;
+import org.apache.inlong.tubemq.server.broker.offset.OffsetHistoryInfo;
import org.apache.inlong.tubemq.server.broker.offset.OffsetService;
import org.apache.inlong.tubemq.server.broker.stats.BrokerStatsType;
import org.apache.inlong.tubemq.server.broker.stats.BrokerSrvStatsHolder;
@@ -118,6 +119,9 @@ public class BrokerAdminServlet extends AbstractWebHandler {
// set or update group's offset info
innRegisterWebMethod("admin_set_offset",
"adminSetGroupOffSet", false);
+ // set or update group's offset info by history offset time
+ innRegisterWebMethod("admin_set_offset_by_time",
+ "adminSetGroupOffSetByTime", false);
// remove group's offset info
innRegisterWebMethod("admin_rmv_offset",
"adminRemoveGroupOffSet", false);
@@ -1001,6 +1005,228 @@ public class BrokerAdminServlet extends
AbstractWebHandler {
sBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"}");
}
+ /**
+ * Add or Modify consumer group offset by group's history offset time.
+ *
+ * @param req request
+ * @param sBuffer process result
+ */
+ public void adminSetGroupOffSetByTime(HttpServletRequest req,
StringBuilder sBuffer) {
+ ProcessResult result = new ProcessResult();
+ // get group name
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.GROUPNAME, true, null, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.getErrMsg());
+ return;
+ }
+ final String groupName = (String) result.getRetData();
+ // get modify user
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.MODIFYUSER, true, null, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.getErrMsg());
+ return;
+ }
+ final String modifyUser = (String) result.getRetData();
+ // get the left timestamp to be set
+ if (!WebParameterUtils.getDateParameter(req,
+ WebFieldDef.RECORDTIME, true, null, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.getErrMsg());
+ return;
+ }
+ final Date tmpDataTime1 = (Date) result.getRetData();
+ final long recStartTime = tmpDataTime1.getTime();
+ // get the right timestamp to be set
+ if (!WebParameterUtils.getDateParameter(req,
+ WebFieldDef.ENDTIME, false, null, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.getErrMsg());
+ return;
+ }
+ final Date tmpDataTime2 = (Date) result.getRetData();
+ long recEndTime = recStartTime + 5 * 60 * 1000L;
+ if (tmpDataTime2 != null) {
+ recEndTime = tmpDataTime2.getTime();
+ if (recEndTime < recStartTime) {
+ WebParameterUtils.buildFailResult(sBuffer,
+ String.format("Parameter %s value must >= %s",
+ WebFieldDef.ENDTIME.name,
WebFieldDef.RECORDTIME.name));
+ return;
+ }
+ }
+ // check storage status
+ if (ServiceStatusHolder.isReadServiceStop()) {
+ WebParameterUtils.buildFailResult(sBuffer,
+ "Read StoreService temporary unavailable!");
+ return;
+ }
+ // get offset history storage
+ MessageStore msgStore;
+ MessageStoreManager storeManager = broker.getStoreManager();
+ try {
+ msgStore = storeManager.getOrCreateMessageStore(
+ TServerConstants.OFFSET_HISTORY_NAME, 0);
+ } catch (Throwable ex) {
+ WebParameterUtils.buildFailResult(sBuffer,
+ String.format("Get offset history store fail, reason=%s",
ex.getMessage()));
+ return;
+ }
+ // get the history offset in the time range
+ // read history data
+ int totalCnt = 0;
+ // locate start offset
+ int maxRetryCnt = 50;
+ long requestOffset = msgStore.getStartOffsetByTimeStamp(recStartTime);
+ if (!getStoredGroupHisOffsets(groupName, msgStore,
+ requestOffset, maxRetryCnt, recStartTime, recEndTime, result))
{
+ WebParameterUtils.buildFailResult(sBuffer, result.getErrMsg());
+ return;
+ }
+ List<Tuple3<String, Integer, Long>> resetOffsets =
+ (List<Tuple3<String, Integer, Long>>) result.getRetData();
+ if (resetOffsets.isEmpty()) {
+ WebParameterUtils.buildFailResult(sBuffer, "Not found history
offset value!");
+ return;
+ }
+ Set<String> groupNameSet = new HashSet<>();
+ groupNameSet.add(groupName);
+ Set<String> topicSet = new HashSet<>();
+ // before
+ Map<String, Map<String, Map<Integer, GroupOffsetInfo>>>
befGroupOffsetMap =
+ getGroupOffsetInfo(WebFieldDef.COMPSGROUPNAME, groupNameSet,
topicSet);
+ Map<String, Map<Integer, GroupOffsetInfo>> befTopicPartMap =
+ befGroupOffsetMap.get(groupName);
+ // change
+ broker.getOffsetManager().modifyGroupOffset(groupNameSet,
resetOffsets, modifyUser);
+ // after
+ Map<String, Map<String, Map<Integer, GroupOffsetInfo>>>
aftGroupOffsetMap =
+ getGroupOffsetInfo(WebFieldDef.COMPSGROUPNAME, groupNameSet,
topicSet);
+ Map<String, Map<Integer, GroupOffsetInfo>> aftTopicPartMap =
+ aftGroupOffsetMap.get(groupName);
+ // build result
+ WebParameterUtils.buildSuccessWithDataRetBegin(sBuffer);
+ int topicCnt = 0;
+
sBuffer.append("{\"groupName\":\"").append(groupName).append("\",\"before\":[");
+ for (Map.Entry<String, Map<Integer, GroupOffsetInfo>> entry1 :
befTopicPartMap.entrySet()) {
+ if (topicCnt++ > 0) {
+ sBuffer.append(",");
+ }
+ Map<Integer, GroupOffsetInfo> partOffMap = entry1.getValue();
+ sBuffer.append("{\"topicName\":\"").append(entry1.getKey())
+ .append("\",\"offsets\":[");
+ int partCnt = 0;
+ for (Map.Entry<Integer, GroupOffsetInfo> entry2 :
partOffMap.entrySet()) {
+ if (partCnt++ > 0) {
+ sBuffer.append(",");
+ }
+ GroupOffsetInfo offsetInfo = entry2.getValue();
+ offsetInfo.buildOffsetInfo(sBuffer);
+ }
+ sBuffer.append("],\"partCount\":").append(partCnt).append("}");
+ }
+ sBuffer.append("],\"after\":[");
+ topicCnt = 0;
+ for (Map.Entry<String, Map<Integer, GroupOffsetInfo>> entry1 :
aftTopicPartMap.entrySet()) {
+ if (topicCnt++ > 0) {
+ sBuffer.append(",");
+ }
+ Map<Integer, GroupOffsetInfo> partOffMap = entry1.getValue();
+ sBuffer.append("{\"topicName\":\"").append(entry1.getKey())
+ .append("\",\"offsets\":[");
+ int partCnt = 0;
+ for (Map.Entry<Integer, GroupOffsetInfo> entry2 :
partOffMap.entrySet()) {
+ if (partCnt++ > 0) {
+ sBuffer.append(",");
+ }
+ GroupOffsetInfo offsetInfo = entry2.getValue();
+ offsetInfo.buildOffsetInfo(sBuffer);
+ }
+ sBuffer.append("],\"partCount\":").append(partCnt).append("}");
+ }
+ sBuffer.append("]}");
+ WebParameterUtils.buildSuccessWithDataRetEnd(sBuffer, totalCnt);
+ }
+
+ /**
+ * Query group's offset records stored in broker.
+ *
+ * @param groupName group name
+ * @param msgStore history offset store
+ * @param requestOffset request offset
+ * @param maxRetryCnt max query turns
+ * @param recStartTime record start timestamp
+ * @param recEndTime record end timestamp
+ * @param result query result
+ * @return whether success
+ */
+ private boolean getStoredGroupHisOffsets(String groupName, MessageStore
msgStore,
+ long requestOffset, int maxRetryCnt,
+ long recStartTime, long recEndTime,
+ ProcessResult result) {
+ int msgTypeCode;
+ int partitionId;
+ Throwable qryThrow;
+ GetMessageResult getMessageResult;
+ // locate partitionId and filter-item
+ msgTypeCode = groupName.hashCode();
+ partitionId = Math.abs(msgTypeCode) %
TServerConstants.OFFSET_HISTORY_NUMPARTS;
+ Set<String> filterCodes = new HashSet<>();
+ filterCodes.add(groupName);
+ // build consumer node information
+ ConsumerNodeInfo consumerNodeInfo = new
ConsumerNodeInfo(broker.getStoreManager(),
+ groupName, "offsetConsumer", filterCodes, "",
System.currentTimeMillis(), "", "");
+ // query records from storage
+ int qryRetryCount = 0;
+ long itemInitOffset = requestOffset;
+ int maxTransferSize = broker.getStoreManager().getMaxMsgTransferSize();
+ do {
+ qryThrow = null;
+ try {
+ getMessageResult = msgStore.getMessages(303, itemInitOffset,
+ partitionId, consumerNodeInfo,
TServerConstants.OFFSET_HISTORY_NAME,
+ maxTransferSize, recStartTime);
+ } catch (Throwable e2) {
+ qryThrow = e2;
+ continue;
+ }
+ // check query result
+ if (getMessageResult.transferedMessageList == null
+ || getMessageResult.transferedMessageList.isEmpty()) {
+ itemInitOffset += getMessageResult.lastReadOffset;
+ continue;
+ }
+ // build record to return result
+ List<Message> messageList = DataConverterUtil.convertMessage(
+ TServerConstants.OFFSET_HISTORY_NAME,
getMessageResult.transferedMessageList);
+ for (Message message : messageList) {
+ if (message == null) {
+ continue;
+ }
+ long recAppTime = DateTimeConvertUtils.yyyyMMddHHmm2ms(
+ message.getAttrValue(TokenConstants.TOKEN_MSG_TIME));
+ if (recAppTime > recEndTime) {
+ result.setFailResult(String.format(
+ "Over required endTime range, current time is %s",
+
message.getAttrValue(TokenConstants.TOKEN_MSG_TIME)));
+ return result.isSuccess();
+ }
+ if (!groupName.equals(message.getAttrValue(
+ TServerConstants.TOKEN_OFFSET_GROUP))) {
+ continue;
+ }
+ return OffsetHistoryInfo.parseRecordInfo(
+ StringUtils.newStringUtf8(message.getData()), result);
+ }
+ itemInitOffset += getMessageResult.lastReadOffset;
+ } while (++qryRetryCount < maxRetryCnt);
+ // check query result
+ if (qryThrow == null) {
+ result.setFailResult("Not found record in required search range");
+ } else {
+ result.setFailResult(String.format(
+ "Query record failure, reason is :%s",
qryThrow.getMessage()));
+ }
+ return result.isSuccess();
+ }
+
/**
* Clone consume group offset, clone A group's offset to other group.
*
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/fielddef/WebFieldDef.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/fielddef/WebFieldDef.java
index f40fc0e81..4a8d140de 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/fielddef/WebFieldDef.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/fielddef/WebFieldDef.java
@@ -259,7 +259,13 @@ public enum WebFieldDef {
MAXRETRYCOUNT(93, "maxRetryCnt", "mrc", WebFieldType.INT,
"Max retry query turns", RegexDef.TMP_NUMBER),
STATSTYPE(94, "statsType", "st", WebFieldType.STRING,
- "Statistics type", TServerConstants.META_MAX_STATSTYPE_LENGTH);
+ "Statistics type", TServerConstants.META_MAX_STATSTYPE_LENGTH),
+
+ RESETVALUE(95, "resetValue", "rv", WebFieldType.BOOLEAN,
+ "Reset value, default is false"),
+ ENDTIME(96, "endTime", "et", WebFieldType.STRING,
+ "The end record time of the historical offset of the consume
group",
+ DateTimeConvertUtils.LENGTH_YYYYMMDDHHMMSS);
public final int id;
public final String name;
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerAbnHolder.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerAbnHolder.java
index 3a31afae0..2f9f0abfd 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerAbnHolder.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerAbnHolder.java
@@ -69,7 +69,9 @@ public class BrokerAbnHolder {
int reportReadStatus,
int reportWriteStatus) {
StringBuilder sBuffer = new StringBuilder(512);
- if (reportReadStatus == 0 && reportWriteStatus == 0) {
+ ManageStatus reqMngStatus =
+ getManageStatus(reportWriteStatus, reportReadStatus);
+ if (reqMngStatus == ManageStatus.STATUS_MANAGE_ONLINE) {
BrokerAbnInfo brokerAbnInfo = brokerAbnormalMap.get(brokerId);
if (brokerAbnInfo != null) {
if (brokerForbiddenMap.get(brokerId) == null) {
@@ -89,13 +91,6 @@ public class BrokerAbnHolder {
if (curEntry == null) {
return;
}
- ManageStatus reqMngStatus =
- getManageStatus(reportWriteStatus, reportReadStatus);
- if ((curEntry.getManageStatus() == reqMngStatus)
- || ((reqMngStatus == ManageStatus.STATUS_MANAGE_OFFLINE)
- && (curEntry.getManageStatus().getCode() <
ManageStatus.STATUS_MANAGE_ONLINE.getCode()))) {
- return;
- }
BrokerAbnInfo brokerAbnInfo = brokerAbnormalMap.get(brokerId);
if (brokerAbnInfo == null) {
if (brokerAbnormalMap.putIfAbsent(brokerId,
@@ -110,20 +105,30 @@ public class BrokerAbnHolder {
} else {
brokerAbnInfo.updateLastRepStatus(reportReadStatus,
reportWriteStatus);
}
+ ManageStatus curStatus = curEntry.getManageStatus();
+ if (curStatus == reqMngStatus
+ || curStatus.getCode() <
ManageStatus.STATUS_MANAGE_ONLINE.getCode()
+ || curStatus.getCode() >=
ManageStatus.STATUS_MANAGE_OFFLINE.getCode()) {
+ return;
+ }
+ ManageStatus newStatus = reqMngStatus;
+ if (reqMngStatus == ManageStatus.STATUS_MANAGE_ONLINE_NOT_WRITE
+ && curStatus == ManageStatus.STATUS_MANAGE_ONLINE_NOT_READ) {
+ newStatus = ManageStatus.STATUS_MANAGE_OFFLINE;
+ }
BrokerFbdInfo brokerFbdInfo = brokerForbiddenMap.get(brokerId);
if (brokerFbdInfo == null) {
- BrokerFbdInfo tmpBrokerFbdInfo =
- new BrokerFbdInfo(brokerId, curEntry.getManageStatus(),
- reqMngStatus, System.currentTimeMillis());
+ BrokerFbdInfo tmpFbdInfo = new BrokerFbdInfo(brokerId,
+ curStatus, newStatus, System.currentTimeMillis());
if (reportReadStatus > 0 || reportWriteStatus > 0) {
- if (updateCurManageStatus(brokerId, reqMngStatus, sBuffer)) {
- if (brokerForbiddenMap.putIfAbsent(brokerId,
tmpBrokerFbdInfo) == null) {
+ if (updateCurManageStatus(brokerId, newStatus, sBuffer)) {
+ if (brokerForbiddenMap.putIfAbsent(brokerId, tmpFbdInfo)
== null) {
brokerForbiddenCount.incrementAndGet();
MasterSrvStatsHolder.incBrokerForbiddenCnt();
logger.warn(sBuffer
.append("[Broker AutoForbidden] master add
missing forbidden broker, ")
.append(brokerId).append("'s manage status to
")
-
.append(reqMngStatus.getDescription()).toString());
+
.append(newStatus.getDescription()).toString());
sBuffer.delete(0, sBuffer.length());
}
}
@@ -132,8 +137,8 @@ public class BrokerAbnHolder {
brokerForbiddenCount.decrementAndGet();
return;
}
- if (updateCurManageStatus(brokerId, reqMngStatus, sBuffer)) {
- if (brokerForbiddenMap.putIfAbsent(brokerId,
tmpBrokerFbdInfo) != null) {
+ if (updateCurManageStatus(brokerId, newStatus, sBuffer)) {
+ if (brokerForbiddenMap.putIfAbsent(brokerId, tmpFbdInfo)
!= null) {
brokerForbiddenCount.decrementAndGet();
return;
}
@@ -141,15 +146,15 @@ public class BrokerAbnHolder {
logger.warn(sBuffer
.append("[Broker AutoForbidden] master auto
forbidden broker, ")
.append(brokerId).append("'s manage status to ")
- .append(reqMngStatus.getDescription()).toString());
+ .append(newStatus.getDescription()).toString());
sBuffer.delete(0, sBuffer.length());
} else {
brokerForbiddenCount.decrementAndGet();
}
}
} else {
- if (updateCurManageStatus(brokerId, reqMngStatus, sBuffer)) {
- brokerFbdInfo.updateInfo(curEntry.getManageStatus(),
reqMngStatus);
+ if (updateCurManageStatus(brokerId, newStatus, sBuffer)) {
+ brokerFbdInfo.updateInfo(curStatus, newStatus);
}
}
}