This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch TUBEMQ-314
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
The following commit(s) were added to refs/heads/TUBEMQ-314 by this push:
new c636e9f [TUBEMQ-411]Adjust Broker implementation based on interval
consumption (#313)
c636e9f is described below
commit c636e9f63a4710bdabc7d089ea2059d60acd4b5f
Author: gosonzhang <[email protected]>
AuthorDate: Fri Nov 13 17:00:33 2020 +0800
[TUBEMQ-411]Adjust Broker implementation based on interval consumption
(#313)
Co-authored-by: gosonzhang <[email protected]>
---
.../server/broker/msgstore/MessageStore.java | 33 +++++++++---------
.../server/broker/msgstore/disk/MsgFileStore.java | 31 ++++++++---------
.../server/broker/msgstore/mem/MsgMemStore.java | 39 ++++++++++++----------
.../tubemq/server/broker/nodeinfo/AssignInfo.java | 2 +-
.../server/broker/nodeinfo/FlowLimitInfo.java | 2 ++
.../broker/msgstore/mem/MsgMemStoreTest.java | 4 ++-
6 files changed, 57 insertions(+), 54 deletions(-)
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java
index b63169b..d5a2384 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java
@@ -172,6 +172,10 @@ public class MessageStore implements Closeable {
.append("[Data Store] Closed MessageStore for storeKey ")
.append(this.storeKey).toString());
}
+ if (requestOffset >= nodeInfo.getRightOffset()) {
+ return new GetMessageResult(false,
TErrCodeConstants.CONSUME_REACHED_RIGHT_BOUNDARY,
+ requestOffset, 0, "The request offset reached right
boundary!");
+ }
int result = 0;
boolean inMemCache = false;
int maxIndexReadLength = memMaxIndexReadCnt.get();
@@ -194,20 +198,16 @@ public class MessageStore implements Closeable {
if (reqSwitch > 2) {
memMsgRlt =
// read from main memory.
-
msgMemStore.getMessages(nodeInfo.getLastDataRdOffset(),
- requestOffset,
msgStoreMgr.getMaxMsgTransferSize(),
- maxIndexReadLength,
nodeInfo.getPartitionId(), false,
- nodeInfo.isFilterConsume(),
-
nodeInfo.getFilterCondCodeSet());
+ msgMemStore.getMessages(nodeInfo,
requestOffset,
+
msgStoreMgr.getMaxMsgTransferSize(),
+ maxIndexReadLength, false);
}
} else {
// read from backup memory.
memMsgRlt =
-
msgMemStoreBeingFlush.getMessages(nodeInfo.getLastDataRdOffset(),
- requestOffset,
msgStoreMgr.getMaxMsgTransferSize(),
- maxIndexReadLength,
nodeInfo.getPartitionId(), true,
- nodeInfo.isFilterConsume(),
- nodeInfo.getFilterCondCodeSet());
+
msgMemStoreBeingFlush.getMessages(nodeInfo, requestOffset,
+
msgStoreMgr.getMaxMsgTransferSize(),
+ maxIndexReadLength, true);
}
}
} finally {
@@ -246,8 +246,7 @@ public class MessageStore implements Closeable {
}
}
// before read from file, adjust request's offset.
- long reqNewOffset = requestOffset <
this.msgFileStore.getIndexMinOffset()
- ? this.msgFileStore.getIndexMinOffset() : requestOffset;
+ long reqNewOffset = Math.max(requestOffset,
this.msgFileStore.getIndexMinOffset());
if (reqSwitch <= 1 && reqNewOffset >= getFileIndexMaxOffset()) {
return new GetMessageResult(false, TErrCodeConstants.NOT_FOUND,
reqNewOffset, 0, "current offset is exceed max file
offset");
@@ -266,6 +265,10 @@ public class MessageStore implements Closeable {
reqNewOffset, 0, "current offset is exceed max
offset!");
}
}
+ if (reqNewOffset >= nodeInfo.getRightOffset()) {
+ return new GetMessageResult(false,
TErrCodeConstants.CONSUME_REACHED_RIGHT_BOUNDARY,
+ requestOffset, 0, "The request offset reached right
boundary!");
+ }
indexRecordView.read(indexBuffer, reqNewOffset);
indexBuffer.flip();
indexRecordView.relViewRef();
@@ -275,11 +278,7 @@ public class MessageStore implements Closeable {
msgSizeLimit = this.maxAllowRdSize;
}
GetMessageResult retResult =
- msgFileStore.getMessages(nodeInfo.getPartitionId(),
- nodeInfo.getLastDataRdOffset(), reqNewOffset,
- indexBuffer, nodeInfo.isFilterConsume(),
- nodeInfo.getFilterCondCodeSet(),
- nodeInfo.getStatisKey(), msgSizeLimit);
+ msgFileStore.getMessages(nodeInfo, reqNewOffset, indexBuffer,
msgSizeLimit);
if (reqSwitch <= 1) {
retResult.setMaxOffset(getFileIndexMaxOffset());
} else {
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/MsgFileStore.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/MsgFileStore.java
index ed762d2..b942585 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/MsgFileStore.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/MsgFileStore.java
@@ -26,7 +26,6 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
-import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -36,6 +35,7 @@ import
org.apache.tubemq.corebase.protobuf.generated.ClientBroker;
import org.apache.tubemq.corebase.utils.ServiceStatusHolder;
import org.apache.tubemq.server.broker.BrokerConfig;
import org.apache.tubemq.server.broker.msgstore.MessageStore;
+import org.apache.tubemq.server.broker.nodeinfo.ConsumerNodeInfo;
import org.apache.tubemq.server.broker.stats.CountItem;
import org.apache.tubemq.server.broker.utils.DataStoreUtils;
import org.apache.tubemq.server.broker.utils.DiskSamplePrint;
@@ -223,21 +223,14 @@ public class MsgFileStore implements Closeable {
/***
* Get message from index and data files.
*
- * @param partitionId
- * @param lastRdOffset
* @param reqOffset
* @param indexBuffer
- * @param isFilterConsume
- * @param filterKeySet
- * @param statisKeyBase
* @param maxMsgTransferSize
* @return
*/
- public GetMessageResult getMessages(final int partitionId, final long
lastRdOffset,
- final long reqOffset, final ByteBuffer
indexBuffer,
- final boolean isFilterConsume,
- final Set<Integer> filterKeySet,
- final String statisKeyBase,
+ public GetMessageResult getMessages(final ConsumerNodeInfo nodeInfo,
+ final long reqOffset,
+ final ByteBuffer indexBuffer,
final int maxMsgTransferSize) {
// #lizard forgives
// Orderly read from index file, then random read from data file.
@@ -259,6 +252,7 @@ public class MsgFileStore implements Closeable {
final StringBuilder sBuilder = new StringBuilder(512);
final long curDataMaxOffset = getDataMaxOffset();
final long curDataMinOffset = getDataMinOffset();
+ long rightBoundary = nodeInfo.getRightOffset();
HashMap<String, CountItem> countMap = new HashMap<>();
ByteBuffer dataBuffer =
ByteBuffer.allocate(TServerConstants.CFG_STORE_DEFAULT_MSG_READ_UNIT);
@@ -267,6 +261,9 @@ public class MsgFileStore implements Closeable {
// read data file by index.
for (curIndexOffset = 0; curIndexOffset < indexBuffer.remaining();
curIndexOffset += DataStoreUtils.STORE_INDEX_HEAD_LEN) {
+ if (curIndexOffset + reqOffset >= rightBoundary) {
+ break;
+ }
curIndexPartitionId = indexBuffer.getInt();
curIndexDataOffset = indexBuffer.getLong();
curIndexDataSize = indexBuffer.getInt();
@@ -288,9 +285,9 @@ public class MsgFileStore implements Closeable {
break;
}
// conduct filter operation.
- if (curIndexPartitionId != partitionId
- || (isFilterConsume
- && !filterKeySet.contains(curIndexKeyCode))) {
+ if (curIndexPartitionId != nodeInfo.getPartitionId()
+ || (nodeInfo.isFilterConsume()
+ &&
!nodeInfo.getFilterCondCodeSet().contains(curIndexKeyCode))) {
lastRdDataOffset = maxDataLimitOffset;
readedOffset = curIndexOffset +
DataStoreUtils.STORE_INDEX_HEAD_LEN;
continue;
@@ -330,7 +327,7 @@ public class MsgFileStore implements Closeable {
ServiceStatusHolder.addReadIOErrCnt();
}
samplePrintCtrl.printExceptionCaught(e2,
- messageStore.getStoreKey(), String.valueOf(partitionId));
+ messageStore.getStoreKey(),
String.valueOf(nodeInfo.getPartitionId()));
retCode = TErrCodeConstants.INTERNAL_SERVER_ERROR;
sBuilder.delete(0, sBuilder.length());
errInfo = sBuilder.append("Get message from file failure : ")
@@ -344,7 +341,7 @@ public class MsgFileStore implements Closeable {
lastRdDataOffset = maxDataLimitOffset;
ClientBroker.TransferedMessage transferedMessage =
DataStoreUtils.getTransferMsg(dataBuffer,
- curIndexDataSize, countMap, statisKeyBase,
sBuilder);
+ curIndexDataSize, countMap,
nodeInfo.getStatisKey(), sBuilder);
if (transferedMessage == null) {
continue;
}
@@ -366,7 +363,7 @@ public class MsgFileStore implements Closeable {
}
}
if (lastRdDataOffset <= 0L) {
- lastRdDataOffset = lastRdOffset;
+ lastRdDataOffset = nodeInfo.getLastDataRdOffset();
}
// return result.
return new GetMessageResult(result, retCode, errInfo,
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/mem/MsgMemStore.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/mem/MsgMemStore.java
index 25c98f2..ea21d81 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/mem/MsgMemStore.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/mem/MsgMemStore.java
@@ -22,7 +22,6 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
-import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -31,6 +30,7 @@ import org.apache.tubemq.corebase.TBaseConstants;
import org.apache.tubemq.corebase.TErrCodeConstants;
import org.apache.tubemq.server.broker.BrokerConfig;
import org.apache.tubemq.server.broker.msgstore.disk.MsgFileStore;
+import org.apache.tubemq.server.broker.nodeinfo.ConsumerNodeInfo;
import org.apache.tubemq.server.broker.utils.DataStoreUtils;
import org.apache.tubemq.server.common.utils.AppendResult;
import org.slf4j.Logger;
@@ -133,21 +133,17 @@ public class MsgMemStore implements Closeable {
/***
* Read from memory, read index, then data.
*
- * @param lstRdDataOffset
* @param lstRdIndexOffset
* @param maxReadSize
* @param maxReadCount
- * @param partitionId
* @param isSecond
- * @param isFilterConsume
- * @param filterKeySet
* @return
*/
- public GetCacheMsgResult getMessages(final long lstRdDataOffset, final
long lstRdIndexOffset,
- final int maxReadSize, final int
maxReadCount,
- final int partitionId, final boolean
isSecond,
- final boolean isFilterConsume,
- final Set<Integer> filterKeySet) {
+ public GetCacheMsgResult getMessages(final ConsumerNodeInfo nodeInfo,
+ final long lstRdIndexOffset,
+ final int maxReadSize,
+ final int maxReadCount,
+ final boolean isSecond) {
// #lizard forgives
Integer lastWritePos = 0;
boolean hasMsg = false;
@@ -161,16 +157,20 @@ public class MsgMemStore implements Closeable {
return new GetCacheMsgResult(false, TErrCodeConstants.NOT_FOUND,
lstRdIndexOffset, "Request offset reached cache
maxOffset");
}
+ if (lstRdIndexOffset >= nodeInfo.getRightOffset()) {
+ return new GetCacheMsgResult(false,
TErrCodeConstants.CONSUME_REACHED_RIGHT_BOUNDARY,
+ lstRdIndexOffset, "The request offset reached right
boundary!");
+ }
int totalReadSize = 0;
int currIndexOffset;
int currDataOffset;
- long lastDataRdOff = lstRdDataOffset;
+ long lastDataRdOff = nodeInfo.getLastDataRdOffset();
int startReadOff = (int) (lstRdIndexOffset - this.writeIndexStartPos);
this.writeLock.lock();
try {
- if (isFilterConsume) {
+ if (nodeInfo.isFilterConsume()) {
// filter conduct. accelerate by keysMap.
- for (Integer keyCode : filterKeySet) {
+ for (Integer keyCode : nodeInfo.getFilterCondCodeSet()) {
if (keyCode != null) {
lastWritePos = this.keysMap.get(keyCode);
if ((lastWritePos != null) && (lastWritePos >=
startReadOff)) {
@@ -181,7 +181,7 @@ public class MsgMemStore implements Closeable {
}
} else {
// orderly consume by partition id.
- lastWritePos = this.queuesMap.get(partitionId);
+ lastWritePos = this.queuesMap.get(nodeInfo.getPartitionId());
if ((lastWritePos != null) && (lastWritePos >= startReadOff)) {
hasMsg = true;
}
@@ -195,7 +195,7 @@ public class MsgMemStore implements Closeable {
int limitReadSize = currIndexOffset - startReadOff;
// cannot find message, return not found
if (!hasMsg) {
- if (isSecond && !isFilterConsume) {
+ if (isSecond && !nodeInfo.isFilterConsume()) {
return new GetCacheMsgResult(true, 0, "Ok2",
lstRdIndexOffset, limitReadSize, lastDataRdOff,
totalReadSize, cacheMsgList);
} else {
@@ -212,6 +212,7 @@ public class MsgMemStore implements Closeable {
int cKeyCode = 0;
long cTimeRecv = 0L;
int cDataOffset = 0;
+ long rightBoundary = nodeInfo.getRightOffset();
ByteBuffer tmpIndexRdBuf = this.cachedIndexSegment.asReadOnlyBuffer();
ByteBuffer tmpDataRdBuf = this.cacheDataSegment.asReadOnlyBuffer();
// loop read by index
@@ -219,7 +220,8 @@ public class MsgMemStore implements Closeable {
count++, startReadOff += DataStoreUtils.STORE_INDEX_HEAD_LEN) {
// cannot find matched message, return
if ((startReadOff >= currIndexOffset)
- || (startReadOff + DataStoreUtils.STORE_INDEX_HEAD_LEN >
currIndexOffset)) {
+ || (startReadOff + DataStoreUtils.STORE_INDEX_HEAD_LEN >
currIndexOffset)
+ || (startReadOff + this.writeIndexStartPos >=
rightBoundary)) {
break;
}
// read index content.
@@ -239,8 +241,9 @@ public class MsgMemStore implements Closeable {
readedSize += DataStoreUtils.STORE_INDEX_HEAD_LEN;
continue;
}
- if ((cPartitionId != partitionId)
- || (isFilterConsume &&
(!filterKeySet.contains(cKeyCode)))) {
+ if ((cPartitionId != nodeInfo.getPartitionId())
+ || (nodeInfo.isFilterConsume()
+ && (!nodeInfo.getFilterCondCodeSet().contains(cKeyCode))))
{
readedSize += DataStoreUtils.STORE_INDEX_HEAD_LEN;
continue;
}
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/nodeinfo/AssignInfo.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/nodeinfo/AssignInfo.java
index 307feed..2fbecc7 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/nodeinfo/AssignInfo.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/nodeinfo/AssignInfo.java
@@ -92,6 +92,6 @@ public class AssignInfo {
|| rangeType == RangeType.RANGE_SET_BOTH_DEFINED)) {
return targetTuple.getRightValue();
}
- return TBaseConstants.META_VALUE_UNDEFINED;
+ return Long.MAX_VALUE;
}
}
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/nodeinfo/FlowLimitInfo.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/nodeinfo/FlowLimitInfo.java
index b32afe6..3e5f2dd 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/nodeinfo/FlowLimitInfo.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/nodeinfo/FlowLimitInfo.java
@@ -137,4 +137,6 @@ public class FlowLimitInfo {
return curFlowCtrlVal.freqLtInMs;
}
+
+
}
diff --git
a/tubemq-server/src/test/java/org/apache/tubemq/server/broker/msgstore/mem/MsgMemStoreTest.java
b/tubemq-server/src/test/java/org/apache/tubemq/server/broker/msgstore/mem/MsgMemStoreTest.java
index 3b3702a..8c8da2a 100644
---
a/tubemq-server/src/test/java/org/apache/tubemq/server/broker/msgstore/mem/MsgMemStoreTest.java
+++
b/tubemq-server/src/test/java/org/apache/tubemq/server/broker/msgstore/mem/MsgMemStoreTest.java
@@ -19,6 +19,7 @@ package org.apache.tubemq.server.broker.msgstore.mem;
import java.nio.ByteBuffer;
+import org.apache.tubemq.server.broker.nodeinfo.ConsumerNodeInfo;
import org.apache.tubemq.server.common.utils.AppendResult;
import org.junit.Test;
@@ -52,7 +53,8 @@ public class MsgMemStoreTest {
AppendResult appendResult = new AppendResult();
msgMemStore.appendMsg(msgMemStatisInfo, 0, 0,
System.currentTimeMillis(), 3, bf, appendResult);
+ ConsumerNodeInfo nodeInfo = new ConsumerNodeInfo(null, 0, 1024, null);
// get messages
- GetCacheMsgResult getCacheMsgResult = msgMemStore.getMessages(0, 2,
1024, 1000, 0, false, false, null);
+ GetCacheMsgResult getCacheMsgResult =
msgMemStore.getMessages(nodeInfo, 0, 1024, 1000, false);
}
}