This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 938861c  [INLONG-2505][TubeMQ] Add MsgStoreStatsHolder class (#2506)
938861c is described below

commit 938861c80ab52e83c4df980895dca2b139514796
Author: gosonzhang <[email protected]>
AuthorDate: Tue Feb 15 21:58:24 2022 +0800

    [INLONG-2505][TubeMQ] Add MsgStoreStatsHolder class (#2506)
---
 .../server/broker/msgstore/MessageStore.java       |  34 +-
 .../broker/msgstore/MessageStoreManager.java       |   3 +-
 .../server/broker/msgstore/disk/MsgFileStore.java  |  12 +-
 .../server/broker/msgstore/mem/MsgMemStore.java    |   6 +-
 .../server/broker/stats/FileStoreStatsHolder.java  | 423 --------------
 .../server/broker/stats/MemStoreStatsHolder.java   | 411 --------------
 .../server/broker/stats/MsgStoreStatsHolder.java   | 615 +++++++++++++++++++++
 .../server/broker/stats/ServiceStatsHolder.java    |  71 ++-
 .../server/broker/stats/TrafficStatsService.java   |   6 +-
 .../server/broker/web/BrokerAdminServlet.java      |  60 +-
 .../server/common/webbase/WebCallStatsHolder.java  |  69 ++-
 .../broker/msgstore/mem/MsgMemStoreTest.java       |   6 +-
 .../broker/stats/FileStoreStatsHolderTest.java     | 128 -----
 .../broker/stats/MemStoreStatsHolderTest.java      | 111 ----
 .../broker/stats/MsgStoreStatsHolderTest.java      | 206 +++++++
 15 files changed, 949 insertions(+), 1212 deletions(-)

diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStore.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStore.java
index 2bf8fa9..b4d4e20 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStore.java
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStore.java
@@ -45,8 +45,7 @@ import 
org.apache.inlong.tubemq.server.broker.msgstore.disk.Segment;
 import org.apache.inlong.tubemq.server.broker.msgstore.mem.GetCacheMsgResult;
 import org.apache.inlong.tubemq.server.broker.msgstore.mem.MsgMemStore;
 import org.apache.inlong.tubemq.server.broker.nodeinfo.ConsumerNodeInfo;
-import org.apache.inlong.tubemq.server.broker.stats.FileStoreStatsHolder;
-import org.apache.inlong.tubemq.server.broker.stats.MemStoreStatsHolder;
+import org.apache.inlong.tubemq.server.broker.stats.MsgStoreStatsHolder;
 import org.apache.inlong.tubemq.server.broker.stats.TrafficInfo;
 import org.apache.inlong.tubemq.server.broker.utils.DataStoreUtils;
 import org.apache.inlong.tubemq.server.common.utils.AppendResult;
@@ -71,8 +70,7 @@ public class MessageStore implements Closeable {
     private final String primStorePath;
     private final AtomicLong lastMemFlushTime = new AtomicLong(0);
     private final MessageStoreManager msgStoreMgr;
-    private final MemStoreStatsHolder memStoreStatsHolder = new 
MemStoreStatsHolder();
-    private final FileStoreStatsHolder fileStoreStatsHolder = new 
FileStoreStatsHolder();
+    private final MsgStoreStatsHolder msgStoreStatsHolder = new 
MsgStoreStatsHolder();
     private final MsgFileStore msgFileStore;
     private final ReentrantReadWriteLock writeCacheMutex = new 
ReentrantReadWriteLock();
     private final Condition flushWriteCacheCondition = 
writeCacheMutex.writeLock().newCondition();
@@ -417,7 +415,7 @@ public class MessageStore implements Closeable {
         do {
             this.writeCacheMutex.readLock().lock();
             try {
-                if (this.msgMemStore.appendMsg(memStoreStatsHolder,
+                if (this.msgMemStore.appendMsg(msgStoreStatsHolder,
                         partitionId, msgTypeCode, receivedTime,
                         msgBufLen, buffer, appendResult)) {
                     return true;
@@ -432,20 +430,16 @@ public class MessageStore implements Closeable {
             }
             ThreadUtils.sleep(waitRetryMs);
         } while (count-- >= 0);
-        memStoreStatsHolder.addMsgWriteFail();
+        msgStoreStatsHolder.addMsgWriteCacheFail();
         return false;
     }
 
-    public void getMemStoreStatsInfo(boolean needRefresh, StringBuilder 
strBuff) {
-        memStoreStatsHolder.getAllMemStatsInfo(needRefresh, strBuff);
+    public void getMsgStoreStatsInfo(boolean needRefresh, StringBuilder 
strBuff) {
+        msgStoreStatsHolder.getMsgStoreStatsInfo(needRefresh, strBuff);
     }
 
-    public void getCurFileStoreStatsInfo(boolean needRefresh, StringBuilder 
strBuff) {
-        fileStoreStatsHolder.getAllFileStatsInfo(needRefresh, strBuff);
-    }
-
-    public FileStoreStatsHolder getFileStoreStatsHolder() {
-        return this.fileStoreStatsHolder;
+    public MsgStoreStatsHolder getMsgStoreStatsHolder() {
+        return this.msgStoreStatsHolder;
     }
 
     /**
@@ -517,10 +511,9 @@ public class MessageStore implements Closeable {
     /**
      * Flush memory store to file.
      *
-     * @param checkTime   the check time
      * @throws IOException the exception during processing
      */
-    public void flushMemCacheData(long checkTime) throws IOException {
+    public void flushMemCacheData() throws IOException {
         if (this.closed.get()) {
             throw new IllegalStateException(new StringBuilder(512)
                     .append("[Data Store] Closed MessageStore for storeKey ")
@@ -530,7 +523,6 @@ public class MessageStore implements Closeable {
                 && (System.currentTimeMillis() - this.lastMemFlushTime.get()) 
>= this.writeCacheFlushIntvl) {
             triggerFlushAndAddMsg(-1, 0, 0, 0, false, null, true, null);
         }
-        memStoreStatsHolder.chkStatsExpired(checkTime);
     }
 
     @Override
@@ -722,13 +714,13 @@ public class MessageStore implements Closeable {
                         } catch (Throwable e) {
                             logger.error("[Data Store] Error during flush", e);
                         } finally {
-                            memStoreStatsHolder.addFlushTime(
+                            msgStoreStatsHolder.addCacheFlushTime(
                                     (System.currentTimeMillis() - startTime), 
isTimeTrigger);
                         }
                     }
                 });
             } else {
-                memStoreStatsHolder.addCachePending();
+                msgStoreStatsHolder.addCachePending();
             }
             long startTime = System.currentTimeMillis();
             long timeoutNs = TimeUnit.MILLISECONDS.toNanos(100);
@@ -743,7 +735,7 @@ public class MessageStore implements Closeable {
                 }
             }
             if (needAdd) {
-                return msgMemStore.appendMsg(memStoreStatsHolder,
+                return msgMemStore.appendMsg(msgStoreStatsHolder,
                         partitionId, keyCode, receivedTime,
                         entryLength, entry, appendResult);
             }
@@ -818,7 +810,7 @@ public class MessageStore implements Closeable {
             isFlushOngoing.set(true);
             writeCacheMutex.writeLock().unlock();
             if (isRealloc) {
-                memStoreStatsHolder.addCacheReAlloc();
+                msgStoreStatsHolder.addCacheReAlloc();
                 logger.info(strBuffer.append("[Data Store] Found 
").append(getStoreKey())
                         .append(" Cache capacity change, new MemSize=")
                         .append(writeCacheMaxSize).append(", new CacheCnt=")
diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java
index 29c529d..6f10324 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java
@@ -818,7 +818,6 @@ public class MessageStoreManager implements StoreService {
 
         @Override
         public void run() {
-            long checkTime = System.currentTimeMillis();
             StringBuilder sBuilder = new StringBuilder(256);
             for (Map<Integer, MessageStore> storeMap : dataStores.values()) {
                 if (storeMap == null || storeMap.isEmpty()) {
@@ -829,7 +828,7 @@ public class MessageStoreManager implements StoreService {
                         continue;
                     }
                     try {
-                        msgStore.flushMemCacheData(checkTime);
+                        msgStore.flushMemCacheData();
                     } catch (final Throwable e) {
                         logger.error(sBuilder.append("[Store Manager] Try to 
flush ")
                                 .append(msgStore.getStoreKey())
diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java
index efd6b27..a91b729 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java
@@ -35,7 +35,7 @@ import 
org.apache.inlong.tubemq.corebase.protobuf.generated.ClientBroker;
 import org.apache.inlong.tubemq.corebase.utils.ServiceStatusHolder;
 import org.apache.inlong.tubemq.server.broker.BrokerConfig;
 import org.apache.inlong.tubemq.server.broker.msgstore.MessageStore;
-import org.apache.inlong.tubemq.server.broker.stats.FileStoreStatsHolder;
+import org.apache.inlong.tubemq.server.broker.stats.MsgStoreStatsHolder;
 import org.apache.inlong.tubemq.server.broker.stats.ServiceStatsHolder;
 import org.apache.inlong.tubemq.server.broker.stats.TrafficInfo;
 import org.apache.inlong.tubemq.server.broker.utils.DataStoreUtils;
@@ -69,7 +69,7 @@ public class MsgFileStore implements Closeable {
     private final AtomicLong lastMetaFlushTime = new AtomicLong(0);
     private final BrokerConfig tubeConfig;
     // file store stats holder
-    private final FileStoreStatsHolder fileStatsHolder;
+    private final MsgStoreStatsHolder msgStoreStatsHolder;
     // lock used for append message to storage
     private final ReentrantLock writeLock = new ReentrantLock();
     private final ByteBuffer byteBufferIndex =
@@ -98,7 +98,7 @@ public class MsgFileStore implements Closeable {
         final StringBuilder sBuilder = new StringBuilder(512);
         this.tubeConfig = tubeConfig;
         this.messageStore = messageStore;
-        this.fileStatsHolder = messageStore.getFileStoreStatsHolder();
+        this.msgStoreStatsHolder = messageStore.getMsgStoreStatsHolder();
         this.storeKey = messageStore.getStoreKey();
         this.dataDir = new File(sBuilder.append(baseStorePath)
                 .append(File.separator).append(this.storeKey).toString());
@@ -229,7 +229,7 @@ public class MsgFileStore implements Closeable {
         } finally {
             this.writeLock.unlock();
             // add statistics.
-            fileStatsHolder.addFileFlushStatsInfo(msgCnt, indexSize, dataSize,
+            msgStoreStatsHolder.addFileFlushStatsInfo(msgCnt, indexSize, 
dataSize,
                     flushedMsgCnt, flushedDataSize, isDataSegFlushed, 
isIndexSegFlushed,
                     isMsgDataFlushed, isMsgCntFlushed, isMsgTimeFlushed, 
isForceMetadata);
         }
@@ -523,11 +523,11 @@ public class MsgFileStore implements Closeable {
                 }
             } finally {
                 this.writeLock.unlock();
-                fileStatsHolder.addTimeoutFlush(flushedMsgCnt,
+                msgStoreStatsHolder.addFileTimeoutFlushStats(flushedMsgCnt,
                         flushedDataSize, forceMetadata);
             }
         }
-        fileStatsHolder.chkStatsExpired(checkTimestamp);
+        msgStoreStatsHolder.chkStatsExpired(checkTimestamp);
     }
 
     public long getDataSizeInBytes() {
diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStore.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStore.java
index ae86780..63b1c93 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStore.java
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStore.java
@@ -32,7 +32,7 @@ import org.apache.inlong.tubemq.corebase.TErrCodeConstants;
 import org.apache.inlong.tubemq.server.broker.BrokerConfig;
 import org.apache.inlong.tubemq.server.broker.metadata.ClusterConfigHolder;
 import org.apache.inlong.tubemq.server.broker.msgstore.disk.MsgFileStore;
-import org.apache.inlong.tubemq.server.broker.stats.MemStoreStatsHolder;
+import org.apache.inlong.tubemq.server.broker.stats.MsgStoreStatsHolder;
 import org.apache.inlong.tubemq.server.broker.stats.ServiceStatsHolder;
 import org.apache.inlong.tubemq.server.broker.utils.DataStoreUtils;
 import org.apache.inlong.tubemq.server.common.utils.AppendResult;
@@ -111,7 +111,7 @@ public class MsgMemStore implements Closeable {
      *
      * @return    the process result
      */
-    public boolean appendMsg(MemStoreStatsHolder memStatsHolder,
+    public boolean appendMsg(MsgStoreStatsHolder memStatsHolder,
                              int partitionId, int keyCode,
                              long timeRecv, int dataEntryLength,
                              ByteBuffer dataEntry, AppendResult appendResult) {
@@ -156,7 +156,7 @@ public class MsgMemStore implements Closeable {
         } finally {
             this.writeLock.unlock();
             if (isAppended) {
-                memStatsHolder.addAppendedMsgSize(dataEntryLength);
+                memStatsHolder.addCacheMsgSize(dataEntryLength);
             } else {
                 memStatsHolder.addCacheFullType(fullDataSize, fullIndexSize, 
fullCount);
             }
diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/FileStoreStatsHolder.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/FileStoreStatsHolder.java
deleted file mode 100644
index a50468b..0000000
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/FileStoreStatsHolder.java
+++ /dev/null
@@ -1,423 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.tubemq.server.broker.stats;
-
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.inlong.tubemq.corebase.metric.impl.LongStatsCounter;
-import org.apache.inlong.tubemq.corebase.metric.impl.SimpleHistogram;
-import org.apache.inlong.tubemq.corebase.metric.impl.SinceTime;
-import org.apache.inlong.tubemq.corebase.utils.DateTimeConvertUtils;
-import org.apache.inlong.tubemq.server.common.TServerConstants;
-
-/**
- * FileStoreStatsHolder, The statistics set related to the file store.
- *
- * Through this class, we complete the collection of metric data that occurs in
- * the memory cache operation, including the number of messages produced, size,
- * number of times of flushing, distribution of flushing time,
- * and triggering conditions for flushing, etc.
- *
- * This part supports index comparison output before and after data collection.
- */
-public class FileStoreStatsHolder {
-    // Switchable statistic items
-    private final FileStatsItemSet[] fileStatsSets = new FileStatsItemSet[2];
-    // Current writable index
-    private final AtomicInteger writableIndex = new AtomicInteger(0);
-    // Last query time
-    private final AtomicLong lstQueryTime = new AtomicLong(0);
-    // Last snapshot time
-    private final AtomicLong lstSnapshotTime = new AtomicLong(0);
-    // whether the statistic is closed
-    private volatile boolean isClosed;
-
-    public FileStoreStatsHolder() {
-        this.isClosed = true;
-        this.fileStatsSets[0] = new FileStatsItemSet();
-        this.fileStatsSets[1] = new FileStatsItemSet();
-        this.lstQueryTime.set(System.currentTimeMillis());
-        this.lstSnapshotTime.set(System.currentTimeMillis());
-    }
-
-    /**
-     * Add message store statistics information.
-     *
-     * @param msgCnt             the message count written
-     * @param msgIndexSize       the message index size written
-     * @param msgDataSize        the message data size written
-     * @param flushedMsgCnt      the flushed message count
-     * @param flushedDataSize    the flushed message size
-     * @param isDataSegFlush     whether the data segment flushed
-     * @param isIndexSegFlush    whether the index segment flushed
-     * @param isDataSizeFull     whether the cached data is full
-     * @param isMsgCntFull       whether the cached message count is full
-     * @param isCacheTimeFull    whether the cached time is full
-     * @param isForceMetadata    whether force push metadata
-     */
-    public void addFileFlushStatsInfo(int msgCnt, int msgIndexSize, int 
msgDataSize,
-                                      long flushedMsgCnt, long flushedDataSize,
-                                      boolean isDataSegFlush, boolean 
isIndexSegFlush,
-                                      boolean isDataSizeFull, boolean 
isMsgCntFull,
-                                      boolean isCacheTimeFull, boolean 
isForceMetadata) {
-        if (isClosed) {
-            return;
-        }
-        FileStatsItemSet tmStatsSet = fileStatsSets[getIndex()];
-        tmStatsSet.accumMsgCnt.addValue(msgCnt);
-        tmStatsSet.accumMsgIndexSize.addValue(msgIndexSize);
-        tmStatsSet.accumMsgDataSize.addValue(msgDataSize);
-        if (flushedDataSize > 0) {
-            tmStatsSet.flushedDataSizeStats.update(flushedDataSize);
-        }
-        if (flushedMsgCnt > 0) {
-            tmStatsSet.flushedMsgCntStats.update(flushedMsgCnt);
-        }
-        if (isDataSegFlush) {
-            tmStatsSet.dataSegAddCnt.incValue();
-        }
-        if (isIndexSegFlush) {
-            tmStatsSet.indexSegAddCnt.incValue();
-        }
-        if (isDataSizeFull) {
-            tmStatsSet.dataSizeFullCnt.incValue();
-        }
-        if (isMsgCntFull) {
-            tmStatsSet.msgCountFullCnt.incValue();
-        }
-        if (isCacheTimeFull) {
-            tmStatsSet.cachedTimeFullCnt.incValue();
-        }
-        if (isForceMetadata) {
-            tmStatsSet.metaFlushCnt.incValue();
-        }
-    }
-
-    /**
-     * Add flush time timeout statistic.
-     *
-     * @param flushedMsgCnt      the flushed message count
-     * @param flushedDataSize    the flushed message size
-     * @param isForceMetadata    whether force push metadata
-     */
-    public void addTimeoutFlush(long flushedMsgCnt,
-                                long flushedDataSize,
-                                boolean isForceMetadata) {
-        if (isClosed) {
-            return;
-        }
-        FileStatsItemSet tmStatsSet = fileStatsSets[getIndex()];
-        tmStatsSet.cachedTimeFullCnt.incValue();
-        if (flushedDataSize > 0) {
-            tmStatsSet.flushedDataSizeStats.update(flushedDataSize);
-        }
-        if (flushedMsgCnt > 0) {
-            tmStatsSet.flushedMsgCntStats.update(flushedMsgCnt);
-        }
-        if (isForceMetadata) {
-            tmStatsSet.metaFlushCnt.incValue();
-        }
-    }
-
-    /**
-     * Check whether has exceeded the maximum self-statistics period.
-     *
-     * @param checkTime   the check time
-     */
-    public synchronized void chkStatsExpired(long checkTime) {
-        if (!this.isClosed) {
-            if ((checkTime - this.lstQueryTime.get())
-                    >= TServerConstants.CFG_STORE_STATS_MAX_REFRESH_DURATION) {
-                this.isClosed = true;
-            }
-        }
-    }
-
-    /**
-     * Get current writing statistics information.
-     *
-     * @param statsMap  the return map information
-     */
-    public void getValue(Map<String, Long> statsMap) {
-        enableStats();
-        getStatsValue(true, fileStatsSets[getIndex()], statsMap);
-    }
-
-    /**
-     * Get current writing statistics information.
-     *
-     * @param strBuff  the return information in json format
-     */
-    public void getValue(StringBuilder strBuff) {
-        enableStats();
-        getStatsValue(true, fileStatsSets[getIndex()], strBuff);
-    }
-
-    /**
-     * Snapshot and get current writing statistics information.
-     *
-     * @param statsMap  the return map information
-     */
-    public void snapShort(Map<String, Long> statsMap) {
-        enableStats();
-        if (switchStatsSets()) {
-            getStatsValue(false, fileStatsSets[getIndex(writableIndex.get() - 
1)], statsMap);
-        } else {
-            getStatsValue(true, fileStatsSets[getIndex()], statsMap);
-        }
-    }
-
-    /**
-     * Snapshot and get current writing statistics information.
-     *
-     * @param strBuff  the return information in json format
-     */
-    public void snapShort(StringBuilder strBuff) {
-        this.enableStats();
-        if (switchStatsSets()) {
-            getStatsValue(false, fileStatsSets[getIndex(writableIndex.get() - 
1)], strBuff);
-        } else {
-            getStatsValue(true, fileStatsSets[getIndex()], strBuff);
-        }
-    }
-
-    /**
-     * Get current file store statistics information.
-     * Contains the data results of the current statistics and the previous 
snapshot
-     *
-     * @param isSwitch  whether to switch the writing statistics block
-     * @param strBuff   the return information
-     */
-    public synchronized void getAllFileStatsInfo(boolean isSwitch, 
StringBuilder strBuff) {
-        this.enableStats();
-        strBuff.append("[");
-        getStatsValue(false, fileStatsSets[getIndex(writableIndex.get() - 1)], 
strBuff);
-        strBuff.append(",");
-        getStatsValue(true, fileStatsSets[getIndex()], strBuff);
-        strBuff.append("]");
-        if (isSwitch) {
-            switchStatsSets();
-        }
-    }
-
-    /**
-     * Set statistic status
-     *
-     */
-    private synchronized void enableStats() {
-        this.lstQueryTime.set(System.currentTimeMillis());
-        if (this.isClosed) {
-            this.isClosed = false;
-        }
-    }
-
-    /**
-     * Check and switch statistic sets
-     *
-     * @return  whether the statistic sets has been switched
-     */
-    private boolean switchStatsSets() {
-        long curSwitchTime = lstSnapshotTime.get();
-        // Avoid frequent snapshots
-        if ((System.currentTimeMillis() - curSwitchTime)
-                >= TServerConstants.MIN_SNAPSHOT_PERIOD_MS) {
-            if (lstSnapshotTime.compareAndSet(curSwitchTime, 
System.currentTimeMillis())) {
-                fileStatsSets[getIndex(writableIndex.get() - 1)].clear();
-                int befIndex = writableIndex.getAndIncrement();
-                
fileStatsSets[getIndex(befIndex)].setSnapshotTime(lstSnapshotTime.get());
-                return true;
-            }
-        }
-        return false;
-    }
-
-    /**
-     * Get current writable block index.
-     *
-     * @return the writable block index
-     */
-    private int getIndex() {
-        return getIndex(writableIndex.get());
-    }
-
-    /**
-     * Gets the metric block index based on the specified value.
-     *
-     * @param origIndex    the specified value
-     * @return the metric block index
-     */
-    private int getIndex(int origIndex) {
-        return Math.abs(origIndex % 2);
-    }
-
-    /**
-     * Read metric block data into map.
-     *
-     * @param isWriting    the metric block is writing
-     * @param statsSet     the metric block need to read
-     * @param statsMap     the read result
-     */
-    private void getStatsValue(boolean isWriting,
-                               FileStatsItemSet statsSet,
-                               Map<String, Long> statsMap) {
-        statsMap.put(statsSet.resetTime.getFullName(),
-                statsSet.resetTime.getSinceTime());
-        statsMap.put(statsSet.accumMsgCnt.getFullName(),
-                statsSet.accumMsgCnt.getValue());
-        statsMap.put(statsSet.accumMsgDataSize.getFullName(),
-                statsSet.accumMsgDataSize.getValue());
-        statsMap.put(statsSet.accumMsgIndexSize.getFullName(),
-                statsSet.accumMsgIndexSize.getValue());
-        statsSet.flushedDataSizeStats.getValue(statsMap, false);
-        statsSet.flushedMsgCntStats.getValue(statsMap, false);
-        statsMap.put(statsSet.dataSegAddCnt.getFullName(),
-                statsSet.dataSegAddCnt.getValue());
-        statsMap.put(statsSet.indexSegAddCnt.getFullName(),
-                statsSet.indexSegAddCnt.getValue());
-        statsMap.put(statsSet.metaFlushCnt.getFullName(),
-                statsSet.metaFlushCnt.getValue());
-        statsMap.put(statsSet.dataSizeFullCnt.getFullName(),
-                statsSet.dataSizeFullCnt.getValue());
-        statsMap.put(statsSet.msgCountFullCnt.getFullName(),
-                statsSet.msgCountFullCnt.getValue());
-        statsMap.put(statsSet.cachedTimeFullCnt.getFullName(),
-                statsSet.cachedTimeFullCnt.getValue());
-        if (isWriting) {
-            statsMap.put(statsSet.snapShotTime.getFullName(),
-                    System.currentTimeMillis());
-        } else {
-            statsMap.put(statsSet.snapShotTime.getFullName(),
-                    statsSet.snapShotTime.getSinceTime());
-        }
-    }
-
-    /**
-     * Read metric block data into string format.
-     *
-     * @param isWriting    the metric block is writing
-     * @param statsSet     the metric block need to read
-     * @param strBuff     the return buffer
-     */
-    private static void getStatsValue(boolean isWriting,
-                                      FileStatsItemSet statsSet,
-                                      StringBuilder strBuff) {
-        strBuff.append("{\"").append(statsSet.resetTime.getFullName())
-                .append("\":\"").append(statsSet.resetTime.getStrSinceTime())
-                .append("\",\"").append(statsSet.accumMsgCnt.getFullName())
-                .append("\":").append(statsSet.accumMsgCnt.getValue())
-                .append(",\"").append(statsSet.accumMsgDataSize.getFullName())
-                .append("\":").append(statsSet.accumMsgDataSize.getValue())
-                .append(",\"").append(statsSet.accumMsgIndexSize.getFullName())
-                .append("\":").append(statsSet.accumMsgIndexSize.getValue())
-                .append(",");
-        statsSet.flushedDataSizeStats.getValue(strBuff, false);
-        strBuff.append(",");
-        statsSet.flushedMsgCntStats.getValue(strBuff, false);
-        strBuff.append(",\"").append(statsSet.dataSegAddCnt.getFullName())
-                .append("\":").append(statsSet.dataSegAddCnt.getValue())
-                .append(",\"").append(statsSet.indexSegAddCnt.getFullName())
-                .append("\":").append(statsSet.indexSegAddCnt.getValue())
-                .append(",\"").append(statsSet.metaFlushCnt.getFullName())
-                .append("\":").append(statsSet.metaFlushCnt.getValue())
-                .append(",\"").append(statsSet.dataSizeFullCnt.getFullName())
-                .append("\":").append(statsSet.dataSizeFullCnt.getValue())
-                .append(",\"").append(statsSet.msgCountFullCnt.getFullName())
-                .append("\":").append(statsSet.msgCountFullCnt.getValue())
-                .append(",\"").append(statsSet.cachedTimeFullCnt.getFullName())
-                .append("\":").append(statsSet.cachedTimeFullCnt.getValue())
-                .append(",\"").append(statsSet.snapShotTime.getFullName())
-                .append("\":\"");
-        if (isWriting) {
-            
strBuff.append(DateTimeConvertUtils.ms2yyyyMMddHHmmss(System.currentTimeMillis()));
-        } else {
-            strBuff.append(statsSet.snapShotTime.getStrSinceTime());
-        }
-        strBuff.append("\"}");
-    }
-
-    /**
-     * FileStatsItemSet, a collection of statistical metrics related to file 
storage
-     *
-     */
-    private static class FileStatsItemSet {
-        // The reset time of file statistics set
-        protected final SinceTime resetTime =
-                new SinceTime("reset_time", null);
-        // The accumulate message count statistics
-        protected final LongStatsCounter accumMsgCnt =
-                new LongStatsCounter("total_msg_cnt", null);
-        // The accumulate message data size statistics
-        protected final LongStatsCounter accumMsgDataSize =
-                new LongStatsCounter("total_data_size", null);
-        // The accumulate message index statistics
-        protected final LongStatsCounter accumMsgIndexSize =
-                new LongStatsCounter("total_index_size", null);
-        // The data flushed statistics
-        protected final SimpleHistogram flushedDataSizeStats =
-                new SimpleHistogram("flushed_data_size", null);
-        // The message count flushed statistics
-        protected final SimpleHistogram flushedMsgCntStats =
-                new SimpleHistogram("flushed_msg_cnt", null);
-        // The new data segment statistics
-        protected final LongStatsCounter dataSegAddCnt =
-                new LongStatsCounter("data_seg_cnt", null);
-        // The new index segment full statistics
-        protected final LongStatsCounter indexSegAddCnt =
-                new LongStatsCounter("index_seg_cnt", null);
-        // The flush count statistics of file meta-data
-        protected final LongStatsCounter metaFlushCnt =
-                new LongStatsCounter("meta_flush_cnt", null);
-        // The cached message data full statistics
-        protected final LongStatsCounter dataSizeFullCnt =
-                new LongStatsCounter("data_size_full", null);
-        // The cached message count full statistics
-        protected final LongStatsCounter msgCountFullCnt =
-                new LongStatsCounter("msg_count_full", null);
-        // The cache timeout refresh amount statistics
-        protected final LongStatsCounter cachedTimeFullCnt =
-                new LongStatsCounter("cache_time_full", null);
-        // The snapshot time of file statistics set
-        protected final SinceTime snapShotTime =
-                new SinceTime("end_time", null);
-
-        public FileStatsItemSet() {
-            clear();
-        }
-
-        public void setSnapshotTime(long snapshotTime) {
-            this.snapShotTime.reset(snapshotTime);
-        }
-
-        public void clear() {
-            this.snapShotTime.reset();
-            this.accumMsgCnt.clear();
-            this.accumMsgDataSize.clear();
-            this.flushedDataSizeStats.clear();
-            this.accumMsgIndexSize.clear();
-            this.flushedMsgCntStats.clear();
-            this.dataSegAddCnt.clear();
-            this.indexSegAddCnt.clear();
-            this.dataSizeFullCnt.clear();
-            this.metaFlushCnt.clear();
-            this.msgCountFullCnt.clear();
-            this.cachedTimeFullCnt.clear();
-            this.resetTime.reset();
-        }
-    }
-}
diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/MemStoreStatsHolder.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/MemStoreStatsHolder.java
deleted file mode 100644
index 7a5abef..0000000
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/MemStoreStatsHolder.java
+++ /dev/null
@@ -1,411 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.tubemq.server.broker.stats;
-
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.inlong.tubemq.corebase.metric.impl.ESTHistogram;
-import org.apache.inlong.tubemq.corebase.metric.impl.LongStatsCounter;
-import org.apache.inlong.tubemq.corebase.metric.impl.SimpleHistogram;
-import org.apache.inlong.tubemq.corebase.metric.impl.SinceTime;
-import org.apache.inlong.tubemq.corebase.utils.DateTimeConvertUtils;
-import org.apache.inlong.tubemq.server.common.TServerConstants;
-
-/**
- * MemStoreStatsHolder, The statistics set related to the memory cache.
- *
- * Through this file storage statistics class, it mainly counts
- * the number of messages refreshed each time, the total data size, the total 
index size,
- * the total number of times the data file is full, the number of times the 
index file is full,
- * as well as the conditions that trigger the flush, the data write 
information such as
- * the number of refreshes that enter the timeout, etc.
- *
- * This part supports index comparison output before and after data collection.
- */
-public class MemStoreStatsHolder {
-    // Switchable statistic items
-    private final MemStatsItemSet[] memStatsSets = new MemStatsItemSet[2];
-    // Current writable index
-    private final AtomicInteger writableIndex = new AtomicInteger(0);
-    // Last query time
-    private final AtomicLong lstQueryTime = new AtomicLong(0);
-    // Last snapshot time
-    private final AtomicLong lstSnapshotTime = new AtomicLong(0);
-    // whether the statistic is closed
-    private volatile boolean isClosed;
-
-    public MemStoreStatsHolder() {
-        this.isClosed = true;
-        this.memStatsSets[0] = new MemStatsItemSet();
-        this.memStatsSets[1] = new MemStatsItemSet();
-        this.lstQueryTime.set(System.currentTimeMillis());
-        this.lstSnapshotTime.set(System.currentTimeMillis());
-    }
-
-    /**
-     * Add appended message size statistic.
-     *
-     * @param msgSize   the message size
-     */
-    public void addAppendedMsgSize(int msgSize) {
-        if (isClosed) {
-            return;
-        }
-        memStatsSets[getIndex()].msgRcvStats.update(msgSize);
-    }
-
-    /**
-     * Add write message failure count statistics.
-     */
-    public void addMsgWriteFail() {
-        if (isClosed) {
-            return;
-        }
-        memStatsSets[getIndex()].writeFailCnt.incValue();
-    }
-
-    /**
-     * Add cache pending count statistics.
-     */
-    public void addCachePending() {
-        if (isClosed) {
-            return;
-        }
-        memStatsSets[getIndex()].flushPendingCnt.incValue();
-    }
-
-    /**
-     * Add cache re-alloc count statistics.
-     */
-    public void addCacheReAlloc() {
-        if (isClosed) {
-            return;
-        }
-        memStatsSets[getIndex()].cacheReAllocCnt.incValue();
-    }
-
-    /**
-     * Add flush trigger type statistics.
-     *
-     * @param isDataSizeFull     whether the cached data is full
-     * @param isIndexSizeFull    whether the cached index is full
-     * @param isMsgCntFull       whether the cached message count is full
-     */
-    public void addCacheFullType(boolean isDataSizeFull,
-                                 boolean isIndexSizeFull,
-                                 boolean isMsgCntFull) {
-        if (isClosed) {
-            return;
-        }
-        if (isDataSizeFull) {
-            memStatsSets[getIndex()].dataSizeFullCnt.incValue();
-        }
-        if (isIndexSizeFull) {
-            memStatsSets[getIndex()].indexSizeFullCnt.incValue();
-        }
-        if (isMsgCntFull) {
-            memStatsSets[getIndex()].msgCountFullCnt.incValue();
-        }
-    }
-
-    /**
-     * Add flush time statistic.
-     *
-     * @param flushTime          the flush time
-     * @param isTimeoutFlush     whether is timeout flush
-     */
-    public void addFlushTime(long flushTime, boolean isTimeoutFlush) {
-        if (isClosed) {
-            return;
-        }
-        memStatsSets[getIndex()].cacheSyncStats.update(flushTime);
-        if (isTimeoutFlush) {
-            memStatsSets[getIndex()].cachedTimeFullCnt.incValue();
-        }
-    }
-
-    /**
-     * Check whether has exceeded the maximum self-statistics period.
-     *
-     * @param checkTime   the check time
-     */
-    public synchronized void chkStatsExpired(long checkTime) {
-        if (!this.isClosed) {
-            if ((checkTime - this.lstQueryTime.get())
-                    >= TServerConstants.CFG_STORE_STATS_MAX_REFRESH_DURATION) {
-                this.isClosed = true;
-            }
-        }
-    }
-
-    /**
-     * Get current writing statistics information.
-     *
-     * @param statsMap  the return map information
-     */
-    public void getValue(Map<String, Long> statsMap) {
-        enableStats();
-        getStatsValue(true, memStatsSets[getIndex()], statsMap);
-    }
-
-    /**
-     * Get current writing statistics information.
-     *
-     * @param strBuff  the return information in json format
-     */
-    public void getValue(StringBuilder strBuff) {
-        enableStats();
-        getStatsValue(true, memStatsSets[getIndex()], strBuff);
-    }
-
-    /**
-     * Snapshot and get current writing statistics information.
-     *
-     * @param statsMap  the return map information
-     */
-    public void snapShort(Map<String, Long> statsMap) {
-        enableStats();
-        if (switchStatsSets()) {
-            getStatsValue(false, memStatsSets[getIndex(writableIndex.get() - 
1)], statsMap);
-        } else {
-            getStatsValue(true, memStatsSets[getIndex()], statsMap);
-        }
-    }
-
-    /**
-     * Snapshot and get current writing statistics information.
-     *
-     * @param strBuff  the return information in json format
-     */
-    public void snapShort(StringBuilder strBuff) {
-        this.enableStats();
-        if (switchStatsSets()) {
-            getStatsValue(false, memStatsSets[getIndex(writableIndex.get() - 
1)], strBuff);
-        } else {
-            getStatsValue(true, memStatsSets[getIndex()], strBuff);
-        }
-    }
-
-    /**
-     * Get current memory store statistics information. Contains the data 
results of
-     * the current statistics and the previous snapshot
-     *
-     * @param isSwitch  whether to switch the writing statistics block
-     * @param strBuff   the return information
-     */
-    public synchronized void getAllMemStatsInfo(boolean isSwitch, 
StringBuilder strBuff) {
-        this.enableStats();
-        strBuff.append("[");
-        getStatsValue(false, memStatsSets[getIndex(writableIndex.get() - 1)], 
strBuff);
-        strBuff.append(",");
-        getStatsValue(true, memStatsSets[getIndex()], strBuff);
-        strBuff.append("]");
-        if (isSwitch) {
-            switchStatsSets();
-        }
-    }
-
-    /**
-     * Set statistic status
-     *
-     */
-    private synchronized void enableStats() {
-        this.lstQueryTime.set(System.currentTimeMillis());
-        if (this.isClosed) {
-            this.isClosed = false;
-        }
-    }
-
-    /**
-     * Check and switch statistic sets
-     *
-     * @return  whether the statistic sets has been switched
-     */
-    private boolean switchStatsSets() {
-        long curSwitchTime = lstSnapshotTime.get();
-        // Avoid frequent snapshots
-        if ((System.currentTimeMillis() - curSwitchTime)
-                >= TServerConstants.MIN_SNAPSHOT_PERIOD_MS) {
-            if (lstSnapshotTime.compareAndSet(curSwitchTime, 
System.currentTimeMillis())) {
-                memStatsSets[getIndex(writableIndex.get() - 1)].clear();
-                int befIndex = writableIndex.getAndIncrement();
-                
memStatsSets[getIndex(befIndex)].setSnapshotTime(lstSnapshotTime.get());
-                return true;
-            }
-        }
-        return false;
-    }
-
-    /**
-     * Get current writable block index.
-     *
-     * @return the writable block index
-     */
-    private int getIndex() {
-        return getIndex(writableIndex.get());
-    }
-
-    /**
-     * Gets the metric block index based on the specified value.
-     *
-     * @param origIndex    the specified value
-     * @return the metric block index
-     */
-    private int getIndex(int origIndex) {
-        return Math.abs(origIndex % 2);
-    }
-
-    /**
-     * Read metric block data into map.
-     *
-     * @param isWriting    the metric block is writing
-     * @param statsSet     the metric block need to read
-     * @param statsMap     the read result
-     */
-    private void getStatsValue(boolean isWriting,
-                               MemStatsItemSet statsSet,
-                               Map<String, Long> statsMap) {
-        statsMap.put(statsSet.resetTime.getFullName(),
-                statsSet.resetTime.getSinceTime());
-        statsSet.msgRcvStats.getValue(statsMap, false);
-        statsMap.put(statsSet.writeFailCnt.getFullName(),
-                statsSet.writeFailCnt.getValue());
-        statsMap.put(statsSet.dataSizeFullCnt.getFullName(),
-                statsSet.dataSizeFullCnt.getValue());
-        statsMap.put(statsSet.indexSizeFullCnt.getFullName(),
-                statsSet.indexSizeFullCnt.getValue());
-        statsMap.put(statsSet.msgCountFullCnt.getFullName(),
-                statsSet.msgCountFullCnt.getValue());
-        statsMap.put(statsSet.cachedTimeFullCnt.getFullName(),
-                statsSet.cachedTimeFullCnt.getValue());
-        statsMap.put(statsSet.flushPendingCnt.getFullName(),
-                statsSet.flushPendingCnt.getValue());
-        statsMap.put(statsSet.cacheReAllocCnt.getFullName(),
-                statsSet.cacheReAllocCnt.getValue());
-        statsSet.cacheSyncStats.getValue(statsMap, false);
-        if (isWriting) {
-            statsMap.put(statsSet.snapShotTime.getFullName(),
-                    System.currentTimeMillis());
-        } else {
-            statsMap.put(statsSet.snapShotTime.getFullName(),
-                    statsSet.snapShotTime.getSinceTime());
-        }
-    }
-
-    /**
-     * Read metric block data into string format.
-     *
-     * @param isWriting    the metric block is writing
-     * @param statsSet     the metric block need to read
-     * @param strBuff     the return buffer
-     */
-    private static void getStatsValue(boolean isWriting,
-                                      MemStatsItemSet statsSet,
-                                      StringBuilder strBuff) {
-        strBuff.append("{\"").append(statsSet.resetTime.getFullName())
-                .append("\":\"").append(statsSet.resetTime.getStrSinceTime())
-                .append("\",");
-        statsSet.msgRcvStats.getValue(strBuff, false);
-        strBuff.append(",\"").append(statsSet.writeFailCnt.getFullName())
-                .append("\":").append(statsSet.writeFailCnt.getValue())
-                .append(",\"").append(statsSet.dataSizeFullCnt.getFullName())
-                .append("\":").append(statsSet.dataSizeFullCnt.getValue())
-                .append(",\"").append(statsSet.msgCountFullCnt.getFullName())
-                .append("\":").append(statsSet.msgCountFullCnt.getValue())
-                .append(",\"").append(statsSet.cachedTimeFullCnt.getFullName())
-                .append("\":").append(statsSet.cachedTimeFullCnt.getValue())
-                .append(",\"").append(statsSet.flushPendingCnt.getFullName())
-                .append("\":").append(statsSet.flushPendingCnt.getValue())
-                .append(",\"").append(statsSet.cacheReAllocCnt.getFullName())
-                .append("\":").append(statsSet.cacheReAllocCnt.getValue())
-                .append(",\"").append(statsSet.dataSizeFullCnt.getFullName())
-                .append("\":").append(statsSet.dataSizeFullCnt.getValue())
-                .append(",");
-        statsSet.cacheSyncStats.getValue(strBuff, false);
-        
strBuff.append(",\"").append(statsSet.snapShotTime.getFullName()).append("\":\"");
-        if (isWriting) {
-            
strBuff.append(DateTimeConvertUtils.ms2yyyyMMddHHmmss(System.currentTimeMillis()));
-        } else {
-            strBuff.append(statsSet.snapShotTime.getStrSinceTime());
-        }
-        strBuff.append("\"}");
-    }
-
-    /**
-     * MemStatsItemSet, Memory cache related statistics set
-     *
-     */
-    private static class MemStatsItemSet {
-        // The reset time of memory statistics set
-        protected final SinceTime resetTime =
-                new SinceTime("reset_time", null);
-        // The status of messages received
-        protected final SimpleHistogram msgRcvStats =
-                new SimpleHistogram("msg_in", null);
-        // The count of message append failures
-        protected final LongStatsCounter writeFailCnt =
-                new LongStatsCounter("msg_append_fail", null);
-        // The cached message data full statistics
-        protected final LongStatsCounter dataSizeFullCnt =
-                new LongStatsCounter("data_size_full", null);
-        // The cached message index full statistics
-        protected final LongStatsCounter indexSizeFullCnt =
-                new LongStatsCounter("index_size_full", null);
-        // The cached message count full statistics
-        protected final LongStatsCounter msgCountFullCnt =
-                new LongStatsCounter("msg_count_full", null);
-        // The cache timeout refresh amount statistics
-        protected final LongStatsCounter cachedTimeFullCnt =
-                new LongStatsCounter("cache_time_full", null);
-        // The pending count for cache flush operations
-        protected final LongStatsCounter flushPendingCnt =
-                new LongStatsCounter("flush_pending", null);
-        // The cache re-alloc count
-        protected final LongStatsCounter cacheReAllocCnt =
-                new LongStatsCounter("cache_realloc", null);
-        // The cache persistence duration statistics
-        protected final ESTHistogram cacheSyncStats =
-                new ESTHistogram("cache_flush_dlt", null);
-        // The snapshot time of memory statistics set
-        protected final SinceTime snapShotTime =
-                new SinceTime("end_time", null);
-
-        public MemStatsItemSet() {
-            clear();
-        }
-
-        public void setSnapshotTime(long snapshotTime) {
-            this.snapShotTime.reset(snapshotTime);
-        }
-
-        public void clear() {
-            this.snapShotTime.reset();
-            this.msgRcvStats.clear();
-            this.writeFailCnt.clear();
-            this.dataSizeFullCnt.clear();
-            this.indexSizeFullCnt.clear();
-            this.msgCountFullCnt.clear();
-            this.flushPendingCnt.clear();
-            this.cacheReAllocCnt.clear();
-            this.cachedTimeFullCnt.clear();
-            this.cacheSyncStats.clear();
-            this.resetTime.reset();
-        }
-    }
-}
diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/MsgStoreStatsHolder.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/MsgStoreStatsHolder.java
new file mode 100644
index 0000000..ff88df7
--- /dev/null
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/MsgStoreStatsHolder.java
@@ -0,0 +1,615 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.server.broker.stats;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.inlong.tubemq.corebase.metric.impl.ESTHistogram;
+import org.apache.inlong.tubemq.corebase.metric.impl.LongStatsCounter;
+import org.apache.inlong.tubemq.corebase.metric.impl.SimpleHistogram;
+import org.apache.inlong.tubemq.corebase.metric.impl.SinceTime;
+import org.apache.inlong.tubemq.corebase.utils.DateTimeConvertUtils;
+import org.apache.inlong.tubemq.server.common.TServerConstants;
+
+/**
+ * MsgStoreStatsHolder, The statistics set related to the message store,
+ * include cache and file store statistics.
+ *
+ * Through this file storage statistics class, it mainly counts
+ * the number of messages refreshed each time, the total data size, the total 
index size,
+ * the total number of times the data file is full, the number of times the 
index file is full,
+ * as well as the conditions that trigger the flush, the data write 
information such as
+ * the number of refreshes that enter the timeout, etc.
+ *
+ * This part supports index comparison output before and after data collection.
+ */
+public class MsgStoreStatsHolder {
+    // Switchable statistic items
+    private final MsgStoreStatsItemSet[] msgStoreStatsSets = new 
MsgStoreStatsItemSet[2];
+    // Current writable index
+    private final AtomicInteger writableIndex = new AtomicInteger(0);
+    // Last query time
+    private final AtomicLong lstQueryTime = new AtomicLong(0);
+    // Last snapshot time
+    private final AtomicLong lstSnapshotTime = new AtomicLong(0);
+    // Whether closed for unused for a long time
+    private volatile boolean isClosed;
+    // whether the statistic is manual closed
+    private volatile boolean isManualClosed = false;
+
+    public MsgStoreStatsHolder() {
+        this.isClosed = true;
+        this.msgStoreStatsSets[0] = new MsgStoreStatsItemSet();
+        this.msgStoreStatsSets[1] = new MsgStoreStatsItemSet();
+        this.lstQueryTime.set(System.currentTimeMillis());
+        this.lstSnapshotTime.set(System.currentTimeMillis());
+    }
+
+    /**
+     * Add appended message size statistic.
+     *
+     * @param msgSize   the message size
+     */
+    public void addCacheMsgSize(int msgSize) {
+        if (isClosed) {
+            return;
+        }
+        msgStoreStatsSets[getIndex()].cacheMsgRcvStats.update(msgSize);
+    }
+
+    /**
+     * Add write message failure count statistics.
+     */
+    public void addMsgWriteCacheFail() {
+        if (isClosed) {
+            return;
+        }
+        msgStoreStatsSets[getIndex()].cacheWriteFailCnt.incValue();
+    }
+
+    /**
+     * Add cache pending count statistics.
+     */
+    public void addCachePending() {
+        if (isClosed) {
+            return;
+        }
+        msgStoreStatsSets[getIndex()].cacheFlushPendingCnt.incValue();
+    }
+
+    /**
+     * Add cache re-alloc count statistics.
+     */
+    public void addCacheReAlloc() {
+        if (isClosed) {
+            return;
+        }
+        msgStoreStatsSets[getIndex()].cacheReAllocCnt.incValue();
+    }
+
+    /**
+     * Add flush trigger type statistics.
+     *
+     * @param isDataSizeFull     whether the cached data is full
+     * @param isIndexSizeFull    whether the cached index is full
+     * @param isMsgCntFull       whether the cached message count is full
+     */
+    public void addCacheFullType(boolean isDataSizeFull,
+                                 boolean isIndexSizeFull,
+                                 boolean isMsgCntFull) {
+        if (isClosed) {
+            return;
+        }
+        if (isDataSizeFull) {
+            msgStoreStatsSets[getIndex()].cacheDataSizeFullCnt.incValue();
+        }
+        if (isIndexSizeFull) {
+            msgStoreStatsSets[getIndex()].cacheIndexSizeFullCnt.incValue();
+        }
+        if (isMsgCntFull) {
+            msgStoreStatsSets[getIndex()].cacheMsgCountFullCnt.incValue();
+        }
+    }
+
+    /**
+     * Add flush time statistic.
+     *
+     * @param flushTime          the flush time
+     * @param isTimeoutFlush     whether is timeout flush
+     */
+    public void addCacheFlushTime(long flushTime, boolean isTimeoutFlush) {
+        if (isClosed) {
+            return;
+        }
+        msgStoreStatsSets[getIndex()].cacheSyncStats.update(flushTime);
+        if (isTimeoutFlush) {
+            msgStoreStatsSets[getIndex()].cacheTimeFullCnt.incValue();
+        }
+    }
+
+    /**
+     * Add message store statistics information.
+     *
+     * @param msgCnt             the message count written
+     * @param msgIndexSize       the message index size written
+     * @param msgDataSize        the message data size written
+     * @param flushedMsgCnt      the flushed message count
+     * @param flushedDataSize    the flushed message size
+     * @param isDataSegFlush     whether the data segment flushed
+     * @param isIndexSegFlush    whether the index segment flushed
+     * @param isDataSizeFull     whether the cached data is full
+     * @param isMsgCntFull       whether the cached message count is full
+     * @param isCacheTimeFull    whether the cached time is full
+     * @param isForceMetadata    whether force push metadata
+     */
+    public void addFileFlushStatsInfo(int msgCnt, int msgIndexSize, int 
msgDataSize,
+                                      long flushedMsgCnt, long flushedDataSize,
+                                      boolean isDataSegFlush, boolean 
isIndexSegFlush,
+                                      boolean isDataSizeFull, boolean 
isMsgCntFull,
+                                      boolean isCacheTimeFull, boolean 
isForceMetadata) {
+        if (isClosed) {
+            return;
+        }
+        MsgStoreStatsItemSet tmStatsSet = msgStoreStatsSets[getIndex()];
+        tmStatsSet.fileAccumMsgCnt.addValue(msgCnt);
+        tmStatsSet.fileAccumMsgIndexSize.addValue(msgIndexSize);
+        tmStatsSet.fileAccumMsgDataSize.addValue(msgDataSize);
+        if (flushedDataSize > 0) {
+            tmStatsSet.fileFlushedDataSize.update(flushedDataSize);
+        }
+        if (flushedMsgCnt > 0) {
+            tmStatsSet.fileFlushedMsgCnt.update(flushedMsgCnt);
+        }
+        if (isDataSegFlush) {
+            tmStatsSet.fileDataSegAddCnt.incValue();
+        }
+        if (isIndexSegFlush) {
+            tmStatsSet.fileIndexSegAddCnt.incValue();
+        }
+        if (isDataSizeFull) {
+            tmStatsSet.fileDataSizeFullCnt.incValue();
+        }
+        if (isMsgCntFull) {
+            tmStatsSet.fileMsgCountFullCnt.incValue();
+        }
+        if (isCacheTimeFull) {
+            tmStatsSet.fileCachedTimeFullCnt.incValue();
+        }
+        if (isForceMetadata) {
+            tmStatsSet.fileMetaFlushCnt.incValue();
+        }
+    }
+
+    /**
+     * Add flush time timeout statistic.
+     *
+     * @param flushedMsgCnt      the flushed message count
+     * @param flushedDataSize    the flushed message size
+     * @param isForceMetadata    whether force push metadata
+     */
+    public void addFileTimeoutFlushStats(long flushedMsgCnt,
+                                         long flushedDataSize,
+                                         boolean isForceMetadata) {
+        if (isClosed) {
+            return;
+        }
+        MsgStoreStatsItemSet tmStatsSet = msgStoreStatsSets[getIndex()];
+        tmStatsSet.fileCachedTimeFullCnt.incValue();
+        if (flushedDataSize > 0) {
+            tmStatsSet.fileFlushedDataSize.update(flushedDataSize);
+        }
+        if (flushedMsgCnt > 0) {
+            tmStatsSet.fileFlushedMsgCnt.update(flushedMsgCnt);
+        }
+        if (isForceMetadata) {
+            tmStatsSet.fileMetaFlushCnt.incValue();
+        }
+    }
+
+    /**
+     * Check whether has exceeded the maximum self-statistics period.
+     *
+     * @param checkTime   the check time
+     */
+    public synchronized void chkStatsExpired(long checkTime) {
+        if (!this.isClosed) {
+            if ((checkTime - this.lstQueryTime.get())
+                    >= TServerConstants.CFG_STORE_STATS_MAX_REFRESH_DURATION) {
+                this.isClosed = true;
+            }
+        }
+    }
+
+    /**
+     * Set manually the statistic status.
+     *
+     * @param enableStats  enable or disable the statistic.
+     */
+    public synchronized void setStatsStatus(boolean enableStats) {
+        if (enableStats) {
+            this.isManualClosed = false;
+        } else {
+            this.isManualClosed = true;
+            this.isClosed = true;
+        }
+    }
+
+    /**
+     * Query whether the statistic is closed.
+     *
+     * @return the statistic status
+     */
+    public boolean isStatsClosed() {
+        return (this.isManualClosed || this.isClosed);
+    }
+
+    /**
+     * Get current writing statistics information.
+     *
+     * @param statsMap  the return map information
+     */
+    public void getValue(Map<String, Long> statsMap) {
+        activeStatsBaseCall();
+        getStatsValue(true, msgStoreStatsSets[getIndex()], statsMap);
+    }
+
+    /**
+     * Get current writing statistics information.
+     *
+     * @param strBuff  the return information in json format
+     */
+    public void getValue(StringBuilder strBuff) {
+        activeStatsBaseCall();
+        getStatsValue(true, msgStoreStatsSets[getIndex()], strBuff);
+    }
+
+    /**
+     * Snapshot and get current writing statistics information.
+     *
+     * @param statsMap  the return map information
+     */
+    public void snapShort(Map<String, Long> statsMap) {
+        activeStatsBaseCall();
+        if (switchStatsSets()) {
+            getStatsValue(false,
+                    msgStoreStatsSets[getIndex(writableIndex.get() - 1)], 
statsMap);
+        } else {
+            getStatsValue(true, msgStoreStatsSets[getIndex()], statsMap);
+        }
+    }
+
+    /**
+     * Snapshot and get current writing statistics information.
+     *
+     * @param strBuff  the return information in json format
+     */
+    public void snapShort(StringBuilder strBuff) {
+        this.activeStatsBaseCall();
+        if (switchStatsSets()) {
+            getStatsValue(false,
+                    msgStoreStatsSets[getIndex(writableIndex.get() - 1)], 
strBuff);
+        } else {
+            getStatsValue(true, msgStoreStatsSets[getIndex()], strBuff);
+        }
+    }
+
+    /**
+     * Get current message store statistics information. Contains the data 
results of
+     * the current statistics and the previous snapshot
+     *
+     * @param isSwitch  whether to switch the writing statistics block
+     * @param strBuff   the return information
+     */
+    public synchronized void getMsgStoreStatsInfo(boolean isSwitch, 
StringBuilder strBuff) {
+        this.activeStatsBaseCall();
+        strBuff.append("[");
+        getStatsValue(false,
+                msgStoreStatsSets[getIndex(writableIndex.get() - 1)], strBuff);
+        strBuff.append(",");
+        getStatsValue(true, msgStoreStatsSets[getIndex()], strBuff);
+        strBuff.append("]");
+        if (isSwitch) {
+            switchStatsSets();
+        }
+    }
+
+    /**
+     * Active statistic status based on api call
+     *
+     */
+    private void activeStatsBaseCall() {
+        if (isManualClosed) {
+            return;
+        }
+        this.lstQueryTime.set(System.currentTimeMillis());
+        if (this.isClosed) {
+            this.isClosed = false;
+        }
+    }
+
+    /**
+     * Check and switch statistic sets
+     *
+     * @return  whether the statistic sets has been switched
+     */
+    private boolean switchStatsSets() {
+        long curSwitchTime = lstSnapshotTime.get();
+        // Avoid frequent snapshots
+        if ((System.currentTimeMillis() - curSwitchTime)
+                >= TServerConstants.MIN_SNAPSHOT_PERIOD_MS) {
+            if (lstSnapshotTime.compareAndSet(curSwitchTime, 
System.currentTimeMillis())) {
+                msgStoreStatsSets[getIndex(writableIndex.get() - 1)].clear();
+                msgStoreStatsSets[getIndex(writableIndex.getAndIncrement())]
+                        .setSnapshotTime(lstSnapshotTime.get());
+                return true;
+            }
+        }
+        return false;
+    }
+
+    /**
+     * Get current writable block index.
+     *
+     * @return the writable block index
+     */
+    private int getIndex() {
+        return getIndex(writableIndex.get());
+    }
+
+    /**
+     * Gets the metric block index based on the specified value.
+     *
+     * @param origIndex    the specified value
+     * @return the metric block index
+     */
+    private int getIndex(int origIndex) {
+        return Math.abs(origIndex % 2);
+    }
+
+    /**
+     * Read metric block data into map.
+     *
+     * @param isWriting    the metric block is writing
+     * @param statsSet     the metric block need to read
+     * @param statsMap     the read result
+     */
+    private void getStatsValue(boolean isWriting,
+                               MsgStoreStatsItemSet statsSet,
+                               Map<String, Long> statsMap) {
+        statsMap.put(statsSet.resetTime.getFullName(),
+                statsSet.resetTime.getSinceTime());
+        statsMap.put("isClosed", (isStatsClosed() ? 1L : 0L));
+        // for memory store
+        statsSet.cacheMsgRcvStats.getValue(statsMap, false);
+        statsMap.put(statsSet.cacheWriteFailCnt.getFullName(),
+                statsSet.cacheWriteFailCnt.getValue());
+        statsMap.put(statsSet.cacheDataSizeFullCnt.getFullName(),
+                statsSet.cacheDataSizeFullCnt.getValue());
+        statsMap.put(statsSet.cacheIndexSizeFullCnt.getFullName(),
+                statsSet.cacheIndexSizeFullCnt.getValue());
+        statsMap.put(statsSet.cacheMsgCountFullCnt.getFullName(),
+                statsSet.cacheMsgCountFullCnt.getValue());
+        statsMap.put(statsSet.cacheTimeFullCnt.getFullName(),
+                statsSet.cacheTimeFullCnt.getValue());
+        statsMap.put(statsSet.cacheFlushPendingCnt.getFullName(),
+                statsSet.cacheFlushPendingCnt.getValue());
+        statsMap.put(statsSet.cacheReAllocCnt.getFullName(),
+                statsSet.cacheReAllocCnt.getValue());
+        statsSet.cacheSyncStats.getValue(statsMap, false);
+        // for file store
+        statsMap.put(statsSet.fileAccumMsgCnt.getFullName(),
+                statsSet.fileAccumMsgCnt.getValue());
+        statsMap.put(statsSet.fileAccumMsgDataSize.getFullName(),
+                statsSet.fileAccumMsgDataSize.getValue());
+        statsMap.put(statsSet.fileAccumMsgIndexSize.getFullName(),
+                statsSet.fileAccumMsgIndexSize.getValue());
+        statsSet.fileFlushedDataSize.getValue(statsMap, false);
+        statsSet.fileFlushedMsgCnt.getValue(statsMap, false);
+        statsMap.put(statsSet.fileDataSegAddCnt.getFullName(),
+                statsSet.fileDataSegAddCnt.getValue());
+        statsMap.put(statsSet.fileIndexSegAddCnt.getFullName(),
+                statsSet.fileIndexSegAddCnt.getValue());
+        statsMap.put(statsSet.fileMetaFlushCnt.getFullName(),
+                statsSet.fileMetaFlushCnt.getValue());
+        statsMap.put(statsSet.fileDataSizeFullCnt.getFullName(),
+                statsSet.fileDataSizeFullCnt.getValue());
+        statsMap.put(statsSet.fileMsgCountFullCnt.getFullName(),
+                statsSet.fileMsgCountFullCnt.getValue());
+        statsMap.put(statsSet.fileCachedTimeFullCnt.getFullName(),
+                statsSet.fileCachedTimeFullCnt.getValue());
+        if (isWriting) {
+            statsMap.put(statsSet.snapShotTime.getFullName(),
+                    System.currentTimeMillis());
+        } else {
+            statsMap.put(statsSet.snapShotTime.getFullName(),
+                    statsSet.snapShotTime.getSinceTime());
+        }
+    }
+
+    /**
+     * Read metric block data into string format.
+     *
+     * @param isWriting    the metric block is writing
+     * @param statsSet     the metric block need to read
+     * @param strBuff     the return buffer
+     */
+    private void getStatsValue(boolean isWriting,
+                               MsgStoreStatsItemSet statsSet,
+                               StringBuilder strBuff) {
+        strBuff.append("{\"").append(statsSet.resetTime.getFullName())
+                .append("\":\"").append(statsSet.resetTime.getStrSinceTime())
+                
.append("\",\"isClosed\":").append(isStatsClosed()).append(",");
+        statsSet.cacheMsgRcvStats.getValue(strBuff, false);
+        strBuff.append(",\"").append(statsSet.cacheWriteFailCnt.getFullName())
+                .append("\":").append(statsSet.cacheWriteFailCnt.getValue())
+                
.append(",\"").append(statsSet.cacheDataSizeFullCnt.getFullName())
+                .append("\":").append(statsSet.cacheDataSizeFullCnt.getValue())
+                
.append(",\"").append(statsSet.cacheMsgCountFullCnt.getFullName())
+                .append("\":").append(statsSet.cacheMsgCountFullCnt.getValue())
+                .append(",\"").append(statsSet.cacheTimeFullCnt.getFullName())
+                .append("\":").append(statsSet.cacheTimeFullCnt.getValue())
+                
.append(",\"").append(statsSet.cacheFlushPendingCnt.getFullName())
+                .append("\":").append(statsSet.cacheFlushPendingCnt.getValue())
+                .append(",\"").append(statsSet.cacheReAllocCnt.getFullName())
+                .append("\":").append(statsSet.cacheReAllocCnt.getValue())
+                
.append(",\"").append(statsSet.cacheDataSizeFullCnt.getFullName())
+                .append("\":").append(statsSet.cacheDataSizeFullCnt.getValue())
+                .append(",");
+        statsSet.cacheSyncStats.getValue(strBuff, false);
+        strBuff.append(",\"").append(statsSet.fileAccumMsgCnt.getFullName())
+                .append("\":").append(statsSet.fileAccumMsgCnt.getValue())
+                
.append(",\"").append(statsSet.fileAccumMsgDataSize.getFullName())
+                .append("\":").append(statsSet.fileAccumMsgDataSize.getValue())
+                
.append(",\"").append(statsSet.fileAccumMsgIndexSize.getFullName())
+                
.append("\":").append(statsSet.fileAccumMsgIndexSize.getValue())
+                .append(",");
+        statsSet.fileFlushedDataSize.getValue(strBuff, false);
+        strBuff.append(",");
+        statsSet.fileFlushedMsgCnt.getValue(strBuff, false);
+        strBuff.append(",\"").append(statsSet.fileDataSegAddCnt.getFullName())
+                .append("\":").append(statsSet.fileDataSegAddCnt.getValue())
+                
.append(",\"").append(statsSet.fileIndexSegAddCnt.getFullName())
+                .append("\":").append(statsSet.fileIndexSegAddCnt.getValue())
+                .append(",\"").append(statsSet.fileMetaFlushCnt.getFullName())
+                .append("\":").append(statsSet.fileMetaFlushCnt.getValue())
+                
.append(",\"").append(statsSet.fileDataSizeFullCnt.getFullName())
+                .append("\":").append(statsSet.fileDataSizeFullCnt.getValue())
+                
.append(",\"").append(statsSet.fileMsgCountFullCnt.getFullName())
+                .append("\":").append(statsSet.fileMsgCountFullCnt.getValue())
+                
.append(",\"").append(statsSet.fileCachedTimeFullCnt.getFullName())
+                
.append("\":").append(statsSet.fileCachedTimeFullCnt.getValue())
+                .append(",\"").append(statsSet.snapShotTime.getFullName())
+                .append("\":\"");
+        if (isWriting) {
+            
strBuff.append(DateTimeConvertUtils.ms2yyyyMMddHHmmss(System.currentTimeMillis()));
+        } else {
+            strBuff.append(statsSet.snapShotTime.getStrSinceTime());
+        }
+        strBuff.append("\"}");
+    }
+
+    /**
+     * MsgStoreStatsItemSet, Message store cache related statistics set
+     *
+     */
+    private static class MsgStoreStatsItemSet {
+        // The reset time of statistics set
+        protected final SinceTime resetTime =
+                new SinceTime("reset_time", null);
+        // The status of messages received by cache
+        protected final SimpleHistogram cacheMsgRcvStats =
+                new SimpleHistogram("cache_msg_in", null);
+        // The count of message append cache failures
+        protected final LongStatsCounter cacheWriteFailCnt =
+                new LongStatsCounter("cache_append_fail", null);
+        // The cached message data full statistics
+        protected final LongStatsCounter cacheDataSizeFullCnt =
+                new LongStatsCounter("cache_data_full", null);
+        // The cached message index full statistics
+        protected final LongStatsCounter cacheIndexSizeFullCnt =
+                new LongStatsCounter("cache_index_full", null);
+        // The cached message count full statistics
+        protected final LongStatsCounter cacheMsgCountFullCnt =
+                new LongStatsCounter("cache_count_full", null);
+        // The cache timeout refresh amount statistics
+        protected final LongStatsCounter cacheTimeFullCnt =
+                new LongStatsCounter("cache_time_full", null);
+        // The pending count for cache flush operations
+        protected final LongStatsCounter cacheFlushPendingCnt =
+                new LongStatsCounter("cache_flush_pending", null);
+        // The cache re-alloc count
+        protected final LongStatsCounter cacheReAllocCnt =
+                new LongStatsCounter("cache_realloc", null);
+        // The cache persistence duration statistics
+        protected final ESTHistogram cacheSyncStats =
+                new ESTHistogram("cache_flush_dlt", null);
+        // for file store
+        // The accumulate message count statistics
+        protected final LongStatsCounter fileAccumMsgCnt =
+                new LongStatsCounter("file_total_msg_cnt", null);
+        // The accumulate message data size statistics
+        protected final LongStatsCounter fileAccumMsgDataSize =
+                new LongStatsCounter("file_total_data_size", null);
+        // The accumulate message index statistics
+        protected final LongStatsCounter fileAccumMsgIndexSize =
+                new LongStatsCounter("file_total_index_size", null);
+        // The data flushed statistics
+        protected final SimpleHistogram fileFlushedDataSize =
+                new SimpleHistogram("file_flushed_data", null);
+        // The message count flushed statistics
+        protected final SimpleHistogram fileFlushedMsgCnt =
+                new SimpleHistogram("file_flushed_msg", null);
+        // The new data segment statistics
+        protected final LongStatsCounter fileDataSegAddCnt =
+                new LongStatsCounter("file_data_seg", null);
+        // The new index segment full statistics
+        protected final LongStatsCounter fileIndexSegAddCnt =
+                new LongStatsCounter("file_index_seg", null);
+        // The flush count statistics of file meta-data
+        protected final LongStatsCounter fileMetaFlushCnt =
+                new LongStatsCounter("file_meta_flush", null);
+        // The cached message data full statistics
+        protected final LongStatsCounter fileDataSizeFullCnt =
+                new LongStatsCounter("file_data_full", null);
+        // The cached message count full statistics
+        protected final LongStatsCounter fileMsgCountFullCnt =
+                new LongStatsCounter("file_count_full", null);
+        // The cache timeout refresh amount statistics
+        protected final LongStatsCounter fileCachedTimeFullCnt =
+                new LongStatsCounter("file_time_full", null);
+        // The snapshot time of statistics set
+        protected final SinceTime snapShotTime =
+                new SinceTime("end_time", null);
+
+        public MsgStoreStatsItemSet() {
+            clear();
+        }
+
+        public void setSnapshotTime(long snapshotTime) {
+            this.snapShotTime.reset(snapshotTime);
+        }
+
+        public void clear() {
+            this.snapShotTime.reset();
+            // for file metric items
+            this.fileAccumMsgCnt.clear();
+            this.fileAccumMsgDataSize.clear();
+            this.fileFlushedDataSize.clear();
+            this.fileAccumMsgIndexSize.clear();
+            this.fileFlushedMsgCnt.clear();
+            this.fileDataSegAddCnt.clear();
+            this.fileIndexSegAddCnt.clear();
+            this.fileDataSizeFullCnt.clear();
+            this.fileMetaFlushCnt.clear();
+            this.fileMsgCountFullCnt.clear();
+            this.fileCachedTimeFullCnt.clear();
+            // for cache metric items
+            this.cacheMsgRcvStats.clear();
+            this.cacheWriteFailCnt.clear();
+            this.cacheDataSizeFullCnt.clear();
+            this.cacheIndexSizeFullCnt.clear();
+            this.cacheMsgCountFullCnt.clear();
+            this.cacheFlushPendingCnt.clear();
+            this.cacheReAllocCnt.clear();
+            this.cacheTimeFullCnt.clear();
+            this.cacheSyncStats.clear();
+            this.resetTime.reset();
+        }
+    }
+}
diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/ServiceStatsHolder.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/ServiceStatsHolder.java
index 7b398dc..217a14a 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/ServiceStatsHolder.java
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/ServiceStatsHolder.java
@@ -43,6 +43,8 @@ public class ServiceStatsHolder {
     private static final AtomicInteger writableIndex = new AtomicInteger(0);
     // Last snapshot time
     private static final AtomicLong lstSnapshotTime = new AtomicLong(0);
+    // whether the DiskSync statistic is closed
+    private static volatile boolean diskSyncClosed = false;
 
     // Initial service statistic set
     static {
@@ -60,34 +62,39 @@ public class ServiceStatsHolder {
     }
 
     public static void snapShort(Map<String, Long> statsMap) {
-        long curSnapshotTime = lstSnapshotTime.get();
-        // Avoid frequent snapshots
-        if ((System.currentTimeMillis() - curSnapshotTime)
-                >= TServerConstants.MIN_SNAPSHOT_PERIOD_MS) {
-            if (lstSnapshotTime.compareAndSet(curSnapshotTime, 
System.currentTimeMillis())) {
-                int befIndex = writableIndex.getAndIncrement();
-                switchableSets[getIndex()].resetSinceTime();
-                getStatsValue(switchableSets[getIndex(befIndex)], true, 
statsMap);
-                return;
-            }
+        if (switchWritingStatsUnit()) {
+            getStatsValue(switchableSets[getIndex(writableIndex.get() - 1)], 
true, statsMap);
+        } else {
+            getValue(statsMap);
         }
-        getValue(statsMap);
     }
 
     public static void snapShort(StringBuilder strBuff) {
-        long curSnapshotTime = lstSnapshotTime.get();
-        // Avoid frequent snapshots
-        if ((System.currentTimeMillis() - curSnapshotTime)
-                >= TServerConstants.MIN_SNAPSHOT_PERIOD_MS) {
-            if (lstSnapshotTime.compareAndSet(curSnapshotTime, 
System.currentTimeMillis())) {
-                int befIndex = writableIndex.getAndIncrement();
-                switchableSets[getIndex()].resetSinceTime();
-                getStatsValue(switchableSets[getIndex(befIndex)], true, 
strBuff);
-                return;
-            }
+        if (switchWritingStatsUnit()) {
+            getStatsValue(switchableSets[getIndex(writableIndex.get() - 1)], 
true, strBuff);
+        } else {
+            getValue(strBuff);
         }
-        getValue(strBuff);
     }
+
+    /**
+     * Set manually the DiskSync statistic status.
+     *
+     * @param enableStats  enable or disable the statistic.
+     */
+    public static synchronized void setDiskSyncStatsStatus(boolean 
enableStats) {
+        ServiceStatsHolder.diskSyncClosed = !enableStats;
+    }
+
+    /**
+     * Query whether the statistic is closed.
+     *
+     * @return the statistic status
+     */
+    public static boolean isDiskSyncStatsClosed() {
+        return ServiceStatsHolder.diskSyncClosed;
+    }
+
     // metric set operate APIs end
 
     // metric item operate APIs begin
@@ -119,6 +126,9 @@ public class ServiceStatsHolder {
     }
 
     public static void updDiskSyncDataDlt(long dltTime) {
+        if (diskSyncClosed) {
+            return;
+        }
         switchableSets[getIndex()].fileSyncDltStats.update(dltTime);
     }
 
@@ -128,11 +138,25 @@ public class ServiceStatsHolder {
     // metric set operate APIs end
 
     // private functions
+    private static boolean switchWritingStatsUnit() {
+        long curSnapshotTime = lstSnapshotTime.get();
+        // Avoid frequent snapshots
+        if ((System.currentTimeMillis() - curSnapshotTime)
+                >= TServerConstants.MIN_SNAPSHOT_PERIOD_MS) {
+            if (lstSnapshotTime.compareAndSet(curSnapshotTime, 
System.currentTimeMillis())) {
+                
switchableSets[getIndex(writableIndex.incrementAndGet())].resetSinceTime();
+                return true;
+            }
+        }
+        return false;
+    }
+
     private static void getStatsValue(ServiceStatsSet statsSet,
                                       boolean resetValue,
                                       Map<String, Long> statsMap) {
         statsMap.put(statsSet.lstResetTime.getFullName(),
                 statsSet.lstResetTime.getSinceTime());
+        statsMap.put("isDiskSyncClosed", (diskSyncClosed ? 1L : 0L));
         if (resetValue) {
             statsSet.fileSyncDltStats.snapShort(statsMap, false);
             statsMap.put(statsSet.fileIOExcStats.getFullName(),
@@ -171,7 +195,8 @@ public class ServiceStatsHolder {
                                       StringBuilder strBuff) {
         strBuff.append("{\"").append(statsSet.lstResetTime.getFullName())
                 
.append("\":\"").append(statsSet.lstResetTime.getStrSinceTime())
-                .append("\",");
+                .append("\",\"isDiskSyncClosed\":").append(diskSyncClosed)
+                .append(",");
         if (resetValue) {
             statsSet.fileSyncDltStats.snapShort(strBuff, false);
             strBuff.append(",\"").append(statsSet.fileIOExcStats.getFullName())
diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/TrafficStatsService.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/TrafficStatsService.java
index 04de7aa..0aa8ea2 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/TrafficStatsService.java
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/TrafficStatsService.java
@@ -90,10 +90,8 @@ public class TrafficStatsService extends 
AbstractDaemonService implements Traffi
             return;
         }
         // Output remain information
-        int index = writableIndex.get();
-        for (int i = 0; i < switchableUnits.length; i++) {
-            output2file(++index);
-        }
+        output2file(writableIndex.get() - 1);
+        output2file(writableIndex.get());
     }
 
     @Override
diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/web/BrokerAdminServlet.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/web/BrokerAdminServlet.java
index e3e7be6..b82220e 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/web/BrokerAdminServlet.java
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/web/BrokerAdminServlet.java
@@ -301,67 +301,19 @@ public class BrokerAdminServlet extends 
AbstractWebHandler {
         sBuilder.append("],\"totalCnt\":").append(recordId).append("}");
     }
 
-    /**
+    /***
      * Get memory store status info.
      *
      * @param req      request
-     * @param sBuffer  process result
+     * @param sBuilder  process result
      */
     public void adminGetMemStoreStatisInfo(HttpServletRequest req,
-                                           StringBuilder sBuffer) {
-        ProcessResult result = new ProcessResult();
-        if (!WebParameterUtils.getStringParamValue(req,
-                WebFieldDef.COMPSTOPICNAME, false, null, sBuffer, result)) {
-            WebParameterUtils.buildFailResult(sBuffer, result.getErrMsg());
-            return;
-        }
-        Set<String> topicNameSet = (Set<String>) result.getRetData();
-        if (!WebParameterUtils.getBooleanParamValue(req,
-                WebFieldDef.NEEDREFRESH, false, false, sBuffer, result)) {
-            WebParameterUtils.buildFailResult(sBuffer, result.getErrMsg());
-            return;
-        }
-        boolean requireRefresh = (boolean) result.getRetData();
-        
sBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"Success!\",\"detail\":[");
-        Map<String, ConcurrentHashMap<Integer, MessageStore>> 
messageTopicStores =
-                broker.getStoreManager().getMessageStores();
-        int index = 0;
-        int recordId = 0;
-        for (Map.Entry<String, ConcurrentHashMap<Integer, MessageStore>> entry 
: messageTopicStores.entrySet()) {
-            if (TStringUtils.isBlank(entry.getKey())
-                    || (!topicNameSet.isEmpty() && 
!topicNameSet.contains(entry.getKey()))) {
-                continue;
-            }
-            String topicName = entry.getKey();
-            if (recordId++ > 0) {
-                sBuffer.append(",");
-            }
-            index = 0;
-            
sBuffer.append("{\"topicName\":\"").append(topicName).append("\",\"storeStatsInfo\":[");
-            ConcurrentHashMap<Integer, MessageStore> partStoreMap = 
entry.getValue();
-            if (partStoreMap != null) {
-                for (Entry<Integer, MessageStore> subEntry : 
partStoreMap.entrySet()) {
-                    MessageStore msgStore = subEntry.getValue();
-                    if (msgStore == null) {
-                        continue;
-                    }
-                    if (index++ > 0) {
-                        sBuffer.append(",");
-                    }
-                    sBuffer.append("{\"storeId\":").append(subEntry.getKey())
-                            .append(",\"memStats\":");
-                    msgStore.getMemStoreStatsInfo(requireRefresh, sBuffer);
-                    sBuffer.append(",\"fileStats\":");
-                    msgStore.getCurFileStoreStatsInfo(requireRefresh, sBuffer);
-                    sBuffer.append("}");
-                }
-            }
-            sBuffer.append("]}");
-        }
-        sBuffer.append("],\"totalCount\":").append(recordId).append("}");
+                                           StringBuilder sBuilder) {
+        sBuilder.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
+                .append("The method is deprecated, please use 
admin_get_msgstore_stats\"}");
     }
 
-    /**
+    /***
      * Manual set offset.
      *
      * @param req      request
diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/webbase/WebCallStatsHolder.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/webbase/WebCallStatsHolder.java
index a8ade6e..f47beb7 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/webbase/WebCallStatsHolder.java
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/webbase/WebCallStatsHolder.java
@@ -40,6 +40,8 @@ public class WebCallStatsHolder {
     private static final AtomicInteger writableIndex = new AtomicInteger(0);
     // Last snapshot time
     private static final AtomicLong lstSnapshotTime = new AtomicLong(0);
+    // whether the statistic is manual closed
+    private static volatile boolean isManualClosed = false;
 
     // Initial service statistic set
     static {
@@ -57,38 +59,45 @@ public class WebCallStatsHolder {
     }
 
     public static void snapShort(Map<String, Long> statsMap) {
-        long curSnapshotTime = lstSnapshotTime.get();
-        // Avoid frequent snapshots
-        if ((System.currentTimeMillis() - curSnapshotTime)
-                >= TServerConstants.MIN_SNAPSHOT_PERIOD_MS) {
-            if (lstSnapshotTime.compareAndSet(curSnapshotTime, 
System.currentTimeMillis())) {
-                int befIndex = writableIndex.getAndIncrement();
-                switchableSets[getIndex()].resetSinceTime();
-                getStatsValue(switchableSets[getIndex(befIndex)], true, 
statsMap);
-                return;
-            }
+        if (switchWritingStatsUnit()) {
+            getStatsValue(switchableSets[getIndex(writableIndex.get() - 1)], 
true, statsMap);
+        } else {
+            getValue(statsMap);
         }
-        getValue(statsMap);
     }
 
     public static void snapShort(StringBuilder strBuff) {
-        long curSnapshotTime = lstSnapshotTime.get();
-        // Avoid frequent snapshots
-        if ((System.currentTimeMillis() - curSnapshotTime)
-                >= TServerConstants.MIN_SNAPSHOT_PERIOD_MS) {
-            if (lstSnapshotTime.compareAndSet(curSnapshotTime, 
System.currentTimeMillis())) {
-                int befIndex = writableIndex.getAndIncrement();
-                switchableSets[getIndex()].resetSinceTime();
-                getStatsValue(switchableSets[getIndex(befIndex)], true, 
strBuff);
-                return;
-            }
+        if (switchWritingStatsUnit()) {
+            getStatsValue(switchableSets[getIndex(writableIndex.get() - 1)], 
true, strBuff);
+        } else {
+            getValue(strBuff);
         }
-        getValue(strBuff);
+    }
+
+    /**
+     * Set manually the statistic status.
+     *
+     * @param enableStats  enable or disable the statistic.
+     */
+    public static synchronized void setStatsStatus(boolean enableStats) {
+        WebCallStatsHolder.isManualClosed = !enableStats;
+    }
+
+    /**
+     * Query whether the statistic is closed.
+     *
+     * @return the statistic status
+     */
+    public static boolean isStatsClosed() {
+        return WebCallStatsHolder.isManualClosed;
     }
     // metric set operate APIs end
 
     // metric item operate APIs begin
     public static void addMethodCall(String method, long callDlt) {
+        if (isManualClosed) {
+            return;
+        }
         method = (method == null) ? "NULL" : method;
         WebCallStatsItemSet webCallStatsSet = switchableSets[getIndex()];
         webCallStatsSet.totalCallStats.update(callDlt);
@@ -105,11 +114,25 @@ public class WebCallStatsHolder {
     // metric set operate APIs end
 
     // private functions
+    private static boolean switchWritingStatsUnit() {
+        long curSnapshotTime = lstSnapshotTime.get();
+        // Avoid frequent snapshots
+        if ((System.currentTimeMillis() - curSnapshotTime)
+                >= TServerConstants.MIN_SNAPSHOT_PERIOD_MS) {
+            if (lstSnapshotTime.compareAndSet(curSnapshotTime, 
System.currentTimeMillis())) {
+                
switchableSets[getIndex(writableIndex.incrementAndGet())].resetSinceTime();
+                return true;
+            }
+        }
+        return false;
+    }
+
     private static void getStatsValue(WebCallStatsItemSet statsSet,
                                       boolean resetValue,
                                       Map<String, Long> statsMap) {
         statsMap.put(statsSet.lstResetTime.getFullName(),
                 statsSet.lstResetTime.getSinceTime());
+        statsMap.put("isClosed", (isManualClosed ? 1L : 0L));
         if (resetValue) {
             statsSet.totalCallStats.snapShort(statsMap, false);
             for (SimpleHistogram itemStats : statsSet.methodStatsMap.values()) 
{
@@ -128,7 +151,7 @@ public class WebCallStatsHolder {
                                       StringBuilder strBuff) {
         strBuff.append("{\"").append(statsSet.lstResetTime.getFullName())
                 
.append("\":\"").append(statsSet.lstResetTime.getStrSinceTime())
-                .append("\",");
+                .append("\",\"isClosed\":").append(isManualClosed).append(",");
         int totalcnt = 0;
         if (resetValue) {
             statsSet.totalCallStats.snapShort(strBuff, false);
diff --git 
a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStoreTest.java
 
b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStoreTest.java
index 207e3d7..ca93202 100644
--- 
a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStoreTest.java
+++ 
b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStoreTest.java
@@ -18,7 +18,7 @@
 package org.apache.inlong.tubemq.server.broker.msgstore.mem;
 
 import java.nio.ByteBuffer;
-import org.apache.inlong.tubemq.server.broker.stats.MemStoreStatsHolder;
+import org.apache.inlong.tubemq.server.broker.stats.MsgStoreStatsHolder;
 import org.apache.inlong.tubemq.server.common.utils.AppendResult;
 import org.junit.Test;
 
@@ -33,7 +33,7 @@ public class MsgMemStoreTest {
         int maxMsgCount = 10000;
 
         MsgMemStore msgMemStore = new MsgMemStore(maxCacheSize, maxMsgCount, 
null);
-        MemStoreStatsHolder memStatsHolder = new MemStoreStatsHolder();
+        MsgStoreStatsHolder memStatsHolder = new MsgStoreStatsHolder();
         ByteBuffer bf = ByteBuffer.allocate(1024);
         bf.put("abc".getBytes());
         AppendResult appendResult = new AppendResult();
@@ -47,7 +47,7 @@ public class MsgMemStoreTest {
         int maxCacheSize = 2 * 1024 * 1024;
         int maxMsgCount = 10000;
         MsgMemStore msgMemStore = new MsgMemStore(maxCacheSize, maxMsgCount, 
null);
-        MemStoreStatsHolder memStatsHolder = new MemStoreStatsHolder();
+        MsgStoreStatsHolder memStatsHolder = new MsgStoreStatsHolder();
         ByteBuffer bf = ByteBuffer.allocate(1024);
         bf.put("abc".getBytes());
         AppendResult appendResult = new AppendResult();
diff --git 
a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/stats/FileStoreStatsHolderTest.java
 
b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/stats/FileStoreStatsHolderTest.java
deleted file mode 100644
index 0449d1b..0000000
--- 
a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/stats/FileStoreStatsHolderTest.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.tubemq.server.broker.stats;
-
-import java.util.LinkedHashMap;
-import java.util.Map;
-import org.apache.inlong.tubemq.corebase.TBaseConstants;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * FileStoreStatsHolder test.
- */
-public class FileStoreStatsHolderTest {
-
-    @Test
-    public void testFileStoreStatsHolder() {
-        FileStoreStatsHolder fileStatsHolder = new FileStoreStatsHolder();
-        // case 1, not started
-        fileStatsHolder.addFileFlushStatsInfo(2, 30, 500,
-                0, 0, true, true,
-                true, true, true, true);
-        fileStatsHolder.addTimeoutFlush(1, 500, false);
-        Map<String, Long> retMap = new LinkedHashMap<>();
-        fileStatsHolder.getValue(retMap);
-        Assert.assertNotNull(retMap.get("reset_time"));
-        Assert.assertEquals(0, retMap.get("total_msg_cnt").longValue());
-        Assert.assertEquals(0, retMap.get("total_data_size").longValue());
-        Assert.assertEquals(0, retMap.get("total_index_size").longValue());
-        Assert.assertEquals(0, 
retMap.get("flushed_data_size_count").longValue());
-        Assert.assertEquals(Long.MIN_VALUE, 
retMap.get("flushed_data_size_max").longValue());
-        Assert.assertEquals(Long.MAX_VALUE, 
retMap.get("flushed_data_size_min").longValue());
-        Assert.assertEquals(0, 
retMap.get("flushed_msg_cnt_count").longValue());
-        Assert.assertEquals(Long.MIN_VALUE, 
retMap.get("flushed_msg_cnt_max").longValue());
-        Assert.assertEquals(Long.MAX_VALUE, 
retMap.get("flushed_msg_cnt_min").longValue());
-        Assert.assertEquals(0, retMap.get("data_seg_cnt").longValue());
-        Assert.assertEquals(0, retMap.get("index_seg_cnt").longValue());
-        Assert.assertEquals(0, retMap.get("data_size_full").longValue());
-        Assert.assertEquals(0, retMap.get("meta_flush_cnt").longValue());
-        Assert.assertEquals(0, retMap.get("msg_count_full").longValue());
-        Assert.assertEquals(0, retMap.get("cache_time_full").longValue());
-        Assert.assertNotNull(retMap.get("end_time"));
-        retMap.clear();
-        // get content by StringBuilder
-        StringBuilder strBuff = new 
StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE);
-        fileStatsHolder.getValue(strBuff);
-        // System.out.println(strBuff.toString());
-        strBuff.delete(0, strBuff.length());
-        // test timeout
-        fileStatsHolder.addTimeoutFlush(1, 1, true);
-        fileStatsHolder.getValue(retMap);
-        Assert.assertNotNull(retMap.get("reset_time"));
-        Assert.assertEquals(0, retMap.get("total_msg_cnt").longValue());
-        Assert.assertEquals(0, retMap.get("total_data_size").longValue());
-        Assert.assertEquals(0, retMap.get("total_index_size").longValue());
-        Assert.assertEquals(1, 
retMap.get("flushed_data_size_count").longValue());
-        Assert.assertEquals(1, 
retMap.get("flushed_data_size_max").longValue());
-        Assert.assertEquals(1, 
retMap.get("flushed_data_size_min").longValue());
-        Assert.assertEquals(1, 
retMap.get("flushed_msg_cnt_count").longValue());
-        Assert.assertEquals(1, retMap.get("flushed_msg_cnt_max").longValue());
-        Assert.assertEquals(1, retMap.get("flushed_msg_cnt_min").longValue());
-        Assert.assertEquals(0, retMap.get("data_seg_cnt").longValue());
-        Assert.assertEquals(0, retMap.get("index_seg_cnt").longValue());
-        Assert.assertEquals(1, retMap.get("meta_flush_cnt").longValue());
-        Assert.assertEquals(0, retMap.get("data_size_full").longValue());
-        Assert.assertEquals(0, retMap.get("msg_count_full").longValue());
-        Assert.assertEquals(1, retMap.get("cache_time_full").longValue());
-        Assert.assertNotNull(retMap.get("end_time"));
-        retMap.clear();
-        // get value when started
-        fileStatsHolder.addFileFlushStatsInfo(1, 1, 1,
-                1, 1, true, false,
-                false, false, false, false);
-        fileStatsHolder.addFileFlushStatsInfo(6, 6, 6,
-                6, 6, false, false,
-                false, false, false, true);
-        fileStatsHolder.addFileFlushStatsInfo(2, 2, 2,
-                2, 2, false, true,
-                false, false, false, false);
-        fileStatsHolder.addFileFlushStatsInfo(5, 5, 5,
-                5, 5, false, false,
-                false, false, true, false);
-        fileStatsHolder.addFileFlushStatsInfo(4, 4, 4,
-                4, 4, false, false,
-                false, true, false, false);
-        fileStatsHolder.addFileFlushStatsInfo(3, 3, 3,
-                3, 3, false, false,
-                true, false, false, false);
-        fileStatsHolder.snapShort(retMap);
-        Assert.assertNotNull(retMap.get("reset_time"));
-        Assert.assertEquals(21, retMap.get("total_msg_cnt").longValue());
-        Assert.assertEquals(21, retMap.get("total_data_size").longValue());
-        Assert.assertEquals(21, retMap.get("total_index_size").longValue());
-        Assert.assertEquals(7, 
retMap.get("flushed_data_size_count").longValue());
-        Assert.assertEquals(6, 
retMap.get("flushed_data_size_max").longValue());
-        Assert.assertEquals(1, 
retMap.get("flushed_data_size_min").longValue());
-        Assert.assertEquals(7, 
retMap.get("flushed_msg_cnt_count").longValue());
-        Assert.assertEquals(6, retMap.get("flushed_msg_cnt_max").longValue());
-        Assert.assertEquals(1, retMap.get("flushed_msg_cnt_min").longValue());
-        Assert.assertEquals(1, retMap.get("data_seg_cnt").longValue());
-        Assert.assertEquals(1, retMap.get("index_seg_cnt").longValue());
-        Assert.assertEquals(1, retMap.get("data_size_full").longValue());
-        Assert.assertEquals(2, retMap.get("meta_flush_cnt").longValue());
-        Assert.assertEquals(1, retMap.get("msg_count_full").longValue());
-        Assert.assertEquals(2, retMap.get("cache_time_full").longValue());
-        Assert.assertNotNull(retMap.get("end_time"));
-        retMap.clear();
-        fileStatsHolder.getAllFileStatsInfo(true, strBuff);
-        // System.out.println(strBuff.toString());
-        strBuff.delete(0, strBuff.length());
-
-    }
-}
diff --git 
a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/stats/MemStoreStatsHolderTest.java
 
b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/stats/MemStoreStatsHolderTest.java
deleted file mode 100644
index ad06f7a..0000000
--- 
a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/stats/MemStoreStatsHolderTest.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.tubemq.server.broker.stats;
-
-import java.util.LinkedHashMap;
-import java.util.Map;
-import org.apache.inlong.tubemq.corebase.TBaseConstants;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * MemStoreStatsHolder test.
- */
-public class MemStoreStatsHolderTest {
-
-    @Test
-    public void testMemStoreStatsHolder() {
-        MemStoreStatsHolder memStatsHolder = new MemStoreStatsHolder();
-        // case 1, not started
-        memStatsHolder.addAppendedMsgSize(50);
-        memStatsHolder.addCacheFullType(true, false, false);
-        memStatsHolder.addCacheFullType(false, true, false);
-        memStatsHolder.addCacheFullType(false, false, true);
-        memStatsHolder.addFlushTime(50, false);
-        memStatsHolder.addFlushTime(10, true);
-        memStatsHolder.addMsgWriteFail();
-        Map<String, Long> retMap = new LinkedHashMap<>();
-        memStatsHolder.getValue(retMap);
-        Assert.assertNotNull(retMap.get("reset_time"));
-        Assert.assertEquals(0, retMap.get("msg_in_count").longValue());
-        Assert.assertEquals(Long.MIN_VALUE, 
retMap.get("msg_in_max").longValue());
-        Assert.assertEquals(Long.MAX_VALUE, 
retMap.get("msg_in_min").longValue());
-        Assert.assertEquals(0, retMap.get("msg_append_fail").longValue());
-        Assert.assertEquals(0, retMap.get("data_size_full").longValue());
-        Assert.assertEquals(0, retMap.get("index_size_full").longValue());
-        Assert.assertEquals(0, retMap.get("msg_count_full").longValue());
-        Assert.assertEquals(0, retMap.get("cache_time_full").longValue());
-        Assert.assertEquals(0, retMap.get("flush_pending").longValue());
-        Assert.assertEquals(0, retMap.get("cache_realloc").longValue());
-        Assert.assertEquals(0, 
retMap.get("cache_flush_dlt_count").longValue());
-        Assert.assertEquals(Long.MIN_VALUE, 
retMap.get("cache_flush_dlt_max").longValue());
-        Assert.assertEquals(Long.MAX_VALUE, 
retMap.get("cache_flush_dlt_min").longValue());
-        Assert.assertNotNull(retMap.get("end_time"));
-        retMap.clear();
-        // get content by StringBuilder
-        StringBuilder strBuff = new 
StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE);
-        memStatsHolder.getValue(strBuff);
-        // System.out.println(strBuff.toString());
-        strBuff.delete(0, strBuff.length());
-        memStatsHolder.getAllMemStatsInfo(true, strBuff);
-        // System.out.println("getAllMemStatsInfo : " + strBuff);
-        strBuff.delete(0, strBuff.length());
-        // case 2 started
-        memStatsHolder.addAppendedMsgSize(50);
-        memStatsHolder.addAppendedMsgSize(500);
-        memStatsHolder.addAppendedMsgSize(5);
-        memStatsHolder.addCacheFullType(true, false, false);
-        memStatsHolder.addCacheFullType(false, true, false);
-        memStatsHolder.addCacheFullType(false, false, true);
-        memStatsHolder.addFlushTime(50, false);
-        memStatsHolder.addFlushTime(10, true);
-        memStatsHolder.addFlushTime(100, true);
-        memStatsHolder.addFlushTime(1, false);
-        memStatsHolder.addMsgWriteFail();
-        memStatsHolder.addMsgWriteFail();
-        memStatsHolder.addCacheReAlloc();
-        memStatsHolder.addCacheReAlloc();
-        memStatsHolder.addCachePending();
-        memStatsHolder.addCachePending();
-        memStatsHolder.addCachePending();
-        memStatsHolder.getValue(retMap);
-        Assert.assertNotNull(retMap.get("reset_time"));
-        Assert.assertEquals(3, retMap.get("msg_in_count").longValue());
-        Assert.assertEquals(500, retMap.get("msg_in_max").longValue());
-        Assert.assertEquals(5, retMap.get("msg_in_min").longValue());
-        Assert.assertEquals(2, retMap.get("msg_append_fail").longValue());
-        Assert.assertEquals(1, retMap.get("data_size_full").longValue());
-        Assert.assertEquals(1, retMap.get("index_size_full").longValue());
-        Assert.assertEquals(1, retMap.get("msg_count_full").longValue());
-        Assert.assertEquals(2, retMap.get("cache_time_full").longValue());
-        Assert.assertEquals(3, retMap.get("flush_pending").longValue());
-        Assert.assertEquals(2, retMap.get("cache_realloc").longValue());
-        Assert.assertEquals(4, 
retMap.get("cache_flush_dlt_count").longValue());
-        Assert.assertEquals(100, 
retMap.get("cache_flush_dlt_max").longValue());
-        Assert.assertEquals(1, retMap.get("cache_flush_dlt_min").longValue());
-        Assert.assertEquals(1, 
retMap.get("cache_flush_dlt_cell_0t2").longValue());
-        Assert.assertEquals(1, 
retMap.get("cache_flush_dlt_cell_8t16").longValue());
-        Assert.assertEquals(1, 
retMap.get("cache_flush_dlt_cell_32t64").longValue());
-        Assert.assertEquals(1, 
retMap.get("cache_flush_dlt_cell_64t128").longValue());
-        Assert.assertNotNull(retMap.get("end_time"));
-        memStatsHolder.getAllMemStatsInfo(false, strBuff);
-        // System.out.println("\n the second is : " + strBuff.toString());
-        strBuff.delete(0, strBuff.length());
-
-    }
-}
diff --git 
a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/stats/MsgStoreStatsHolderTest.java
 
b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/stats/MsgStoreStatsHolderTest.java
new file mode 100644
index 0000000..3d41289
--- /dev/null
+++ 
b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/stats/MsgStoreStatsHolderTest.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.server.broker.stats;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import org.apache.inlong.tubemq.corebase.TBaseConstants;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * MsgStoreStatsHolder test.
+ */
+public class MsgStoreStatsHolderTest {
+
+    @Test
+    public void testMemPartStats() {
+        MsgStoreStatsHolder msgStoreStatsHolder = new MsgStoreStatsHolder();
+        // case 1, not started
+        msgStoreStatsHolder.addCacheMsgSize(50);
+        msgStoreStatsHolder.addCacheFullType(true, false, false);
+        msgStoreStatsHolder.addCacheFullType(false, true, false);
+        msgStoreStatsHolder.addCacheFullType(false, false, true);
+        msgStoreStatsHolder.addCacheFlushTime(50, false);
+        msgStoreStatsHolder.addCacheFlushTime(10, true);
+        msgStoreStatsHolder.addMsgWriteCacheFail();
+        Map<String, Long> retMap = new LinkedHashMap<>();
+        msgStoreStatsHolder.getValue(retMap);
+        Assert.assertNotNull(retMap.get("reset_time"));
+        Assert.assertEquals(0, retMap.get("cache_msg_in_count").longValue());
+        Assert.assertEquals(Long.MIN_VALUE, 
retMap.get("cache_msg_in_max").longValue());
+        Assert.assertEquals(Long.MAX_VALUE, 
retMap.get("cache_msg_in_min").longValue());
+        Assert.assertEquals(0, retMap.get("cache_append_fail").longValue());
+        Assert.assertEquals(0, retMap.get("cache_data_full").longValue());
+        Assert.assertEquals(0, retMap.get("cache_index_full").longValue());
+        Assert.assertEquals(0, retMap.get("cache_count_full").longValue());
+        Assert.assertEquals(0, retMap.get("cache_time_full").longValue());
+        Assert.assertEquals(0, retMap.get("cache_flush_pending").longValue());
+        Assert.assertEquals(0, retMap.get("cache_realloc").longValue());
+        Assert.assertEquals(0, 
retMap.get("cache_flush_dlt_count").longValue());
+        Assert.assertEquals(Long.MIN_VALUE, 
retMap.get("cache_flush_dlt_max").longValue());
+        Assert.assertEquals(Long.MAX_VALUE, 
retMap.get("cache_flush_dlt_min").longValue());
+        Assert.assertNotNull(retMap.get("end_time"));
+        retMap.clear();
+        // get content by StringBuilder
+        StringBuilder strBuff = new 
StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE);
+        msgStoreStatsHolder.getValue(strBuff);
+        // System.out.println(strBuff.toString());
+        strBuff.delete(0, strBuff.length());
+        msgStoreStatsHolder.getMsgStoreStatsInfo(true, strBuff);
+        // System.out.println("getAllMemStatsInfo : " + strBuff);
+        strBuff.delete(0, strBuff.length());
+        // case 2 started
+        msgStoreStatsHolder.addCacheMsgSize(50);
+        msgStoreStatsHolder.addCacheMsgSize(500);
+        msgStoreStatsHolder.addCacheMsgSize(5);
+        msgStoreStatsHolder.addCacheFullType(true, false, false);
+        msgStoreStatsHolder.addCacheFullType(false, true, false);
+        msgStoreStatsHolder.addCacheFullType(false, false, true);
+        msgStoreStatsHolder.addCacheFlushTime(50, false);
+        msgStoreStatsHolder.addCacheFlushTime(10, true);
+        msgStoreStatsHolder.addCacheFlushTime(100, true);
+        msgStoreStatsHolder.addCacheFlushTime(1, false);
+        msgStoreStatsHolder.addMsgWriteCacheFail();
+        msgStoreStatsHolder.addMsgWriteCacheFail();
+        msgStoreStatsHolder.addCacheReAlloc();
+        msgStoreStatsHolder.addCacheReAlloc();
+        msgStoreStatsHolder.addCachePending();
+        msgStoreStatsHolder.addCachePending();
+        msgStoreStatsHolder.addCachePending();
+        msgStoreStatsHolder.getValue(retMap);
+        Assert.assertNotNull(retMap.get("reset_time"));
+        Assert.assertEquals(3, retMap.get("cache_msg_in_count").longValue());
+        Assert.assertEquals(500, retMap.get("cache_msg_in_max").longValue());
+        Assert.assertEquals(5, retMap.get("cache_msg_in_min").longValue());
+        Assert.assertEquals(2, retMap.get("cache_append_fail").longValue());
+        Assert.assertEquals(1, retMap.get("cache_data_full").longValue());
+        Assert.assertEquals(1, retMap.get("cache_index_full").longValue());
+        Assert.assertEquals(1, retMap.get("cache_count_full").longValue());
+        Assert.assertEquals(2, retMap.get("cache_time_full").longValue());
+        Assert.assertEquals(3, retMap.get("cache_flush_pending").longValue());
+        Assert.assertEquals(2, retMap.get("cache_realloc").longValue());
+        Assert.assertEquals(4, 
retMap.get("cache_flush_dlt_count").longValue());
+        Assert.assertEquals(100, 
retMap.get("cache_flush_dlt_max").longValue());
+        Assert.assertEquals(1, retMap.get("cache_flush_dlt_min").longValue());
+        Assert.assertEquals(1, 
retMap.get("cache_flush_dlt_cell_0t2").longValue());
+        Assert.assertEquals(1, 
retMap.get("cache_flush_dlt_cell_8t16").longValue());
+        Assert.assertEquals(1, 
retMap.get("cache_flush_dlt_cell_32t64").longValue());
+        Assert.assertEquals(1, 
retMap.get("cache_flush_dlt_cell_64t128").longValue());
+        Assert.assertNotNull(retMap.get("end_time"));
+        msgStoreStatsHolder.getMsgStoreStatsInfo(false, strBuff);
+        // System.out.println("\n the second is : " + strBuff.toString());
+        strBuff.delete(0, strBuff.length());
+    }
+
+    @Test
+    public void testFilePartStats() {
+        MsgStoreStatsHolder msgStoreStatsHolder = new MsgStoreStatsHolder();
+        // case 1, not started
+        msgStoreStatsHolder.addFileFlushStatsInfo(2, 30, 500,
+                0, 0, true, true,
+                true, true, true, true);
+        msgStoreStatsHolder.addFileTimeoutFlushStats(1, 500, false);
+        Map<String, Long> retMap = new LinkedHashMap<>();
+        msgStoreStatsHolder.getValue(retMap);
+        Assert.assertNotNull(retMap.get("reset_time"));
+        Assert.assertEquals(0, retMap.get("file_total_msg_cnt").longValue());
+        Assert.assertEquals(0, retMap.get("file_total_data_size").longValue());
+        Assert.assertEquals(0, 
retMap.get("file_total_index_size").longValue());
+        Assert.assertEquals(0, 
retMap.get("file_flushed_data_count").longValue());
+        Assert.assertEquals(Long.MIN_VALUE, 
retMap.get("file_flushed_data_max").longValue());
+        Assert.assertEquals(Long.MAX_VALUE, 
retMap.get("file_flushed_data_min").longValue());
+        Assert.assertEquals(0, 
retMap.get("file_flushed_msg_count").longValue());
+        Assert.assertEquals(Long.MIN_VALUE, 
retMap.get("file_flushed_msg_max").longValue());
+        Assert.assertEquals(Long.MAX_VALUE, 
retMap.get("file_flushed_msg_min").longValue());
+        Assert.assertEquals(0, retMap.get("file_index_seg").longValue());
+        Assert.assertEquals(0, retMap.get("file_meta_flush").longValue());
+        Assert.assertEquals(0, retMap.get("file_data_full").longValue());
+        Assert.assertEquals(0, retMap.get("file_count_full").longValue());
+        Assert.assertEquals(0, retMap.get("file_time_full").longValue());
+        Assert.assertNotNull(retMap.get("end_time"));
+        retMap.clear();
+        // get content by StringBuilder
+        StringBuilder strBuff = new 
StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE);
+        msgStoreStatsHolder.getValue(strBuff);
+        // System.out.println(strBuff.toString());
+        strBuff.delete(0, strBuff.length());
+        // test timeout
+        msgStoreStatsHolder.addFileTimeoutFlushStats(1, 1, true);
+        msgStoreStatsHolder.getValue(retMap);
+        Assert.assertNotNull(retMap.get("reset_time"));
+        Assert.assertEquals(0, retMap.get("file_total_msg_cnt").longValue());
+        Assert.assertEquals(0, retMap.get("file_total_data_size").longValue());
+        Assert.assertEquals(0, 
retMap.get("file_total_index_size").longValue());
+        Assert.assertEquals(1, 
retMap.get("file_flushed_data_count").longValue());
+        Assert.assertEquals(1, 
retMap.get("file_flushed_data_max").longValue());
+        Assert.assertEquals(1, 
retMap.get("file_flushed_data_min").longValue());
+        Assert.assertEquals(1, 
retMap.get("file_flushed_msg_count").longValue());
+        Assert.assertEquals(1, retMap.get("file_flushed_msg_max").longValue());
+        Assert.assertEquals(1, retMap.get("file_flushed_msg_min").longValue());
+        Assert.assertEquals(0, retMap.get("file_data_seg").longValue());
+        Assert.assertEquals(0, retMap.get("file_index_seg").longValue());
+        Assert.assertEquals(1, retMap.get("file_meta_flush").longValue());
+        Assert.assertEquals(0, retMap.get("file_data_full").longValue());
+        Assert.assertEquals(0, retMap.get("file_count_full").longValue());
+        Assert.assertEquals(1, retMap.get("file_time_full").longValue());
+        Assert.assertNotNull(retMap.get("end_time"));
+        retMap.clear();
+        // get value when started
+        msgStoreStatsHolder.addFileFlushStatsInfo(1, 1, 1,
+                1, 1, true, false,
+                false, false, false, false);
+        msgStoreStatsHolder.addFileFlushStatsInfo(6, 6, 6,
+                6, 6, false, false,
+                false, false, false, true);
+        msgStoreStatsHolder.addFileFlushStatsInfo(2, 2, 2,
+                2, 2, false, true,
+                false, false, false, false);
+        msgStoreStatsHolder.addFileFlushStatsInfo(5, 5, 5,
+                5, 5, false, false,
+                false, false, true, false);
+        msgStoreStatsHolder.addFileFlushStatsInfo(4, 4, 4,
+                4, 4, false, false,
+                false, true, false, false);
+        msgStoreStatsHolder.addFileFlushStatsInfo(3, 3, 3,
+                3, 3, false, false,
+                true, false, false, false);
+        msgStoreStatsHolder.snapShort(retMap);
+        Assert.assertNotNull(retMap.get("reset_time"));
+        Assert.assertEquals(21, retMap.get("file_total_msg_cnt").longValue());
+        Assert.assertEquals(21, 
retMap.get("file_total_data_size").longValue());
+        Assert.assertEquals(21, 
retMap.get("file_total_index_size").longValue());
+        Assert.assertEquals(7, 
retMap.get("file_flushed_data_count").longValue());
+        Assert.assertEquals(6, 
retMap.get("file_flushed_data_max").longValue());
+        Assert.assertEquals(1, 
retMap.get("file_flushed_data_min").longValue());
+        Assert.assertEquals(7, 
retMap.get("file_flushed_msg_count").longValue());
+        Assert.assertEquals(6, retMap.get("file_flushed_msg_max").longValue());
+        Assert.assertEquals(1, retMap.get("file_flushed_msg_min").longValue());
+        Assert.assertEquals(1, retMap.get("file_data_seg").longValue());
+        Assert.assertEquals(1, retMap.get("file_index_seg").longValue());
+        Assert.assertEquals(1, retMap.get("file_data_full").longValue());
+        Assert.assertEquals(2, retMap.get("file_meta_flush").longValue());
+        Assert.assertEquals(1, retMap.get("file_count_full").longValue());
+        Assert.assertEquals(2, retMap.get("file_time_full").longValue());
+        Assert.assertNotNull(retMap.get("end_time"));
+        retMap.clear();
+        msgStoreStatsHolder.getMsgStoreStatsInfo(true, strBuff);
+        // System.out.println(strBuff.toString());
+        strBuff.delete(0, strBuff.length());
+    }
+}

Reply via email to