This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch TUBEMQ-314
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git


The following commit(s) were added to refs/heads/TUBEMQ-314 by this push:
     new 5cadb27  [TUBEMQ-334]Optimize BrokerServiceServer and OffsetService 
classes logic[addendum] (#312)
5cadb27 is described below

commit 5cadb2735435b58d1b38be417504c935d488031a
Author: gosonzhang <[email protected]>
AuthorDate: Mon Nov 9 18:03:05 2020 +0800

    [TUBEMQ-334]Optimize BrokerServiceServer and OffsetService classes 
logic[addendum] (#312)
    
    Co-authored-by: gosonzhang <[email protected]>
---
 .../tubemq/server/broker/BrokerServiceServer.java  | 50 ++++++++--------------
 .../server/broker/msgstore/MessageStore.java       | 44 +++++++++----------
 .../broker/msgstore/MessageStoreManager.java       |  9 ++--
 .../server/broker/nodeinfo/ConsumerNodeInfo.java   | 32 ++++++++++++--
 4 files changed, 70 insertions(+), 65 deletions(-)

diff --git 
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/BrokerServiceServer.java
 
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/BrokerServiceServer.java
index 4ccfc56..bafd454 100644
--- 
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/BrokerServiceServer.java
+++ 
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/BrokerServiceServer.java
@@ -315,9 +315,9 @@ public class BrokerServiceServer implements 
BrokerReadService, BrokerWriteServic
         boolean isEscFlowCtrl = request.hasEscFlowCtrl() && 
request.getEscFlowCtrl();
         String partStr = getPartStr(groupName, topicName, partitionId);
         String consumerId = null;
-        ConsumerNodeInfo consumerNodeInfo = consumerRegisterMap.get(partStr);
-        if (consumerNodeInfo != null) {
-            consumerId = consumerNodeInfo.getConsumerId();
+        ConsumerNodeInfo nodeInfo = consumerRegisterMap.get(partStr);
+        if (nodeInfo != null) {
+            consumerId = nodeInfo.getConsumerId();
         }
         if (consumerId == null) {
             logger.warn(strBuffer.append("[UnRegistered 
Consumer]").append(clientId)
@@ -339,9 +339,8 @@ public class BrokerServiceServer implements 
BrokerReadService, BrokerWriteServic
             builder.setErrMsg(strBuffer.toString());
             return builder.build();
         }
-        String rmtAddrInfo = consumerNodeInfo.getRmtAddrInfo();
         try {
-            
heartbeatManager.updConsumerNode(consumerNodeInfo.getHeartbeatNodeId());
+            heartbeatManager.updConsumerNode(nodeInfo.getHeartbeatNodeId());
         } catch (HeartbeatException e) {
             logger.warn(strBuffer.append("[Invalid Request]").append(clientId)
                     .append(TokenConstants.SEGMENT_SEP).append(topicName)
@@ -368,11 +367,10 @@ public class BrokerServiceServer implements 
BrokerReadService, BrokerWriteServic
             dataStore = this.storeManager.getOrCreateMessageStore(topicName, 
partitionId);
             isGetStore = true;
             GetMessageResult msgResult =
-                    getMessages(dataStore, consumerNodeInfo, groupName, 
topicName, partitionId,
-                            request.getLastPackConsumed(), 
request.getManualCommitOffset(),
-                            clientId, this.tubeConfig.getHostName(), 
rmtAddrInfo, isEscFlowCtrl, strBuffer);
+                    getMessages(dataStore, nodeInfo, 
request.getLastPackConsumed(),
+                            request.getManualCommitOffset(), isEscFlowCtrl, 
strBuffer);
             if (msgResult.isSuccess) {
-                consumerNodeInfo.recordConsumeInfo(
+                nodeInfo.recordConsumeInfo(
                         msgResult.lastRdDataOffset,
                         msgResult.totalMsgSize);
                 getCounterGroup.add(msgResult.tmpCounters);
@@ -417,14 +415,8 @@ public class BrokerServiceServer implements 
BrokerReadService, BrokerWriteServic
      *
      * @param msgStore
      * @param nodeInfo
-     * @param group
-     * @param topic
-     * @param partitionId
      * @param lastConsumed
      * @param isManualCommitOffset
-     * @param sentAddr
-     * @param brokerAddr
-     * @param rmtAddrInfo
      * @param isEscFlowCtrl
      * @param sb
      * @return
@@ -432,10 +424,8 @@ public class BrokerServiceServer implements 
BrokerReadService, BrokerWriteServic
      */
     private GetMessageResult getMessages(final MessageStore msgStore,
                                          final ConsumerNodeInfo nodeInfo,
-                                         final String group, final String 
topic,
-                                         final int partitionId, final boolean 
lastConsumed,
-                                         final boolean isManualCommitOffset, 
final String sentAddr,
-                                         final String brokerAddr, final String 
rmtAddrInfo,
+                                         final boolean lastConsumed,
+                                         final boolean isManualCommitOffset,
                                          boolean isEscFlowCtrl, final 
StringBuilder sb) throws IOException {
         long requestOffset =
                 offsetManager.getOffset(msgStore, nodeInfo,
@@ -458,13 +448,9 @@ public class BrokerServiceServer implements 
BrokerReadService, BrokerWriteServic
             }
         }
         try {
-            String baseKey = sb.append(topic).append("#").append(brokerAddr)
-                    
.append("#").append(sentAddr).append("#").append(rmtAddrInfo)
-                    
.append("#").append(group).append("#").append(partitionId).toString();
-            sb.delete(0, sb.length());
             GetMessageResult msgQueryResult =
-                    msgStore.getMessages(reqSwitch, requestOffset,
-                            partitionId, nodeInfo, baseKey, msgDataSizeLimit);
+                    msgStore.getMessages(reqSwitch,
+                            requestOffset, nodeInfo, msgDataSizeLimit);
             offsetManager.bookOffset(nodeInfo,
                     msgQueryResult.lastReadOffset, isManualCommitOffset,
                     msgQueryResult.transferedMessageList.isEmpty(), sb);
@@ -473,8 +459,8 @@ public class BrokerServiceServer implements 
BrokerReadService, BrokerWriteServic
         } catch (Throwable e1) {
             sb.delete(0, sb.length());
             logger.warn(sb.append("[Store Manager] get message failure, 
requestOffset=")
-                    
.append(requestOffset).append(",group=").append(group).append(",topic=").append(topic)
-                    .append(",partitionId=").append(partitionId).toString(), 
e1);
+                    .append(requestOffset).append(", partstr=")
+                    .append(nodeInfo.getPartStr()).toString(), e1);
             sb.delete(0, sb.length());
             return new GetMessageResult(false, 
TErrCodeConstants.INTERNAL_SERVER_ERROR,
                     requestOffset, 0, sb.append("Get message failure, errMsg=")
@@ -537,7 +523,7 @@ public class BrokerServiceServer implements 
BrokerReadService, BrokerWriteServic
                 }
             }
             GetMessageResult getMessageResult =
-                    storeManager.getMessages(dataStore, topicName, 
partitionId, msgCount, filterCondSet);
+                    storeManager.getMessages(dataStore, partitionId, msgCount, 
filterCondSet);
             if ((getMessageResult.transferedMessageList == null)
                     || (getMessageResult.transferedMessageList.isEmpty())) {
                 sb.append("{\"result\":false,\"errCode\":401,\"errMsg\":\"")
@@ -765,16 +751,16 @@ public class BrokerServiceServer implements 
BrokerReadService, BrokerWriteServic
             builder.setErrMsg(authorizeResult.errInfo);
             return builder.build();
         }
-        String partStr = getPartStr(groupName, topicName, 
request.getPartitionId());
         ConsumerNodeInfo nodeInfo =
-                new ConsumerNodeInfo(partStr, isRegister, clientId,
-                        groupName, topicName, request.getPartitionId(), 
filterCondSet, overtls);
+                new ConsumerNodeInfo(tubeConfig, isRegister, clientId, 
groupName,
+                        topicName, request.getPartitionId(), filterCondSet, 
rmtAddress, overtls);
         Integer lid = null;
         Integer partLock = null;
         try {
             lid = brokerRowLock.getLock(null, 
StringUtils.getBytesUtf8(clientId), true);
             try {
-                partLock = brokerRowLock.getLock(null, 
StringUtils.getBytesUtf8(partStr), true);
+                partLock = brokerRowLock.getLock(null,
+                        StringUtils.getBytesUtf8(nodeInfo.getPartStr()), true);
                 switch (request.getOpType()) {
                     case RpcConstants.MSG_OPTYPE_REGISTER: {
                         return inProcessConsumerRegister(nodeInfo, request, 
builder, strBuffer);
diff --git 
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java
 
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java
index a827723..b63169b 100644
--- 
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java
+++ 
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java
@@ -157,18 +157,14 @@ public class MessageStore implements Closeable {
      *
      * @param reqSwitch
      * @param requestOffset
-     * @param partitionId
-     * @param consumerNodeInfo
-     * @param statisKeyBase
+     * @param nodeInfo
      * @param msgSizeLimit
      * @return
      * @throws IOException
      */
     public GetMessageResult getMessages(int reqSwitch,
                                         final long requestOffset,
-                                        final int partitionId,
-                                        final ConsumerNodeInfo 
consumerNodeInfo,
-                                        final String statisKeyBase,
+                                        final ConsumerNodeInfo nodeInfo,
                                         int msgSizeLimit) throws IOException {
         // #lizard forgives
         if (this.closed.get()) {
@@ -183,7 +179,7 @@ public class MessageStore implements Closeable {
                 requestOffset, "Can't found Message by index in cache");
         // determine position to read.
         reqSwitch = (reqSwitch <= 0)
-                ? 0 : (consumerNodeInfo.isFilterConsume() ? (reqSwitch % 100) 
: (reqSwitch / 100));
+                ? 0 : (nodeInfo.isFilterConsume() ? (reqSwitch % 100) : 
(reqSwitch / 100));
         if (reqSwitch > 1) {
             // in read memory situation, read main memory or backup memory by 
consumer's config.
             long maxIndexOffset = TBaseConstants.META_VALUE_UNDEFINED;
@@ -198,20 +194,20 @@ public class MessageStore implements Closeable {
                             if (reqSwitch > 2) {
                                 memMsgRlt =
                                         // read from main memory.
-                                        
msgMemStore.getMessages(consumerNodeInfo.getLastDataRdOffset(),
+                                        
msgMemStore.getMessages(nodeInfo.getLastDataRdOffset(),
                                                 requestOffset, 
msgStoreMgr.getMaxMsgTransferSize(),
-                                                maxIndexReadLength, 
partitionId, false,
-                                                
consumerNodeInfo.isFilterConsume(),
-                                                
consumerNodeInfo.getFilterCondCodeSet());
+                                                maxIndexReadLength, 
nodeInfo.getPartitionId(), false,
+                                                nodeInfo.isFilterConsume(),
+                                                
nodeInfo.getFilterCondCodeSet());
                             }
                         } else {
                             // read from backup memory.
                             memMsgRlt =
-                                    
msgMemStoreBeingFlush.getMessages(consumerNodeInfo.getLastDataRdOffset(),
+                                    
msgMemStoreBeingFlush.getMessages(nodeInfo.getLastDataRdOffset(),
                                             requestOffset, 
msgStoreMgr.getMaxMsgTransferSize(),
-                                            maxIndexReadLength, partitionId, 
true,
-                                            consumerNodeInfo.isFilterConsume(),
-                                            
consumerNodeInfo.getFilterCondCodeSet());
+                                            maxIndexReadLength, 
nodeInfo.getPartitionId(), true,
+                                            nodeInfo.isFilterConsume(),
+                                            nodeInfo.getFilterCondCodeSet());
                         }
                     }
                 } finally {
@@ -231,7 +227,7 @@ public class MessageStore implements Closeable {
                             ClientBroker.TransferedMessage transferedMessage =
                                     DataStoreUtils.getTransferMsg(dataBuffer,
                                             dataBuffer.array().length,
-                                            countMap, statisKeyBase, 
strBuffer);
+                                            countMap, nodeInfo.getStatisKey(), 
strBuffer);
                             if (transferedMessage != null) {
                                 transferedMessageList.add(transferedMessage);
                             }
@@ -256,7 +252,7 @@ public class MessageStore implements Closeable {
             return new GetMessageResult(false, TErrCodeConstants.NOT_FOUND,
                     reqNewOffset, 0, "current offset is exceed max file 
offset");
         }
-        maxIndexReadLength = consumerNodeInfo.isFilterConsume()
+        maxIndexReadLength = nodeInfo.isFilterConsume()
                 ? fileMaxFilterIndexReadSize.get() : 
fileMaxIndexReadSize.get();
         final ByteBuffer indexBuffer = ByteBuffer.allocate(maxIndexReadLength);
         Segment indexRecordView =
@@ -273,23 +269,23 @@ public class MessageStore implements Closeable {
         indexRecordView.read(indexBuffer, reqNewOffset);
         indexBuffer.flip();
         indexRecordView.relViewRef();
-        if ((msgFileStore.getDataHighMaxOffset() - 
consumerNodeInfo.getLastDataRdOffset()
+        if ((msgFileStore.getDataHighMaxOffset() - 
nodeInfo.getLastDataRdOffset()
             >= this.tubeConfig.getDoubleDefaultDeduceReadSize())
             && msgSizeLimit > this.maxAllowRdSize) {
             msgSizeLimit = this.maxAllowRdSize;
         }
         GetMessageResult retResult =
-            msgFileStore.getMessages(partitionId,
-                consumerNodeInfo.getLastDataRdOffset(), reqNewOffset,
-                indexBuffer, consumerNodeInfo.isFilterConsume(),
-                consumerNodeInfo.getFilterCondCodeSet(),
-                statisKeyBase, msgSizeLimit);
+            msgFileStore.getMessages(nodeInfo.getPartitionId(),
+                    nodeInfo.getLastDataRdOffset(), reqNewOffset,
+                    indexBuffer, nodeInfo.isFilterConsume(),
+                    nodeInfo.getFilterCondCodeSet(),
+                    nodeInfo.getStatisKey(), msgSizeLimit);
         if (reqSwitch <= 1) {
             retResult.setMaxOffset(getFileIndexMaxOffset());
         } else {
             retResult.setMaxOffset(getIndexMaxOffset());
         }
-        if (consumerNodeInfo.isFilterConsume()
+        if (nodeInfo.isFilterConsume()
             && retResult.isSuccess
             && retResult.getLastReadOffset() > 0) {
             if ((getFileIndexMaxOffset()
diff --git 
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStoreManager.java
 
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStoreManager.java
index 9feb5b2..c0b937f 100644
--- 
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStoreManager.java
+++ 
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStoreManager.java
@@ -341,7 +341,6 @@ public class MessageStoreManager implements StoreService {
      * Get message from store.
      *
      * @param msgStore
-     * @param topic
      * @param partitionId
      * @param msgCount
      * @param filterCondSet
@@ -349,7 +348,6 @@ public class MessageStoreManager implements StoreService {
      * @throws IOException
      */
     public GetMessageResult getMessages(final MessageStore msgStore,
-                                        final String topic,
                                         final int partitionId,
                                         final int msgCount,
                                         final Set<String> filterCondSet) 
throws IOException {
@@ -357,15 +355,16 @@ public class MessageStoreManager implements StoreService {
         try {
             final long maxOffset = msgStore.getIndexMaxOffset();
             ConsumerNodeInfo consumerNodeInfo =
-                    new ConsumerNodeInfo(metadataManager, maxMsgTransferSize, 
filterCondSet);
+                    new ConsumerNodeInfo(metadataManager,
+                            partitionId, maxMsgTransferSize, filterCondSet);
             int maxIndexReadSize = (msgCount + 1)
                     * DataStoreUtils.STORE_INDEX_HEAD_LEN * 
msgStore.getPartitionNum();
             if (filterCondSet != null && !filterCondSet.isEmpty()) {
                 maxIndexReadSize *= 5;
             }
             requestOffset = maxOffset - maxIndexReadSize < 0 ? 0L : maxOffset 
- maxIndexReadSize;
-            return msgStore.getMessages(303, requestOffset, partitionId,
-                    consumerNodeInfo, topic, this.maxMsgTransferSize);
+            return msgStore.getMessages(303,
+                    requestOffset, consumerNodeInfo, this.maxMsgTransferSize);
         } catch (Throwable e1) {
             return new GetMessageResult(false, 
TErrCodeConstants.INTERNAL_SERVER_ERROR,
                     requestOffset, 0, "Get message failure, errMsg=" + 
e1.getMessage());
diff --git 
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/nodeinfo/ConsumerNodeInfo.java
 
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/nodeinfo/ConsumerNodeInfo.java
index 3c043f8..8c9004f 100644
--- 
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/nodeinfo/ConsumerNodeInfo.java
+++ 
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/nodeinfo/ConsumerNodeInfo.java
@@ -22,6 +22,7 @@ import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.tubemq.corebase.TBaseConstants;
 import org.apache.tubemq.corebase.TokenConstants;
+import org.apache.tubemq.server.broker.BrokerConfig;
 import org.apache.tubemq.server.broker.metadata.MetadataManager;
 
 /***
@@ -36,7 +37,9 @@ public class ConsumerNodeInfo {
     private String topicName = "";
     private String offsetCacheKey;
     private String heartbeatNodeId;
+    private String statisKey;
     private String rmtAddrInfo;
+    private BrokerConfig tubeConfig;
     private boolean overTls = false;
     private int partitionId
             = TBaseConstants.META_VALUE_UNDEFINED;
@@ -58,7 +61,7 @@ public class ConsumerNodeInfo {
 
 
     public ConsumerNodeInfo(final MetadataManager metaManager,
-                            int maxXfeSize, Set<String> filterCodes) {
+                            int partitionId, int maxXfeSize, Set<String> 
filterCodes) {
         if (filterCodes != null) {
             for (String filterItem : filterCodes) {
                 this.filterCondStrs.add(filterItem);
@@ -68,19 +71,22 @@ public class ConsumerNodeInfo {
                 this.isFilterConsume = true;
             }
         }
+        this.partitionId = partitionId;
         this.qryPriorityId.set(TBaseConstants.META_VALUE_UNDEFINED);
         this.flowLimitInfo =
                 new FlowLimitInfo(metaManager, false, maxXfeSize);
     }
 
-    public ConsumerNodeInfo(String partStr, boolean isRegister,
+    public ConsumerNodeInfo(BrokerConfig tubeConfig, boolean isRegister,
                             String consumerId, String groupName, String 
topicName,
-                            int partitionId, Set<String> filterCodes, boolean 
overTls) {
-        this.partStr = partStr;
+                            int partitionId, Set<String> filterCodes, String 
rmtAddress,
+                            boolean overTls) {
+        this.tubeConfig = tubeConfig;
         this.consumerId = consumerId;
         this.groupName = groupName;
         this.topicName = topicName;
         this.partitionId = partitionId;
+        this.rmtAddrInfo = rmtAddress;
         this.overTls = overTls;
         buildSearchKeyInfo();
         if (isRegister && filterCodes != null) {
@@ -105,6 +111,10 @@ public class ConsumerNodeInfo {
         return assignInfo.getLeftOffset();
     }
 
+    public long getRightOffset() {
+        return assignInfo.getRightOffset();
+    }
+
     public String getPartStr() {
         return partStr;
     }
@@ -133,6 +143,10 @@ public class ConsumerNodeInfo {
         return partitionId;
     }
 
+    public String getStatisKey() {
+        return statisKey;
+    }
+
     public boolean isOverTls() {
         return overTls;
     }
@@ -195,12 +209,22 @@ public class ConsumerNodeInfo {
 
     private void buildSearchKeyInfo() {
         StringBuilder sBuilder = new StringBuilder(512);
+        this.partStr = 
sBuilder.append(groupName).append(TokenConstants.ATTR_SEP)
+                
.append(topicName).append(TokenConstants.ATTR_SEP).append(partitionId).toString();
+        sBuilder.delete(0, sBuilder.length());
         this.offsetCacheKey = sBuilder.append(topicName)
                 .append(TokenConstants.HYPHEN).append(partitionId).toString();
         sBuilder.delete(0, sBuilder.length());
         this.heartbeatNodeId = sBuilder.append(consumerId)
                 .append(TokenConstants.SEGMENT_SEP).append(partStr).toString();
         sBuilder.delete(0, sBuilder.length());
+        this.statisKey =  
sBuilder.append(topicName).append(TokenConstants.SEGMENT_SEP)
+                
.append(tubeConfig.getHostName()).append(TokenConstants.SEGMENT_SEP)
+                .append(consumerId).append(TokenConstants.SEGMENT_SEP)
+                .append(rmtAddrInfo).append(TokenConstants.SEGMENT_SEP)
+                .append(groupName).append(TokenConstants.SEGMENT_SEP)
+                .append(partitionId).toString();
+        sBuilder.delete(0, sBuilder.length());
         if (consumerId.lastIndexOf("_") != -1) {
             String targetStr = 
consumerId.substring(consumerId.lastIndexOf("_") + 1);
             String[] strInfos = targetStr.split("-");

Reply via email to