This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new c1e1dcd  [INLONG-2232][tubemq] Add start and end timestamp of segment 
(#2235)
c1e1dcd is described below

commit c1e1dcd4d1e1cc07badf6a0523c12b7673e1ecf5
Author: gosonzhang <[email protected]>
AuthorDate: Thu Jan 20 18:44:48 2022 +0800

    [INLONG-2232][tubemq] Add start and end timestamp of segment (#2235)
---
 .../tubemq/server/broker/BrokerServiceServer.java  |   2 +-
 .../server/broker/msgstore/MessageStore.java       | 193 ++++++++++++++-------
 .../broker/msgstore/MessageStoreManager.java       |   6 +-
 .../server/broker/msgstore/disk/FileSegment.java   | 158 +++++++++++++----
 .../broker/msgstore/disk/FileSegmentList.java      | 101 +++++++++--
 .../server/broker/msgstore/disk/MsgFileStore.java  | 164 +++++++++++++----
 .../server/broker/msgstore/disk/Segment.java       |  35 +++-
 .../server/broker/msgstore/disk/SegmentList.java   |   3 +
 .../server/broker/msgstore/mem/MsgMemStore.java    | 115 ++++++++----
 .../tubemq/server/tools/StoreRepairAdmin.java      |   2 +-
 .../broker/msgstore/disk/FileSegmentListTest.java  |   6 +-
 .../broker/msgstore/disk/FileSegmentTest.java      |   8 +-
 .../broker/msgstore/mem/MsgMemStoreTest.java       |   2 +-
 13 files changed, 605 insertions(+), 190 deletions(-)

diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java
index f6cfa46..86c66e4 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java
@@ -466,7 +466,7 @@ public class BrokerServiceServer implements 
BrokerReadService, BrokerWriteServic
             sb.delete(0, sb.length());
             GetMessageResult msgQueryResult =
                     msgStore.getMessages(reqSwitch, requestOffset,
-                            partitionId, consumerNodeInfo, baseKey, 
msgDataSizeLimit);
+                            partitionId, consumerNodeInfo, baseKey, 
msgDataSizeLimit, 0);
             offsetManager.bookOffset(group, topic, partitionId,
                     msgQueryResult.lastReadOffset, isManualCommitOffset,
                     msgQueryResult.transferedMessageList.isEmpty(), sb);
diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStore.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStore.java
index 22b330f..d27e78e 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStore.java
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStore.java
@@ -80,27 +80,27 @@ public class MessageStore implements Closeable {
     private final ExecutorService executor = 
Executors.newSingleThreadExecutor();
     private final AtomicBoolean closed = new AtomicBoolean(false);
     private volatile int partitionNum;
-    private AtomicInteger unflushInterval = new AtomicInteger(0);
-    private AtomicInteger unflushThreshold = new AtomicInteger(0);
-    private AtomicInteger unflushDataHold = new AtomicInteger(0);
+    private final AtomicInteger unflushInterval = new AtomicInteger(0);
+    private final AtomicInteger unflushThreshold = new AtomicInteger(0);
+    private final AtomicInteger unflushDataHold = new AtomicInteger(0);
     private volatile int writeCacheMaxSize;
     private volatile int writeCacheMaxCnt;
     private volatile int writeCacheFlushIntvl;
-    private AtomicLong maxFileValidDurMs = new AtomicLong(0);
+    private final AtomicLong maxFileValidDurMs = new AtomicLong(0);
     private int maxAllowRdSize = 262144;
-    private AtomicInteger memMaxIndexReadCnt = new AtomicInteger(6000);
-    private AtomicInteger fileMaxIndexReadCnt = new AtomicInteger(8000);
-    private AtomicInteger memMaxFilterIndexReadCnt
+    private final AtomicInteger memMaxIndexReadCnt = new AtomicInteger(6000);
+    private final AtomicInteger fileMaxIndexReadCnt = new AtomicInteger(8000);
+    private final AtomicInteger memMaxFilterIndexReadCnt
             = new AtomicInteger(memMaxIndexReadCnt.get() * 2);
-    private AtomicInteger fileMaxFilterIndexReadCnt
+    private final AtomicInteger fileMaxFilterIndexReadCnt
             = new AtomicInteger(fileMaxIndexReadCnt.get() * 3);
-    private AtomicInteger fileLowReqMaxFilterIndexReadCnt
+    private final AtomicInteger fileLowReqMaxFilterIndexReadCnt
             = new AtomicInteger(fileMaxIndexReadCnt.get() * 10);
-    private AtomicInteger fileMaxIndexReadSize
+    private final AtomicInteger fileMaxIndexReadSize
             = new AtomicInteger(this.fileMaxIndexReadCnt.get() * 
DataStoreUtils.STORE_INDEX_HEAD_LEN);
-    private AtomicInteger fileMaxFilterIndexReadSize
+    private final AtomicInteger fileMaxFilterIndexReadSize
             = new AtomicInteger(this.fileMaxFilterIndexReadCnt.get() * 
DataStoreUtils.STORE_INDEX_HEAD_LEN);
-    private AtomicInteger fileLowReqMaxFilterIndexReadSize
+    private final AtomicInteger fileLowReqMaxFilterIndexReadSize
             = new AtomicInteger(this.fileLowReqMaxFilterIndexReadCnt.get() * 
DataStoreUtils.STORE_INDEX_HEAD_LEN);
     private MsgMemStore msgMemStore;
     private MsgMemStore msgMemStoreBeingFlush;
@@ -154,21 +154,20 @@ public class MessageStore implements Closeable {
     /***
      * Get message from message store. Support the given offset, filter.
      *
-     * @param reqSwitch
-     * @param requestOffset
-     * @param partitionId
-     * @param consumerNodeInfo
-     * @param statisKeyBase
-     * @param msgSizeLimit
-     * @return
-     * @throws IOException
+     * @param reqSwitch            read message from where
+     * @param requestOffset        the request offset to read
+     * @param partitionId          the partitionId for reading messages
+     * @param consumerNodeInfo     the consumer object
+     * @param statisKeyBase        the statistical key prefix
+     * @param msgSizeLimit         the max read size
+     * @param reqRcvTime           the timestamp of the record to be checked
+     * @return                     read result
+     * @throws IOException         exception while process
      */
-    public GetMessageResult getMessages(int reqSwitch,
-                                        final long requestOffset,
-                                        final int partitionId,
-                                        final ConsumerNodeInfo 
consumerNodeInfo,
-                                        final String statisKeyBase,
-                                        int msgSizeLimit) throws IOException {
+    public GetMessageResult getMessages(int reqSwitch, long requestOffset,
+                                        int partitionId, ConsumerNodeInfo 
consumerNodeInfo,
+                                        String statisKeyBase, int msgSizeLimit,
+                                        long reqRcvTime) throws IOException {
         // #lizard forgives
         if (this.closed.get()) {
             throw new IllegalStateException(new StringBuilder(512)
@@ -201,7 +200,7 @@ public class MessageStore implements Closeable {
                                                 requestOffset, 
msgStoreMgr.getMaxMsgTransferSize(),
                                                 maxIndexReadLength, 
partitionId, false,
                                                 
consumerNodeInfo.isFilterConsume(),
-                                                
consumerNodeInfo.getFilterCondCodeSet());
+                                                
consumerNodeInfo.getFilterCondCodeSet(), reqRcvTime);
                             }
                         } else {
                             // read from backup memory.
@@ -210,7 +209,7 @@ public class MessageStore implements Closeable {
                                             requestOffset, 
msgStoreMgr.getMaxMsgTransferSize(),
                                             maxIndexReadLength, partitionId, 
true,
                                             consumerNodeInfo.isFilterConsume(),
-                                            
consumerNodeInfo.getFilterCondCodeSet());
+                                            
consumerNodeInfo.getFilterCondCodeSet(), reqRcvTime);
                         }
                     }
                 } finally {
@@ -281,7 +280,7 @@ public class MessageStore implements Closeable {
                 consumerNodeInfo.getLastDataRdOffset(), reqNewOffset,
                 indexBuffer, consumerNodeInfo.isFilterConsume(),
                 consumerNodeInfo.getFilterCondCodeSet(),
-                statisKeyBase, msgSizeLimit);
+                statisKeyBase, msgSizeLimit, reqRcvTime);
         if (reqSwitch <= 1) {
             retResult.setMaxOffset(getFileIndexMaxOffset());
         } else {
@@ -300,23 +299,81 @@ public class MessageStore implements Closeable {
     }
 
     /***
+     * Get start offset by timestamp.
+     *
+     * @param timestamp  timestamp
+     * @return start offset
+     */
+    public long getStartOffsetByTimeStamp(long timestamp) {
+        if (this.closed.get()) {
+            throw new IllegalStateException(new StringBuilder(512)
+                    .append("[Data Store] Closed MessageStore for storeKey ")
+                    .append(this.storeKey).toString());
+        }
+        if (timestamp <= this.msgFileStore.getIndexMaxAppendTime()) {
+            return this.msgFileStore.getStartOffsetByTimeStamp(timestamp);
+        }
+        this.writeCacheMutex.readLock().lock();
+        try {
+            // read from backup memory.
+            if (timestamp <= this.msgMemStoreBeingFlush.getRightAppendTime()) {
+                return this.msgMemStoreBeingFlush.getIndexStartWritePos();
+            }
+            // read from main memory.
+            return this.msgMemStore.getIndexStartWritePos();
+        } finally {
+            this.writeCacheMutex.readLock().unlock();
+        }
+    }
+
+    /***
      * Append msg to store.
      *
-     * @param appendResult
-     * @param dataLength
-     * @param dataCheckSum
-     * @param data
-     * @param msgTypeCode
-     * @param msgFlag
-     * @param partitionId
-     * @param sentAddr
-     * @return
-     * @throws IOException
+     * @param appendResult    the append result
+     * @param dataLength      the data length
+     * @param dataCheckSum    the check sum of message data
+     * @param data            the message data
+     * @param msgTypeCode     the filter item hash code
+     * @param msgFlag         the message flag
+     * @param partitionId     the partitionId for append messages
+     * @param sentAddr        the address to send the message to
+     *
+     * @return                the process result
+     * @throws IOException    exception while process
      */
-    public boolean appendMsg(final AppendResult appendResult, final int 
dataLength,
-                             final int dataCheckSum, final byte[] data,
-                             final int msgTypeCode, final int msgFlag,
-                             final int partitionId, final int sentAddr) throws 
IOException {
+    public boolean appendMsg(AppendResult appendResult, int dataLength,
+                             int dataCheckSum, byte[] data,
+                             int msgTypeCode, int msgFlag,
+                             int partitionId, int sentAddr) throws IOException 
{
+        return appendMsg2(appendResult, dataLength, dataCheckSum, data,
+                msgTypeCode, msgFlag, partitionId, sentAddr,
+                System.currentTimeMillis(), 3, 2);
+    }
+
+    /***
+     * Append msg to store.
+     *
+     * @param appendResult    the append result
+     * @param dataLength      the data length
+     * @param dataCheckSum    the check sum of message data
+     * @param data            the message data
+     * @param msgTypeCode     the filter item hash code
+     * @param msgFlag         the message flag
+     * @param partitionId     the partitionId for append messages
+     * @param sentAddr        the address to send the message to
+     * @param receivedTime    the received time of message
+     * @param count           the retry count while full
+     * @param waitRetryMs     the wait duration while retry
+     *
+     * @return                the process result
+     * @throws IOException    exception while process
+     */
+    public boolean appendMsg2(AppendResult appendResult, int dataLength,
+                              int dataCheckSum, byte[] data,
+                              int msgTypeCode, int msgFlag,
+                              int partitionId, int sentAddr,
+                              long receivedTime, int count,
+                              long waitRetryMs) throws IOException {
         if (this.closed.get()) {
             throw new IllegalStateException(new StringBuilder(512)
                     .append("[Data Store] Closed MessageStore for storeKey ")
@@ -324,7 +381,6 @@ public class MessageStore implements Closeable {
         }
         long messageId = this.idWorker.nextId();
         int msgBufLen = DataStoreUtils.STORE_DATA_HEADER_LEN + dataLength;
-        final long receivedTime = System.currentTimeMillis();
         final ByteBuffer buffer = ByteBuffer.allocate(msgBufLen);
         buffer.putInt(DataStoreUtils.STORE_DATA_PREFX_LEN + dataLength);
         buffer.putInt(DataStoreUtils.STORE_DATA_TOKER_BEGIN_VALUE);
@@ -339,7 +395,6 @@ public class MessageStore implements Closeable {
         buffer.put(data);
         buffer.flip();
         appendResult.putReceivedInfo(messageId, receivedTime);
-        int count = 3;
         do {
             this.writeCacheMutex.readLock().lock();
             try {
@@ -356,7 +411,7 @@ public class MessageStore implements Closeable {
                     buffer, false, appendResult)) {
                 return true;
             }
-            ThreadUtils.sleep(1);
+            ThreadUtils.sleep(waitRetryMs);
         } while (count-- >= 0);
         msgMemStatisInfo.addWriteFailCount();
         return false;
@@ -377,8 +432,9 @@ public class MessageStore implements Closeable {
     /***
      * Execute cleanup policy.
      *
-     * @param onlyCheck
-     * @return
+     * @param onlyCheck   whether only check status
+     *
+     * @return whether clear up segments
      */
     public boolean runClearupPolicy(boolean onlyCheck) {
         if (this.closed.get()) {
@@ -392,7 +448,7 @@ public class MessageStore implements Closeable {
     /***
      * Refresh unflush threshold
      *
-     * @param topicMetadata
+     * @param topicMetadata   topic meta data
      */
     public void refreshUnflushThreshold(TopicMetadata topicMetadata) {
         if (this.closed.get()) {
@@ -428,7 +484,7 @@ public class MessageStore implements Closeable {
     /***
      * Flush file store to disk.
      *
-     * @throws IOException
+     * @throws IOException exception while process
      */
     public void flushFile() throws IOException {
         if (this.closed.get()) {
@@ -442,7 +498,7 @@ public class MessageStore implements Closeable {
     /***
      * Flush memory store to file.
      *
-     * @throws IOException
+     * @throws IOException exception while process
      */
     public void flushMemCacheData() throws IOException {
         if (this.closed.get()) {
@@ -603,6 +659,11 @@ public class MessageStore implements Closeable {
     private int validAndGetMemCacheSize(TopicMetadata topicMetadata) {
         int memCacheSize = topicMetadata.getMemCacheMsgSize();
         if (memCacheSize < topicMetadata.getMinMemCacheSize()) {
+            logger.info(new StringBuilder(512)
+                    .append("[Data Store] ").append(getTopic())
+                    .append(" writeCacheMaxSize changed, from ")
+                    .append(memCacheSize).append(" to ")
+                    .append(topicMetadata.getMinMemCacheSize()).toString());
             memCacheSize = topicMetadata.getMinMemCacheSize();
         }
         return memCacheSize;
@@ -611,21 +672,23 @@ public class MessageStore implements Closeable {
     /***
      * Append message and trigger flush operation.
      *
-     * @param partitionId
-     * @param keyCode
-     * @param receivedTime
-     * @param entryLength
-     * @param needAdd
-     * @param entry
-     * @param isTimeTrigger
-     * @return
-     * @throws IOException
+     * @param partitionId       the partitionId for reading messages
+     * @param keyCode           the filter item hash code
+     * @param receivedTime      the received time of message
+     * @param entryLength       the stored entry length
+     * @param needAdd           whether to add a message
+     * @param entry             the stored entry
+     * @param isTimeTrigger     whether is timer trigger
+     * @param appendResult      the append result
+     *
+     * @return                  the append result
+     * @throws IOException      exception while process
      */
-    private boolean triggerFlushAndAddMsg(final int partitionId, final int 
keyCode,
-                                          final long receivedTime, final int 
entryLength,
-                                          final boolean needAdd, final 
ByteBuffer entry,
-                                          final boolean isTimeTrigger,
-                                          final AppendResult appendResult) 
throws IOException {
+    private boolean triggerFlushAndAddMsg(int partitionId, int keyCode,
+                                          long receivedTime, int entryLength,
+                                          boolean needAdd, ByteBuffer entry,
+                                          boolean isTimeTrigger,
+                                          AppendResult appendResult) throws 
IOException {
         writeCacheMutex.writeLock().lock();
         try {
             if (!isFlushOngoing.get() && 
hasFlushBeenTriggered.compareAndSet(false, true)) {
diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java
index 6b8e973..5414f09 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java
@@ -79,9 +79,9 @@ public class MessageStoreManager implements StoreService {
     // message on memory sink to disk operation scheduler.
     private final ScheduledExecutorService unFlushMemScheduler;
     // max transfer size.
-    private int maxMsgTransferSize;
+    private final int maxMsgTransferSize;
     // the status that is deleting topic.
-    private AtomicBoolean isRemovingTopic = new AtomicBoolean(false);
+    private final AtomicBoolean isRemovingTopic = new AtomicBoolean(false);
 
     public MessageStoreManager(final TubeBroker tubeBroker,
                                final BrokerConfig tubeConfig) throws 
IOException {
@@ -366,7 +366,7 @@ public class MessageStoreManager implements StoreService {
             }
             requestOffset = maxOffset - maxIndexReadSize < 0 ? 0L : maxOffset 
- maxIndexReadSize;
             return msgStore.getMessages(303, requestOffset, partitionId,
-                    consumerNodeInfo, topic, this.maxMsgTransferSize);
+                    consumerNodeInfo, topic, this.maxMsgTransferSize, 0);
         } catch (Throwable e1) {
             return new GetMessageResult(false, 
TErrCodeConstants.INTERNAL_SERVER_ERROR,
                     requestOffset, 0, "Get message failure, errMsg=" + 
e1.getMessage());
diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/FileSegment.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/FileSegment.java
index 42580bd..6839a2a 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/FileSegment.java
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/FileSegment.java
@@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+import org.apache.inlong.tubemq.corebase.TBaseConstants;
 import org.apache.inlong.tubemq.corebase.utils.CheckSum;
 import org.apache.inlong.tubemq.corebase.utils.ServiceStatusHolder;
 import org.apache.inlong.tubemq.server.broker.metrics.BrokerMetricsHolder;
@@ -40,33 +41,38 @@ public class FileSegment implements Segment {
             LoggerFactory.getLogger(FileSegment.class);
     private final long start;
     private final File file;
-    private RandomAccessFile randFile;
-    private FileChannel channel;
+    private final RandomAccessFile randFile;
+    private final FileChannel channel;
     private final AtomicLong cachedSize;
     private final AtomicLong flushedSize;
     private final SegmentType segmentType;
-    private boolean mutable;
+    private volatile boolean mutable = false;
     private long expiredTime = 0;
-    private AtomicBoolean expired = new AtomicBoolean(false);
-    private AtomicBoolean closed = new AtomicBoolean(false);
-
-    public FileSegment(final long start, final File file, SegmentType type) 
throws IOException {
+    private final AtomicBoolean expired = new AtomicBoolean(false);
+    private final AtomicBoolean closed = new AtomicBoolean(false);
+    // the first record append time
+    private final AtomicLong leftAppendTime =
+            new AtomicLong(TBaseConstants.META_VALUE_UNDEFINED);
+    // the latest record append time
+    private final AtomicLong rightAppendTime =
+            new AtomicLong(TBaseConstants.META_VALUE_UNDEFINED);
+
+    public FileSegment(long start, File file, SegmentType type) throws 
IOException {
         this(start, file, true, type, Long.MAX_VALUE);
     }
 
-    public FileSegment(final long start, final File file,
-                       final boolean mutable, SegmentType type) throws 
IOException {
+    public FileSegment(long start, File file,
+                       boolean mutable, SegmentType type) throws IOException {
         this(start, file, mutable, type, Long.MAX_VALUE);
     }
 
-    public FileSegment(final long start, final File file,
-                       SegmentType type, final long checkOffset) throws 
IOException {
+    public FileSegment(long start, File file,
+                       SegmentType type, long checkOffset) throws IOException {
         this(start, file, true, type, checkOffset);
     }
 
-    private FileSegment(final long start, final File file,
-                        final boolean mutable, SegmentType type,
-                        final long checkOffset) throws IOException {
+    private FileSegment(long start, File file, boolean mutable,
+                        SegmentType type, long checkOffset) throws IOException 
{
         super();
         this.segmentType = type;
         this.start = start;
@@ -124,6 +130,18 @@ public class FileSegment implements Segment {
                 }
             }
         }
+        if (this.segmentType == SegmentType.INDEX) {
+            if (this.cachedSize.get() == 0) {
+                if (this.mutable) {
+                    this.leftAppendTime.set(System.currentTimeMillis());
+                    this.rightAppendTime.set(System.currentTimeMillis());
+                }
+            } else {
+                this.leftAppendTime.set(getRecordTime(this.start));
+                this.rightAppendTime.set(getRecordTime(this.start
+                        + this.cachedSize.get() - 
DataStoreUtils.STORE_INDEX_HEAD_LEN));
+            }
+        }
     }
 
     @Override
@@ -182,14 +200,17 @@ public class FileSegment implements Segment {
     }
 
     /***
-     * Messages can only be appended to the last FileSegment. The last 
FileSegment is writable, the others are mutable.
+     * Messages can only be appended to the last FileSegment.
+     * The last FileSegment is writable, the others are mutable.
      *
-     * @param buf
-     * @return
-     * @throws IOException
+     * @param buf            data buffer
+     * @param leftTime       the first record timestamp
+     * @param rightTime      the latest record timestamp
+     * @return               latest writable position
+     * @throws IOException   exception while force data to disk
      */
     @Override
-    public long append(final ByteBuffer buf) throws IOException {
+    public long append(ByteBuffer buf, long leftTime, long rightTime) throws 
IOException {
         if (!this.mutable) {
             if (this.segmentType == SegmentType.DATA) {
                 throw new UnsupportedOperationException("[File Store] Data 
Segment is immutable!");
@@ -207,15 +228,22 @@ public class FileSegment implements Segment {
             sizeInBytes += this.channel.write(buf);
         }
         this.cachedSize.addAndGet(sizeInBytes);
+        if (segmentType == SegmentType.INDEX) {
+            this.rightAppendTime.set(rightTime);
+            if (offset == 0) {
+                this.leftAppendTime.set(leftTime);
+            }
+        }
         return this.start + offset;
     }
 
     /***
      * Flush file cache to disk.
      *
-     * @param force
-     * @return
-     * @throws IOException
+     * @param force whether to brush
+     * @return the latest writable position
+     *
+     * @throws IOException exception while force data to disk
      */
     @Override
     public long flush(boolean force) throws IOException {
@@ -240,7 +268,7 @@ public class FileSegment implements Segment {
     }
 
     @Override
-    public boolean contains(final long offset) {
+    public boolean contains(long offset) {
         return (this.getCachedSize() == 0
                 && offset == this.start
                 || this.getCachedSize() > 0
@@ -249,7 +277,8 @@ public class FileSegment implements Segment {
     }
 
     /***
-     * Release reference to this FileSegment. File's channel will be closed 
when the reference decreased to 0.
+     * Release reference to this FileSegment.
+     * File's channel will be closed when the reference decreased to 0.
      */
     @Override
     public void relViewRef() {
@@ -269,7 +298,7 @@ public class FileSegment implements Segment {
     /***
      * Return the position that have been flushed to disk.
      *
-     * @return
+     * @return the position that have been flushed to disk
      */
     @Override
     public long getCommitLast() {
@@ -284,7 +313,7 @@ public class FileSegment implements Segment {
     /***
      * Set FileSegment to readonly.
      *
-     * @param mutable
+     * @param mutable mutable or immutable
      */
     @Override
     public void setMutable(boolean mutable) {
@@ -292,6 +321,30 @@ public class FileSegment implements Segment {
     }
 
     @Override
+    public long getLeftAppendTime() {
+        return leftAppendTime.get();
+    }
+
+    @Override
+    public long getRightAppendTime() {
+        return rightAppendTime.get();
+    }
+
+    @Override
+    public boolean containTime(long timestamp) {
+        if (this.getCachedSize() == 0) {
+            return this.mutable;
+        }
+        if (timestamp >= this.leftAppendTime.get()) {
+            if (this.mutable) {
+                return true;
+            }
+            return timestamp <= this.rightAppendTime.get();
+        }
+        return false;
+    }
+
+    @Override
     public long getCachedSize() {
         return this.cachedSize.get();
     }
@@ -307,29 +360,66 @@ public class FileSegment implements Segment {
     }
 
     @Override
-    public void read(final ByteBuffer bf, final long reqOffset) throws 
IOException {
+    public void read(ByteBuffer bf, long absOffset) throws IOException {
         if (this.isExpired()) {
             //Todo: conduct file closed and expired cases.
         }
         int size = 0;
+        long startPos  = absOffset - start;
         while (bf.hasRemaining()) {
-            final int l = this.channel.read(bf, reqOffset - start + size);
+            final int l = this.channel.read(bf, startPos + size);
+            if (l < 0) {
+                break;
+            }
+            size += l;
+        }
+    }
+
+    @Override
+    public void relRead(final ByteBuffer bf, long relOffset) throws 
IOException {
+        if (this.isExpired()) {
+            //Todo: conduct file closed and expired cases.
+        }
+        int size = 0;
+        while (bf.hasRemaining()) {
+            final int l = this.channel.read(bf, relOffset + size);
+            if (l < 0) {
+                break;
+            }
+            size += l;
+        }
+    }
+
+    /***
+     * read index record's append time.
+     * @param reqOffset request offset.
+     * @return message append time.
+     */
+    @Override
+    public long getRecordTime(long reqOffset) throws IOException {
+        ByteBuffer readUnit = 
ByteBuffer.allocate(DataStoreUtils.STORE_INDEX_HEAD_LEN);
+        int size = 0;
+        while (readUnit.hasRemaining()) {
+            final int l = this.channel.read(readUnit, reqOffset - start + 
size);
             if (l < 0) {
                 break;
             }
             size += l;
         }
+        readUnit.flip();
+        return readUnit.getLong(DataStoreUtils.INDEX_POS_TIME_RECV);
     }
 
     /***
-     * Check whether this FileSegment is expired, and set expire status. The 
last FileSegment cannot be marked expired.
+     * Check whether this FileSegment is expired, and set expire status.
+     * The last FileSegment cannot be marked expired.
      *
      * @param checkTimestamp check timestamp.
      * @param maxValidTimeMs the max expire interval in milliseconds.
      * @return -1 means already expired, 0 means the last FileSegment, 1 means 
expired.
      */
     @Override
-    public int checkAndSetExpired(final long checkTimestamp, final long 
maxValidTimeMs) {
+    public int checkAndSetExpired(long checkTimestamp, long maxValidTimeMs) {
         if (expired.get()) {
             return -1;
         }
@@ -347,7 +437,7 @@ public class FileSegment implements Segment {
         return 0;
     }
 
-    private RecoverResult recoverData(final long checkOffset) throws 
IOException {
+    private RecoverResult recoverData(long checkOffset) throws IOException {
         if (!this.mutable) {
             throw new UnsupportedOperationException(
                     "[File Store] The Data Segment must be mutable!");
@@ -414,7 +504,7 @@ public class FileSegment implements Segment {
         return new RecoverResult(totalBytes - validBytes, false);
     }
 
-    private RecoverResult recoverIndex(final long checkOffset) throws 
IOException {
+    private RecoverResult recoverIndex(long checkOffset) throws IOException {
         if (!this.mutable) {
             throw new UnsupportedOperationException(
                     "[File Store] The Index Segment must be mutable!");
@@ -475,8 +565,8 @@ public class FileSegment implements Segment {
     }
 
     private static class RecoverResult {
-        private long truncated;
-        private boolean isEqual;
+        private final long truncated;
+        private final boolean isEqual;
 
         public RecoverResult(long truncated, boolean isEqual) {
             this.truncated = truncated;
diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/FileSegmentList.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/FileSegmentList.java
index 386a4c9..09a0763 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/FileSegmentList.java
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/FileSegmentList.java
@@ -29,7 +29,7 @@ public class FileSegmentList implements SegmentList {
     private static final Logger logger =
             LoggerFactory.getLogger(FileSegmentList.class);
     // list of segments.
-    private AtomicReference<Segment[]> segmentList =
+    private final AtomicReference<Segment[]> segmentList =
             new AtomicReference<>();
 
     public FileSegmentList(final Segment[] s) {
@@ -59,9 +59,9 @@ public class FileSegmentList implements SegmentList {
     /***
      * Return segment by the given offset.
      *
-     * @param offset
-     * @return
-     * @throws IOException
+     * @param offset     the position to search
+     * @return           the segment included the position
+     * @throws IOException  the exception while searching
      */
     @Override
     public Segment getRecordSeg(final long offset) throws IOException {
@@ -88,12 +88,12 @@ public class FileSegmentList implements SegmentList {
     /***
      * Check each FileSegment whether is expired, and set expire status.
      *
-     * @param checkTimestamp
-     * @param fileValidTimeMs
-     * @return
+     * @param checkTimestamp   current check timestamp
+     * @param fileValidTimeMs  the max expire interval
+     * @return                 whether is expired
      */
     @Override
-    public boolean checkExpiredSegments(final long checkTimestamp, final long 
fileValidTimeMs) {
+    public boolean checkExpiredSegments(long checkTimestamp, long 
fileValidTimeMs) {
         boolean hasExpired = false;
         for (Segment segment : segmentList.get()) {
             if (segment == null) {
@@ -110,7 +110,7 @@ public class FileSegmentList implements SegmentList {
     /***
      * Check FileSegments whether is expired, close all expired FileSegments, 
and then delete these files.
      *
-     * @param sb
+     * @param sb   string buffer
      */
     @Override
     public void delExpiredSegments(final StringBuilder sb) {
@@ -177,7 +177,7 @@ public class FileSegmentList implements SegmentList {
     /***
      * Return the start position of these FileSegments.
      *
-     * @return
+     * @return  the first position
      */
     @Override
     public long getMinOffset() {
@@ -202,7 +202,7 @@ public class FileSegmentList implements SegmentList {
     /***
      * Return the max position of these FileSegments.
      *
-     * @return
+     * @return   the latest position
      */
     @Override
     public long getMaxOffset() {
@@ -217,10 +217,23 @@ public class FileSegmentList implements SegmentList {
         return last.getLast();
     }
 
+    @Override
+    public long getMaxAppendTime() {
+        final Segment[] curViews = segmentList.get();
+        if (curViews.length == 0) {
+            return 0L;
+        }
+        Segment last = curViews[curViews.length - 1];
+        if (last == null) {
+            return 0L;
+        }
+        return last.getRightAppendTime();
+    }
+
     /***
      * Return the max position that have been flushed to disk.
      *
-     * @return
+     * @return  the latest committed offset
      */
     @Override
     public long getCommitMaxOffset() {
@@ -254,8 +267,9 @@ public class FileSegmentList implements SegmentList {
 
     /**
      *  Binary search the segment that contains the offset
-     * @param offset
-     * @return
+     *
+     * @param offset    offset to search
+     * @return          the segment includes the searched offset
      */
     @Override
     public Segment findSegment(final long offset) {
@@ -299,4 +313,63 @@ public class FileSegmentList implements SegmentList {
         return null;
     }
 
+    @Override
+    public Segment findSegmentByTimeStamp(long timestamp) {
+        final Segment[] curViews = segmentList.get();
+        if (curViews.length == 0) {
+            return null;
+        }
+        int minStart  = 0;
+        for (minStart = 0; minStart < curViews.length; minStart++) {
+            if (curViews[minStart] == null
+                    || curViews[minStart].isExpired()) {
+                continue;
+            }
+            break;
+        }
+        // Check boundaries
+        if (minStart >= curViews.length) {
+            minStart = curViews.length - 1;
+        }
+        int hight = curViews.length - 1;
+        final Segment startSeg = curViews[minStart];
+        if ((minStart == hight)
+                || (timestamp <= startSeg.getLeftAppendTime())) {
+            return startSeg;
+        }
+        Segment last = curViews[hight];
+        Segment before = curViews[hight - 1];
+        if (last.getCachedSize() > 0) {
+            if (timestamp > last.getLeftAppendTime()) {
+                return last;
+            }
+        }
+        if (timestamp > before.getRightAppendTime()) {
+            return last;
+        } else if (timestamp > before.getLeftAppendTime()) {
+            return before;
+        }
+        int mid = 0;
+        Segment found = null;
+        // Use dichotomy to find the first segment that contains the specified 
timestamp
+        while (minStart <= hight) {
+            mid = hight + minStart >>> 1;
+            found = curViews[mid];
+            if (found.containTime(timestamp)) {
+                before = curViews[mid - 1];
+                if (timestamp > before.getRightAppendTime()) {
+                    return found;
+                } else if (timestamp > before.getLeftAppendTime()) {
+                    return before;
+                }
+                hight = mid - 1;
+            } else if (timestamp < found.getLeftAppendTime()) {
+                hight = mid - 1;
+            } else {
+                minStart = mid + 1;
+            }
+        }
+        return null;
+    }
+
 }
diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java
index 33495b4..2ea3f49 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java
@@ -106,16 +106,19 @@ public class MsgFileStore implements Closeable {
     /***
      * Batch append message to file segment
      *
-     * @param sb
-     * @param msgCnt
-     * @param indexSize
-     * @param indexBuffer
-     * @param dataSize
-     * @param dataBuffer
+     * @param sb             string buffer
+     * @param msgCnt         the record count to append
+     * @param indexSize      the index buffer length
+     * @param indexBuffer    the index buffer to append
+     * @param dataSize       the data buffer length
+     * @param dataBuffer     the data buffer to append
+     * @param leftTime       the first record timestamp
+     * @param rightTime      the latest record timestamp
      */
-    public void batchAppendMsg(final StringBuilder sb, final int msgCnt,
-                               final int indexSize, final ByteBuffer 
indexBuffer,
-                               final int dataSize, final ByteBuffer 
dataBuffer) throws Throwable {
+    public void batchAppendMsg(StringBuilder sb, int msgCnt,
+                               int indexSize, ByteBuffer indexBuffer,
+                               int dataSize, ByteBuffer dataBuffer,
+                               long leftTime, long rightTime) throws Throwable 
{
         // append message, put in data file first, then index file.
         if (this.closed.get()) {
             throw new IllegalStateException(new StringBuilder(512)
@@ -135,7 +138,7 @@ public class MsgFileStore implements Closeable {
             // filling data segment.
             final Segment curDataSeg = this.dataSegments.last();
             this.curUnflushSize.addAndGet(dataSize);
-            final long dataOffset = curDataSeg.append(dataBuffer);
+            final long dataOffset = curDataSeg.append(dataBuffer, leftTime, 
rightTime);
             // judge whether need to create a new data segment.
             if (curDataSeg.getCachedSize() >= 
this.tubeConfig.getMaxSegmentSize()) {
                 isDataSegFlushed = true;
@@ -152,7 +155,7 @@ public class MsgFileStore implements Closeable {
             // filling index data.
             final long inDataOffset = 
indexBuffer.getLong(DataStoreUtils.INDEX_POS_DATAOFFSET);
             final Segment curIndexSeg = this.indexSegments.last();
-            final long indexOffset = curIndexSeg.append(indexBuffer);
+            final long indexOffset = curIndexSeg.append(indexBuffer, leftTime, 
rightTime);
             // judge whether need to create a new index segment.
             if (curIndexSeg.getCachedSize()
                 >= this.tubeConfig.getMaxIndexSegmentSize()) {
@@ -222,22 +225,25 @@ public class MsgFileStore implements Closeable {
     /***
      * Get message from index and data files.
      *
-     * @param partitionId
-     * @param lastRdOffset
-     * @param reqOffset
-     * @param indexBuffer
-     * @param isFilterConsume
-     * @param filterKeySet
-     * @param statisKeyBase
-     * @param maxMsgTransferSize
-     * @return
+     * @param partitionId           the partitionId for reading messages
+     * @param lastRdOffset          the recent data offset read before
+     * @param reqOffset             the request index offset
+     * @param indexBuffer           the index read buffer
+     * @param isFilterConsume       whether to filter consumption
+     * @param filterKeySet          filter item set
+     * @param statisKeyBase         the statistical key prefix
+     * @param maxMsgTransferSize    the max read message size
+     * @param reqRcvTime            the timestamp of the record to be checked
+     *
+     * @return                      read result
      */
-    public GetMessageResult getMessages(final int partitionId, final long 
lastRdOffset,
-                                        final long reqOffset, final ByteBuffer 
indexBuffer,
-                                        final boolean isFilterConsume,
-                                        final Set<Integer> filterKeySet,
-                                        final String statisKeyBase,
-                                        final int maxMsgTransferSize) {
+    public GetMessageResult getMessages(int partitionId, long lastRdOffset,
+                                        long reqOffset, ByteBuffer indexBuffer,
+                                        boolean isFilterConsume,
+                                        Set<Integer> filterKeySet,
+                                        String statisKeyBase,
+                                        int maxMsgTransferSize,
+                                        long reqRcvTime) {
         // #lizard forgives
         // Orderly read from index file, then random read from data file.
         int retCode = 0;
@@ -294,6 +300,9 @@ public class MsgFileStore implements Closeable {
                 readedOffset = curIndexOffset + 
DataStoreUtils.STORE_INDEX_HEAD_LEN;
                 continue;
             }
+            if (reqRcvTime != 0 && recvTimeInMillsec < reqRcvTime) {
+                continue;
+            }
             try {
                 // get data from data file by index one by one.
                 if (recordSeg == null
@@ -374,6 +383,65 @@ public class MsgFileStore implements Closeable {
                 totalSize, countMap, transferedMessageList);
     }
 
+    /***
+     * Get the segment start Offset that contains the specified timestamp
+     *
+     * @param timestamp           the specified timestamp
+     *
+     * @return                    the start offset
+     */
+    public long getStartOffsetByTimeStamp(long timestamp) {
+        Segment recordSeg = indexSegments.findSegmentByTimeStamp(timestamp);
+        if (recordSeg == null || this.closed.get()) {
+            return -1;
+        }
+        long endPos = (recordSeg.getCommitLast() - recordSeg.getStart())
+                / DataStoreUtils.STORE_INDEX_HEAD_LEN - 1;
+        final long curDataMinOffset = getDataMinOffset();
+        final ByteBuffer readBuffer =
+                ByteBuffer.allocate(DataStoreUtils.STORE_INDEX_HEAD_LEN);
+        // check boundaries
+        if (endPos <= 0) {
+            return recordSeg.getStart();
+        }
+        long foundTime = getTimeStamp(recordSeg,
+                0, curDataMinOffset, readBuffer);
+        if (timestamp < foundTime) {
+            return recordSeg.getStart();
+        }
+        foundTime = getTimeStamp(recordSeg,
+                endPos * DataStoreUtils.STORE_INDEX_HEAD_LEN,
+                curDataMinOffset, readBuffer);
+        if (timestamp > foundTime) {
+            return recordSeg.getStart() + endPos * 
DataStoreUtils.STORE_INDEX_HEAD_LEN;
+        }
+        long midPos = 0;
+        long startPos = 0;
+        long firstLowPos = 0;
+        long firstEqualPos = -1;
+        // Dichotomy finds the first offset position less than the specified 
time
+        while (startPos <= endPos) {
+            midPos = endPos + startPos >>> 1;
+            foundTime = getTimeStamp(recordSeg,
+                    midPos * DataStoreUtils.STORE_INDEX_HEAD_LEN,
+                    curDataMinOffset, readBuffer);
+            if (foundTime < timestamp) {
+                firstLowPos = midPos;
+                startPos = midPos + 1;
+            } else {
+                endPos = midPos - 1;
+                if (foundTime == timestamp) {
+                    firstEqualPos = midPos;
+                }
+            }
+        }
+        if (firstEqualPos != -1) {
+            return recordSeg.getStart() + firstEqualPos * 
DataStoreUtils.STORE_INDEX_HEAD_LEN;
+        } else {
+            return recordSeg.getStart() + firstLowPos * 
DataStoreUtils.STORE_INDEX_HEAD_LEN;
+        }
+    }
+
     @Override
     public void close() throws IOException {
         if (this.closed.compareAndSet(false, true)) {
@@ -390,8 +458,8 @@ public class MsgFileStore implements Closeable {
     /***
      * Clean expired data files and index files.
      *
-     * @param onlyCheck
-     * @return
+     * @param onlyCheck   whether to check only
+     * @return            whether found expired segments
      */
     public boolean runClearupPolicy(boolean onlyCheck) {
         final StringBuilder sBuilder = new StringBuilder(512);
@@ -415,7 +483,7 @@ public class MsgFileStore implements Closeable {
     /***
      * Flush data to disk at interval.
      *
-     * @throws IOException
+     * @throws IOException exception while process
      */
     public void flushDiskFile() throws IOException {
         long checkTimestamp = System.currentTimeMillis();
@@ -444,7 +512,6 @@ public class MsgFileStore implements Closeable {
                 this.writeLock.unlock();
             }
         }
-        return;
     }
 
     public long getDataSizeInBytes() {
@@ -471,6 +538,10 @@ public class MsgFileStore implements Closeable {
         return this.indexSegments.getMaxOffset();
     }
 
+    public long getIndexMaxAppendTime() {
+        return this.indexSegments.getMaxAppendTime();
+    }
+
     public long getIndexMaxHighOffset() {
         return this.indexSegments.getCommitMaxOffset();
     }
@@ -483,7 +554,7 @@ public class MsgFileStore implements Closeable {
         return indexSegments.getRecordSeg(offset);
     }
 
-    private void loadSegments(final SegmentType segType, long offsetIfCreate,
+    private void loadSegments(SegmentType segType, long offsetIfCreate,
                               StringBuilder sBuilder) throws IOException {
         String segTypeStr = "Data";
         File   segListDir = this.dataDir;
@@ -577,7 +648,7 @@ public class MsgFileStore implements Closeable {
         sBuilder.delete(0, sBuilder.length());
     }
 
-    private void validateSegments(final String segTypeStr, final List<Segment> 
segments) {
+    private void validateSegments(String segTypeStr, final List<Segment> 
segments) {
         // valid segments, continuous
         for (int i = 0; i < segments.size() - 1; i++) {
             final Segment curr = segments.get(i);
@@ -592,4 +663,33 @@ public class MsgFileStore implements Closeable {
         }
     }
 
+    private long getTimeStamp(Segment recordSeg, long relReadPos,
+                              long curDataMinOffset, ByteBuffer readBuffer) {
+        int curIndexPartitionId = 0;
+        long curIndexDataOffset = 0L;
+        int curIndexDataSize = 0;
+        int curIndexKeyCode = 0;
+        long recvTimeInMillsec = 0L;
+        try {
+            readBuffer.clear();
+            recordSeg.relRead(readBuffer, relReadPos);
+            readBuffer.flip();
+            curIndexPartitionId = readBuffer.getInt();
+            curIndexDataOffset = readBuffer.getLong();
+            curIndexDataSize = readBuffer.getInt();
+            curIndexKeyCode = readBuffer.getInt();
+            recvTimeInMillsec = readBuffer.getLong();
+            // skip when mismatch condition
+            if (curIndexDataOffset < 0
+                    || curIndexDataSize <= 0
+                    || curIndexDataSize > 
DataStoreUtils.STORE_MAX_MESSAGE_STORE_LEN
+                    || curIndexDataOffset < curDataMinOffset) {
+                return -1;
+            }
+            return recvTimeInMillsec;
+        } catch (Throwable ex) {
+            samplePrintCtrl.printExceptionCaught(ex);
+            return -1;
+        }
+    }
 }
diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/Segment.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/Segment.java
index 97ed0de..6ecd417 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/Segment.java
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/Segment.java
@@ -28,7 +28,17 @@ public interface Segment {
 
     void close();
 
-    long append(ByteBuffer buf) throws IOException;
+    /***
+     * Messages can only be appended to the last FileSegment.
+     * The last FileSegment is writable, the others are mutable.
+     *
+     * @param buf            data buffer
+     * @param leftTime       the first record timestamp
+     * @param rightTime      the latest record timestamp
+     * @return               latest writable position
+     * @throws IOException   exception while force data to disk
+     */
+    long append(ByteBuffer buf, long leftTime, long rightTime) throws 
IOException;
 
     long flush(boolean force) throws IOException;
 
@@ -62,6 +72,27 @@ public interface Segment {
 
     void relViewRef();
 
-    void read(ByteBuffer bf, long offset) throws IOException;
+    /***
+     * Read data to buffer from absolute position.
+     *
+     * @param bf          buffer to store data
+     * @param absOffset   absolute read position
+     */
+    void read(ByteBuffer bf, long absOffset) throws IOException;
 
+    /***
+     * Read data to buffer from relative position.
+     *
+     * @param bf          buffer to store data
+     * @param relOffset   relative read position
+     */
+    void relRead(ByteBuffer bf, long relOffset) throws IOException;
+
+    long getLeftAppendTime();
+
+    long getRightAppendTime();
+
+    boolean containTime(long timestamp);
+
+    long getRecordTime(long reqOffset) throws IOException;
 }
diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/SegmentList.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/SegmentList.java
index fbd703e..cf4bed4 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/SegmentList.java
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/SegmentList.java
@@ -36,6 +36,8 @@ public interface SegmentList {
 
     long getMaxOffset();
 
+    long getMaxAppendTime();
+
     boolean checkExpiredSegments(long checkTimestamp, long fileValidTimeMs);
 
     void delExpiredSegments(StringBuilder sb);
@@ -52,4 +54,5 @@ public interface SegmentList {
 
     Segment getRecordSeg(long offset) throws IOException;
 
+    Segment findSegmentByTimeStamp(long timestamp);
 }
diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStore.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStore.java
index e662112..d5c9c15 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStore.java
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStore.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantLock;
 import org.apache.inlong.tubemq.corebase.TBaseConstants;
 import org.apache.inlong.tubemq.corebase.TErrCodeConstants;
@@ -55,13 +56,17 @@ public class MsgMemStore implements Closeable {
     private final ConcurrentHashMap<Integer, Integer> keysMap =
             new ConcurrentHashMap<>(100);
     // where messages in memory will sink to disk
-    private int maxDataCacheSize;
+    private final int maxDataCacheSize;
     private long writeDataStartPos = -1;
-    private ByteBuffer cacheDataSegment;
-    private int maxIndexCacheSize;
+    private final ByteBuffer cacheDataSegment;
+    private final int maxIndexCacheSize;
     private long writeIndexStartPos = -1;
-    private ByteBuffer cachedIndexSegment;
-    private int maxAllowedMsgCount;
+    private final ByteBuffer cachedIndexSegment;
+    private final int maxAllowedMsgCount;
+    private final AtomicLong leftAppendTime =
+            new AtomicLong(TBaseConstants.META_VALUE_UNDEFINED);
+    private final AtomicLong rightAppendTime =
+            new AtomicLong(TBaseConstants.META_VALUE_UNDEFINED);
 
     public MsgMemStore(int maxCacheSize, int maxMsgCount, final BrokerConfig 
tubeConfig) {
         this.maxDataCacheSize = maxCacheSize;
@@ -69,6 +74,8 @@ public class MsgMemStore implements Closeable {
         this.maxIndexCacheSize = this.maxAllowedMsgCount * 
DataStoreUtils.STORE_INDEX_HEAD_LEN;
         this.cacheDataSegment = 
ByteBuffer.allocateDirect(this.maxDataCacheSize);
         this.cachedIndexSegment = 
ByteBuffer.allocateDirect(this.maxIndexCacheSize);
+        this.leftAppendTime.set(System.currentTimeMillis());
+        this.rightAppendTime.set(System.currentTimeMillis());
     }
 
     public void resetStartPos(long writeDataStartPos, long writeIndexStartPos) 
{
@@ -77,10 +84,23 @@ public class MsgMemStore implements Closeable {
         this.writeIndexStartPos = writeIndexStartPos;
     }
 
-    public boolean appendMsg(final MsgMemStatisInfo msgMemStatisInfo,
-                             final int partitionId, final int keyCode,
-                             final long timeRecv, final int entryLength,
-                             final ByteBuffer entry, final AppendResult 
appendResult) {
+    /***
+     * Append message to memory cache
+     *
+     * @param msgMemStatisInfo  statistical information object
+     * @param partitionId       the partitionId for append messages
+     * @param keyCode           the filter item hash code
+     * @param timeRecv          the received timestamp
+     * @param entryLength       the stored entry length
+     * @param entry             the stored entry
+     * @param appendResult      the append result
+     *
+     * @return    the process result
+     */
+    public boolean appendMsg(MsgMemStatisInfo msgMemStatisInfo,
+                             int partitionId, int keyCode,
+                             long timeRecv, int entryLength,
+                             ByteBuffer entry, AppendResult appendResult) {
         boolean fullDataSize = false;
         boolean fullIndexSize = false;
         boolean fullCount = false;
@@ -114,6 +134,10 @@ public class MsgMemStore implements Closeable {
             this.keysMap.put(keyCode, indexSizePos);
             this.curMessageCount.getAndAdd(1);
             msgMemStatisInfo.addMsgSizeStatis(timeRecv, entryLength);
+            this.rightAppendTime.set(timeRecv);
+            if (indexSizePos == 0) {
+                this.leftAppendTime.set(timeRecv);
+            }
         } finally {
             this.writeLock.unlock();
         }
@@ -124,21 +148,23 @@ public class MsgMemStore implements Closeable {
     /***
      * Read from memory, read index, then data.
      *
-     * @param lstRdDataOffset
-     * @param lstRdIndexOffset
-     * @param maxReadSize
-     * @param maxReadCount
-     * @param partitionId
-     * @param isSecond
-     * @param isFilterConsume
-     * @param filterKeySet
-     * @return
+     * @param lstRdDataOffset       the recent data offset read before
+     * @param lstRdIndexOffset      the recent index offset read before
+     * @param maxReadSize           the max read size
+     * @param maxReadCount          the max read count
+     * @param partitionId           the partitionId for reading messages
+     * @param isSecond              whether read from secondary cache
+     * @param isFilterConsume       whether to filter consumption
+     * @param filterKeySet          filter item set
+     * @param reqRcvTime            the timestamp of the record to be checked
+     *
+     * @return                      read result
      */
-    public GetCacheMsgResult getMessages(final long lstRdDataOffset, final 
long lstRdIndexOffset,
-                                         final int maxReadSize, final int 
maxReadCount,
-                                         final int partitionId, final boolean 
isSecond,
-                                         final boolean isFilterConsume,
-                                         final Set<Integer> filterKeySet) {
+    public GetCacheMsgResult getMessages(long lstRdDataOffset, long 
lstRdIndexOffset,
+                                         int maxReadSize, int maxReadCount,
+                                         int partitionId, boolean isSecond,
+                                         boolean isFilterConsume, Set<Integer> 
filterKeySet,
+                                         long reqRcvTime) {
         // #lizard forgives
         Integer lastWritePos = 0;
         boolean hasMsg = false;
@@ -235,6 +261,9 @@ public class MsgMemStore implements Closeable {
                 readedSize += DataStoreUtils.STORE_INDEX_HEAD_LEN;
                 continue;
             }
+            if (reqRcvTime != 0 && cTimeRecv < reqRcvTime) {
+                continue;
+            }
             // read data file.
             byte[] tmpArray = new byte[cDataSize];
             final ByteBuffer buffer = ByteBuffer.wrap(tmpArray);
@@ -258,15 +287,14 @@ public class MsgMemStore implements Closeable {
     /***
      * Batch flush memory data to disk.
      *
-     * @param msgFileStore
-     * @param strBuffer
-     * @return
-     * @throws IOException
+     * @param msgFileStore    the file storage
+     * @param strBuffer       the message buffer
+     * @throws IOException    the exception while process
      */
-    public boolean batchFlush(MsgFileStore msgFileStore,
-                              final StringBuilder strBuffer) throws Throwable {
+    public void batchFlush(MsgFileStore msgFileStore,
+                           StringBuilder strBuffer) throws Throwable {
         if (this.curMessageCount.get() == 0) {
-            return true;
+            return;
         }
         ByteBuffer tmpIndexBuffer = this.cachedIndexSegment.asReadOnlyBuffer();
         final ByteBuffer tmpDataReadBuf = 
this.cacheDataSegment.asReadOnlyBuffer();
@@ -274,9 +302,9 @@ public class MsgMemStore implements Closeable {
         tmpDataReadBuf.flip();
         long startTime = System.currentTimeMillis();
         msgFileStore.batchAppendMsg(strBuffer, curMessageCount.get(),
-            cacheIndexOffset.get(), tmpIndexBuffer, cacheDataOffset.get(), 
tmpDataReadBuf);
+            cacheIndexOffset.get(), tmpIndexBuffer, cacheDataOffset.get(),
+                tmpDataReadBuf, leftAppendTime.get(), rightAppendTime.get());
         BrokerMetricsHolder.updSyncDataDurations(System.currentTimeMillis() - 
startTime);
-        return true;
     }
 
     public int getCurMsgCount() {
@@ -316,6 +344,27 @@ public class MsgMemStore implements Closeable {
         return this.writeIndexStartPos + this.cacheIndexOffset.get();
     }
 
+    public long getIndexStartWritePos() {
+        return writeIndexStartPos;
+    }
+
+    public long getLeftAppendTime() {
+        return leftAppendTime.get();
+    }
+
+    public long getRightAppendTime() {
+        return rightAppendTime.get();
+    }
+
+    public int isTimestampInHold(long timestamp) {
+        if (timestamp < this.leftAppendTime.get()) {
+            return -1;
+        } else if (timestamp > rightAppendTime.get()) {
+            return 1;
+        }
+        return 0;
+    }
+
     public void clear() {
         this.writeDataStartPos = -1;
         this.writeIndexStartPos = -1;
@@ -324,6 +373,8 @@ public class MsgMemStore implements Closeable {
         this.curMessageCount.set(0);
         this.queuesMap.clear();
         this.keysMap.clear();
+        this.leftAppendTime.set(System.currentTimeMillis());
+        this.rightAppendTime.set(System.currentTimeMillis());
     }
 
     @Override
diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/StoreRepairAdmin.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/StoreRepairAdmin.java
index 14e7715..82f78ba 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/StoreRepairAdmin.java
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/StoreRepairAdmin.java
@@ -379,7 +379,7 @@ public class StoreRepairAdmin {
                                 curPartSeg =
                                         new FileSegment(queueOffset, newFile, 
SegmentType.INDEX);
                             }
-                            curPartSeg.append(indexBuffer);
+                            curPartSeg.append(indexBuffer, timeRecv, timeRecv);
                             gQueueOffset += 
DataStoreUtils.STORE_INDEX_HEAD_LEN;
                             if (curPartSeg.getCachedSize() >= 
maxIndexSegmentSize) {
                                 curPartSeg.flush(true);
diff --git 
a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/FileSegmentListTest.java
 
b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/FileSegmentListTest.java
index 38d575c..0c38660 100644
--- 
a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/FileSegmentListTest.java
+++ 
b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/FileSegmentListTest.java
@@ -43,7 +43,8 @@ public class FileSegmentListTest {
             Segment fileSegment = fileSegmentList.last();
             String data = "abc";
             // append data to last FileSegment.
-            fileSegment.append(ByteBuffer.wrap(data.getBytes()));
+            long appendTime = System.currentTimeMillis();
+            fileSegment.append(ByteBuffer.wrap(data.getBytes()), appendTime, 
appendTime);
             fileSegment.flush(true);
             // get view
             Segment[] segmentList = fileSegmentList.getView();
@@ -70,7 +71,8 @@ public class FileSegmentListTest {
             Segment fileSegment = new FileSegment(100L, file, true, 
SegmentType.DATA);
             String data = "abc";
             // append data to last FileSegment.
-            fileSegment.append(ByteBuffer.wrap(data.getBytes()));
+            long appendTime = System.currentTimeMillis();
+            fileSegment.append(ByteBuffer.wrap(data.getBytes()), appendTime, 
appendTime);
             fileSegment.flush(true);
             fileSegmentList.append(fileSegment);
             Segment[] segmentList = fileSegmentList.getView();
diff --git 
a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/FileSegmentTest.java
 
b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/FileSegmentTest.java
index ca7c972..959fd83 100644
--- 
a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/FileSegmentTest.java
+++ 
b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/FileSegmentTest.java
@@ -40,8 +40,9 @@ public class FileSegmentTest {
             byte[] bytes = data.getBytes();
             ByteBuffer buf = ByteBuffer.wrap(bytes);
             // append data to FileSegment.
-            fileSegment.append(buf);
-            fileSegment.append(buf);
+            long appendTime = System.currentTimeMillis();
+            fileSegment.append(buf, appendTime, appendTime);
+            fileSegment.append(buf, appendTime, appendTime);
         } catch (IOException e) {
             e.printStackTrace();
         } finally {
@@ -64,7 +65,8 @@ public class FileSegmentTest {
             byte[] bytes = data.getBytes();
             ByteBuffer buf = ByteBuffer.wrap(bytes);
             // append data to fileSegment.
-            long offset = fileSegment.append(buf);
+            long appendTime = System.currentTimeMillis();
+            long offset = fileSegment.append(buf, appendTime, appendTime);
             int limit = 1000;
             // get view of fileSegment.
             ByteBuffer readBuffer = ByteBuffer.allocate(limit);
diff --git 
a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStoreTest.java
 
b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStoreTest.java
index 4bccc36..c08ab34 100644
--- 
a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStoreTest.java
+++ 
b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStoreTest.java
@@ -53,6 +53,6 @@ public class MsgMemStoreTest {
         msgMemStore.appendMsg(msgMemStatisInfo, 0, 0,
                 System.currentTimeMillis(), 3, bf, appendResult);
         // get messages
-        GetCacheMsgResult getCacheMsgResult = msgMemStore.getMessages(0, 2, 
1024, 1000, 0, false, false, null);
+        GetCacheMsgResult getCacheMsgResult = msgMemStore.getMessages(0, 2, 
1024, 1000, 0, false, false, null, 0);
     }
 }

Reply via email to