This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new c1e1dcd [INLONG-2232][tubemq] Add start and end timestamp of segment
(#2235)
c1e1dcd is described below
commit c1e1dcd4d1e1cc07badf6a0523c12b7673e1ecf5
Author: gosonzhang <[email protected]>
AuthorDate: Thu Jan 20 18:44:48 2022 +0800
[INLONG-2232][tubemq] Add start and end timestamp of segment (#2235)
---
.../tubemq/server/broker/BrokerServiceServer.java | 2 +-
.../server/broker/msgstore/MessageStore.java | 193 ++++++++++++++-------
.../broker/msgstore/MessageStoreManager.java | 6 +-
.../server/broker/msgstore/disk/FileSegment.java | 158 +++++++++++++----
.../broker/msgstore/disk/FileSegmentList.java | 101 +++++++++--
.../server/broker/msgstore/disk/MsgFileStore.java | 164 +++++++++++++----
.../server/broker/msgstore/disk/Segment.java | 35 +++-
.../server/broker/msgstore/disk/SegmentList.java | 3 +
.../server/broker/msgstore/mem/MsgMemStore.java | 115 ++++++++----
.../tubemq/server/tools/StoreRepairAdmin.java | 2 +-
.../broker/msgstore/disk/FileSegmentListTest.java | 6 +-
.../broker/msgstore/disk/FileSegmentTest.java | 8 +-
.../broker/msgstore/mem/MsgMemStoreTest.java | 2 +-
13 files changed, 605 insertions(+), 190 deletions(-)
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java
index f6cfa46..86c66e4 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java
@@ -466,7 +466,7 @@ public class BrokerServiceServer implements
BrokerReadService, BrokerWriteServic
sb.delete(0, sb.length());
GetMessageResult msgQueryResult =
msgStore.getMessages(reqSwitch, requestOffset,
- partitionId, consumerNodeInfo, baseKey,
msgDataSizeLimit);
+ partitionId, consumerNodeInfo, baseKey,
msgDataSizeLimit, 0);
offsetManager.bookOffset(group, topic, partitionId,
msgQueryResult.lastReadOffset, isManualCommitOffset,
msgQueryResult.transferedMessageList.isEmpty(), sb);
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStore.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStore.java
index 22b330f..d27e78e 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStore.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStore.java
@@ -80,27 +80,27 @@ public class MessageStore implements Closeable {
private final ExecutorService executor =
Executors.newSingleThreadExecutor();
private final AtomicBoolean closed = new AtomicBoolean(false);
private volatile int partitionNum;
- private AtomicInteger unflushInterval = new AtomicInteger(0);
- private AtomicInteger unflushThreshold = new AtomicInteger(0);
- private AtomicInteger unflushDataHold = new AtomicInteger(0);
+ private final AtomicInteger unflushInterval = new AtomicInteger(0);
+ private final AtomicInteger unflushThreshold = new AtomicInteger(0);
+ private final AtomicInteger unflushDataHold = new AtomicInteger(0);
private volatile int writeCacheMaxSize;
private volatile int writeCacheMaxCnt;
private volatile int writeCacheFlushIntvl;
- private AtomicLong maxFileValidDurMs = new AtomicLong(0);
+ private final AtomicLong maxFileValidDurMs = new AtomicLong(0);
private int maxAllowRdSize = 262144;
- private AtomicInteger memMaxIndexReadCnt = new AtomicInteger(6000);
- private AtomicInteger fileMaxIndexReadCnt = new AtomicInteger(8000);
- private AtomicInteger memMaxFilterIndexReadCnt
+ private final AtomicInteger memMaxIndexReadCnt = new AtomicInteger(6000);
+ private final AtomicInteger fileMaxIndexReadCnt = new AtomicInteger(8000);
+ private final AtomicInteger memMaxFilterIndexReadCnt
= new AtomicInteger(memMaxIndexReadCnt.get() * 2);
- private AtomicInteger fileMaxFilterIndexReadCnt
+ private final AtomicInteger fileMaxFilterIndexReadCnt
= new AtomicInteger(fileMaxIndexReadCnt.get() * 3);
- private AtomicInteger fileLowReqMaxFilterIndexReadCnt
+ private final AtomicInteger fileLowReqMaxFilterIndexReadCnt
= new AtomicInteger(fileMaxIndexReadCnt.get() * 10);
- private AtomicInteger fileMaxIndexReadSize
+ private final AtomicInteger fileMaxIndexReadSize
= new AtomicInteger(this.fileMaxIndexReadCnt.get() *
DataStoreUtils.STORE_INDEX_HEAD_LEN);
- private AtomicInteger fileMaxFilterIndexReadSize
+ private final AtomicInteger fileMaxFilterIndexReadSize
= new AtomicInteger(this.fileMaxFilterIndexReadCnt.get() *
DataStoreUtils.STORE_INDEX_HEAD_LEN);
- private AtomicInteger fileLowReqMaxFilterIndexReadSize
+ private final AtomicInteger fileLowReqMaxFilterIndexReadSize
= new AtomicInteger(this.fileLowReqMaxFilterIndexReadCnt.get() *
DataStoreUtils.STORE_INDEX_HEAD_LEN);
private MsgMemStore msgMemStore;
private MsgMemStore msgMemStoreBeingFlush;
@@ -154,21 +154,20 @@ public class MessageStore implements Closeable {
/***
* Get message from message store. Support the given offset, filter.
*
- * @param reqSwitch
- * @param requestOffset
- * @param partitionId
- * @param consumerNodeInfo
- * @param statisKeyBase
- * @param msgSizeLimit
- * @return
- * @throws IOException
+ * @param reqSwitch read message from where
+ * @param requestOffset the request offset to read
+ * @param partitionId the partitionId for reading messages
+ * @param consumerNodeInfo the consumer object
+ * @param statisKeyBase the statistical key prefix
+ * @param msgSizeLimit the max read size
+ * @param reqRcvTime the timestamp of the record to be checked
+ * @return read result
+ * @throws IOException exception while process
*/
- public GetMessageResult getMessages(int reqSwitch,
- final long requestOffset,
- final int partitionId,
- final ConsumerNodeInfo
consumerNodeInfo,
- final String statisKeyBase,
- int msgSizeLimit) throws IOException {
+ public GetMessageResult getMessages(int reqSwitch, long requestOffset,
+ int partitionId, ConsumerNodeInfo
consumerNodeInfo,
+ String statisKeyBase, int msgSizeLimit,
+ long reqRcvTime) throws IOException {
// #lizard forgives
if (this.closed.get()) {
throw new IllegalStateException(new StringBuilder(512)
@@ -201,7 +200,7 @@ public class MessageStore implements Closeable {
requestOffset,
msgStoreMgr.getMaxMsgTransferSize(),
maxIndexReadLength,
partitionId, false,
consumerNodeInfo.isFilterConsume(),
-
consumerNodeInfo.getFilterCondCodeSet());
+
consumerNodeInfo.getFilterCondCodeSet(), reqRcvTime);
}
} else {
// read from backup memory.
@@ -210,7 +209,7 @@ public class MessageStore implements Closeable {
requestOffset,
msgStoreMgr.getMaxMsgTransferSize(),
maxIndexReadLength, partitionId,
true,
consumerNodeInfo.isFilterConsume(),
-
consumerNodeInfo.getFilterCondCodeSet());
+
consumerNodeInfo.getFilterCondCodeSet(), reqRcvTime);
}
}
} finally {
@@ -281,7 +280,7 @@ public class MessageStore implements Closeable {
consumerNodeInfo.getLastDataRdOffset(), reqNewOffset,
indexBuffer, consumerNodeInfo.isFilterConsume(),
consumerNodeInfo.getFilterCondCodeSet(),
- statisKeyBase, msgSizeLimit);
+ statisKeyBase, msgSizeLimit, reqRcvTime);
if (reqSwitch <= 1) {
retResult.setMaxOffset(getFileIndexMaxOffset());
} else {
@@ -300,23 +299,81 @@ public class MessageStore implements Closeable {
}
/***
+ * Get start offset by timestamp.
+ *
+ * @param timestamp timestamp
+ * @return start offset
+ */
+ public long getStartOffsetByTimeStamp(long timestamp) {
+ if (this.closed.get()) {
+ throw new IllegalStateException(new StringBuilder(512)
+ .append("[Data Store] Closed MessageStore for storeKey ")
+ .append(this.storeKey).toString());
+ }
+ if (timestamp <= this.msgFileStore.getIndexMaxAppendTime()) {
+ return this.msgFileStore.getStartOffsetByTimeStamp(timestamp);
+ }
+ this.writeCacheMutex.readLock().lock();
+ try {
+ // read from backup memory.
+ if (timestamp <= this.msgMemStoreBeingFlush.getRightAppendTime()) {
+ return this.msgMemStoreBeingFlush.getIndexStartWritePos();
+ }
+ // read from main memory.
+ return this.msgMemStore.getIndexStartWritePos();
+ } finally {
+ this.writeCacheMutex.readLock().unlock();
+ }
+ }
+
+ /***
* Append msg to store.
*
- * @param appendResult
- * @param dataLength
- * @param dataCheckSum
- * @param data
- * @param msgTypeCode
- * @param msgFlag
- * @param partitionId
- * @param sentAddr
- * @return
- * @throws IOException
+ * @param appendResult the append result
+ * @param dataLength the data length
+ * @param dataCheckSum the check sum of message data
+ * @param data the message data
+ * @param msgTypeCode the filter item hash code
+ * @param msgFlag the message flag
+ * @param partitionId the partitionId for append messages
+ * @param sentAddr the address to send the message to
+ *
+ * @return the process result
+ * @throws IOException exception while process
*/
- public boolean appendMsg(final AppendResult appendResult, final int
dataLength,
- final int dataCheckSum, final byte[] data,
- final int msgTypeCode, final int msgFlag,
- final int partitionId, final int sentAddr) throws
IOException {
+ public boolean appendMsg(AppendResult appendResult, int dataLength,
+ int dataCheckSum, byte[] data,
+ int msgTypeCode, int msgFlag,
+ int partitionId, int sentAddr) throws IOException
{
+ return appendMsg2(appendResult, dataLength, dataCheckSum, data,
+ msgTypeCode, msgFlag, partitionId, sentAddr,
+ System.currentTimeMillis(), 3, 2);
+ }
+
+ /***
+ * Append msg to store.
+ *
+ * @param appendResult the append result
+ * @param dataLength the data length
+ * @param dataCheckSum the check sum of message data
+ * @param data the message data
+ * @param msgTypeCode the filter item hash code
+ * @param msgFlag the message flag
+ * @param partitionId the partitionId for append messages
+ * @param sentAddr the address to send the message to
+ * @param receivedTime the received time of message
+ * @param count the retry count while full
+ * @param waitRetryMs the wait duration while retry
+ *
+ * @return the process result
+ * @throws IOException exception while process
+ */
+ public boolean appendMsg2(AppendResult appendResult, int dataLength,
+ int dataCheckSum, byte[] data,
+ int msgTypeCode, int msgFlag,
+ int partitionId, int sentAddr,
+ long receivedTime, int count,
+ long waitRetryMs) throws IOException {
if (this.closed.get()) {
throw new IllegalStateException(new StringBuilder(512)
.append("[Data Store] Closed MessageStore for storeKey ")
@@ -324,7 +381,6 @@ public class MessageStore implements Closeable {
}
long messageId = this.idWorker.nextId();
int msgBufLen = DataStoreUtils.STORE_DATA_HEADER_LEN + dataLength;
- final long receivedTime = System.currentTimeMillis();
final ByteBuffer buffer = ByteBuffer.allocate(msgBufLen);
buffer.putInt(DataStoreUtils.STORE_DATA_PREFX_LEN + dataLength);
buffer.putInt(DataStoreUtils.STORE_DATA_TOKER_BEGIN_VALUE);
@@ -339,7 +395,6 @@ public class MessageStore implements Closeable {
buffer.put(data);
buffer.flip();
appendResult.putReceivedInfo(messageId, receivedTime);
- int count = 3;
do {
this.writeCacheMutex.readLock().lock();
try {
@@ -356,7 +411,7 @@ public class MessageStore implements Closeable {
buffer, false, appendResult)) {
return true;
}
- ThreadUtils.sleep(1);
+ ThreadUtils.sleep(waitRetryMs);
} while (count-- >= 0);
msgMemStatisInfo.addWriteFailCount();
return false;
@@ -377,8 +432,9 @@ public class MessageStore implements Closeable {
/***
* Execute cleanup policy.
*
- * @param onlyCheck
- * @return
+ * @param onlyCheck whether only check status
+ *
+ * @return whether clear up segments
*/
public boolean runClearupPolicy(boolean onlyCheck) {
if (this.closed.get()) {
@@ -392,7 +448,7 @@ public class MessageStore implements Closeable {
/***
* Refresh unflush threshold
*
- * @param topicMetadata
+ * @param topicMetadata topic meta data
*/
public void refreshUnflushThreshold(TopicMetadata topicMetadata) {
if (this.closed.get()) {
@@ -428,7 +484,7 @@ public class MessageStore implements Closeable {
/***
* Flush file store to disk.
*
- * @throws IOException
+ * @throws IOException exception while process
*/
public void flushFile() throws IOException {
if (this.closed.get()) {
@@ -442,7 +498,7 @@ public class MessageStore implements Closeable {
/***
* Flush memory store to file.
*
- * @throws IOException
+ * @throws IOException exception while process
*/
public void flushMemCacheData() throws IOException {
if (this.closed.get()) {
@@ -603,6 +659,11 @@ public class MessageStore implements Closeable {
private int validAndGetMemCacheSize(TopicMetadata topicMetadata) {
int memCacheSize = topicMetadata.getMemCacheMsgSize();
if (memCacheSize < topicMetadata.getMinMemCacheSize()) {
+ logger.info(new StringBuilder(512)
+ .append("[Data Store] ").append(getTopic())
+ .append(" writeCacheMaxSize changed, from ")
+ .append(memCacheSize).append(" to ")
+ .append(topicMetadata.getMinMemCacheSize()).toString());
memCacheSize = topicMetadata.getMinMemCacheSize();
}
return memCacheSize;
@@ -611,21 +672,23 @@ public class MessageStore implements Closeable {
/***
* Append message and trigger flush operation.
*
- * @param partitionId
- * @param keyCode
- * @param receivedTime
- * @param entryLength
- * @param needAdd
- * @param entry
- * @param isTimeTrigger
- * @return
- * @throws IOException
+ * @param partitionId the partitionId for reading messages
+ * @param keyCode the filter item hash code
+ * @param receivedTime the received time of message
+ * @param entryLength the stored entry length
+ * @param needAdd whether to add a message
+ * @param entry the stored entry
+ * @param isTimeTrigger whether is timer trigger
+ * @param appendResult the append result
+ *
+ * @return the append result
+ * @throws IOException exception while process
*/
- private boolean triggerFlushAndAddMsg(final int partitionId, final int
keyCode,
- final long receivedTime, final int
entryLength,
- final boolean needAdd, final
ByteBuffer entry,
- final boolean isTimeTrigger,
- final AppendResult appendResult)
throws IOException {
+ private boolean triggerFlushAndAddMsg(int partitionId, int keyCode,
+ long receivedTime, int entryLength,
+ boolean needAdd, ByteBuffer entry,
+ boolean isTimeTrigger,
+ AppendResult appendResult) throws
IOException {
writeCacheMutex.writeLock().lock();
try {
if (!isFlushOngoing.get() &&
hasFlushBeenTriggered.compareAndSet(false, true)) {
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java
index 6b8e973..5414f09 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java
@@ -79,9 +79,9 @@ public class MessageStoreManager implements StoreService {
// message on memory sink to disk operation scheduler.
private final ScheduledExecutorService unFlushMemScheduler;
// max transfer size.
- private int maxMsgTransferSize;
+ private final int maxMsgTransferSize;
// the status that is deleting topic.
- private AtomicBoolean isRemovingTopic = new AtomicBoolean(false);
+ private final AtomicBoolean isRemovingTopic = new AtomicBoolean(false);
public MessageStoreManager(final TubeBroker tubeBroker,
final BrokerConfig tubeConfig) throws
IOException {
@@ -366,7 +366,7 @@ public class MessageStoreManager implements StoreService {
}
requestOffset = maxOffset - maxIndexReadSize < 0 ? 0L : maxOffset
- maxIndexReadSize;
return msgStore.getMessages(303, requestOffset, partitionId,
- consumerNodeInfo, topic, this.maxMsgTransferSize);
+ consumerNodeInfo, topic, this.maxMsgTransferSize, 0);
} catch (Throwable e1) {
return new GetMessageResult(false,
TErrCodeConstants.INTERNAL_SERVER_ERROR,
requestOffset, 0, "Get message failure, errMsg=" +
e1.getMessage());
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/FileSegment.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/FileSegment.java
index 42580bd..6839a2a 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/FileSegment.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/FileSegment.java
@@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.inlong.tubemq.corebase.TBaseConstants;
import org.apache.inlong.tubemq.corebase.utils.CheckSum;
import org.apache.inlong.tubemq.corebase.utils.ServiceStatusHolder;
import org.apache.inlong.tubemq.server.broker.metrics.BrokerMetricsHolder;
@@ -40,33 +41,38 @@ public class FileSegment implements Segment {
LoggerFactory.getLogger(FileSegment.class);
private final long start;
private final File file;
- private RandomAccessFile randFile;
- private FileChannel channel;
+ private final RandomAccessFile randFile;
+ private final FileChannel channel;
private final AtomicLong cachedSize;
private final AtomicLong flushedSize;
private final SegmentType segmentType;
- private boolean mutable;
+ private volatile boolean mutable = false;
private long expiredTime = 0;
- private AtomicBoolean expired = new AtomicBoolean(false);
- private AtomicBoolean closed = new AtomicBoolean(false);
-
- public FileSegment(final long start, final File file, SegmentType type)
throws IOException {
+ private final AtomicBoolean expired = new AtomicBoolean(false);
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+ // the first record append time
+ private final AtomicLong leftAppendTime =
+ new AtomicLong(TBaseConstants.META_VALUE_UNDEFINED);
+ // the latest record append time
+ private final AtomicLong rightAppendTime =
+ new AtomicLong(TBaseConstants.META_VALUE_UNDEFINED);
+
+ public FileSegment(long start, File file, SegmentType type) throws
IOException {
this(start, file, true, type, Long.MAX_VALUE);
}
- public FileSegment(final long start, final File file,
- final boolean mutable, SegmentType type) throws
IOException {
+ public FileSegment(long start, File file,
+ boolean mutable, SegmentType type) throws IOException {
this(start, file, mutable, type, Long.MAX_VALUE);
}
- public FileSegment(final long start, final File file,
- SegmentType type, final long checkOffset) throws
IOException {
+ public FileSegment(long start, File file,
+ SegmentType type, long checkOffset) throws IOException {
this(start, file, true, type, checkOffset);
}
- private FileSegment(final long start, final File file,
- final boolean mutable, SegmentType type,
- final long checkOffset) throws IOException {
+ private FileSegment(long start, File file, boolean mutable,
+ SegmentType type, long checkOffset) throws IOException
{
super();
this.segmentType = type;
this.start = start;
@@ -124,6 +130,18 @@ public class FileSegment implements Segment {
}
}
}
+ if (this.segmentType == SegmentType.INDEX) {
+ if (this.cachedSize.get() == 0) {
+ if (this.mutable) {
+ this.leftAppendTime.set(System.currentTimeMillis());
+ this.rightAppendTime.set(System.currentTimeMillis());
+ }
+ } else {
+ this.leftAppendTime.set(getRecordTime(this.start));
+ this.rightAppendTime.set(getRecordTime(this.start
+ + this.cachedSize.get() -
DataStoreUtils.STORE_INDEX_HEAD_LEN));
+ }
+ }
}
@Override
@@ -182,14 +200,17 @@ public class FileSegment implements Segment {
}
/***
- * Messages can only be appended to the last FileSegment. The last
FileSegment is writable, the others are mutable.
+ * Messages can only be appended to the last FileSegment.
+ * The last FileSegment is writable, the others are mutable.
*
- * @param buf
- * @return
- * @throws IOException
+ * @param buf data buffer
+ * @param leftTime the first record timestamp
+ * @param rightTime the latest record timestamp
+ * @return latest writable position
+ * @throws IOException exception while force data to disk
*/
@Override
- public long append(final ByteBuffer buf) throws IOException {
+ public long append(ByteBuffer buf, long leftTime, long rightTime) throws
IOException {
if (!this.mutable) {
if (this.segmentType == SegmentType.DATA) {
throw new UnsupportedOperationException("[File Store] Data
Segment is immutable!");
@@ -207,15 +228,22 @@ public class FileSegment implements Segment {
sizeInBytes += this.channel.write(buf);
}
this.cachedSize.addAndGet(sizeInBytes);
+ if (segmentType == SegmentType.INDEX) {
+ this.rightAppendTime.set(rightTime);
+ if (offset == 0) {
+ this.leftAppendTime.set(leftTime);
+ }
+ }
return this.start + offset;
}
/***
* Flush file cache to disk.
*
- * @param force
- * @return
- * @throws IOException
+ * @param force whether to brush
+ * @return the latest writable position
+ *
+ * @throws IOException exception while force data to disk
*/
@Override
public long flush(boolean force) throws IOException {
@@ -240,7 +268,7 @@ public class FileSegment implements Segment {
}
@Override
- public boolean contains(final long offset) {
+ public boolean contains(long offset) {
return (this.getCachedSize() == 0
&& offset == this.start
|| this.getCachedSize() > 0
@@ -249,7 +277,8 @@ public class FileSegment implements Segment {
}
/***
- * Release reference to this FileSegment. File's channel will be closed
when the reference decreased to 0.
+ * Release reference to this FileSegment.
+ * File's channel will be closed when the reference decreased to 0.
*/
@Override
public void relViewRef() {
@@ -269,7 +298,7 @@ public class FileSegment implements Segment {
/***
* Return the position that have been flushed to disk.
*
- * @return
+ * @return the position that have been flushed to disk
*/
@Override
public long getCommitLast() {
@@ -284,7 +313,7 @@ public class FileSegment implements Segment {
/***
* Set FileSegment to readonly.
*
- * @param mutable
+ * @param mutable mutable or immutable
*/
@Override
public void setMutable(boolean mutable) {
@@ -292,6 +321,30 @@ public class FileSegment implements Segment {
}
@Override
+ public long getLeftAppendTime() {
+ return leftAppendTime.get();
+ }
+
+ @Override
+ public long getRightAppendTime() {
+ return rightAppendTime.get();
+ }
+
+ @Override
+ public boolean containTime(long timestamp) {
+ if (this.getCachedSize() == 0) {
+ return this.mutable;
+ }
+ if (timestamp >= this.leftAppendTime.get()) {
+ if (this.mutable) {
+ return true;
+ }
+ return timestamp <= this.rightAppendTime.get();
+ }
+ return false;
+ }
+
+ @Override
public long getCachedSize() {
return this.cachedSize.get();
}
@@ -307,29 +360,66 @@ public class FileSegment implements Segment {
}
@Override
- public void read(final ByteBuffer bf, final long reqOffset) throws
IOException {
+ public void read(ByteBuffer bf, long absOffset) throws IOException {
if (this.isExpired()) {
//Todo: conduct file closed and expired cases.
}
int size = 0;
+ long startPos = absOffset - start;
while (bf.hasRemaining()) {
- final int l = this.channel.read(bf, reqOffset - start + size);
+ final int l = this.channel.read(bf, startPos + size);
+ if (l < 0) {
+ break;
+ }
+ size += l;
+ }
+ }
+
+ @Override
+ public void relRead(final ByteBuffer bf, long relOffset) throws
IOException {
+ if (this.isExpired()) {
+ //Todo: conduct file closed and expired cases.
+ }
+ int size = 0;
+ while (bf.hasRemaining()) {
+ final int l = this.channel.read(bf, relOffset + size);
+ if (l < 0) {
+ break;
+ }
+ size += l;
+ }
+ }
+
+ /***
+ * read index record's append time.
+ * @param reqOffset request offset.
+ * @return message append time.
+ */
+ @Override
+ public long getRecordTime(long reqOffset) throws IOException {
+ ByteBuffer readUnit =
ByteBuffer.allocate(DataStoreUtils.STORE_INDEX_HEAD_LEN);
+ int size = 0;
+ while (readUnit.hasRemaining()) {
+ final int l = this.channel.read(readUnit, reqOffset - start +
size);
if (l < 0) {
break;
}
size += l;
}
+ readUnit.flip();
+ return readUnit.getLong(DataStoreUtils.INDEX_POS_TIME_RECV);
}
/***
- * Check whether this FileSegment is expired, and set expire status. The
last FileSegment cannot be marked expired.
+ * Check whether this FileSegment is expired, and set expire status.
+ * The last FileSegment cannot be marked expired.
*
* @param checkTimestamp check timestamp.
* @param maxValidTimeMs the max expire interval in milliseconds.
* @return -1 means already expired, 0 means the last FileSegment, 1 means
expired.
*/
@Override
- public int checkAndSetExpired(final long checkTimestamp, final long
maxValidTimeMs) {
+ public int checkAndSetExpired(long checkTimestamp, long maxValidTimeMs) {
if (expired.get()) {
return -1;
}
@@ -347,7 +437,7 @@ public class FileSegment implements Segment {
return 0;
}
- private RecoverResult recoverData(final long checkOffset) throws
IOException {
+ private RecoverResult recoverData(long checkOffset) throws IOException {
if (!this.mutable) {
throw new UnsupportedOperationException(
"[File Store] The Data Segment must be mutable!");
@@ -414,7 +504,7 @@ public class FileSegment implements Segment {
return new RecoverResult(totalBytes - validBytes, false);
}
- private RecoverResult recoverIndex(final long checkOffset) throws
IOException {
+ private RecoverResult recoverIndex(long checkOffset) throws IOException {
if (!this.mutable) {
throw new UnsupportedOperationException(
"[File Store] The Index Segment must be mutable!");
@@ -475,8 +565,8 @@ public class FileSegment implements Segment {
}
private static class RecoverResult {
- private long truncated;
- private boolean isEqual;
+ private final long truncated;
+ private final boolean isEqual;
public RecoverResult(long truncated, boolean isEqual) {
this.truncated = truncated;
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/FileSegmentList.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/FileSegmentList.java
index 386a4c9..09a0763 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/FileSegmentList.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/FileSegmentList.java
@@ -29,7 +29,7 @@ public class FileSegmentList implements SegmentList {
private static final Logger logger =
LoggerFactory.getLogger(FileSegmentList.class);
// list of segments.
- private AtomicReference<Segment[]> segmentList =
+ private final AtomicReference<Segment[]> segmentList =
new AtomicReference<>();
public FileSegmentList(final Segment[] s) {
@@ -59,9 +59,9 @@ public class FileSegmentList implements SegmentList {
/***
* Return segment by the given offset.
*
- * @param offset
- * @return
- * @throws IOException
+ * @param offset the position to search
+ * @return the segment included the position
+ * @throws IOException the exception while searching
*/
@Override
public Segment getRecordSeg(final long offset) throws IOException {
@@ -88,12 +88,12 @@ public class FileSegmentList implements SegmentList {
/***
* Check each FileSegment whether is expired, and set expire status.
*
- * @param checkTimestamp
- * @param fileValidTimeMs
- * @return
+ * @param checkTimestamp current check timestamp
+ * @param fileValidTimeMs the max expire interval
+ * @return whether is expired
*/
@Override
- public boolean checkExpiredSegments(final long checkTimestamp, final long
fileValidTimeMs) {
+ public boolean checkExpiredSegments(long checkTimestamp, long
fileValidTimeMs) {
boolean hasExpired = false;
for (Segment segment : segmentList.get()) {
if (segment == null) {
@@ -110,7 +110,7 @@ public class FileSegmentList implements SegmentList {
/***
* Check FileSegments whether is expired, close all expired FileSegments,
and then delete these files.
*
- * @param sb
+ * @param sb string buffer
*/
@Override
public void delExpiredSegments(final StringBuilder sb) {
@@ -177,7 +177,7 @@ public class FileSegmentList implements SegmentList {
/***
* Return the start position of these FileSegments.
*
- * @return
+ * @return the first position
*/
@Override
public long getMinOffset() {
@@ -202,7 +202,7 @@ public class FileSegmentList implements SegmentList {
/***
* Return the max position of these FileSegments.
*
- * @return
+ * @return the latest position
*/
@Override
public long getMaxOffset() {
@@ -217,10 +217,23 @@ public class FileSegmentList implements SegmentList {
return last.getLast();
}
+ @Override
+ public long getMaxAppendTime() {
+ final Segment[] curViews = segmentList.get();
+ if (curViews.length == 0) {
+ return 0L;
+ }
+ Segment last = curViews[curViews.length - 1];
+ if (last == null) {
+ return 0L;
+ }
+ return last.getRightAppendTime();
+ }
+
/***
* Return the max position that have been flushed to disk.
*
- * @return
+ * @return the latest committed offset
*/
@Override
public long getCommitMaxOffset() {
@@ -254,8 +267,9 @@ public class FileSegmentList implements SegmentList {
/**
* Binary search the segment that contains the offset
- * @param offset
- * @return
+ *
+ * @param offset offset to search
+ * @return the segment includes the searched offset
*/
@Override
public Segment findSegment(final long offset) {
@@ -299,4 +313,63 @@ public class FileSegmentList implements SegmentList {
return null;
}
+ @Override
+ public Segment findSegmentByTimeStamp(long timestamp) {
+ final Segment[] curViews = segmentList.get();
+ if (curViews.length == 0) {
+ return null;
+ }
+ int minStart = 0;
+ for (minStart = 0; minStart < curViews.length; minStart++) {
+ if (curViews[minStart] == null
+ || curViews[minStart].isExpired()) {
+ continue;
+ }
+ break;
+ }
+ // Check boundaries
+ if (minStart >= curViews.length) {
+ minStart = curViews.length - 1;
+ }
+ int hight = curViews.length - 1;
+ final Segment startSeg = curViews[minStart];
+ if ((minStart == hight)
+ || (timestamp <= startSeg.getLeftAppendTime())) {
+ return startSeg;
+ }
+ Segment last = curViews[hight];
+ Segment before = curViews[hight - 1];
+ if (last.getCachedSize() > 0) {
+ if (timestamp > last.getLeftAppendTime()) {
+ return last;
+ }
+ }
+ if (timestamp > before.getRightAppendTime()) {
+ return last;
+ } else if (timestamp > before.getLeftAppendTime()) {
+ return before;
+ }
+ int mid = 0;
+ Segment found = null;
+ // Use dichotomy to find the first segment that contains the specified
timestamp
+ while (minStart <= hight) {
+ mid = hight + minStart >>> 1;
+ found = curViews[mid];
+ if (found.containTime(timestamp)) {
+ before = curViews[mid - 1];
+ if (timestamp > before.getRightAppendTime()) {
+ return found;
+ } else if (timestamp > before.getLeftAppendTime()) {
+ return before;
+ }
+ hight = mid - 1;
+ } else if (timestamp < found.getLeftAppendTime()) {
+ hight = mid - 1;
+ } else {
+ minStart = mid + 1;
+ }
+ }
+ return null;
+ }
+
}
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java
index 33495b4..2ea3f49 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java
@@ -106,16 +106,19 @@ public class MsgFileStore implements Closeable {
/***
* Batch append message to file segment
*
- * @param sb
- * @param msgCnt
- * @param indexSize
- * @param indexBuffer
- * @param dataSize
- * @param dataBuffer
+ * @param sb string buffer
+ * @param msgCnt the record count to append
+ * @param indexSize the index buffer length
+ * @param indexBuffer the index buffer to append
+ * @param dataSize the data buffer length
+ * @param dataBuffer the data buffer to append
+ * @param leftTime the first record timestamp
+ * @param rightTime the latest record timestamp
*/
- public void batchAppendMsg(final StringBuilder sb, final int msgCnt,
- final int indexSize, final ByteBuffer
indexBuffer,
- final int dataSize, final ByteBuffer
dataBuffer) throws Throwable {
+ public void batchAppendMsg(StringBuilder sb, int msgCnt,
+ int indexSize, ByteBuffer indexBuffer,
+ int dataSize, ByteBuffer dataBuffer,
+ long leftTime, long rightTime) throws Throwable
{
// append message, put in data file first, then index file.
if (this.closed.get()) {
throw new IllegalStateException(new StringBuilder(512)
@@ -135,7 +138,7 @@ public class MsgFileStore implements Closeable {
// filling data segment.
final Segment curDataSeg = this.dataSegments.last();
this.curUnflushSize.addAndGet(dataSize);
- final long dataOffset = curDataSeg.append(dataBuffer);
+ final long dataOffset = curDataSeg.append(dataBuffer, leftTime,
rightTime);
// judge whether need to create a new data segment.
if (curDataSeg.getCachedSize() >=
this.tubeConfig.getMaxSegmentSize()) {
isDataSegFlushed = true;
@@ -152,7 +155,7 @@ public class MsgFileStore implements Closeable {
// filling index data.
final long inDataOffset =
indexBuffer.getLong(DataStoreUtils.INDEX_POS_DATAOFFSET);
final Segment curIndexSeg = this.indexSegments.last();
- final long indexOffset = curIndexSeg.append(indexBuffer);
+ final long indexOffset = curIndexSeg.append(indexBuffer, leftTime,
rightTime);
// judge whether need to create a new index segment.
if (curIndexSeg.getCachedSize()
>= this.tubeConfig.getMaxIndexSegmentSize()) {
@@ -222,22 +225,25 @@ public class MsgFileStore implements Closeable {
/***
* Get message from index and data files.
*
- * @param partitionId
- * @param lastRdOffset
- * @param reqOffset
- * @param indexBuffer
- * @param isFilterConsume
- * @param filterKeySet
- * @param statisKeyBase
- * @param maxMsgTransferSize
- * @return
+ * @param partitionId the partitionId for reading messages
+ * @param lastRdOffset the recent data offset read before
+ * @param reqOffset the request index offset
+ * @param indexBuffer the index read buffer
+ * @param isFilterConsume whether to filter consumption
+ * @param filterKeySet filter item set
+ * @param statisKeyBase the statistical key prefix
+ * @param maxMsgTransferSize the max read message size
+ * @param reqRcvTime the timestamp of the record to be checked
+ *
+ * @return read result
*/
- public GetMessageResult getMessages(final int partitionId, final long
lastRdOffset,
- final long reqOffset, final ByteBuffer
indexBuffer,
- final boolean isFilterConsume,
- final Set<Integer> filterKeySet,
- final String statisKeyBase,
- final int maxMsgTransferSize) {
+ public GetMessageResult getMessages(int partitionId, long lastRdOffset,
+ long reqOffset, ByteBuffer indexBuffer,
+ boolean isFilterConsume,
+ Set<Integer> filterKeySet,
+ String statisKeyBase,
+ int maxMsgTransferSize,
+ long reqRcvTime) {
// #lizard forgives
// Orderly read from index file, then random read from data file.
int retCode = 0;
@@ -294,6 +300,9 @@ public class MsgFileStore implements Closeable {
readedOffset = curIndexOffset +
DataStoreUtils.STORE_INDEX_HEAD_LEN;
continue;
}
+ if (reqRcvTime != 0 && recvTimeInMillsec < reqRcvTime) {
+ continue;
+ }
try {
// get data from data file by index one by one.
if (recordSeg == null
@@ -374,6 +383,65 @@ public class MsgFileStore implements Closeable {
totalSize, countMap, transferedMessageList);
}
+ /***
+ * Get the segment start Offset that contains the specified timestamp
+ *
+ * @param timestamp the specified timestamp
+ *
+ * @return the start offset
+ */
+ public long getStartOffsetByTimeStamp(long timestamp) {
+ Segment recordSeg = indexSegments.findSegmentByTimeStamp(timestamp);
+ if (recordSeg == null || this.closed.get()) {
+ return -1;
+ }
+ long endPos = (recordSeg.getCommitLast() - recordSeg.getStart())
+ / DataStoreUtils.STORE_INDEX_HEAD_LEN - 1;
+ final long curDataMinOffset = getDataMinOffset();
+ final ByteBuffer readBuffer =
+ ByteBuffer.allocate(DataStoreUtils.STORE_INDEX_HEAD_LEN);
+ // check boundaries
+ if (endPos <= 0) {
+ return recordSeg.getStart();
+ }
+ long foundTime = getTimeStamp(recordSeg,
+ 0, curDataMinOffset, readBuffer);
+ if (timestamp < foundTime) {
+ return recordSeg.getStart();
+ }
+ foundTime = getTimeStamp(recordSeg,
+ endPos * DataStoreUtils.STORE_INDEX_HEAD_LEN,
+ curDataMinOffset, readBuffer);
+ if (timestamp > foundTime) {
+ return recordSeg.getStart() + endPos *
DataStoreUtils.STORE_INDEX_HEAD_LEN;
+ }
+ long midPos = 0;
+ long startPos = 0;
+ long firstLowPos = 0;
+ long firstEqualPos = -1;
+ // Dichotomy finds the first offset position less than the specified
time
+ while (startPos <= endPos) {
+ midPos = endPos + startPos >>> 1;
+ foundTime = getTimeStamp(recordSeg,
+ midPos * DataStoreUtils.STORE_INDEX_HEAD_LEN,
+ curDataMinOffset, readBuffer);
+ if (foundTime < timestamp) {
+ firstLowPos = midPos;
+ startPos = midPos + 1;
+ } else {
+ endPos = midPos - 1;
+ if (foundTime == timestamp) {
+ firstEqualPos = midPos;
+ }
+ }
+ }
+ if (firstEqualPos != -1) {
+ return recordSeg.getStart() + firstEqualPos *
DataStoreUtils.STORE_INDEX_HEAD_LEN;
+ } else {
+ return recordSeg.getStart() + firstLowPos *
DataStoreUtils.STORE_INDEX_HEAD_LEN;
+ }
+ }
+
@Override
public void close() throws IOException {
if (this.closed.compareAndSet(false, true)) {
@@ -390,8 +458,8 @@ public class MsgFileStore implements Closeable {
/***
* Clean expired data files and index files.
*
- * @param onlyCheck
- * @return
+ * @param onlyCheck whether to check only
+ * @return whether found expired segments
*/
public boolean runClearupPolicy(boolean onlyCheck) {
final StringBuilder sBuilder = new StringBuilder(512);
@@ -415,7 +483,7 @@ public class MsgFileStore implements Closeable {
/***
* Flush data to disk at interval.
*
- * @throws IOException
+ * @throws IOException exception while process
*/
public void flushDiskFile() throws IOException {
long checkTimestamp = System.currentTimeMillis();
@@ -444,7 +512,6 @@ public class MsgFileStore implements Closeable {
this.writeLock.unlock();
}
}
- return;
}
public long getDataSizeInBytes() {
@@ -471,6 +538,10 @@ public class MsgFileStore implements Closeable {
return this.indexSegments.getMaxOffset();
}
+ public long getIndexMaxAppendTime() {
+ return this.indexSegments.getMaxAppendTime();
+ }
+
public long getIndexMaxHighOffset() {
return this.indexSegments.getCommitMaxOffset();
}
@@ -483,7 +554,7 @@ public class MsgFileStore implements Closeable {
return indexSegments.getRecordSeg(offset);
}
- private void loadSegments(final SegmentType segType, long offsetIfCreate,
+ private void loadSegments(SegmentType segType, long offsetIfCreate,
StringBuilder sBuilder) throws IOException {
String segTypeStr = "Data";
File segListDir = this.dataDir;
@@ -577,7 +648,7 @@ public class MsgFileStore implements Closeable {
sBuilder.delete(0, sBuilder.length());
}
- private void validateSegments(final String segTypeStr, final List<Segment>
segments) {
+ private void validateSegments(String segTypeStr, final List<Segment>
segments) {
// valid segments, continuous
for (int i = 0; i < segments.size() - 1; i++) {
final Segment curr = segments.get(i);
@@ -592,4 +663,33 @@ public class MsgFileStore implements Closeable {
}
}
+ private long getTimeStamp(Segment recordSeg, long relReadPos,
+ long curDataMinOffset, ByteBuffer readBuffer) {
+ int curIndexPartitionId = 0;
+ long curIndexDataOffset = 0L;
+ int curIndexDataSize = 0;
+ int curIndexKeyCode = 0;
+ long recvTimeInMillsec = 0L;
+ try {
+ readBuffer.clear();
+ recordSeg.relRead(readBuffer, relReadPos);
+ readBuffer.flip();
+ curIndexPartitionId = readBuffer.getInt();
+ curIndexDataOffset = readBuffer.getLong();
+ curIndexDataSize = readBuffer.getInt();
+ curIndexKeyCode = readBuffer.getInt();
+ recvTimeInMillsec = readBuffer.getLong();
+ // skip when mismatch condition
+ if (curIndexDataOffset < 0
+ || curIndexDataSize <= 0
+ || curIndexDataSize >
DataStoreUtils.STORE_MAX_MESSAGE_STORE_LEN
+ || curIndexDataOffset < curDataMinOffset) {
+ return -1;
+ }
+ return recvTimeInMillsec;
+ } catch (Throwable ex) {
+ samplePrintCtrl.printExceptionCaught(ex);
+ return -1;
+ }
+ }
}
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/Segment.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/Segment.java
index 97ed0de..6ecd417 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/Segment.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/Segment.java
@@ -28,7 +28,17 @@ public interface Segment {
void close();
- long append(ByteBuffer buf) throws IOException;
+ /***
+ * Messages can only be appended to the last FileSegment.
+ * The last FileSegment is writable, the others are mutable.
+ *
+ * @param buf data buffer
+ * @param leftTime the first record timestamp
+ * @param rightTime the latest record timestamp
+ * @return latest writable position
+ * @throws IOException exception while force data to disk
+ */
+ long append(ByteBuffer buf, long leftTime, long rightTime) throws
IOException;
long flush(boolean force) throws IOException;
@@ -62,6 +72,27 @@ public interface Segment {
void relViewRef();
- void read(ByteBuffer bf, long offset) throws IOException;
+ /***
+ * Read data to buffer from absolute position.
+ *
+ * @param bf buffer to store data
+ * @param absOffset absolute read position
+ */
+ void read(ByteBuffer bf, long absOffset) throws IOException;
+ /***
+ * Read data to buffer from relative position.
+ *
+ * @param bf buffer to store data
+ * @param relOffset relative read position
+ */
+ void relRead(ByteBuffer bf, long relOffset) throws IOException;
+
+ long getLeftAppendTime();
+
+ long getRightAppendTime();
+
+ boolean containTime(long timestamp);
+
+ long getRecordTime(long reqOffset) throws IOException;
}
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/SegmentList.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/SegmentList.java
index fbd703e..cf4bed4 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/SegmentList.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/SegmentList.java
@@ -36,6 +36,8 @@ public interface SegmentList {
long getMaxOffset();
+ long getMaxAppendTime();
+
boolean checkExpiredSegments(long checkTimestamp, long fileValidTimeMs);
void delExpiredSegments(StringBuilder sb);
@@ -52,4 +54,5 @@ public interface SegmentList {
Segment getRecordSeg(long offset) throws IOException;
+ Segment findSegmentByTimeStamp(long timestamp);
}
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStore.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStore.java
index e662112..d5c9c15 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStore.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStore.java
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.inlong.tubemq.corebase.TBaseConstants;
import org.apache.inlong.tubemq.corebase.TErrCodeConstants;
@@ -55,13 +56,17 @@ public class MsgMemStore implements Closeable {
private final ConcurrentHashMap<Integer, Integer> keysMap =
new ConcurrentHashMap<>(100);
// where messages in memory will sink to disk
- private int maxDataCacheSize;
+ private final int maxDataCacheSize;
private long writeDataStartPos = -1;
- private ByteBuffer cacheDataSegment;
- private int maxIndexCacheSize;
+ private final ByteBuffer cacheDataSegment;
+ private final int maxIndexCacheSize;
private long writeIndexStartPos = -1;
- private ByteBuffer cachedIndexSegment;
- private int maxAllowedMsgCount;
+ private final ByteBuffer cachedIndexSegment;
+ private final int maxAllowedMsgCount;
+ private final AtomicLong leftAppendTime =
+ new AtomicLong(TBaseConstants.META_VALUE_UNDEFINED);
+ private final AtomicLong rightAppendTime =
+ new AtomicLong(TBaseConstants.META_VALUE_UNDEFINED);
public MsgMemStore(int maxCacheSize, int maxMsgCount, final BrokerConfig
tubeConfig) {
this.maxDataCacheSize = maxCacheSize;
@@ -69,6 +74,8 @@ public class MsgMemStore implements Closeable {
this.maxIndexCacheSize = this.maxAllowedMsgCount *
DataStoreUtils.STORE_INDEX_HEAD_LEN;
this.cacheDataSegment =
ByteBuffer.allocateDirect(this.maxDataCacheSize);
this.cachedIndexSegment =
ByteBuffer.allocateDirect(this.maxIndexCacheSize);
+ this.leftAppendTime.set(System.currentTimeMillis());
+ this.rightAppendTime.set(System.currentTimeMillis());
}
public void resetStartPos(long writeDataStartPos, long writeIndexStartPos)
{
@@ -77,10 +84,23 @@ public class MsgMemStore implements Closeable {
this.writeIndexStartPos = writeIndexStartPos;
}
- public boolean appendMsg(final MsgMemStatisInfo msgMemStatisInfo,
- final int partitionId, final int keyCode,
- final long timeRecv, final int entryLength,
- final ByteBuffer entry, final AppendResult
appendResult) {
+ /***
+ * Append message to memory cache
+ *
+ * @param msgMemStatisInfo statistical information object
+ * @param partitionId the partitionId for append messages
+ * @param keyCode the filter item hash code
+ * @param timeRecv the received timestamp
+ * @param entryLength the stored entry length
+ * @param entry the stored entry
+ * @param appendResult the append result
+ *
+ * @return the process result
+ */
+ public boolean appendMsg(MsgMemStatisInfo msgMemStatisInfo,
+ int partitionId, int keyCode,
+ long timeRecv, int entryLength,
+ ByteBuffer entry, AppendResult appendResult) {
boolean fullDataSize = false;
boolean fullIndexSize = false;
boolean fullCount = false;
@@ -114,6 +134,10 @@ public class MsgMemStore implements Closeable {
this.keysMap.put(keyCode, indexSizePos);
this.curMessageCount.getAndAdd(1);
msgMemStatisInfo.addMsgSizeStatis(timeRecv, entryLength);
+ this.rightAppendTime.set(timeRecv);
+ if (indexSizePos == 0) {
+ this.leftAppendTime.set(timeRecv);
+ }
} finally {
this.writeLock.unlock();
}
@@ -124,21 +148,23 @@ public class MsgMemStore implements Closeable {
/***
* Read from memory, read index, then data.
*
- * @param lstRdDataOffset
- * @param lstRdIndexOffset
- * @param maxReadSize
- * @param maxReadCount
- * @param partitionId
- * @param isSecond
- * @param isFilterConsume
- * @param filterKeySet
- * @return
+ * @param lstRdDataOffset the recent data offset read before
+ * @param lstRdIndexOffset the recent index offset read before
+ * @param maxReadSize the max read size
+ * @param maxReadCount the max read count
+ * @param partitionId the partitionId for reading messages
+ * @param isSecond whether read from secondary cache
+ * @param isFilterConsume whether to filter consumption
+ * @param filterKeySet filter item set
+ * @param reqRcvTime the timestamp of the record to be checked
+ *
+ * @return read result
*/
- public GetCacheMsgResult getMessages(final long lstRdDataOffset, final
long lstRdIndexOffset,
- final int maxReadSize, final int
maxReadCount,
- final int partitionId, final boolean
isSecond,
- final boolean isFilterConsume,
- final Set<Integer> filterKeySet) {
+ public GetCacheMsgResult getMessages(long lstRdDataOffset, long
lstRdIndexOffset,
+ int maxReadSize, int maxReadCount,
+ int partitionId, boolean isSecond,
+ boolean isFilterConsume, Set<Integer>
filterKeySet,
+ long reqRcvTime) {
// #lizard forgives
Integer lastWritePos = 0;
boolean hasMsg = false;
@@ -235,6 +261,9 @@ public class MsgMemStore implements Closeable {
readedSize += DataStoreUtils.STORE_INDEX_HEAD_LEN;
continue;
}
+ if (reqRcvTime != 0 && cTimeRecv < reqRcvTime) {
+ continue;
+ }
// read data file.
byte[] tmpArray = new byte[cDataSize];
final ByteBuffer buffer = ByteBuffer.wrap(tmpArray);
@@ -258,15 +287,14 @@ public class MsgMemStore implements Closeable {
/***
* Batch flush memory data to disk.
*
- * @param msgFileStore
- * @param strBuffer
- * @return
- * @throws IOException
+ * @param msgFileStore the file storage
+ * @param strBuffer the message buffer
+ * @throws IOException the exception while process
*/
- public boolean batchFlush(MsgFileStore msgFileStore,
- final StringBuilder strBuffer) throws Throwable {
+ public void batchFlush(MsgFileStore msgFileStore,
+ StringBuilder strBuffer) throws Throwable {
if (this.curMessageCount.get() == 0) {
- return true;
+ return;
}
ByteBuffer tmpIndexBuffer = this.cachedIndexSegment.asReadOnlyBuffer();
final ByteBuffer tmpDataReadBuf =
this.cacheDataSegment.asReadOnlyBuffer();
@@ -274,9 +302,9 @@ public class MsgMemStore implements Closeable {
tmpDataReadBuf.flip();
long startTime = System.currentTimeMillis();
msgFileStore.batchAppendMsg(strBuffer, curMessageCount.get(),
- cacheIndexOffset.get(), tmpIndexBuffer, cacheDataOffset.get(),
tmpDataReadBuf);
+ cacheIndexOffset.get(), tmpIndexBuffer, cacheDataOffset.get(),
+ tmpDataReadBuf, leftAppendTime.get(), rightAppendTime.get());
BrokerMetricsHolder.updSyncDataDurations(System.currentTimeMillis() -
startTime);
- return true;
}
public int getCurMsgCount() {
@@ -316,6 +344,27 @@ public class MsgMemStore implements Closeable {
return this.writeIndexStartPos + this.cacheIndexOffset.get();
}
+ public long getIndexStartWritePos() {
+ return writeIndexStartPos;
+ }
+
+ public long getLeftAppendTime() {
+ return leftAppendTime.get();
+ }
+
+ public long getRightAppendTime() {
+ return rightAppendTime.get();
+ }
+
+ public int isTimestampInHold(long timestamp) {
+ if (timestamp < this.leftAppendTime.get()) {
+ return -1;
+ } else if (timestamp > rightAppendTime.get()) {
+ return 1;
+ }
+ return 0;
+ }
+
public void clear() {
this.writeDataStartPos = -1;
this.writeIndexStartPos = -1;
@@ -324,6 +373,8 @@ public class MsgMemStore implements Closeable {
this.curMessageCount.set(0);
this.queuesMap.clear();
this.keysMap.clear();
+ this.leftAppendTime.set(System.currentTimeMillis());
+ this.rightAppendTime.set(System.currentTimeMillis());
}
@Override
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/StoreRepairAdmin.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/StoreRepairAdmin.java
index 14e7715..82f78ba 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/StoreRepairAdmin.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/StoreRepairAdmin.java
@@ -379,7 +379,7 @@ public class StoreRepairAdmin {
curPartSeg =
new FileSegment(queueOffset, newFile,
SegmentType.INDEX);
}
- curPartSeg.append(indexBuffer);
+ curPartSeg.append(indexBuffer, timeRecv, timeRecv);
gQueueOffset +=
DataStoreUtils.STORE_INDEX_HEAD_LEN;
if (curPartSeg.getCachedSize() >=
maxIndexSegmentSize) {
curPartSeg.flush(true);
diff --git
a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/FileSegmentListTest.java
b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/FileSegmentListTest.java
index 38d575c..0c38660 100644
---
a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/FileSegmentListTest.java
+++
b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/FileSegmentListTest.java
@@ -43,7 +43,8 @@ public class FileSegmentListTest {
Segment fileSegment = fileSegmentList.last();
String data = "abc";
// append data to last FileSegment.
- fileSegment.append(ByteBuffer.wrap(data.getBytes()));
+ long appendTime = System.currentTimeMillis();
+ fileSegment.append(ByteBuffer.wrap(data.getBytes()), appendTime,
appendTime);
fileSegment.flush(true);
// get view
Segment[] segmentList = fileSegmentList.getView();
@@ -70,7 +71,8 @@ public class FileSegmentListTest {
Segment fileSegment = new FileSegment(100L, file, true,
SegmentType.DATA);
String data = "abc";
// append data to last FileSegment.
- fileSegment.append(ByteBuffer.wrap(data.getBytes()));
+ long appendTime = System.currentTimeMillis();
+ fileSegment.append(ByteBuffer.wrap(data.getBytes()), appendTime,
appendTime);
fileSegment.flush(true);
fileSegmentList.append(fileSegment);
Segment[] segmentList = fileSegmentList.getView();
diff --git
a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/FileSegmentTest.java
b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/FileSegmentTest.java
index ca7c972..959fd83 100644
---
a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/FileSegmentTest.java
+++
b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/FileSegmentTest.java
@@ -40,8 +40,9 @@ public class FileSegmentTest {
byte[] bytes = data.getBytes();
ByteBuffer buf = ByteBuffer.wrap(bytes);
// append data to FileSegment.
- fileSegment.append(buf);
- fileSegment.append(buf);
+ long appendTime = System.currentTimeMillis();
+ fileSegment.append(buf, appendTime, appendTime);
+ fileSegment.append(buf, appendTime, appendTime);
} catch (IOException e) {
e.printStackTrace();
} finally {
@@ -64,7 +65,8 @@ public class FileSegmentTest {
byte[] bytes = data.getBytes();
ByteBuffer buf = ByteBuffer.wrap(bytes);
// append data to fileSegment.
- long offset = fileSegment.append(buf);
+ long appendTime = System.currentTimeMillis();
+ long offset = fileSegment.append(buf, appendTime, appendTime);
int limit = 1000;
// get view of fileSegment.
ByteBuffer readBuffer = ByteBuffer.allocate(limit);
diff --git
a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStoreTest.java
b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStoreTest.java
index 4bccc36..c08ab34 100644
---
a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStoreTest.java
+++
b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStoreTest.java
@@ -53,6 +53,6 @@ public class MsgMemStoreTest {
msgMemStore.appendMsg(msgMemStatisInfo, 0, 0,
System.currentTimeMillis(), 3, bf, appendResult);
// get messages
- GetCacheMsgResult getCacheMsgResult = msgMemStore.getMessages(0, 2,
1024, 1000, 0, false, false, null);
+ GetCacheMsgResult getCacheMsgResult = msgMemStore.getMessages(0, 2,
1024, 1000, 0, false, false, null, 0);
}
}