This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch TUBEMQ-314
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
The following commit(s) were added to refs/heads/TUBEMQ-314 by this push:
new 5cadb27 [TUBEMQ-334]Optimize BrokerServiceServer and OffsetService
classes logic[addendum] (#312)
5cadb27 is described below
commit 5cadb2735435b58d1b38be417504c935d488031a
Author: gosonzhang <[email protected]>
AuthorDate: Mon Nov 9 18:03:05 2020 +0800
[TUBEMQ-334]Optimize BrokerServiceServer and OffsetService classes
logic[addendum] (#312)
Co-authored-by: gosonzhang <[email protected]>
---
.../tubemq/server/broker/BrokerServiceServer.java | 50 ++++++++--------------
.../server/broker/msgstore/MessageStore.java | 44 +++++++++----------
.../broker/msgstore/MessageStoreManager.java | 9 ++--
.../server/broker/nodeinfo/ConsumerNodeInfo.java | 32 ++++++++++++--
4 files changed, 70 insertions(+), 65 deletions(-)
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/BrokerServiceServer.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/BrokerServiceServer.java
index 4ccfc56..bafd454 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/BrokerServiceServer.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/BrokerServiceServer.java
@@ -315,9 +315,9 @@ public class BrokerServiceServer implements
BrokerReadService, BrokerWriteServic
boolean isEscFlowCtrl = request.hasEscFlowCtrl() &&
request.getEscFlowCtrl();
String partStr = getPartStr(groupName, topicName, partitionId);
String consumerId = null;
- ConsumerNodeInfo consumerNodeInfo = consumerRegisterMap.get(partStr);
- if (consumerNodeInfo != null) {
- consumerId = consumerNodeInfo.getConsumerId();
+ ConsumerNodeInfo nodeInfo = consumerRegisterMap.get(partStr);
+ if (nodeInfo != null) {
+ consumerId = nodeInfo.getConsumerId();
}
if (consumerId == null) {
logger.warn(strBuffer.append("[UnRegistered
Consumer]").append(clientId)
@@ -339,9 +339,8 @@ public class BrokerServiceServer implements
BrokerReadService, BrokerWriteServic
builder.setErrMsg(strBuffer.toString());
return builder.build();
}
- String rmtAddrInfo = consumerNodeInfo.getRmtAddrInfo();
try {
-
heartbeatManager.updConsumerNode(consumerNodeInfo.getHeartbeatNodeId());
+ heartbeatManager.updConsumerNode(nodeInfo.getHeartbeatNodeId());
} catch (HeartbeatException e) {
logger.warn(strBuffer.append("[Invalid Request]").append(clientId)
.append(TokenConstants.SEGMENT_SEP).append(topicName)
@@ -368,11 +367,10 @@ public class BrokerServiceServer implements
BrokerReadService, BrokerWriteServic
dataStore = this.storeManager.getOrCreateMessageStore(topicName,
partitionId);
isGetStore = true;
GetMessageResult msgResult =
- getMessages(dataStore, consumerNodeInfo, groupName,
topicName, partitionId,
- request.getLastPackConsumed(),
request.getManualCommitOffset(),
- clientId, this.tubeConfig.getHostName(),
rmtAddrInfo, isEscFlowCtrl, strBuffer);
+ getMessages(dataStore, nodeInfo,
request.getLastPackConsumed(),
+ request.getManualCommitOffset(), isEscFlowCtrl,
strBuffer);
if (msgResult.isSuccess) {
- consumerNodeInfo.recordConsumeInfo(
+ nodeInfo.recordConsumeInfo(
msgResult.lastRdDataOffset,
msgResult.totalMsgSize);
getCounterGroup.add(msgResult.tmpCounters);
@@ -417,14 +415,8 @@ public class BrokerServiceServer implements
BrokerReadService, BrokerWriteServic
*
* @param msgStore
* @param nodeInfo
- * @param group
- * @param topic
- * @param partitionId
* @param lastConsumed
* @param isManualCommitOffset
- * @param sentAddr
- * @param brokerAddr
- * @param rmtAddrInfo
* @param isEscFlowCtrl
* @param sb
* @return
@@ -432,10 +424,8 @@ public class BrokerServiceServer implements
BrokerReadService, BrokerWriteServic
*/
private GetMessageResult getMessages(final MessageStore msgStore,
final ConsumerNodeInfo nodeInfo,
- final String group, final String
topic,
- final int partitionId, final boolean
lastConsumed,
- final boolean isManualCommitOffset,
final String sentAddr,
- final String brokerAddr, final String
rmtAddrInfo,
+ final boolean lastConsumed,
+ final boolean isManualCommitOffset,
boolean isEscFlowCtrl, final
StringBuilder sb) throws IOException {
long requestOffset =
offsetManager.getOffset(msgStore, nodeInfo,
@@ -458,13 +448,9 @@ public class BrokerServiceServer implements
BrokerReadService, BrokerWriteServic
}
}
try {
- String baseKey = sb.append(topic).append("#").append(brokerAddr)
-
.append("#").append(sentAddr).append("#").append(rmtAddrInfo)
-
.append("#").append(group).append("#").append(partitionId).toString();
- sb.delete(0, sb.length());
GetMessageResult msgQueryResult =
- msgStore.getMessages(reqSwitch, requestOffset,
- partitionId, nodeInfo, baseKey, msgDataSizeLimit);
+ msgStore.getMessages(reqSwitch,
+ requestOffset, nodeInfo, msgDataSizeLimit);
offsetManager.bookOffset(nodeInfo,
msgQueryResult.lastReadOffset, isManualCommitOffset,
msgQueryResult.transferedMessageList.isEmpty(), sb);
@@ -473,8 +459,8 @@ public class BrokerServiceServer implements
BrokerReadService, BrokerWriteServic
} catch (Throwable e1) {
sb.delete(0, sb.length());
logger.warn(sb.append("[Store Manager] get message failure,
requestOffset=")
-
.append(requestOffset).append(",group=").append(group).append(",topic=").append(topic)
- .append(",partitionId=").append(partitionId).toString(),
e1);
+ .append(requestOffset).append(", partstr=")
+ .append(nodeInfo.getPartStr()).toString(), e1);
sb.delete(0, sb.length());
return new GetMessageResult(false,
TErrCodeConstants.INTERNAL_SERVER_ERROR,
requestOffset, 0, sb.append("Get message failure, errMsg=")
@@ -537,7 +523,7 @@ public class BrokerServiceServer implements
BrokerReadService, BrokerWriteServic
}
}
GetMessageResult getMessageResult =
- storeManager.getMessages(dataStore, topicName,
partitionId, msgCount, filterCondSet);
+ storeManager.getMessages(dataStore, partitionId, msgCount,
filterCondSet);
if ((getMessageResult.transferedMessageList == null)
|| (getMessageResult.transferedMessageList.isEmpty())) {
sb.append("{\"result\":false,\"errCode\":401,\"errMsg\":\"")
@@ -765,16 +751,16 @@ public class BrokerServiceServer implements
BrokerReadService, BrokerWriteServic
builder.setErrMsg(authorizeResult.errInfo);
return builder.build();
}
- String partStr = getPartStr(groupName, topicName,
request.getPartitionId());
ConsumerNodeInfo nodeInfo =
- new ConsumerNodeInfo(partStr, isRegister, clientId,
- groupName, topicName, request.getPartitionId(),
filterCondSet, overtls);
+ new ConsumerNodeInfo(tubeConfig, isRegister, clientId,
groupName,
+ topicName, request.getPartitionId(), filterCondSet,
rmtAddress, overtls);
Integer lid = null;
Integer partLock = null;
try {
lid = brokerRowLock.getLock(null,
StringUtils.getBytesUtf8(clientId), true);
try {
- partLock = brokerRowLock.getLock(null,
StringUtils.getBytesUtf8(partStr), true);
+ partLock = brokerRowLock.getLock(null,
+ StringUtils.getBytesUtf8(nodeInfo.getPartStr()), true);
switch (request.getOpType()) {
case RpcConstants.MSG_OPTYPE_REGISTER: {
return inProcessConsumerRegister(nodeInfo, request,
builder, strBuffer);
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java
index a827723..b63169b 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java
@@ -157,18 +157,14 @@ public class MessageStore implements Closeable {
*
* @param reqSwitch
* @param requestOffset
- * @param partitionId
- * @param consumerNodeInfo
- * @param statisKeyBase
+ * @param nodeInfo
* @param msgSizeLimit
* @return
* @throws IOException
*/
public GetMessageResult getMessages(int reqSwitch,
final long requestOffset,
- final int partitionId,
- final ConsumerNodeInfo
consumerNodeInfo,
- final String statisKeyBase,
+ final ConsumerNodeInfo nodeInfo,
int msgSizeLimit) throws IOException {
// #lizard forgives
if (this.closed.get()) {
@@ -183,7 +179,7 @@ public class MessageStore implements Closeable {
requestOffset, "Can't found Message by index in cache");
// determine position to read.
reqSwitch = (reqSwitch <= 0)
- ? 0 : (consumerNodeInfo.isFilterConsume() ? (reqSwitch % 100)
: (reqSwitch / 100));
+ ? 0 : (nodeInfo.isFilterConsume() ? (reqSwitch % 100) :
(reqSwitch / 100));
if (reqSwitch > 1) {
// in read memory situation, read main memory or backup memory by
consumer's config.
long maxIndexOffset = TBaseConstants.META_VALUE_UNDEFINED;
@@ -198,20 +194,20 @@ public class MessageStore implements Closeable {
if (reqSwitch > 2) {
memMsgRlt =
// read from main memory.
-
msgMemStore.getMessages(consumerNodeInfo.getLastDataRdOffset(),
+
msgMemStore.getMessages(nodeInfo.getLastDataRdOffset(),
requestOffset,
msgStoreMgr.getMaxMsgTransferSize(),
- maxIndexReadLength,
partitionId, false,
-
consumerNodeInfo.isFilterConsume(),
-
consumerNodeInfo.getFilterCondCodeSet());
+ maxIndexReadLength,
nodeInfo.getPartitionId(), false,
+ nodeInfo.isFilterConsume(),
+
nodeInfo.getFilterCondCodeSet());
}
} else {
// read from backup memory.
memMsgRlt =
-
msgMemStoreBeingFlush.getMessages(consumerNodeInfo.getLastDataRdOffset(),
+
msgMemStoreBeingFlush.getMessages(nodeInfo.getLastDataRdOffset(),
requestOffset,
msgStoreMgr.getMaxMsgTransferSize(),
- maxIndexReadLength, partitionId,
true,
- consumerNodeInfo.isFilterConsume(),
-
consumerNodeInfo.getFilterCondCodeSet());
+ maxIndexReadLength,
nodeInfo.getPartitionId(), true,
+ nodeInfo.isFilterConsume(),
+ nodeInfo.getFilterCondCodeSet());
}
}
} finally {
@@ -231,7 +227,7 @@ public class MessageStore implements Closeable {
ClientBroker.TransferedMessage transferedMessage =
DataStoreUtils.getTransferMsg(dataBuffer,
dataBuffer.array().length,
- countMap, statisKeyBase,
strBuffer);
+ countMap, nodeInfo.getStatisKey(),
strBuffer);
if (transferedMessage != null) {
transferedMessageList.add(transferedMessage);
}
@@ -256,7 +252,7 @@ public class MessageStore implements Closeable {
return new GetMessageResult(false, TErrCodeConstants.NOT_FOUND,
reqNewOffset, 0, "current offset is exceed max file
offset");
}
- maxIndexReadLength = consumerNodeInfo.isFilterConsume()
+ maxIndexReadLength = nodeInfo.isFilterConsume()
? fileMaxFilterIndexReadSize.get() :
fileMaxIndexReadSize.get();
final ByteBuffer indexBuffer = ByteBuffer.allocate(maxIndexReadLength);
Segment indexRecordView =
@@ -273,23 +269,23 @@ public class MessageStore implements Closeable {
indexRecordView.read(indexBuffer, reqNewOffset);
indexBuffer.flip();
indexRecordView.relViewRef();
- if ((msgFileStore.getDataHighMaxOffset() -
consumerNodeInfo.getLastDataRdOffset()
+ if ((msgFileStore.getDataHighMaxOffset() -
nodeInfo.getLastDataRdOffset()
>= this.tubeConfig.getDoubleDefaultDeduceReadSize())
&& msgSizeLimit > this.maxAllowRdSize) {
msgSizeLimit = this.maxAllowRdSize;
}
GetMessageResult retResult =
- msgFileStore.getMessages(partitionId,
- consumerNodeInfo.getLastDataRdOffset(), reqNewOffset,
- indexBuffer, consumerNodeInfo.isFilterConsume(),
- consumerNodeInfo.getFilterCondCodeSet(),
- statisKeyBase, msgSizeLimit);
+ msgFileStore.getMessages(nodeInfo.getPartitionId(),
+ nodeInfo.getLastDataRdOffset(), reqNewOffset,
+ indexBuffer, nodeInfo.isFilterConsume(),
+ nodeInfo.getFilterCondCodeSet(),
+ nodeInfo.getStatisKey(), msgSizeLimit);
if (reqSwitch <= 1) {
retResult.setMaxOffset(getFileIndexMaxOffset());
} else {
retResult.setMaxOffset(getIndexMaxOffset());
}
- if (consumerNodeInfo.isFilterConsume()
+ if (nodeInfo.isFilterConsume()
&& retResult.isSuccess
&& retResult.getLastReadOffset() > 0) {
if ((getFileIndexMaxOffset()
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStoreManager.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStoreManager.java
index 9feb5b2..c0b937f 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStoreManager.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStoreManager.java
@@ -341,7 +341,6 @@ public class MessageStoreManager implements StoreService {
* Get message from store.
*
* @param msgStore
- * @param topic
* @param partitionId
* @param msgCount
* @param filterCondSet
@@ -349,7 +348,6 @@ public class MessageStoreManager implements StoreService {
* @throws IOException
*/
public GetMessageResult getMessages(final MessageStore msgStore,
- final String topic,
final int partitionId,
final int msgCount,
final Set<String> filterCondSet)
throws IOException {
@@ -357,15 +355,16 @@ public class MessageStoreManager implements StoreService {
try {
final long maxOffset = msgStore.getIndexMaxOffset();
ConsumerNodeInfo consumerNodeInfo =
- new ConsumerNodeInfo(metadataManager, maxMsgTransferSize,
filterCondSet);
+ new ConsumerNodeInfo(metadataManager,
+ partitionId, maxMsgTransferSize, filterCondSet);
int maxIndexReadSize = (msgCount + 1)
* DataStoreUtils.STORE_INDEX_HEAD_LEN *
msgStore.getPartitionNum();
if (filterCondSet != null && !filterCondSet.isEmpty()) {
maxIndexReadSize *= 5;
}
requestOffset = maxOffset - maxIndexReadSize < 0 ? 0L : maxOffset
- maxIndexReadSize;
- return msgStore.getMessages(303, requestOffset, partitionId,
- consumerNodeInfo, topic, this.maxMsgTransferSize);
+ return msgStore.getMessages(303,
+ requestOffset, consumerNodeInfo, this.maxMsgTransferSize);
} catch (Throwable e1) {
return new GetMessageResult(false,
TErrCodeConstants.INTERNAL_SERVER_ERROR,
requestOffset, 0, "Get message failure, errMsg=" +
e1.getMessage());
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/nodeinfo/ConsumerNodeInfo.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/nodeinfo/ConsumerNodeInfo.java
index 3c043f8..8c9004f 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/nodeinfo/ConsumerNodeInfo.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/nodeinfo/ConsumerNodeInfo.java
@@ -22,6 +22,7 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.tubemq.corebase.TBaseConstants;
import org.apache.tubemq.corebase.TokenConstants;
+import org.apache.tubemq.server.broker.BrokerConfig;
import org.apache.tubemq.server.broker.metadata.MetadataManager;
/***
@@ -36,7 +37,9 @@ public class ConsumerNodeInfo {
private String topicName = "";
private String offsetCacheKey;
private String heartbeatNodeId;
+ private String statisKey;
private String rmtAddrInfo;
+ private BrokerConfig tubeConfig;
private boolean overTls = false;
private int partitionId
= TBaseConstants.META_VALUE_UNDEFINED;
@@ -58,7 +61,7 @@ public class ConsumerNodeInfo {
public ConsumerNodeInfo(final MetadataManager metaManager,
- int maxXfeSize, Set<String> filterCodes) {
+ int partitionId, int maxXfeSize, Set<String>
filterCodes) {
if (filterCodes != null) {
for (String filterItem : filterCodes) {
this.filterCondStrs.add(filterItem);
@@ -68,19 +71,22 @@ public class ConsumerNodeInfo {
this.isFilterConsume = true;
}
}
+ this.partitionId = partitionId;
this.qryPriorityId.set(TBaseConstants.META_VALUE_UNDEFINED);
this.flowLimitInfo =
new FlowLimitInfo(metaManager, false, maxXfeSize);
}
- public ConsumerNodeInfo(String partStr, boolean isRegister,
+ public ConsumerNodeInfo(BrokerConfig tubeConfig, boolean isRegister,
String consumerId, String groupName, String
topicName,
- int partitionId, Set<String> filterCodes, boolean
overTls) {
- this.partStr = partStr;
+ int partitionId, Set<String> filterCodes, String
rmtAddress,
+ boolean overTls) {
+ this.tubeConfig = tubeConfig;
this.consumerId = consumerId;
this.groupName = groupName;
this.topicName = topicName;
this.partitionId = partitionId;
+ this.rmtAddrInfo = rmtAddress;
this.overTls = overTls;
buildSearchKeyInfo();
if (isRegister && filterCodes != null) {
@@ -105,6 +111,10 @@ public class ConsumerNodeInfo {
return assignInfo.getLeftOffset();
}
+ public long getRightOffset() {
+ return assignInfo.getRightOffset();
+ }
+
public String getPartStr() {
return partStr;
}
@@ -133,6 +143,10 @@ public class ConsumerNodeInfo {
return partitionId;
}
+ public String getStatisKey() {
+ return statisKey;
+ }
+
public boolean isOverTls() {
return overTls;
}
@@ -195,12 +209,22 @@ public class ConsumerNodeInfo {
private void buildSearchKeyInfo() {
StringBuilder sBuilder = new StringBuilder(512);
+ this.partStr =
sBuilder.append(groupName).append(TokenConstants.ATTR_SEP)
+
.append(topicName).append(TokenConstants.ATTR_SEP).append(partitionId).toString();
+ sBuilder.delete(0, sBuilder.length());
this.offsetCacheKey = sBuilder.append(topicName)
.append(TokenConstants.HYPHEN).append(partitionId).toString();
sBuilder.delete(0, sBuilder.length());
this.heartbeatNodeId = sBuilder.append(consumerId)
.append(TokenConstants.SEGMENT_SEP).append(partStr).toString();
sBuilder.delete(0, sBuilder.length());
+ this.statisKey =
sBuilder.append(topicName).append(TokenConstants.SEGMENT_SEP)
+
.append(tubeConfig.getHostName()).append(TokenConstants.SEGMENT_SEP)
+ .append(consumerId).append(TokenConstants.SEGMENT_SEP)
+ .append(rmtAddrInfo).append(TokenConstants.SEGMENT_SEP)
+ .append(groupName).append(TokenConstants.SEGMENT_SEP)
+ .append(partitionId).toString();
+ sBuilder.delete(0, sBuilder.length());
if (consumerId.lastIndexOf("_") != -1) {
String targetStr =
consumerId.substring(consumerId.lastIndexOf("_") + 1);
String[] strInfos = targetStr.split("-");