This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch TUBEMQ-314
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git


The following commit(s) were added to refs/heads/TUBEMQ-314 by this push:
     new c636e9f  [TUBEMQ-411]Adjust Broker implementation based on interval 
consumption (#313)
c636e9f is described below

commit c636e9f63a4710bdabc7d089ea2059d60acd4b5f
Author: gosonzhang <[email protected]>
AuthorDate: Fri Nov 13 17:00:33 2020 +0800

    [TUBEMQ-411]Adjust Broker implementation based on interval consumption 
(#313)
    
    Co-authored-by: gosonzhang <[email protected]>
---
 .../server/broker/msgstore/MessageStore.java       | 33 +++++++++---------
 .../server/broker/msgstore/disk/MsgFileStore.java  | 31 ++++++++---------
 .../server/broker/msgstore/mem/MsgMemStore.java    | 39 ++++++++++++----------
 .../tubemq/server/broker/nodeinfo/AssignInfo.java  |  2 +-
 .../server/broker/nodeinfo/FlowLimitInfo.java      |  2 ++
 .../broker/msgstore/mem/MsgMemStoreTest.java       |  4 ++-
 6 files changed, 57 insertions(+), 54 deletions(-)

diff --git 
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java
 
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java
index b63169b..d5a2384 100644
--- 
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java
+++ 
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java
@@ -172,6 +172,10 @@ public class MessageStore implements Closeable {
                     .append("[Data Store] Closed MessageStore for storeKey ")
                     .append(this.storeKey).toString());
         }
+        if (requestOffset >= nodeInfo.getRightOffset()) {
+            return new GetMessageResult(false, 
TErrCodeConstants.CONSUME_REACHED_RIGHT_BOUNDARY,
+                    requestOffset, 0, "The request offset reached right 
boundary!");
+        }
         int result = 0;
         boolean inMemCache = false;
         int maxIndexReadLength = memMaxIndexReadCnt.get();
@@ -194,20 +198,16 @@ public class MessageStore implements Closeable {
                             if (reqSwitch > 2) {
                                 memMsgRlt =
                                         // read from main memory.
-                                        
msgMemStore.getMessages(nodeInfo.getLastDataRdOffset(),
-                                                requestOffset, 
msgStoreMgr.getMaxMsgTransferSize(),
-                                                maxIndexReadLength, 
nodeInfo.getPartitionId(), false,
-                                                nodeInfo.isFilterConsume(),
-                                                
nodeInfo.getFilterCondCodeSet());
+                                        msgMemStore.getMessages(nodeInfo, 
requestOffset,
+                                                
msgStoreMgr.getMaxMsgTransferSize(),
+                                                maxIndexReadLength, false);
                             }
                         } else {
                             // read from backup memory.
                             memMsgRlt =
-                                    
msgMemStoreBeingFlush.getMessages(nodeInfo.getLastDataRdOffset(),
-                                            requestOffset, 
msgStoreMgr.getMaxMsgTransferSize(),
-                                            maxIndexReadLength, 
nodeInfo.getPartitionId(), true,
-                                            nodeInfo.isFilterConsume(),
-                                            nodeInfo.getFilterCondCodeSet());
+                                    
msgMemStoreBeingFlush.getMessages(nodeInfo, requestOffset,
+                                            
msgStoreMgr.getMaxMsgTransferSize(),
+                                            maxIndexReadLength, true);
                         }
                     }
                 } finally {
@@ -246,8 +246,7 @@ public class MessageStore implements Closeable {
             }
         }
         // before read from file, adjust request's offset.
-        long reqNewOffset = requestOffset < 
this.msgFileStore.getIndexMinOffset()
-                ? this.msgFileStore.getIndexMinOffset() : requestOffset;
+        long reqNewOffset = Math.max(requestOffset, 
this.msgFileStore.getIndexMinOffset());
         if (reqSwitch <= 1 && reqNewOffset >= getFileIndexMaxOffset()) {
             return new GetMessageResult(false, TErrCodeConstants.NOT_FOUND,
                     reqNewOffset, 0, "current offset is exceed max file 
offset");
@@ -266,6 +265,10 @@ public class MessageStore implements Closeable {
                         reqNewOffset, 0, "current offset is exceed max 
offset!");
             }
         }
+        if (reqNewOffset >= nodeInfo.getRightOffset()) {
+            return new GetMessageResult(false, 
TErrCodeConstants.CONSUME_REACHED_RIGHT_BOUNDARY,
+                    requestOffset, 0, "The request offset reached right 
boundary!");
+        }
         indexRecordView.read(indexBuffer, reqNewOffset);
         indexBuffer.flip();
         indexRecordView.relViewRef();
@@ -275,11 +278,7 @@ public class MessageStore implements Closeable {
             msgSizeLimit = this.maxAllowRdSize;
         }
         GetMessageResult retResult =
-            msgFileStore.getMessages(nodeInfo.getPartitionId(),
-                    nodeInfo.getLastDataRdOffset(), reqNewOffset,
-                    indexBuffer, nodeInfo.isFilterConsume(),
-                    nodeInfo.getFilterCondCodeSet(),
-                    nodeInfo.getStatisKey(), msgSizeLimit);
+                msgFileStore.getMessages(nodeInfo, reqNewOffset, indexBuffer, 
msgSizeLimit);
         if (reqSwitch <= 1) {
             retResult.setMaxOffset(getFileIndexMaxOffset());
         } else {
diff --git 
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/MsgFileStore.java
 
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/MsgFileStore.java
index ed762d2..b942585 100644
--- 
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/MsgFileStore.java
+++ 
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/MsgFileStore.java
@@ -26,7 +26,6 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
-import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -36,6 +35,7 @@ import 
org.apache.tubemq.corebase.protobuf.generated.ClientBroker;
 import org.apache.tubemq.corebase.utils.ServiceStatusHolder;
 import org.apache.tubemq.server.broker.BrokerConfig;
 import org.apache.tubemq.server.broker.msgstore.MessageStore;
+import org.apache.tubemq.server.broker.nodeinfo.ConsumerNodeInfo;
 import org.apache.tubemq.server.broker.stats.CountItem;
 import org.apache.tubemq.server.broker.utils.DataStoreUtils;
 import org.apache.tubemq.server.broker.utils.DiskSamplePrint;
@@ -223,21 +223,14 @@ public class MsgFileStore implements Closeable {
     /***
      * Get message from index and data files.
      *
-     * @param partitionId
-     * @param lastRdOffset
      * @param reqOffset
      * @param indexBuffer
-     * @param isFilterConsume
-     * @param filterKeySet
-     * @param statisKeyBase
      * @param maxMsgTransferSize
      * @return
      */
-    public GetMessageResult getMessages(final int partitionId, final long 
lastRdOffset,
-                                        final long reqOffset, final ByteBuffer 
indexBuffer,
-                                        final boolean isFilterConsume,
-                                        final Set<Integer> filterKeySet,
-                                        final String statisKeyBase,
+    public GetMessageResult getMessages(final ConsumerNodeInfo nodeInfo,
+                                        final long reqOffset,
+                                        final ByteBuffer indexBuffer,
                                         final int maxMsgTransferSize) {
         // #lizard forgives
         // Orderly read from index file, then random read from data file.
@@ -259,6 +252,7 @@ public class MsgFileStore implements Closeable {
         final StringBuilder sBuilder = new StringBuilder(512);
         final long curDataMaxOffset = getDataMaxOffset();
         final long curDataMinOffset = getDataMinOffset();
+        long rightBoundary = nodeInfo.getRightOffset();
         HashMap<String, CountItem> countMap = new HashMap<>();
         ByteBuffer dataBuffer =
                 
ByteBuffer.allocate(TServerConstants.CFG_STORE_DEFAULT_MSG_READ_UNIT);
@@ -267,6 +261,9 @@ public class MsgFileStore implements Closeable {
         // read data file by index.
         for (curIndexOffset = 0; curIndexOffset < indexBuffer.remaining();
              curIndexOffset += DataStoreUtils.STORE_INDEX_HEAD_LEN) {
+            if (curIndexOffset + reqOffset >= rightBoundary) {
+                break;
+            }
             curIndexPartitionId = indexBuffer.getInt();
             curIndexDataOffset = indexBuffer.getLong();
             curIndexDataSize = indexBuffer.getInt();
@@ -288,9 +285,9 @@ public class MsgFileStore implements Closeable {
                 break;
             }
             // conduct filter operation.
-            if (curIndexPartitionId != partitionId
-                    || (isFilterConsume
-                    && !filterKeySet.contains(curIndexKeyCode))) {
+            if (curIndexPartitionId != nodeInfo.getPartitionId()
+                    || (nodeInfo.isFilterConsume()
+                    && 
!nodeInfo.getFilterCondCodeSet().contains(curIndexKeyCode))) {
                 lastRdDataOffset = maxDataLimitOffset;
                 readedOffset = curIndexOffset + 
DataStoreUtils.STORE_INDEX_HEAD_LEN;
                 continue;
@@ -330,7 +327,7 @@ public class MsgFileStore implements Closeable {
                     ServiceStatusHolder.addReadIOErrCnt();
                 }
                 samplePrintCtrl.printExceptionCaught(e2,
-                    messageStore.getStoreKey(), String.valueOf(partitionId));
+                    messageStore.getStoreKey(), 
String.valueOf(nodeInfo.getPartitionId()));
                 retCode = TErrCodeConstants.INTERNAL_SERVER_ERROR;
                 sBuilder.delete(0, sBuilder.length());
                 errInfo = sBuilder.append("Get message from file failure : ")
@@ -344,7 +341,7 @@ public class MsgFileStore implements Closeable {
             lastRdDataOffset = maxDataLimitOffset;
             ClientBroker.TransferedMessage transferedMessage =
                     DataStoreUtils.getTransferMsg(dataBuffer,
-                            curIndexDataSize, countMap, statisKeyBase, 
sBuilder);
+                            curIndexDataSize, countMap, 
nodeInfo.getStatisKey(), sBuilder);
             if (transferedMessage == null) {
                 continue;
             }
@@ -366,7 +363,7 @@ public class MsgFileStore implements Closeable {
             }
         }
         if (lastRdDataOffset <= 0L) {
-            lastRdDataOffset = lastRdOffset;
+            lastRdDataOffset = nodeInfo.getLastDataRdOffset();
         }
         // return result.
         return new GetMessageResult(result, retCode, errInfo,
diff --git 
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/mem/MsgMemStore.java
 
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/mem/MsgMemStore.java
index 25c98f2..ea21d81 100644
--- 
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/mem/MsgMemStore.java
+++ 
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/mem/MsgMemStore.java
@@ -22,7 +22,6 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -31,6 +30,7 @@ import org.apache.tubemq.corebase.TBaseConstants;
 import org.apache.tubemq.corebase.TErrCodeConstants;
 import org.apache.tubemq.server.broker.BrokerConfig;
 import org.apache.tubemq.server.broker.msgstore.disk.MsgFileStore;
+import org.apache.tubemq.server.broker.nodeinfo.ConsumerNodeInfo;
 import org.apache.tubemq.server.broker.utils.DataStoreUtils;
 import org.apache.tubemq.server.common.utils.AppendResult;
 import org.slf4j.Logger;
@@ -133,21 +133,17 @@ public class MsgMemStore implements Closeable {
     /***
      * Read from memory, read index, then data.
      *
-     * @param lstRdDataOffset
      * @param lstRdIndexOffset
      * @param maxReadSize
      * @param maxReadCount
-     * @param partitionId
      * @param isSecond
-     * @param isFilterConsume
-     * @param filterKeySet
      * @return
      */
-    public GetCacheMsgResult getMessages(final long lstRdDataOffset, final 
long lstRdIndexOffset,
-                                         final int maxReadSize, final int 
maxReadCount,
-                                         final int partitionId, final boolean 
isSecond,
-                                         final boolean isFilterConsume,
-                                         final Set<Integer> filterKeySet) {
+    public GetCacheMsgResult getMessages(final ConsumerNodeInfo nodeInfo,
+                                         final long lstRdIndexOffset,
+                                         final int maxReadSize,
+                                         final int maxReadCount,
+                                         final boolean isSecond) {
         // #lizard forgives
         Integer lastWritePos = 0;
         boolean hasMsg = false;
@@ -161,16 +157,20 @@ public class MsgMemStore implements Closeable {
             return new GetCacheMsgResult(false, TErrCodeConstants.NOT_FOUND,
                     lstRdIndexOffset, "Request offset reached cache 
maxOffset");
         }
+        if (lstRdIndexOffset >= nodeInfo.getRightOffset()) {
+            return new GetCacheMsgResult(false, 
TErrCodeConstants.CONSUME_REACHED_RIGHT_BOUNDARY,
+                    lstRdIndexOffset, "The request offset reached right 
boundary!");
+        }
         int totalReadSize = 0;
         int currIndexOffset;
         int currDataOffset;
-        long lastDataRdOff = lstRdDataOffset;
+        long lastDataRdOff = nodeInfo.getLastDataRdOffset();
         int startReadOff = (int) (lstRdIndexOffset - this.writeIndexStartPos);
         this.writeLock.lock();
         try {
-            if (isFilterConsume) {
+            if (nodeInfo.isFilterConsume()) {
                 // filter conduct. accelerate by keysMap.
-                for (Integer keyCode : filterKeySet) {
+                for (Integer keyCode : nodeInfo.getFilterCondCodeSet()) {
                     if (keyCode != null) {
                         lastWritePos = this.keysMap.get(keyCode);
                         if ((lastWritePos != null) && (lastWritePos >= 
startReadOff)) {
@@ -181,7 +181,7 @@ public class MsgMemStore implements Closeable {
                 }
             } else {
                 // orderly consume by partition id.
-                lastWritePos = this.queuesMap.get(partitionId);
+                lastWritePos = this.queuesMap.get(nodeInfo.getPartitionId());
                 if ((lastWritePos != null) && (lastWritePos >= startReadOff)) {
                     hasMsg = true;
                 }
@@ -195,7 +195,7 @@ public class MsgMemStore implements Closeable {
         int limitReadSize = currIndexOffset - startReadOff;
         // cannot find message, return not found
         if (!hasMsg) {
-            if (isSecond && !isFilterConsume) {
+            if (isSecond && !nodeInfo.isFilterConsume()) {
                 return new GetCacheMsgResult(true, 0, "Ok2",
                         lstRdIndexOffset, limitReadSize, lastDataRdOff, 
totalReadSize, cacheMsgList);
             } else {
@@ -212,6 +212,7 @@ public class MsgMemStore implements Closeable {
         int cKeyCode = 0;
         long cTimeRecv = 0L;
         int cDataOffset = 0;
+        long rightBoundary = nodeInfo.getRightOffset();
         ByteBuffer tmpIndexRdBuf = this.cachedIndexSegment.asReadOnlyBuffer();
         ByteBuffer tmpDataRdBuf = this.cacheDataSegment.asReadOnlyBuffer();
         // loop read by index
@@ -219,7 +220,8 @@ public class MsgMemStore implements Closeable {
              count++, startReadOff += DataStoreUtils.STORE_INDEX_HEAD_LEN) {
             // cannot find matched message, return
             if ((startReadOff >= currIndexOffset)
-                || (startReadOff + DataStoreUtils.STORE_INDEX_HEAD_LEN > 
currIndexOffset)) {
+                    || (startReadOff + DataStoreUtils.STORE_INDEX_HEAD_LEN > 
currIndexOffset)
+                    || (startReadOff + this.writeIndexStartPos >= 
rightBoundary)) {
                 break;
             }
             // read index content.
@@ -239,8 +241,9 @@ public class MsgMemStore implements Closeable {
                 readedSize += DataStoreUtils.STORE_INDEX_HEAD_LEN;
                 continue;
             }
-            if ((cPartitionId != partitionId)
-                    || (isFilterConsume && 
(!filterKeySet.contains(cKeyCode)))) {
+            if ((cPartitionId != nodeInfo.getPartitionId())
+                    || (nodeInfo.isFilterConsume()
+                    && (!nodeInfo.getFilterCondCodeSet().contains(cKeyCode)))) 
{
                 readedSize += DataStoreUtils.STORE_INDEX_HEAD_LEN;
                 continue;
             }
diff --git 
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/nodeinfo/AssignInfo.java
 
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/nodeinfo/AssignInfo.java
index 307feed..2fbecc7 100644
--- 
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/nodeinfo/AssignInfo.java
+++ 
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/nodeinfo/AssignInfo.java
@@ -92,6 +92,6 @@ public class AssignInfo {
                 || rangeType == RangeType.RANGE_SET_BOTH_DEFINED)) {
             return targetTuple.getRightValue();
         }
-        return TBaseConstants.META_VALUE_UNDEFINED;
+        return Long.MAX_VALUE;
     }
 }
diff --git 
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/nodeinfo/FlowLimitInfo.java
 
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/nodeinfo/FlowLimitInfo.java
index b32afe6..3e5f2dd 100644
--- 
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/nodeinfo/FlowLimitInfo.java
+++ 
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/nodeinfo/FlowLimitInfo.java
@@ -137,4 +137,6 @@ public class FlowLimitInfo {
         return curFlowCtrlVal.freqLtInMs;
     }
 
+
+
 }
diff --git 
a/tubemq-server/src/test/java/org/apache/tubemq/server/broker/msgstore/mem/MsgMemStoreTest.java
 
b/tubemq-server/src/test/java/org/apache/tubemq/server/broker/msgstore/mem/MsgMemStoreTest.java
index 3b3702a..8c8da2a 100644
--- 
a/tubemq-server/src/test/java/org/apache/tubemq/server/broker/msgstore/mem/MsgMemStoreTest.java
+++ 
b/tubemq-server/src/test/java/org/apache/tubemq/server/broker/msgstore/mem/MsgMemStoreTest.java
@@ -19,6 +19,7 @@ package org.apache.tubemq.server.broker.msgstore.mem;
 
 import java.nio.ByteBuffer;
 
+import org.apache.tubemq.server.broker.nodeinfo.ConsumerNodeInfo;
 import org.apache.tubemq.server.common.utils.AppendResult;
 import org.junit.Test;
 
@@ -52,7 +53,8 @@ public class MsgMemStoreTest {
         AppendResult appendResult = new AppendResult();
         msgMemStore.appendMsg(msgMemStatisInfo, 0, 0,
                 System.currentTimeMillis(), 3, bf, appendResult);
+        ConsumerNodeInfo nodeInfo = new ConsumerNodeInfo(null, 0, 1024, null);
         // get messages
-        GetCacheMsgResult getCacheMsgResult = msgMemStore.getMessages(0, 2, 
1024, 1000, 0, false, false, null);
+        GetCacheMsgResult getCacheMsgResult = 
msgMemStore.getMessages(nodeInfo, 0, 1024, 1000, false);
     }
 }

Reply via email to