This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 938861c [INLONG-2505][TubeMQ] Add MsgStoreStatsHolder class (#2506)
938861c is described below
commit 938861c80ab52e83c4df980895dca2b139514796
Author: gosonzhang <[email protected]>
AuthorDate: Tue Feb 15 21:58:24 2022 +0800
[INLONG-2505][TubeMQ] Add MsgStoreStatsHolder class (#2506)
---
.../server/broker/msgstore/MessageStore.java | 34 +-
.../broker/msgstore/MessageStoreManager.java | 3 +-
.../server/broker/msgstore/disk/MsgFileStore.java | 12 +-
.../server/broker/msgstore/mem/MsgMemStore.java | 6 +-
.../server/broker/stats/FileStoreStatsHolder.java | 423 --------------
.../server/broker/stats/MemStoreStatsHolder.java | 411 --------------
.../server/broker/stats/MsgStoreStatsHolder.java | 615 +++++++++++++++++++++
.../server/broker/stats/ServiceStatsHolder.java | 71 ++-
.../server/broker/stats/TrafficStatsService.java | 6 +-
.../server/broker/web/BrokerAdminServlet.java | 60 +-
.../server/common/webbase/WebCallStatsHolder.java | 69 ++-
.../broker/msgstore/mem/MsgMemStoreTest.java | 6 +-
.../broker/stats/FileStoreStatsHolderTest.java | 128 -----
.../broker/stats/MemStoreStatsHolderTest.java | 111 ----
.../broker/stats/MsgStoreStatsHolderTest.java | 206 +++++++
15 files changed, 949 insertions(+), 1212 deletions(-)
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStore.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStore.java
index 2bf8fa9..b4d4e20 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStore.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStore.java
@@ -45,8 +45,7 @@ import
org.apache.inlong.tubemq.server.broker.msgstore.disk.Segment;
import org.apache.inlong.tubemq.server.broker.msgstore.mem.GetCacheMsgResult;
import org.apache.inlong.tubemq.server.broker.msgstore.mem.MsgMemStore;
import org.apache.inlong.tubemq.server.broker.nodeinfo.ConsumerNodeInfo;
-import org.apache.inlong.tubemq.server.broker.stats.FileStoreStatsHolder;
-import org.apache.inlong.tubemq.server.broker.stats.MemStoreStatsHolder;
+import org.apache.inlong.tubemq.server.broker.stats.MsgStoreStatsHolder;
import org.apache.inlong.tubemq.server.broker.stats.TrafficInfo;
import org.apache.inlong.tubemq.server.broker.utils.DataStoreUtils;
import org.apache.inlong.tubemq.server.common.utils.AppendResult;
@@ -71,8 +70,7 @@ public class MessageStore implements Closeable {
private final String primStorePath;
private final AtomicLong lastMemFlushTime = new AtomicLong(0);
private final MessageStoreManager msgStoreMgr;
- private final MemStoreStatsHolder memStoreStatsHolder = new
MemStoreStatsHolder();
- private final FileStoreStatsHolder fileStoreStatsHolder = new
FileStoreStatsHolder();
+ private final MsgStoreStatsHolder msgStoreStatsHolder = new
MsgStoreStatsHolder();
private final MsgFileStore msgFileStore;
private final ReentrantReadWriteLock writeCacheMutex = new
ReentrantReadWriteLock();
private final Condition flushWriteCacheCondition =
writeCacheMutex.writeLock().newCondition();
@@ -417,7 +415,7 @@ public class MessageStore implements Closeable {
do {
this.writeCacheMutex.readLock().lock();
try {
- if (this.msgMemStore.appendMsg(memStoreStatsHolder,
+ if (this.msgMemStore.appendMsg(msgStoreStatsHolder,
partitionId, msgTypeCode, receivedTime,
msgBufLen, buffer, appendResult)) {
return true;
@@ -432,20 +430,16 @@ public class MessageStore implements Closeable {
}
ThreadUtils.sleep(waitRetryMs);
} while (count-- >= 0);
- memStoreStatsHolder.addMsgWriteFail();
+ msgStoreStatsHolder.addMsgWriteCacheFail();
return false;
}
- public void getMemStoreStatsInfo(boolean needRefresh, StringBuilder
strBuff) {
- memStoreStatsHolder.getAllMemStatsInfo(needRefresh, strBuff);
+ public void getMsgStoreStatsInfo(boolean needRefresh, StringBuilder
strBuff) {
+ msgStoreStatsHolder.getMsgStoreStatsInfo(needRefresh, strBuff);
}
- public void getCurFileStoreStatsInfo(boolean needRefresh, StringBuilder
strBuff) {
- fileStoreStatsHolder.getAllFileStatsInfo(needRefresh, strBuff);
- }
-
- public FileStoreStatsHolder getFileStoreStatsHolder() {
- return this.fileStoreStatsHolder;
+ public MsgStoreStatsHolder getMsgStoreStatsHolder() {
+ return this.msgStoreStatsHolder;
}
/**
@@ -517,10 +511,9 @@ public class MessageStore implements Closeable {
/**
* Flush memory store to file.
*
- * @param checkTime the check time
* @throws IOException the exception during processing
*/
- public void flushMemCacheData(long checkTime) throws IOException {
+ public void flushMemCacheData() throws IOException {
if (this.closed.get()) {
throw new IllegalStateException(new StringBuilder(512)
.append("[Data Store] Closed MessageStore for storeKey ")
@@ -530,7 +523,6 @@ public class MessageStore implements Closeable {
&& (System.currentTimeMillis() - this.lastMemFlushTime.get())
>= this.writeCacheFlushIntvl) {
triggerFlushAndAddMsg(-1, 0, 0, 0, false, null, true, null);
}
- memStoreStatsHolder.chkStatsExpired(checkTime);
}
@Override
@@ -722,13 +714,13 @@ public class MessageStore implements Closeable {
} catch (Throwable e) {
logger.error("[Data Store] Error during flush", e);
} finally {
- memStoreStatsHolder.addFlushTime(
+ msgStoreStatsHolder.addCacheFlushTime(
(System.currentTimeMillis() - startTime),
isTimeTrigger);
}
}
});
} else {
- memStoreStatsHolder.addCachePending();
+ msgStoreStatsHolder.addCachePending();
}
long startTime = System.currentTimeMillis();
long timeoutNs = TimeUnit.MILLISECONDS.toNanos(100);
@@ -743,7 +735,7 @@ public class MessageStore implements Closeable {
}
}
if (needAdd) {
- return msgMemStore.appendMsg(memStoreStatsHolder,
+ return msgMemStore.appendMsg(msgStoreStatsHolder,
partitionId, keyCode, receivedTime,
entryLength, entry, appendResult);
}
@@ -818,7 +810,7 @@ public class MessageStore implements Closeable {
isFlushOngoing.set(true);
writeCacheMutex.writeLock().unlock();
if (isRealloc) {
- memStoreStatsHolder.addCacheReAlloc();
+ msgStoreStatsHolder.addCacheReAlloc();
logger.info(strBuffer.append("[Data Store] Found
").append(getStoreKey())
.append(" Cache capacity change, new MemSize=")
.append(writeCacheMaxSize).append(", new CacheCnt=")
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java
index 29c529d..6f10324 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java
@@ -818,7 +818,6 @@ public class MessageStoreManager implements StoreService {
@Override
public void run() {
- long checkTime = System.currentTimeMillis();
StringBuilder sBuilder = new StringBuilder(256);
for (Map<Integer, MessageStore> storeMap : dataStores.values()) {
if (storeMap == null || storeMap.isEmpty()) {
@@ -829,7 +828,7 @@ public class MessageStoreManager implements StoreService {
continue;
}
try {
- msgStore.flushMemCacheData(checkTime);
+ msgStore.flushMemCacheData();
} catch (final Throwable e) {
logger.error(sBuilder.append("[Store Manager] Try to
flush ")
.append(msgStore.getStoreKey())
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java
index efd6b27..a91b729 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java
@@ -35,7 +35,7 @@ import
org.apache.inlong.tubemq.corebase.protobuf.generated.ClientBroker;
import org.apache.inlong.tubemq.corebase.utils.ServiceStatusHolder;
import org.apache.inlong.tubemq.server.broker.BrokerConfig;
import org.apache.inlong.tubemq.server.broker.msgstore.MessageStore;
-import org.apache.inlong.tubemq.server.broker.stats.FileStoreStatsHolder;
+import org.apache.inlong.tubemq.server.broker.stats.MsgStoreStatsHolder;
import org.apache.inlong.tubemq.server.broker.stats.ServiceStatsHolder;
import org.apache.inlong.tubemq.server.broker.stats.TrafficInfo;
import org.apache.inlong.tubemq.server.broker.utils.DataStoreUtils;
@@ -69,7 +69,7 @@ public class MsgFileStore implements Closeable {
private final AtomicLong lastMetaFlushTime = new AtomicLong(0);
private final BrokerConfig tubeConfig;
// file store stats holder
- private final FileStoreStatsHolder fileStatsHolder;
+ private final MsgStoreStatsHolder msgStoreStatsHolder;
// lock used for append message to storage
private final ReentrantLock writeLock = new ReentrantLock();
private final ByteBuffer byteBufferIndex =
@@ -98,7 +98,7 @@ public class MsgFileStore implements Closeable {
final StringBuilder sBuilder = new StringBuilder(512);
this.tubeConfig = tubeConfig;
this.messageStore = messageStore;
- this.fileStatsHolder = messageStore.getFileStoreStatsHolder();
+ this.msgStoreStatsHolder = messageStore.getMsgStoreStatsHolder();
this.storeKey = messageStore.getStoreKey();
this.dataDir = new File(sBuilder.append(baseStorePath)
.append(File.separator).append(this.storeKey).toString());
@@ -229,7 +229,7 @@ public class MsgFileStore implements Closeable {
} finally {
this.writeLock.unlock();
// add statistics.
- fileStatsHolder.addFileFlushStatsInfo(msgCnt, indexSize, dataSize,
+ msgStoreStatsHolder.addFileFlushStatsInfo(msgCnt, indexSize,
dataSize,
flushedMsgCnt, flushedDataSize, isDataSegFlushed,
isIndexSegFlushed,
isMsgDataFlushed, isMsgCntFlushed, isMsgTimeFlushed,
isForceMetadata);
}
@@ -523,11 +523,11 @@ public class MsgFileStore implements Closeable {
}
} finally {
this.writeLock.unlock();
- fileStatsHolder.addTimeoutFlush(flushedMsgCnt,
+ msgStoreStatsHolder.addFileTimeoutFlushStats(flushedMsgCnt,
flushedDataSize, forceMetadata);
}
}
- fileStatsHolder.chkStatsExpired(checkTimestamp);
+ msgStoreStatsHolder.chkStatsExpired(checkTimestamp);
}
public long getDataSizeInBytes() {
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStore.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStore.java
index ae86780..63b1c93 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStore.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStore.java
@@ -32,7 +32,7 @@ import org.apache.inlong.tubemq.corebase.TErrCodeConstants;
import org.apache.inlong.tubemq.server.broker.BrokerConfig;
import org.apache.inlong.tubemq.server.broker.metadata.ClusterConfigHolder;
import org.apache.inlong.tubemq.server.broker.msgstore.disk.MsgFileStore;
-import org.apache.inlong.tubemq.server.broker.stats.MemStoreStatsHolder;
+import org.apache.inlong.tubemq.server.broker.stats.MsgStoreStatsHolder;
import org.apache.inlong.tubemq.server.broker.stats.ServiceStatsHolder;
import org.apache.inlong.tubemq.server.broker.utils.DataStoreUtils;
import org.apache.inlong.tubemq.server.common.utils.AppendResult;
@@ -111,7 +111,7 @@ public class MsgMemStore implements Closeable {
*
* @return the process result
*/
- public boolean appendMsg(MemStoreStatsHolder memStatsHolder,
+ public boolean appendMsg(MsgStoreStatsHolder memStatsHolder,
int partitionId, int keyCode,
long timeRecv, int dataEntryLength,
ByteBuffer dataEntry, AppendResult appendResult) {
@@ -156,7 +156,7 @@ public class MsgMemStore implements Closeable {
} finally {
this.writeLock.unlock();
if (isAppended) {
- memStatsHolder.addAppendedMsgSize(dataEntryLength);
+ memStatsHolder.addCacheMsgSize(dataEntryLength);
} else {
memStatsHolder.addCacheFullType(fullDataSize, fullIndexSize,
fullCount);
}
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/FileStoreStatsHolder.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/FileStoreStatsHolder.java
deleted file mode 100644
index a50468b..0000000
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/FileStoreStatsHolder.java
+++ /dev/null
@@ -1,423 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.tubemq.server.broker.stats;
-
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.inlong.tubemq.corebase.metric.impl.LongStatsCounter;
-import org.apache.inlong.tubemq.corebase.metric.impl.SimpleHistogram;
-import org.apache.inlong.tubemq.corebase.metric.impl.SinceTime;
-import org.apache.inlong.tubemq.corebase.utils.DateTimeConvertUtils;
-import org.apache.inlong.tubemq.server.common.TServerConstants;
-
-/**
- * FileStoreStatsHolder, The statistics set related to the file store.
- *
- * Through this class, we complete the collection of metric data that occurs in
- * the memory cache operation, including the number of messages produced, size,
- * number of times of flushing, distribution of flushing time,
- * and triggering conditions for flushing, etc.
- *
- * This part supports index comparison output before and after data collection.
- */
-public class FileStoreStatsHolder {
- // Switchable statistic items
- private final FileStatsItemSet[] fileStatsSets = new FileStatsItemSet[2];
- // Current writable index
- private final AtomicInteger writableIndex = new AtomicInteger(0);
- // Last query time
- private final AtomicLong lstQueryTime = new AtomicLong(0);
- // Last snapshot time
- private final AtomicLong lstSnapshotTime = new AtomicLong(0);
- // whether the statistic is closed
- private volatile boolean isClosed;
-
- public FileStoreStatsHolder() {
- this.isClosed = true;
- this.fileStatsSets[0] = new FileStatsItemSet();
- this.fileStatsSets[1] = new FileStatsItemSet();
- this.lstQueryTime.set(System.currentTimeMillis());
- this.lstSnapshotTime.set(System.currentTimeMillis());
- }
-
- /**
- * Add message store statistics information.
- *
- * @param msgCnt the message count written
- * @param msgIndexSize the message index size written
- * @param msgDataSize the message data size written
- * @param flushedMsgCnt the flushed message count
- * @param flushedDataSize the flushed message size
- * @param isDataSegFlush whether the data segment flushed
- * @param isIndexSegFlush whether the index segment flushed
- * @param isDataSizeFull whether the cached data is full
- * @param isMsgCntFull whether the cached message count is full
- * @param isCacheTimeFull whether the cached time is full
- * @param isForceMetadata whether force push metadata
- */
- public void addFileFlushStatsInfo(int msgCnt, int msgIndexSize, int
msgDataSize,
- long flushedMsgCnt, long flushedDataSize,
- boolean isDataSegFlush, boolean
isIndexSegFlush,
- boolean isDataSizeFull, boolean
isMsgCntFull,
- boolean isCacheTimeFull, boolean
isForceMetadata) {
- if (isClosed) {
- return;
- }
- FileStatsItemSet tmStatsSet = fileStatsSets[getIndex()];
- tmStatsSet.accumMsgCnt.addValue(msgCnt);
- tmStatsSet.accumMsgIndexSize.addValue(msgIndexSize);
- tmStatsSet.accumMsgDataSize.addValue(msgDataSize);
- if (flushedDataSize > 0) {
- tmStatsSet.flushedDataSizeStats.update(flushedDataSize);
- }
- if (flushedMsgCnt > 0) {
- tmStatsSet.flushedMsgCntStats.update(flushedMsgCnt);
- }
- if (isDataSegFlush) {
- tmStatsSet.dataSegAddCnt.incValue();
- }
- if (isIndexSegFlush) {
- tmStatsSet.indexSegAddCnt.incValue();
- }
- if (isDataSizeFull) {
- tmStatsSet.dataSizeFullCnt.incValue();
- }
- if (isMsgCntFull) {
- tmStatsSet.msgCountFullCnt.incValue();
- }
- if (isCacheTimeFull) {
- tmStatsSet.cachedTimeFullCnt.incValue();
- }
- if (isForceMetadata) {
- tmStatsSet.metaFlushCnt.incValue();
- }
- }
-
- /**
- * Add flush time timeout statistic.
- *
- * @param flushedMsgCnt the flushed message count
- * @param flushedDataSize the flushed message size
- * @param isForceMetadata whether force push metadata
- */
- public void addTimeoutFlush(long flushedMsgCnt,
- long flushedDataSize,
- boolean isForceMetadata) {
- if (isClosed) {
- return;
- }
- FileStatsItemSet tmStatsSet = fileStatsSets[getIndex()];
- tmStatsSet.cachedTimeFullCnt.incValue();
- if (flushedDataSize > 0) {
- tmStatsSet.flushedDataSizeStats.update(flushedDataSize);
- }
- if (flushedMsgCnt > 0) {
- tmStatsSet.flushedMsgCntStats.update(flushedMsgCnt);
- }
- if (isForceMetadata) {
- tmStatsSet.metaFlushCnt.incValue();
- }
- }
-
- /**
- * Check whether has exceeded the maximum self-statistics period.
- *
- * @param checkTime the check time
- */
- public synchronized void chkStatsExpired(long checkTime) {
- if (!this.isClosed) {
- if ((checkTime - this.lstQueryTime.get())
- >= TServerConstants.CFG_STORE_STATS_MAX_REFRESH_DURATION) {
- this.isClosed = true;
- }
- }
- }
-
- /**
- * Get current writing statistics information.
- *
- * @param statsMap the return map information
- */
- public void getValue(Map<String, Long> statsMap) {
- enableStats();
- getStatsValue(true, fileStatsSets[getIndex()], statsMap);
- }
-
- /**
- * Get current writing statistics information.
- *
- * @param strBuff the return information in json format
- */
- public void getValue(StringBuilder strBuff) {
- enableStats();
- getStatsValue(true, fileStatsSets[getIndex()], strBuff);
- }
-
- /**
- * Snapshot and get current writing statistics information.
- *
- * @param statsMap the return map information
- */
- public void snapShort(Map<String, Long> statsMap) {
- enableStats();
- if (switchStatsSets()) {
- getStatsValue(false, fileStatsSets[getIndex(writableIndex.get() -
1)], statsMap);
- } else {
- getStatsValue(true, fileStatsSets[getIndex()], statsMap);
- }
- }
-
- /**
- * Snapshot and get current writing statistics information.
- *
- * @param strBuff the return information in json format
- */
- public void snapShort(StringBuilder strBuff) {
- this.enableStats();
- if (switchStatsSets()) {
- getStatsValue(false, fileStatsSets[getIndex(writableIndex.get() -
1)], strBuff);
- } else {
- getStatsValue(true, fileStatsSets[getIndex()], strBuff);
- }
- }
-
- /**
- * Get current file store statistics information.
- * Contains the data results of the current statistics and the previous
snapshot
- *
- * @param isSwitch whether to switch the writing statistics block
- * @param strBuff the return information
- */
- public synchronized void getAllFileStatsInfo(boolean isSwitch,
StringBuilder strBuff) {
- this.enableStats();
- strBuff.append("[");
- getStatsValue(false, fileStatsSets[getIndex(writableIndex.get() - 1)],
strBuff);
- strBuff.append(",");
- getStatsValue(true, fileStatsSets[getIndex()], strBuff);
- strBuff.append("]");
- if (isSwitch) {
- switchStatsSets();
- }
- }
-
- /**
- * Set statistic status
- *
- */
- private synchronized void enableStats() {
- this.lstQueryTime.set(System.currentTimeMillis());
- if (this.isClosed) {
- this.isClosed = false;
- }
- }
-
- /**
- * Check and switch statistic sets
- *
- * @return whether the statistic sets has been switched
- */
- private boolean switchStatsSets() {
- long curSwitchTime = lstSnapshotTime.get();
- // Avoid frequent snapshots
- if ((System.currentTimeMillis() - curSwitchTime)
- >= TServerConstants.MIN_SNAPSHOT_PERIOD_MS) {
- if (lstSnapshotTime.compareAndSet(curSwitchTime,
System.currentTimeMillis())) {
- fileStatsSets[getIndex(writableIndex.get() - 1)].clear();
- int befIndex = writableIndex.getAndIncrement();
-
fileStatsSets[getIndex(befIndex)].setSnapshotTime(lstSnapshotTime.get());
- return true;
- }
- }
- return false;
- }
-
- /**
- * Get current writable block index.
- *
- * @return the writable block index
- */
- private int getIndex() {
- return getIndex(writableIndex.get());
- }
-
- /**
- * Gets the metric block index based on the specified value.
- *
- * @param origIndex the specified value
- * @return the metric block index
- */
- private int getIndex(int origIndex) {
- return Math.abs(origIndex % 2);
- }
-
- /**
- * Read metric block data into map.
- *
- * @param isWriting the metric block is writing
- * @param statsSet the metric block need to read
- * @param statsMap the read result
- */
- private void getStatsValue(boolean isWriting,
- FileStatsItemSet statsSet,
- Map<String, Long> statsMap) {
- statsMap.put(statsSet.resetTime.getFullName(),
- statsSet.resetTime.getSinceTime());
- statsMap.put(statsSet.accumMsgCnt.getFullName(),
- statsSet.accumMsgCnt.getValue());
- statsMap.put(statsSet.accumMsgDataSize.getFullName(),
- statsSet.accumMsgDataSize.getValue());
- statsMap.put(statsSet.accumMsgIndexSize.getFullName(),
- statsSet.accumMsgIndexSize.getValue());
- statsSet.flushedDataSizeStats.getValue(statsMap, false);
- statsSet.flushedMsgCntStats.getValue(statsMap, false);
- statsMap.put(statsSet.dataSegAddCnt.getFullName(),
- statsSet.dataSegAddCnt.getValue());
- statsMap.put(statsSet.indexSegAddCnt.getFullName(),
- statsSet.indexSegAddCnt.getValue());
- statsMap.put(statsSet.metaFlushCnt.getFullName(),
- statsSet.metaFlushCnt.getValue());
- statsMap.put(statsSet.dataSizeFullCnt.getFullName(),
- statsSet.dataSizeFullCnt.getValue());
- statsMap.put(statsSet.msgCountFullCnt.getFullName(),
- statsSet.msgCountFullCnt.getValue());
- statsMap.put(statsSet.cachedTimeFullCnt.getFullName(),
- statsSet.cachedTimeFullCnt.getValue());
- if (isWriting) {
- statsMap.put(statsSet.snapShotTime.getFullName(),
- System.currentTimeMillis());
- } else {
- statsMap.put(statsSet.snapShotTime.getFullName(),
- statsSet.snapShotTime.getSinceTime());
- }
- }
-
- /**
- * Read metric block data into string format.
- *
- * @param isWriting the metric block is writing
- * @param statsSet the metric block need to read
- * @param strBuff the return buffer
- */
- private static void getStatsValue(boolean isWriting,
- FileStatsItemSet statsSet,
- StringBuilder strBuff) {
- strBuff.append("{\"").append(statsSet.resetTime.getFullName())
- .append("\":\"").append(statsSet.resetTime.getStrSinceTime())
- .append("\",\"").append(statsSet.accumMsgCnt.getFullName())
- .append("\":").append(statsSet.accumMsgCnt.getValue())
- .append(",\"").append(statsSet.accumMsgDataSize.getFullName())
- .append("\":").append(statsSet.accumMsgDataSize.getValue())
- .append(",\"").append(statsSet.accumMsgIndexSize.getFullName())
- .append("\":").append(statsSet.accumMsgIndexSize.getValue())
- .append(",");
- statsSet.flushedDataSizeStats.getValue(strBuff, false);
- strBuff.append(",");
- statsSet.flushedMsgCntStats.getValue(strBuff, false);
- strBuff.append(",\"").append(statsSet.dataSegAddCnt.getFullName())
- .append("\":").append(statsSet.dataSegAddCnt.getValue())
- .append(",\"").append(statsSet.indexSegAddCnt.getFullName())
- .append("\":").append(statsSet.indexSegAddCnt.getValue())
- .append(",\"").append(statsSet.metaFlushCnt.getFullName())
- .append("\":").append(statsSet.metaFlushCnt.getValue())
- .append(",\"").append(statsSet.dataSizeFullCnt.getFullName())
- .append("\":").append(statsSet.dataSizeFullCnt.getValue())
- .append(",\"").append(statsSet.msgCountFullCnt.getFullName())
- .append("\":").append(statsSet.msgCountFullCnt.getValue())
- .append(",\"").append(statsSet.cachedTimeFullCnt.getFullName())
- .append("\":").append(statsSet.cachedTimeFullCnt.getValue())
- .append(",\"").append(statsSet.snapShotTime.getFullName())
- .append("\":\"");
- if (isWriting) {
-
strBuff.append(DateTimeConvertUtils.ms2yyyyMMddHHmmss(System.currentTimeMillis()));
- } else {
- strBuff.append(statsSet.snapShotTime.getStrSinceTime());
- }
- strBuff.append("\"}");
- }
-
- /**
- * FileStatsItemSet, a collection of statistical metrics related to file
storage
- *
- */
- private static class FileStatsItemSet {
- // The reset time of file statistics set
- protected final SinceTime resetTime =
- new SinceTime("reset_time", null);
- // The accumulate message count statistics
- protected final LongStatsCounter accumMsgCnt =
- new LongStatsCounter("total_msg_cnt", null);
- // The accumulate message data size statistics
- protected final LongStatsCounter accumMsgDataSize =
- new LongStatsCounter("total_data_size", null);
- // The accumulate message index statistics
- protected final LongStatsCounter accumMsgIndexSize =
- new LongStatsCounter("total_index_size", null);
- // The data flushed statistics
- protected final SimpleHistogram flushedDataSizeStats =
- new SimpleHistogram("flushed_data_size", null);
- // The message count flushed statistics
- protected final SimpleHistogram flushedMsgCntStats =
- new SimpleHistogram("flushed_msg_cnt", null);
- // The new data segment statistics
- protected final LongStatsCounter dataSegAddCnt =
- new LongStatsCounter("data_seg_cnt", null);
- // The new index segment full statistics
- protected final LongStatsCounter indexSegAddCnt =
- new LongStatsCounter("index_seg_cnt", null);
- // The flush count statistics of file meta-data
- protected final LongStatsCounter metaFlushCnt =
- new LongStatsCounter("meta_flush_cnt", null);
- // The cached message data full statistics
- protected final LongStatsCounter dataSizeFullCnt =
- new LongStatsCounter("data_size_full", null);
- // The cached message count full statistics
- protected final LongStatsCounter msgCountFullCnt =
- new LongStatsCounter("msg_count_full", null);
- // The cache timeout refresh amount statistics
- protected final LongStatsCounter cachedTimeFullCnt =
- new LongStatsCounter("cache_time_full", null);
- // The snapshot time of file statistics set
- protected final SinceTime snapShotTime =
- new SinceTime("end_time", null);
-
- public FileStatsItemSet() {
- clear();
- }
-
- public void setSnapshotTime(long snapshotTime) {
- this.snapShotTime.reset(snapshotTime);
- }
-
- public void clear() {
- this.snapShotTime.reset();
- this.accumMsgCnt.clear();
- this.accumMsgDataSize.clear();
- this.flushedDataSizeStats.clear();
- this.accumMsgIndexSize.clear();
- this.flushedMsgCntStats.clear();
- this.dataSegAddCnt.clear();
- this.indexSegAddCnt.clear();
- this.dataSizeFullCnt.clear();
- this.metaFlushCnt.clear();
- this.msgCountFullCnt.clear();
- this.cachedTimeFullCnt.clear();
- this.resetTime.reset();
- }
- }
-}
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/MemStoreStatsHolder.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/MemStoreStatsHolder.java
deleted file mode 100644
index 7a5abef..0000000
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/MemStoreStatsHolder.java
+++ /dev/null
@@ -1,411 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.tubemq.server.broker.stats;
-
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.inlong.tubemq.corebase.metric.impl.ESTHistogram;
-import org.apache.inlong.tubemq.corebase.metric.impl.LongStatsCounter;
-import org.apache.inlong.tubemq.corebase.metric.impl.SimpleHistogram;
-import org.apache.inlong.tubemq.corebase.metric.impl.SinceTime;
-import org.apache.inlong.tubemq.corebase.utils.DateTimeConvertUtils;
-import org.apache.inlong.tubemq.server.common.TServerConstants;
-
-/**
- * MemStoreStatsHolder, The statistics set related to the memory cache.
- *
- * Through this file storage statistics class, it mainly counts
- * the number of messages refreshed each time, the total data size, the total
index size,
- * the total number of times the data file is full, the number of times the
index file is full,
- * as well as the conditions that trigger the flush, the data write
information such as
- * the number of refreshes that enter the timeout, etc.
- *
- * This part supports index comparison output before and after data collection.
- */
-public class MemStoreStatsHolder {
- // Switchable statistic items
- private final MemStatsItemSet[] memStatsSets = new MemStatsItemSet[2];
- // Current writable index
- private final AtomicInteger writableIndex = new AtomicInteger(0);
- // Last query time
- private final AtomicLong lstQueryTime = new AtomicLong(0);
- // Last snapshot time
- private final AtomicLong lstSnapshotTime = new AtomicLong(0);
- // whether the statistic is closed
- private volatile boolean isClosed;
-
- public MemStoreStatsHolder() {
- this.isClosed = true;
- this.memStatsSets[0] = new MemStatsItemSet();
- this.memStatsSets[1] = new MemStatsItemSet();
- this.lstQueryTime.set(System.currentTimeMillis());
- this.lstSnapshotTime.set(System.currentTimeMillis());
- }
-
- /**
- * Add appended message size statistic.
- *
- * @param msgSize the message size
- */
- public void addAppendedMsgSize(int msgSize) {
- if (isClosed) {
- return;
- }
- memStatsSets[getIndex()].msgRcvStats.update(msgSize);
- }
-
- /**
- * Add write message failure count statistics.
- */
- public void addMsgWriteFail() {
- if (isClosed) {
- return;
- }
- memStatsSets[getIndex()].writeFailCnt.incValue();
- }
-
- /**
- * Add cache pending count statistics.
- */
- public void addCachePending() {
- if (isClosed) {
- return;
- }
- memStatsSets[getIndex()].flushPendingCnt.incValue();
- }
-
- /**
- * Add cache re-alloc count statistics.
- */
- public void addCacheReAlloc() {
- if (isClosed) {
- return;
- }
- memStatsSets[getIndex()].cacheReAllocCnt.incValue();
- }
-
- /**
- * Add flush trigger type statistics.
- *
- * @param isDataSizeFull whether the cached data is full
- * @param isIndexSizeFull whether the cached index is full
- * @param isMsgCntFull whether the cached message count is full
- */
- public void addCacheFullType(boolean isDataSizeFull,
- boolean isIndexSizeFull,
- boolean isMsgCntFull) {
- if (isClosed) {
- return;
- }
- if (isDataSizeFull) {
- memStatsSets[getIndex()].dataSizeFullCnt.incValue();
- }
- if (isIndexSizeFull) {
- memStatsSets[getIndex()].indexSizeFullCnt.incValue();
- }
- if (isMsgCntFull) {
- memStatsSets[getIndex()].msgCountFullCnt.incValue();
- }
- }
-
- /**
- * Add flush time statistic.
- *
- * @param flushTime the flush time
- * @param isTimeoutFlush whether is timeout flush
- */
- public void addFlushTime(long flushTime, boolean isTimeoutFlush) {
- if (isClosed) {
- return;
- }
- memStatsSets[getIndex()].cacheSyncStats.update(flushTime);
- if (isTimeoutFlush) {
- memStatsSets[getIndex()].cachedTimeFullCnt.incValue();
- }
- }
-
- /**
- * Check whether has exceeded the maximum self-statistics period.
- *
- * @param checkTime the check time
- */
- public synchronized void chkStatsExpired(long checkTime) {
- if (!this.isClosed) {
- if ((checkTime - this.lstQueryTime.get())
- >= TServerConstants.CFG_STORE_STATS_MAX_REFRESH_DURATION) {
- this.isClosed = true;
- }
- }
- }
-
- /**
- * Get current writing statistics information.
- *
- * @param statsMap the return map information
- */
- public void getValue(Map<String, Long> statsMap) {
- enableStats();
- getStatsValue(true, memStatsSets[getIndex()], statsMap);
- }
-
- /**
- * Get current writing statistics information.
- *
- * @param strBuff the return information in json format
- */
- public void getValue(StringBuilder strBuff) {
- enableStats();
- getStatsValue(true, memStatsSets[getIndex()], strBuff);
- }
-
- /**
- * Snapshot and get current writing statistics information.
- *
- * @param statsMap the return map information
- */
- public void snapShort(Map<String, Long> statsMap) {
- enableStats();
- if (switchStatsSets()) {
- getStatsValue(false, memStatsSets[getIndex(writableIndex.get() -
1)], statsMap);
- } else {
- getStatsValue(true, memStatsSets[getIndex()], statsMap);
- }
- }
-
- /**
- * Snapshot and get current writing statistics information.
- *
- * @param strBuff the return information in json format
- */
- public void snapShort(StringBuilder strBuff) {
- this.enableStats();
- if (switchStatsSets()) {
- getStatsValue(false, memStatsSets[getIndex(writableIndex.get() -
1)], strBuff);
- } else {
- getStatsValue(true, memStatsSets[getIndex()], strBuff);
- }
- }
-
- /**
- * Get current memory store statistics information. Contains the data
results of
- * the current statistics and the previous snapshot
- *
- * @param isSwitch whether to switch the writing statistics block
- * @param strBuff the return information
- */
- public synchronized void getAllMemStatsInfo(boolean isSwitch,
StringBuilder strBuff) {
- this.enableStats();
- strBuff.append("[");
- getStatsValue(false, memStatsSets[getIndex(writableIndex.get() - 1)],
strBuff);
- strBuff.append(",");
- getStatsValue(true, memStatsSets[getIndex()], strBuff);
- strBuff.append("]");
- if (isSwitch) {
- switchStatsSets();
- }
- }
-
- /**
- * Set statistic status
- *
- */
- private synchronized void enableStats() {
- this.lstQueryTime.set(System.currentTimeMillis());
- if (this.isClosed) {
- this.isClosed = false;
- }
- }
-
- /**
- * Check and switch statistic sets
- *
- * @return whether the statistic sets has been switched
- */
- private boolean switchStatsSets() {
- long curSwitchTime = lstSnapshotTime.get();
- // Avoid frequent snapshots
- if ((System.currentTimeMillis() - curSwitchTime)
- >= TServerConstants.MIN_SNAPSHOT_PERIOD_MS) {
- if (lstSnapshotTime.compareAndSet(curSwitchTime,
System.currentTimeMillis())) {
- memStatsSets[getIndex(writableIndex.get() - 1)].clear();
- int befIndex = writableIndex.getAndIncrement();
-
memStatsSets[getIndex(befIndex)].setSnapshotTime(lstSnapshotTime.get());
- return true;
- }
- }
- return false;
- }
-
- /**
- * Get current writable block index.
- *
- * @return the writable block index
- */
- private int getIndex() {
- return getIndex(writableIndex.get());
- }
-
- /**
- * Gets the metric block index based on the specified value.
- *
- * @param origIndex the specified value
- * @return the metric block index
- */
- private int getIndex(int origIndex) {
- return Math.abs(origIndex % 2);
- }
-
- /**
- * Read metric block data into map.
- *
- * @param isWriting the metric block is writing
- * @param statsSet the metric block need to read
- * @param statsMap the read result
- */
- private void getStatsValue(boolean isWriting,
- MemStatsItemSet statsSet,
- Map<String, Long> statsMap) {
- statsMap.put(statsSet.resetTime.getFullName(),
- statsSet.resetTime.getSinceTime());
- statsSet.msgRcvStats.getValue(statsMap, false);
- statsMap.put(statsSet.writeFailCnt.getFullName(),
- statsSet.writeFailCnt.getValue());
- statsMap.put(statsSet.dataSizeFullCnt.getFullName(),
- statsSet.dataSizeFullCnt.getValue());
- statsMap.put(statsSet.indexSizeFullCnt.getFullName(),
- statsSet.indexSizeFullCnt.getValue());
- statsMap.put(statsSet.msgCountFullCnt.getFullName(),
- statsSet.msgCountFullCnt.getValue());
- statsMap.put(statsSet.cachedTimeFullCnt.getFullName(),
- statsSet.cachedTimeFullCnt.getValue());
- statsMap.put(statsSet.flushPendingCnt.getFullName(),
- statsSet.flushPendingCnt.getValue());
- statsMap.put(statsSet.cacheReAllocCnt.getFullName(),
- statsSet.cacheReAllocCnt.getValue());
- statsSet.cacheSyncStats.getValue(statsMap, false);
- if (isWriting) {
- statsMap.put(statsSet.snapShotTime.getFullName(),
- System.currentTimeMillis());
- } else {
- statsMap.put(statsSet.snapShotTime.getFullName(),
- statsSet.snapShotTime.getSinceTime());
- }
- }
-
- /**
- * Read metric block data into string format.
- *
- * @param isWriting the metric block is writing
- * @param statsSet the metric block need to read
- * @param strBuff the return buffer
- */
- private static void getStatsValue(boolean isWriting,
- MemStatsItemSet statsSet,
- StringBuilder strBuff) {
- strBuff.append("{\"").append(statsSet.resetTime.getFullName())
- .append("\":\"").append(statsSet.resetTime.getStrSinceTime())
- .append("\",");
- statsSet.msgRcvStats.getValue(strBuff, false);
- strBuff.append(",\"").append(statsSet.writeFailCnt.getFullName())
- .append("\":").append(statsSet.writeFailCnt.getValue())
- .append(",\"").append(statsSet.dataSizeFullCnt.getFullName())
- .append("\":").append(statsSet.dataSizeFullCnt.getValue())
- .append(",\"").append(statsSet.msgCountFullCnt.getFullName())
- .append("\":").append(statsSet.msgCountFullCnt.getValue())
- .append(",\"").append(statsSet.cachedTimeFullCnt.getFullName())
- .append("\":").append(statsSet.cachedTimeFullCnt.getValue())
- .append(",\"").append(statsSet.flushPendingCnt.getFullName())
- .append("\":").append(statsSet.flushPendingCnt.getValue())
- .append(",\"").append(statsSet.cacheReAllocCnt.getFullName())
- .append("\":").append(statsSet.cacheReAllocCnt.getValue())
- .append(",\"").append(statsSet.dataSizeFullCnt.getFullName())
- .append("\":").append(statsSet.dataSizeFullCnt.getValue())
- .append(",");
- statsSet.cacheSyncStats.getValue(strBuff, false);
-
strBuff.append(",\"").append(statsSet.snapShotTime.getFullName()).append("\":\"");
- if (isWriting) {
-
strBuff.append(DateTimeConvertUtils.ms2yyyyMMddHHmmss(System.currentTimeMillis()));
- } else {
- strBuff.append(statsSet.snapShotTime.getStrSinceTime());
- }
- strBuff.append("\"}");
- }
-
- /**
- * MemStatsItemSet, Memory cache related statistics set
- *
- */
- private static class MemStatsItemSet {
- // The reset time of memory statistics set
- protected final SinceTime resetTime =
- new SinceTime("reset_time", null);
- // The status of messages received
- protected final SimpleHistogram msgRcvStats =
- new SimpleHistogram("msg_in", null);
- // The count of message append failures
- protected final LongStatsCounter writeFailCnt =
- new LongStatsCounter("msg_append_fail", null);
- // The cached message data full statistics
- protected final LongStatsCounter dataSizeFullCnt =
- new LongStatsCounter("data_size_full", null);
- // The cached message index full statistics
- protected final LongStatsCounter indexSizeFullCnt =
- new LongStatsCounter("index_size_full", null);
- // The cached message count full statistics
- protected final LongStatsCounter msgCountFullCnt =
- new LongStatsCounter("msg_count_full", null);
- // The cache timeout refresh amount statistics
- protected final LongStatsCounter cachedTimeFullCnt =
- new LongStatsCounter("cache_time_full", null);
- // The pending count for cache flush operations
- protected final LongStatsCounter flushPendingCnt =
- new LongStatsCounter("flush_pending", null);
- // The cache re-alloc count
- protected final LongStatsCounter cacheReAllocCnt =
- new LongStatsCounter("cache_realloc", null);
- // The cache persistence duration statistics
- protected final ESTHistogram cacheSyncStats =
- new ESTHistogram("cache_flush_dlt", null);
- // The snapshot time of memory statistics set
- protected final SinceTime snapShotTime =
- new SinceTime("end_time", null);
-
- public MemStatsItemSet() {
- clear();
- }
-
- public void setSnapshotTime(long snapshotTime) {
- this.snapShotTime.reset(snapshotTime);
- }
-
- public void clear() {
- this.snapShotTime.reset();
- this.msgRcvStats.clear();
- this.writeFailCnt.clear();
- this.dataSizeFullCnt.clear();
- this.indexSizeFullCnt.clear();
- this.msgCountFullCnt.clear();
- this.flushPendingCnt.clear();
- this.cacheReAllocCnt.clear();
- this.cachedTimeFullCnt.clear();
- this.cacheSyncStats.clear();
- this.resetTime.reset();
- }
- }
-}
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/MsgStoreStatsHolder.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/MsgStoreStatsHolder.java
new file mode 100644
index 0000000..ff88df7
--- /dev/null
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/MsgStoreStatsHolder.java
@@ -0,0 +1,615 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.server.broker.stats;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.inlong.tubemq.corebase.metric.impl.ESTHistogram;
+import org.apache.inlong.tubemq.corebase.metric.impl.LongStatsCounter;
+import org.apache.inlong.tubemq.corebase.metric.impl.SimpleHistogram;
+import org.apache.inlong.tubemq.corebase.metric.impl.SinceTime;
+import org.apache.inlong.tubemq.corebase.utils.DateTimeConvertUtils;
+import org.apache.inlong.tubemq.server.common.TServerConstants;
+
+/**
+ * MsgStoreStatsHolder, The statistics set related to the message store,
+ * include cache and file store statistics.
+ *
+ * Through this file storage statistics class, it mainly counts
+ * the number of messages refreshed each time, the total data size, the total
index size,
+ * the total number of times the data file is full, the number of times the
index file is full,
+ * as well as the conditions that trigger the flush, the data write
information such as
+ * the number of refreshes that enter the timeout, etc.
+ *
+ * This part supports index comparison output before and after data collection.
+ */
+public class MsgStoreStatsHolder {
+ // Switchable statistic items
+ private final MsgStoreStatsItemSet[] msgStoreStatsSets = new
MsgStoreStatsItemSet[2];
+ // Current writable index
+ private final AtomicInteger writableIndex = new AtomicInteger(0);
+ // Last query time
+ private final AtomicLong lstQueryTime = new AtomicLong(0);
+ // Last snapshot time
+ private final AtomicLong lstSnapshotTime = new AtomicLong(0);
+ // Whether closed for unused for a long time
+ private volatile boolean isClosed;
+ // whether the statistic is manual closed
+ private volatile boolean isManualClosed = false;
+
+ public MsgStoreStatsHolder() {
+ this.isClosed = true;
+ this.msgStoreStatsSets[0] = new MsgStoreStatsItemSet();
+ this.msgStoreStatsSets[1] = new MsgStoreStatsItemSet();
+ this.lstQueryTime.set(System.currentTimeMillis());
+ this.lstSnapshotTime.set(System.currentTimeMillis());
+ }
+
+ /**
+ * Add appended message size statistic.
+ *
+ * @param msgSize the message size
+ */
+ public void addCacheMsgSize(int msgSize) {
+ if (isClosed) {
+ return;
+ }
+ msgStoreStatsSets[getIndex()].cacheMsgRcvStats.update(msgSize);
+ }
+
+ /**
+ * Add write message failure count statistics.
+ */
+ public void addMsgWriteCacheFail() {
+ if (isClosed) {
+ return;
+ }
+ msgStoreStatsSets[getIndex()].cacheWriteFailCnt.incValue();
+ }
+
+ /**
+ * Add cache pending count statistics.
+ */
+ public void addCachePending() {
+ if (isClosed) {
+ return;
+ }
+ msgStoreStatsSets[getIndex()].cacheFlushPendingCnt.incValue();
+ }
+
+ /**
+ * Add cache re-alloc count statistics.
+ */
+ public void addCacheReAlloc() {
+ if (isClosed) {
+ return;
+ }
+ msgStoreStatsSets[getIndex()].cacheReAllocCnt.incValue();
+ }
+
+ /**
+ * Add flush trigger type statistics.
+ *
+ * @param isDataSizeFull whether the cached data is full
+ * @param isIndexSizeFull whether the cached index is full
+ * @param isMsgCntFull whether the cached message count is full
+ */
+ public void addCacheFullType(boolean isDataSizeFull,
+ boolean isIndexSizeFull,
+ boolean isMsgCntFull) {
+ if (isClosed) {
+ return;
+ }
+ if (isDataSizeFull) {
+ msgStoreStatsSets[getIndex()].cacheDataSizeFullCnt.incValue();
+ }
+ if (isIndexSizeFull) {
+ msgStoreStatsSets[getIndex()].cacheIndexSizeFullCnt.incValue();
+ }
+ if (isMsgCntFull) {
+ msgStoreStatsSets[getIndex()].cacheMsgCountFullCnt.incValue();
+ }
+ }
+
+ /**
+ * Add flush time statistic.
+ *
+ * @param flushTime the flush time
+ * @param isTimeoutFlush whether is timeout flush
+ */
+ public void addCacheFlushTime(long flushTime, boolean isTimeoutFlush) {
+ if (isClosed) {
+ return;
+ }
+ msgStoreStatsSets[getIndex()].cacheSyncStats.update(flushTime);
+ if (isTimeoutFlush) {
+ msgStoreStatsSets[getIndex()].cacheTimeFullCnt.incValue();
+ }
+ }
+
+ /**
+ * Add message store statistics information.
+ *
+ * @param msgCnt the message count written
+ * @param msgIndexSize the message index size written
+ * @param msgDataSize the message data size written
+ * @param flushedMsgCnt the flushed message count
+ * @param flushedDataSize the flushed message size
+ * @param isDataSegFlush whether the data segment flushed
+ * @param isIndexSegFlush whether the index segment flushed
+ * @param isDataSizeFull whether the cached data is full
+ * @param isMsgCntFull whether the cached message count is full
+ * @param isCacheTimeFull whether the cached time is full
+ * @param isForceMetadata whether force push metadata
+ */
+ public void addFileFlushStatsInfo(int msgCnt, int msgIndexSize, int
msgDataSize,
+ long flushedMsgCnt, long flushedDataSize,
+ boolean isDataSegFlush, boolean
isIndexSegFlush,
+ boolean isDataSizeFull, boolean
isMsgCntFull,
+ boolean isCacheTimeFull, boolean
isForceMetadata) {
+ if (isClosed) {
+ return;
+ }
+ MsgStoreStatsItemSet tmStatsSet = msgStoreStatsSets[getIndex()];
+ tmStatsSet.fileAccumMsgCnt.addValue(msgCnt);
+ tmStatsSet.fileAccumMsgIndexSize.addValue(msgIndexSize);
+ tmStatsSet.fileAccumMsgDataSize.addValue(msgDataSize);
+ if (flushedDataSize > 0) {
+ tmStatsSet.fileFlushedDataSize.update(flushedDataSize);
+ }
+ if (flushedMsgCnt > 0) {
+ tmStatsSet.fileFlushedMsgCnt.update(flushedMsgCnt);
+ }
+ if (isDataSegFlush) {
+ tmStatsSet.fileDataSegAddCnt.incValue();
+ }
+ if (isIndexSegFlush) {
+ tmStatsSet.fileIndexSegAddCnt.incValue();
+ }
+ if (isDataSizeFull) {
+ tmStatsSet.fileDataSizeFullCnt.incValue();
+ }
+ if (isMsgCntFull) {
+ tmStatsSet.fileMsgCountFullCnt.incValue();
+ }
+ if (isCacheTimeFull) {
+ tmStatsSet.fileCachedTimeFullCnt.incValue();
+ }
+ if (isForceMetadata) {
+ tmStatsSet.fileMetaFlushCnt.incValue();
+ }
+ }
+
+ /**
+ * Add flush time timeout statistic.
+ *
+ * @param flushedMsgCnt the flushed message count
+ * @param flushedDataSize the flushed message size
+ * @param isForceMetadata whether force push metadata
+ */
+ public void addFileTimeoutFlushStats(long flushedMsgCnt,
+ long flushedDataSize,
+ boolean isForceMetadata) {
+ if (isClosed) {
+ return;
+ }
+ MsgStoreStatsItemSet tmStatsSet = msgStoreStatsSets[getIndex()];
+ tmStatsSet.fileCachedTimeFullCnt.incValue();
+ if (flushedDataSize > 0) {
+ tmStatsSet.fileFlushedDataSize.update(flushedDataSize);
+ }
+ if (flushedMsgCnt > 0) {
+ tmStatsSet.fileFlushedMsgCnt.update(flushedMsgCnt);
+ }
+ if (isForceMetadata) {
+ tmStatsSet.fileMetaFlushCnt.incValue();
+ }
+ }
+
+ /**
+ * Check whether has exceeded the maximum self-statistics period.
+ *
+ * @param checkTime the check time
+ */
+ public synchronized void chkStatsExpired(long checkTime) {
+ if (!this.isClosed) {
+ if ((checkTime - this.lstQueryTime.get())
+ >= TServerConstants.CFG_STORE_STATS_MAX_REFRESH_DURATION) {
+ this.isClosed = true;
+ }
+ }
+ }
+
+ /**
+ * Set manually the statistic status.
+ *
+ * @param enableStats enable or disable the statistic.
+ */
+ public synchronized void setStatsStatus(boolean enableStats) {
+ if (enableStats) {
+ this.isManualClosed = false;
+ } else {
+ this.isManualClosed = true;
+ this.isClosed = true;
+ }
+ }
+
+ /**
+ * Query whether the statistic is closed.
+ *
+ * @return the statistic status
+ */
+ public boolean isStatsClosed() {
+ return (this.isManualClosed || this.isClosed);
+ }
+
+ /**
+ * Get current writing statistics information.
+ *
+ * @param statsMap the return map information
+ */
+ public void getValue(Map<String, Long> statsMap) {
+ activeStatsBaseCall();
+ getStatsValue(true, msgStoreStatsSets[getIndex()], statsMap);
+ }
+
+ /**
+ * Get current writing statistics information.
+ *
+ * @param strBuff the return information in json format
+ */
+ public void getValue(StringBuilder strBuff) {
+ activeStatsBaseCall();
+ getStatsValue(true, msgStoreStatsSets[getIndex()], strBuff);
+ }
+
+ /**
+ * Snapshot and get current writing statistics information.
+ *
+ * @param statsMap the return map information
+ */
+ public void snapShort(Map<String, Long> statsMap) {
+ activeStatsBaseCall();
+ if (switchStatsSets()) {
+ getStatsValue(false,
+ msgStoreStatsSets[getIndex(writableIndex.get() - 1)],
statsMap);
+ } else {
+ getStatsValue(true, msgStoreStatsSets[getIndex()], statsMap);
+ }
+ }
+
+ /**
+ * Snapshot and get current writing statistics information.
+ *
+ * @param strBuff the return information in json format
+ */
+ public void snapShort(StringBuilder strBuff) {
+ this.activeStatsBaseCall();
+ if (switchStatsSets()) {
+ getStatsValue(false,
+ msgStoreStatsSets[getIndex(writableIndex.get() - 1)],
strBuff);
+ } else {
+ getStatsValue(true, msgStoreStatsSets[getIndex()], strBuff);
+ }
+ }
+
+ /**
+ * Get current message store statistics information. Contains the data
results of
+ * the current statistics and the previous snapshot
+ *
+ * @param isSwitch whether to switch the writing statistics block
+ * @param strBuff the return information
+ */
+ public synchronized void getMsgStoreStatsInfo(boolean isSwitch,
StringBuilder strBuff) {
+ this.activeStatsBaseCall();
+ strBuff.append("[");
+ getStatsValue(false,
+ msgStoreStatsSets[getIndex(writableIndex.get() - 1)], strBuff);
+ strBuff.append(",");
+ getStatsValue(true, msgStoreStatsSets[getIndex()], strBuff);
+ strBuff.append("]");
+ if (isSwitch) {
+ switchStatsSets();
+ }
+ }
+
+ /**
+ * Active statistic status based on api call
+ *
+ */
+ private void activeStatsBaseCall() {
+ if (isManualClosed) {
+ return;
+ }
+ this.lstQueryTime.set(System.currentTimeMillis());
+ if (this.isClosed) {
+ this.isClosed = false;
+ }
+ }
+
+ /**
+ * Check and switch statistic sets
+ *
+ * @return whether the statistic sets has been switched
+ */
+ private boolean switchStatsSets() {
+ long curSwitchTime = lstSnapshotTime.get();
+ // Avoid frequent snapshots
+ if ((System.currentTimeMillis() - curSwitchTime)
+ >= TServerConstants.MIN_SNAPSHOT_PERIOD_MS) {
+ if (lstSnapshotTime.compareAndSet(curSwitchTime,
System.currentTimeMillis())) {
+ msgStoreStatsSets[getIndex(writableIndex.get() - 1)].clear();
+ msgStoreStatsSets[getIndex(writableIndex.getAndIncrement())]
+ .setSnapshotTime(lstSnapshotTime.get());
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Get current writable block index.
+ *
+ * @return the writable block index
+ */
+ private int getIndex() {
+ return getIndex(writableIndex.get());
+ }
+
+ /**
+ * Gets the metric block index based on the specified value.
+ *
+ * @param origIndex the specified value
+ * @return the metric block index
+ */
+ private int getIndex(int origIndex) {
+ return Math.abs(origIndex % 2);
+ }
+
+ /**
+ * Read metric block data into map.
+ *
+ * @param isWriting the metric block is writing
+ * @param statsSet the metric block need to read
+ * @param statsMap the read result
+ */
+ private void getStatsValue(boolean isWriting,
+ MsgStoreStatsItemSet statsSet,
+ Map<String, Long> statsMap) {
+ statsMap.put(statsSet.resetTime.getFullName(),
+ statsSet.resetTime.getSinceTime());
+ statsMap.put("isClosed", (isStatsClosed() ? 1L : 0L));
+ // for memory store
+ statsSet.cacheMsgRcvStats.getValue(statsMap, false);
+ statsMap.put(statsSet.cacheWriteFailCnt.getFullName(),
+ statsSet.cacheWriteFailCnt.getValue());
+ statsMap.put(statsSet.cacheDataSizeFullCnt.getFullName(),
+ statsSet.cacheDataSizeFullCnt.getValue());
+ statsMap.put(statsSet.cacheIndexSizeFullCnt.getFullName(),
+ statsSet.cacheIndexSizeFullCnt.getValue());
+ statsMap.put(statsSet.cacheMsgCountFullCnt.getFullName(),
+ statsSet.cacheMsgCountFullCnt.getValue());
+ statsMap.put(statsSet.cacheTimeFullCnt.getFullName(),
+ statsSet.cacheTimeFullCnt.getValue());
+ statsMap.put(statsSet.cacheFlushPendingCnt.getFullName(),
+ statsSet.cacheFlushPendingCnt.getValue());
+ statsMap.put(statsSet.cacheReAllocCnt.getFullName(),
+ statsSet.cacheReAllocCnt.getValue());
+ statsSet.cacheSyncStats.getValue(statsMap, false);
+ // for file store
+ statsMap.put(statsSet.fileAccumMsgCnt.getFullName(),
+ statsSet.fileAccumMsgCnt.getValue());
+ statsMap.put(statsSet.fileAccumMsgDataSize.getFullName(),
+ statsSet.fileAccumMsgDataSize.getValue());
+ statsMap.put(statsSet.fileAccumMsgIndexSize.getFullName(),
+ statsSet.fileAccumMsgIndexSize.getValue());
+ statsSet.fileFlushedDataSize.getValue(statsMap, false);
+ statsSet.fileFlushedMsgCnt.getValue(statsMap, false);
+ statsMap.put(statsSet.fileDataSegAddCnt.getFullName(),
+ statsSet.fileDataSegAddCnt.getValue());
+ statsMap.put(statsSet.fileIndexSegAddCnt.getFullName(),
+ statsSet.fileIndexSegAddCnt.getValue());
+ statsMap.put(statsSet.fileMetaFlushCnt.getFullName(),
+ statsSet.fileMetaFlushCnt.getValue());
+ statsMap.put(statsSet.fileDataSizeFullCnt.getFullName(),
+ statsSet.fileDataSizeFullCnt.getValue());
+ statsMap.put(statsSet.fileMsgCountFullCnt.getFullName(),
+ statsSet.fileMsgCountFullCnt.getValue());
+ statsMap.put(statsSet.fileCachedTimeFullCnt.getFullName(),
+ statsSet.fileCachedTimeFullCnt.getValue());
+ if (isWriting) {
+ statsMap.put(statsSet.snapShotTime.getFullName(),
+ System.currentTimeMillis());
+ } else {
+ statsMap.put(statsSet.snapShotTime.getFullName(),
+ statsSet.snapShotTime.getSinceTime());
+ }
+ }
+
+ /**
+ * Read metric block data into string format.
+ *
+ * @param isWriting the metric block is writing
+ * @param statsSet the metric block need to read
+ * @param strBuff the return buffer
+ */
+ private void getStatsValue(boolean isWriting,
+ MsgStoreStatsItemSet statsSet,
+ StringBuilder strBuff) {
+ strBuff.append("{\"").append(statsSet.resetTime.getFullName())
+ .append("\":\"").append(statsSet.resetTime.getStrSinceTime())
+
.append("\",\"isClosed\":").append(isStatsClosed()).append(",");
+ statsSet.cacheMsgRcvStats.getValue(strBuff, false);
+ strBuff.append(",\"").append(statsSet.cacheWriteFailCnt.getFullName())
+ .append("\":").append(statsSet.cacheWriteFailCnt.getValue())
+
.append(",\"").append(statsSet.cacheDataSizeFullCnt.getFullName())
+ .append("\":").append(statsSet.cacheDataSizeFullCnt.getValue())
+
.append(",\"").append(statsSet.cacheMsgCountFullCnt.getFullName())
+ .append("\":").append(statsSet.cacheMsgCountFullCnt.getValue())
+ .append(",\"").append(statsSet.cacheTimeFullCnt.getFullName())
+ .append("\":").append(statsSet.cacheTimeFullCnt.getValue())
+
.append(",\"").append(statsSet.cacheFlushPendingCnt.getFullName())
+ .append("\":").append(statsSet.cacheFlushPendingCnt.getValue())
+ .append(",\"").append(statsSet.cacheReAllocCnt.getFullName())
+ .append("\":").append(statsSet.cacheReAllocCnt.getValue())
+
.append(",\"").append(statsSet.cacheDataSizeFullCnt.getFullName())
+ .append("\":").append(statsSet.cacheDataSizeFullCnt.getValue())
+ .append(",");
+ statsSet.cacheSyncStats.getValue(strBuff, false);
+ strBuff.append(",\"").append(statsSet.fileAccumMsgCnt.getFullName())
+ .append("\":").append(statsSet.fileAccumMsgCnt.getValue())
+
.append(",\"").append(statsSet.fileAccumMsgDataSize.getFullName())
+ .append("\":").append(statsSet.fileAccumMsgDataSize.getValue())
+
.append(",\"").append(statsSet.fileAccumMsgIndexSize.getFullName())
+
.append("\":").append(statsSet.fileAccumMsgIndexSize.getValue())
+ .append(",");
+ statsSet.fileFlushedDataSize.getValue(strBuff, false);
+ strBuff.append(",");
+ statsSet.fileFlushedMsgCnt.getValue(strBuff, false);
+ strBuff.append(",\"").append(statsSet.fileDataSegAddCnt.getFullName())
+ .append("\":").append(statsSet.fileDataSegAddCnt.getValue())
+
.append(",\"").append(statsSet.fileIndexSegAddCnt.getFullName())
+ .append("\":").append(statsSet.fileIndexSegAddCnt.getValue())
+ .append(",\"").append(statsSet.fileMetaFlushCnt.getFullName())
+ .append("\":").append(statsSet.fileMetaFlushCnt.getValue())
+
.append(",\"").append(statsSet.fileDataSizeFullCnt.getFullName())
+ .append("\":").append(statsSet.fileDataSizeFullCnt.getValue())
+
.append(",\"").append(statsSet.fileMsgCountFullCnt.getFullName())
+ .append("\":").append(statsSet.fileMsgCountFullCnt.getValue())
+
.append(",\"").append(statsSet.fileCachedTimeFullCnt.getFullName())
+
.append("\":").append(statsSet.fileCachedTimeFullCnt.getValue())
+ .append(",\"").append(statsSet.snapShotTime.getFullName())
+ .append("\":\"");
+ if (isWriting) {
+
strBuff.append(DateTimeConvertUtils.ms2yyyyMMddHHmmss(System.currentTimeMillis()));
+ } else {
+ strBuff.append(statsSet.snapShotTime.getStrSinceTime());
+ }
+ strBuff.append("\"}");
+ }
+
+ /**
+ * MsgStoreStatsItemSet, Message store cache related statistics set
+ *
+ */
+ private static class MsgStoreStatsItemSet {
+ // The reset time of statistics set
+ protected final SinceTime resetTime =
+ new SinceTime("reset_time", null);
+ // The status of messages received by cache
+ protected final SimpleHistogram cacheMsgRcvStats =
+ new SimpleHistogram("cache_msg_in", null);
+ // The count of message append cache failures
+ protected final LongStatsCounter cacheWriteFailCnt =
+ new LongStatsCounter("cache_append_fail", null);
+ // The cached message data full statistics
+ protected final LongStatsCounter cacheDataSizeFullCnt =
+ new LongStatsCounter("cache_data_full", null);
+ // The cached message index full statistics
+ protected final LongStatsCounter cacheIndexSizeFullCnt =
+ new LongStatsCounter("cache_index_full", null);
+ // The cached message count full statistics
+ protected final LongStatsCounter cacheMsgCountFullCnt =
+ new LongStatsCounter("cache_count_full", null);
+ // The cache timeout refresh amount statistics
+ protected final LongStatsCounter cacheTimeFullCnt =
+ new LongStatsCounter("cache_time_full", null);
+ // The pending count for cache flush operations
+ protected final LongStatsCounter cacheFlushPendingCnt =
+ new LongStatsCounter("cache_flush_pending", null);
+ // The cache re-alloc count
+ protected final LongStatsCounter cacheReAllocCnt =
+ new LongStatsCounter("cache_realloc", null);
+ // The cache persistence duration statistics
+ protected final ESTHistogram cacheSyncStats =
+ new ESTHistogram("cache_flush_dlt", null);
+ // for file store
+ // The accumulate message count statistics
+ protected final LongStatsCounter fileAccumMsgCnt =
+ new LongStatsCounter("file_total_msg_cnt", null);
+ // The accumulate message data size statistics
+ protected final LongStatsCounter fileAccumMsgDataSize =
+ new LongStatsCounter("file_total_data_size", null);
+ // The accumulate message index statistics
+ protected final LongStatsCounter fileAccumMsgIndexSize =
+ new LongStatsCounter("file_total_index_size", null);
+ // The data flushed statistics
+ protected final SimpleHistogram fileFlushedDataSize =
+ new SimpleHistogram("file_flushed_data", null);
+ // The message count flushed statistics
+ protected final SimpleHistogram fileFlushedMsgCnt =
+ new SimpleHistogram("file_flushed_msg", null);
+ // The new data segment statistics
+ protected final LongStatsCounter fileDataSegAddCnt =
+ new LongStatsCounter("file_data_seg", null);
+ // The new index segment full statistics
+ protected final LongStatsCounter fileIndexSegAddCnt =
+ new LongStatsCounter("file_index_seg", null);
+ // The flush count statistics of file meta-data
+ protected final LongStatsCounter fileMetaFlushCnt =
+ new LongStatsCounter("file_meta_flush", null);
+ // The cached message data full statistics
+ protected final LongStatsCounter fileDataSizeFullCnt =
+ new LongStatsCounter("file_data_full", null);
+ // The cached message count full statistics
+ protected final LongStatsCounter fileMsgCountFullCnt =
+ new LongStatsCounter("file_count_full", null);
+ // The cache timeout refresh amount statistics
+ protected final LongStatsCounter fileCachedTimeFullCnt =
+ new LongStatsCounter("file_time_full", null);
+ // The snapshot time of statistics set
+ protected final SinceTime snapShotTime =
+ new SinceTime("end_time", null);
+
+ public MsgStoreStatsItemSet() {
+ clear();
+ }
+
+ public void setSnapshotTime(long snapshotTime) {
+ this.snapShotTime.reset(snapshotTime);
+ }
+
+ public void clear() {
+ this.snapShotTime.reset();
+ // for file metric items
+ this.fileAccumMsgCnt.clear();
+ this.fileAccumMsgDataSize.clear();
+ this.fileFlushedDataSize.clear();
+ this.fileAccumMsgIndexSize.clear();
+ this.fileFlushedMsgCnt.clear();
+ this.fileDataSegAddCnt.clear();
+ this.fileIndexSegAddCnt.clear();
+ this.fileDataSizeFullCnt.clear();
+ this.fileMetaFlushCnt.clear();
+ this.fileMsgCountFullCnt.clear();
+ this.fileCachedTimeFullCnt.clear();
+ // for cache metric items
+ this.cacheMsgRcvStats.clear();
+ this.cacheWriteFailCnt.clear();
+ this.cacheDataSizeFullCnt.clear();
+ this.cacheIndexSizeFullCnt.clear();
+ this.cacheMsgCountFullCnt.clear();
+ this.cacheFlushPendingCnt.clear();
+ this.cacheReAllocCnt.clear();
+ this.cacheTimeFullCnt.clear();
+ this.cacheSyncStats.clear();
+ this.resetTime.reset();
+ }
+ }
+}
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/ServiceStatsHolder.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/ServiceStatsHolder.java
index 7b398dc..217a14a 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/ServiceStatsHolder.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/ServiceStatsHolder.java
@@ -43,6 +43,8 @@ public class ServiceStatsHolder {
private static final AtomicInteger writableIndex = new AtomicInteger(0);
// Last snapshot time
private static final AtomicLong lstSnapshotTime = new AtomicLong(0);
+ // whether the DiskSync statistic is closed
+ private static volatile boolean diskSyncClosed = false;
// Initial service statistic set
static {
@@ -60,34 +62,39 @@ public class ServiceStatsHolder {
}
public static void snapShort(Map<String, Long> statsMap) {
- long curSnapshotTime = lstSnapshotTime.get();
- // Avoid frequent snapshots
- if ((System.currentTimeMillis() - curSnapshotTime)
- >= TServerConstants.MIN_SNAPSHOT_PERIOD_MS) {
- if (lstSnapshotTime.compareAndSet(curSnapshotTime,
System.currentTimeMillis())) {
- int befIndex = writableIndex.getAndIncrement();
- switchableSets[getIndex()].resetSinceTime();
- getStatsValue(switchableSets[getIndex(befIndex)], true,
statsMap);
- return;
- }
+ if (switchWritingStatsUnit()) {
+ getStatsValue(switchableSets[getIndex(writableIndex.get() - 1)],
true, statsMap);
+ } else {
+ getValue(statsMap);
}
- getValue(statsMap);
}
public static void snapShort(StringBuilder strBuff) {
- long curSnapshotTime = lstSnapshotTime.get();
- // Avoid frequent snapshots
- if ((System.currentTimeMillis() - curSnapshotTime)
- >= TServerConstants.MIN_SNAPSHOT_PERIOD_MS) {
- if (lstSnapshotTime.compareAndSet(curSnapshotTime,
System.currentTimeMillis())) {
- int befIndex = writableIndex.getAndIncrement();
- switchableSets[getIndex()].resetSinceTime();
- getStatsValue(switchableSets[getIndex(befIndex)], true,
strBuff);
- return;
- }
+ if (switchWritingStatsUnit()) {
+ getStatsValue(switchableSets[getIndex(writableIndex.get() - 1)],
true, strBuff);
+ } else {
+ getValue(strBuff);
}
- getValue(strBuff);
}
+
+ /**
+ * Set manually the DiskSync statistic status.
+ *
+ * @param enableStats enable or disable the statistic.
+ */
+ public static synchronized void setDiskSyncStatsStatus(boolean
enableStats) {
+ ServiceStatsHolder.diskSyncClosed = !enableStats;
+ }
+
+ /**
+ * Query whether the statistic is closed.
+ *
+ * @return the statistic status
+ */
+ public static boolean isDiskSyncStatsClosed() {
+ return ServiceStatsHolder.diskSyncClosed;
+ }
+
// metric set operate APIs end
// metric item operate APIs begin
@@ -119,6 +126,9 @@ public class ServiceStatsHolder {
}
public static void updDiskSyncDataDlt(long dltTime) {
+ if (diskSyncClosed) {
+ return;
+ }
switchableSets[getIndex()].fileSyncDltStats.update(dltTime);
}
@@ -128,11 +138,25 @@ public class ServiceStatsHolder {
// metric set operate APIs end
// private functions
+ private static boolean switchWritingStatsUnit() {
+ long curSnapshotTime = lstSnapshotTime.get();
+ // Avoid frequent snapshots
+ if ((System.currentTimeMillis() - curSnapshotTime)
+ >= TServerConstants.MIN_SNAPSHOT_PERIOD_MS) {
+ if (lstSnapshotTime.compareAndSet(curSnapshotTime,
System.currentTimeMillis())) {
+
switchableSets[getIndex(writableIndex.incrementAndGet())].resetSinceTime();
+ return true;
+ }
+ }
+ return false;
+ }
+
private static void getStatsValue(ServiceStatsSet statsSet,
boolean resetValue,
Map<String, Long> statsMap) {
statsMap.put(statsSet.lstResetTime.getFullName(),
statsSet.lstResetTime.getSinceTime());
+ statsMap.put("isDiskSyncClosed", (diskSyncClosed ? 1L : 0L));
if (resetValue) {
statsSet.fileSyncDltStats.snapShort(statsMap, false);
statsMap.put(statsSet.fileIOExcStats.getFullName(),
@@ -171,7 +195,8 @@ public class ServiceStatsHolder {
StringBuilder strBuff) {
strBuff.append("{\"").append(statsSet.lstResetTime.getFullName())
.append("\":\"").append(statsSet.lstResetTime.getStrSinceTime())
- .append("\",");
+ .append("\",\"isDiskSyncClosed\":").append(diskSyncClosed)
+ .append(",");
if (resetValue) {
statsSet.fileSyncDltStats.snapShort(strBuff, false);
strBuff.append(",\"").append(statsSet.fileIOExcStats.getFullName())
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/TrafficStatsService.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/TrafficStatsService.java
index 04de7aa..0aa8ea2 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/TrafficStatsService.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/TrafficStatsService.java
@@ -90,10 +90,8 @@ public class TrafficStatsService extends
AbstractDaemonService implements Traffi
return;
}
// Output remain information
- int index = writableIndex.get();
- for (int i = 0; i < switchableUnits.length; i++) {
- output2file(++index);
- }
+ output2file(writableIndex.get() - 1);
+ output2file(writableIndex.get());
}
@Override
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/web/BrokerAdminServlet.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/web/BrokerAdminServlet.java
index e3e7be6..b82220e 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/web/BrokerAdminServlet.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/web/BrokerAdminServlet.java
@@ -301,67 +301,19 @@ public class BrokerAdminServlet extends
AbstractWebHandler {
sBuilder.append("],\"totalCnt\":").append(recordId).append("}");
}
- /**
+ /***
* Get memory store status info.
*
* @param req request
- * @param sBuffer process result
+ * @param sBuilder process result
*/
public void adminGetMemStoreStatisInfo(HttpServletRequest req,
- StringBuilder sBuffer) {
- ProcessResult result = new ProcessResult();
- if (!WebParameterUtils.getStringParamValue(req,
- WebFieldDef.COMPSTOPICNAME, false, null, sBuffer, result)) {
- WebParameterUtils.buildFailResult(sBuffer, result.getErrMsg());
- return;
- }
- Set<String> topicNameSet = (Set<String>) result.getRetData();
- if (!WebParameterUtils.getBooleanParamValue(req,
- WebFieldDef.NEEDREFRESH, false, false, sBuffer, result)) {
- WebParameterUtils.buildFailResult(sBuffer, result.getErrMsg());
- return;
- }
- boolean requireRefresh = (boolean) result.getRetData();
-
sBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"Success!\",\"detail\":[");
- Map<String, ConcurrentHashMap<Integer, MessageStore>>
messageTopicStores =
- broker.getStoreManager().getMessageStores();
- int index = 0;
- int recordId = 0;
- for (Map.Entry<String, ConcurrentHashMap<Integer, MessageStore>> entry
: messageTopicStores.entrySet()) {
- if (TStringUtils.isBlank(entry.getKey())
- || (!topicNameSet.isEmpty() &&
!topicNameSet.contains(entry.getKey()))) {
- continue;
- }
- String topicName = entry.getKey();
- if (recordId++ > 0) {
- sBuffer.append(",");
- }
- index = 0;
-
sBuffer.append("{\"topicName\":\"").append(topicName).append("\",\"storeStatsInfo\":[");
- ConcurrentHashMap<Integer, MessageStore> partStoreMap =
entry.getValue();
- if (partStoreMap != null) {
- for (Entry<Integer, MessageStore> subEntry :
partStoreMap.entrySet()) {
- MessageStore msgStore = subEntry.getValue();
- if (msgStore == null) {
- continue;
- }
- if (index++ > 0) {
- sBuffer.append(",");
- }
- sBuffer.append("{\"storeId\":").append(subEntry.getKey())
- .append(",\"memStats\":");
- msgStore.getMemStoreStatsInfo(requireRefresh, sBuffer);
- sBuffer.append(",\"fileStats\":");
- msgStore.getCurFileStoreStatsInfo(requireRefresh, sBuffer);
- sBuffer.append("}");
- }
- }
- sBuffer.append("]}");
- }
- sBuffer.append("],\"totalCount\":").append(recordId).append("}");
+ StringBuilder sBuilder) {
+ sBuilder.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
+ .append("The method is deprecated, please use
admin_get_msgstore_stats\"}");
}
- /**
+ /***
* Manual set offset.
*
* @param req request
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/webbase/WebCallStatsHolder.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/webbase/WebCallStatsHolder.java
index a8ade6e..f47beb7 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/webbase/WebCallStatsHolder.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/webbase/WebCallStatsHolder.java
@@ -40,6 +40,8 @@ public class WebCallStatsHolder {
private static final AtomicInteger writableIndex = new AtomicInteger(0);
// Last snapshot time
private static final AtomicLong lstSnapshotTime = new AtomicLong(0);
+ // whether the statistic is manual closed
+ private static volatile boolean isManualClosed = false;
// Initial service statistic set
static {
@@ -57,38 +59,45 @@ public class WebCallStatsHolder {
}
public static void snapShort(Map<String, Long> statsMap) {
- long curSnapshotTime = lstSnapshotTime.get();
- // Avoid frequent snapshots
- if ((System.currentTimeMillis() - curSnapshotTime)
- >= TServerConstants.MIN_SNAPSHOT_PERIOD_MS) {
- if (lstSnapshotTime.compareAndSet(curSnapshotTime,
System.currentTimeMillis())) {
- int befIndex = writableIndex.getAndIncrement();
- switchableSets[getIndex()].resetSinceTime();
- getStatsValue(switchableSets[getIndex(befIndex)], true,
statsMap);
- return;
- }
+ if (switchWritingStatsUnit()) {
+ getStatsValue(switchableSets[getIndex(writableIndex.get() - 1)],
true, statsMap);
+ } else {
+ getValue(statsMap);
}
- getValue(statsMap);
}
public static void snapShort(StringBuilder strBuff) {
- long curSnapshotTime = lstSnapshotTime.get();
- // Avoid frequent snapshots
- if ((System.currentTimeMillis() - curSnapshotTime)
- >= TServerConstants.MIN_SNAPSHOT_PERIOD_MS) {
- if (lstSnapshotTime.compareAndSet(curSnapshotTime,
System.currentTimeMillis())) {
- int befIndex = writableIndex.getAndIncrement();
- switchableSets[getIndex()].resetSinceTime();
- getStatsValue(switchableSets[getIndex(befIndex)], true,
strBuff);
- return;
- }
+ if (switchWritingStatsUnit()) {
+ getStatsValue(switchableSets[getIndex(writableIndex.get() - 1)],
true, strBuff);
+ } else {
+ getValue(strBuff);
}
- getValue(strBuff);
+ }
+
+ /**
+ * Set manually the statistic status.
+ *
+ * @param enableStats enable or disable the statistic.
+ */
+ public static synchronized void setStatsStatus(boolean enableStats) {
+ WebCallStatsHolder.isManualClosed = !enableStats;
+ }
+
+ /**
+ * Query whether the statistic is closed.
+ *
+ * @return the statistic status
+ */
+ public static boolean isStatsClosed() {
+ return WebCallStatsHolder.isManualClosed;
}
// metric set operate APIs end
// metric item operate APIs begin
public static void addMethodCall(String method, long callDlt) {
+ if (isManualClosed) {
+ return;
+ }
method = (method == null) ? "NULL" : method;
WebCallStatsItemSet webCallStatsSet = switchableSets[getIndex()];
webCallStatsSet.totalCallStats.update(callDlt);
@@ -105,11 +114,25 @@ public class WebCallStatsHolder {
// metric set operate APIs end
// private functions
+ private static boolean switchWritingStatsUnit() {
+ long curSnapshotTime = lstSnapshotTime.get();
+ // Avoid frequent snapshots
+ if ((System.currentTimeMillis() - curSnapshotTime)
+ >= TServerConstants.MIN_SNAPSHOT_PERIOD_MS) {
+ if (lstSnapshotTime.compareAndSet(curSnapshotTime,
System.currentTimeMillis())) {
+
switchableSets[getIndex(writableIndex.incrementAndGet())].resetSinceTime();
+ return true;
+ }
+ }
+ return false;
+ }
+
private static void getStatsValue(WebCallStatsItemSet statsSet,
boolean resetValue,
Map<String, Long> statsMap) {
statsMap.put(statsSet.lstResetTime.getFullName(),
statsSet.lstResetTime.getSinceTime());
+ statsMap.put("isClosed", (isManualClosed ? 1L : 0L));
if (resetValue) {
statsSet.totalCallStats.snapShort(statsMap, false);
for (SimpleHistogram itemStats : statsSet.methodStatsMap.values())
{
@@ -128,7 +151,7 @@ public class WebCallStatsHolder {
StringBuilder strBuff) {
strBuff.append("{\"").append(statsSet.lstResetTime.getFullName())
.append("\":\"").append(statsSet.lstResetTime.getStrSinceTime())
- .append("\",");
+ .append("\",\"isClosed\":").append(isManualClosed).append(",");
int totalcnt = 0;
if (resetValue) {
statsSet.totalCallStats.snapShort(strBuff, false);
diff --git
a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStoreTest.java
b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStoreTest.java
index 207e3d7..ca93202 100644
---
a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStoreTest.java
+++
b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStoreTest.java
@@ -18,7 +18,7 @@
package org.apache.inlong.tubemq.server.broker.msgstore.mem;
import java.nio.ByteBuffer;
-import org.apache.inlong.tubemq.server.broker.stats.MemStoreStatsHolder;
+import org.apache.inlong.tubemq.server.broker.stats.MsgStoreStatsHolder;
import org.apache.inlong.tubemq.server.common.utils.AppendResult;
import org.junit.Test;
@@ -33,7 +33,7 @@ public class MsgMemStoreTest {
int maxMsgCount = 10000;
MsgMemStore msgMemStore = new MsgMemStore(maxCacheSize, maxMsgCount,
null);
- MemStoreStatsHolder memStatsHolder = new MemStoreStatsHolder();
+ MsgStoreStatsHolder memStatsHolder = new MsgStoreStatsHolder();
ByteBuffer bf = ByteBuffer.allocate(1024);
bf.put("abc".getBytes());
AppendResult appendResult = new AppendResult();
@@ -47,7 +47,7 @@ public class MsgMemStoreTest {
int maxCacheSize = 2 * 1024 * 1024;
int maxMsgCount = 10000;
MsgMemStore msgMemStore = new MsgMemStore(maxCacheSize, maxMsgCount,
null);
- MemStoreStatsHolder memStatsHolder = new MemStoreStatsHolder();
+ MsgStoreStatsHolder memStatsHolder = new MsgStoreStatsHolder();
ByteBuffer bf = ByteBuffer.allocate(1024);
bf.put("abc".getBytes());
AppendResult appendResult = new AppendResult();
diff --git
a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/stats/FileStoreStatsHolderTest.java
b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/stats/FileStoreStatsHolderTest.java
deleted file mode 100644
index 0449d1b..0000000
---
a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/stats/FileStoreStatsHolderTest.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.tubemq.server.broker.stats;
-
-import java.util.LinkedHashMap;
-import java.util.Map;
-import org.apache.inlong.tubemq.corebase.TBaseConstants;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * FileStoreStatsHolder test.
- */
-public class FileStoreStatsHolderTest {
-
- @Test
- public void testFileStoreStatsHolder() {
- FileStoreStatsHolder fileStatsHolder = new FileStoreStatsHolder();
- // case 1, not started
- fileStatsHolder.addFileFlushStatsInfo(2, 30, 500,
- 0, 0, true, true,
- true, true, true, true);
- fileStatsHolder.addTimeoutFlush(1, 500, false);
- Map<String, Long> retMap = new LinkedHashMap<>();
- fileStatsHolder.getValue(retMap);
- Assert.assertNotNull(retMap.get("reset_time"));
- Assert.assertEquals(0, retMap.get("total_msg_cnt").longValue());
- Assert.assertEquals(0, retMap.get("total_data_size").longValue());
- Assert.assertEquals(0, retMap.get("total_index_size").longValue());
- Assert.assertEquals(0,
retMap.get("flushed_data_size_count").longValue());
- Assert.assertEquals(Long.MIN_VALUE,
retMap.get("flushed_data_size_max").longValue());
- Assert.assertEquals(Long.MAX_VALUE,
retMap.get("flushed_data_size_min").longValue());
- Assert.assertEquals(0,
retMap.get("flushed_msg_cnt_count").longValue());
- Assert.assertEquals(Long.MIN_VALUE,
retMap.get("flushed_msg_cnt_max").longValue());
- Assert.assertEquals(Long.MAX_VALUE,
retMap.get("flushed_msg_cnt_min").longValue());
- Assert.assertEquals(0, retMap.get("data_seg_cnt").longValue());
- Assert.assertEquals(0, retMap.get("index_seg_cnt").longValue());
- Assert.assertEquals(0, retMap.get("data_size_full").longValue());
- Assert.assertEquals(0, retMap.get("meta_flush_cnt").longValue());
- Assert.assertEquals(0, retMap.get("msg_count_full").longValue());
- Assert.assertEquals(0, retMap.get("cache_time_full").longValue());
- Assert.assertNotNull(retMap.get("end_time"));
- retMap.clear();
- // get content by StringBuilder
- StringBuilder strBuff = new
StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE);
- fileStatsHolder.getValue(strBuff);
- // System.out.println(strBuff.toString());
- strBuff.delete(0, strBuff.length());
- // test timeout
- fileStatsHolder.addTimeoutFlush(1, 1, true);
- fileStatsHolder.getValue(retMap);
- Assert.assertNotNull(retMap.get("reset_time"));
- Assert.assertEquals(0, retMap.get("total_msg_cnt").longValue());
- Assert.assertEquals(0, retMap.get("total_data_size").longValue());
- Assert.assertEquals(0, retMap.get("total_index_size").longValue());
- Assert.assertEquals(1,
retMap.get("flushed_data_size_count").longValue());
- Assert.assertEquals(1,
retMap.get("flushed_data_size_max").longValue());
- Assert.assertEquals(1,
retMap.get("flushed_data_size_min").longValue());
- Assert.assertEquals(1,
retMap.get("flushed_msg_cnt_count").longValue());
- Assert.assertEquals(1, retMap.get("flushed_msg_cnt_max").longValue());
- Assert.assertEquals(1, retMap.get("flushed_msg_cnt_min").longValue());
- Assert.assertEquals(0, retMap.get("data_seg_cnt").longValue());
- Assert.assertEquals(0, retMap.get("index_seg_cnt").longValue());
- Assert.assertEquals(1, retMap.get("meta_flush_cnt").longValue());
- Assert.assertEquals(0, retMap.get("data_size_full").longValue());
- Assert.assertEquals(0, retMap.get("msg_count_full").longValue());
- Assert.assertEquals(1, retMap.get("cache_time_full").longValue());
- Assert.assertNotNull(retMap.get("end_time"));
- retMap.clear();
- // get value when started
- fileStatsHolder.addFileFlushStatsInfo(1, 1, 1,
- 1, 1, true, false,
- false, false, false, false);
- fileStatsHolder.addFileFlushStatsInfo(6, 6, 6,
- 6, 6, false, false,
- false, false, false, true);
- fileStatsHolder.addFileFlushStatsInfo(2, 2, 2,
- 2, 2, false, true,
- false, false, false, false);
- fileStatsHolder.addFileFlushStatsInfo(5, 5, 5,
- 5, 5, false, false,
- false, false, true, false);
- fileStatsHolder.addFileFlushStatsInfo(4, 4, 4,
- 4, 4, false, false,
- false, true, false, false);
- fileStatsHolder.addFileFlushStatsInfo(3, 3, 3,
- 3, 3, false, false,
- true, false, false, false);
- fileStatsHolder.snapShort(retMap);
- Assert.assertNotNull(retMap.get("reset_time"));
- Assert.assertEquals(21, retMap.get("total_msg_cnt").longValue());
- Assert.assertEquals(21, retMap.get("total_data_size").longValue());
- Assert.assertEquals(21, retMap.get("total_index_size").longValue());
- Assert.assertEquals(7,
retMap.get("flushed_data_size_count").longValue());
- Assert.assertEquals(6,
retMap.get("flushed_data_size_max").longValue());
- Assert.assertEquals(1,
retMap.get("flushed_data_size_min").longValue());
- Assert.assertEquals(7,
retMap.get("flushed_msg_cnt_count").longValue());
- Assert.assertEquals(6, retMap.get("flushed_msg_cnt_max").longValue());
- Assert.assertEquals(1, retMap.get("flushed_msg_cnt_min").longValue());
- Assert.assertEquals(1, retMap.get("data_seg_cnt").longValue());
- Assert.assertEquals(1, retMap.get("index_seg_cnt").longValue());
- Assert.assertEquals(1, retMap.get("data_size_full").longValue());
- Assert.assertEquals(2, retMap.get("meta_flush_cnt").longValue());
- Assert.assertEquals(1, retMap.get("msg_count_full").longValue());
- Assert.assertEquals(2, retMap.get("cache_time_full").longValue());
- Assert.assertNotNull(retMap.get("end_time"));
- retMap.clear();
- fileStatsHolder.getAllFileStatsInfo(true, strBuff);
- // System.out.println(strBuff.toString());
- strBuff.delete(0, strBuff.length());
-
- }
-}
diff --git
a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/stats/MemStoreStatsHolderTest.java
b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/stats/MemStoreStatsHolderTest.java
deleted file mode 100644
index ad06f7a..0000000
---
a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/stats/MemStoreStatsHolderTest.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.tubemq.server.broker.stats;
-
-import java.util.LinkedHashMap;
-import java.util.Map;
-import org.apache.inlong.tubemq.corebase.TBaseConstants;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * MemStoreStatsHolder test.
- */
-public class MemStoreStatsHolderTest {
-
- @Test
- public void testMemStoreStatsHolder() {
- MemStoreStatsHolder memStatsHolder = new MemStoreStatsHolder();
- // case 1, not started
- memStatsHolder.addAppendedMsgSize(50);
- memStatsHolder.addCacheFullType(true, false, false);
- memStatsHolder.addCacheFullType(false, true, false);
- memStatsHolder.addCacheFullType(false, false, true);
- memStatsHolder.addFlushTime(50, false);
- memStatsHolder.addFlushTime(10, true);
- memStatsHolder.addMsgWriteFail();
- Map<String, Long> retMap = new LinkedHashMap<>();
- memStatsHolder.getValue(retMap);
- Assert.assertNotNull(retMap.get("reset_time"));
- Assert.assertEquals(0, retMap.get("msg_in_count").longValue());
- Assert.assertEquals(Long.MIN_VALUE,
retMap.get("msg_in_max").longValue());
- Assert.assertEquals(Long.MAX_VALUE,
retMap.get("msg_in_min").longValue());
- Assert.assertEquals(0, retMap.get("msg_append_fail").longValue());
- Assert.assertEquals(0, retMap.get("data_size_full").longValue());
- Assert.assertEquals(0, retMap.get("index_size_full").longValue());
- Assert.assertEquals(0, retMap.get("msg_count_full").longValue());
- Assert.assertEquals(0, retMap.get("cache_time_full").longValue());
- Assert.assertEquals(0, retMap.get("flush_pending").longValue());
- Assert.assertEquals(0, retMap.get("cache_realloc").longValue());
- Assert.assertEquals(0,
retMap.get("cache_flush_dlt_count").longValue());
- Assert.assertEquals(Long.MIN_VALUE,
retMap.get("cache_flush_dlt_max").longValue());
- Assert.assertEquals(Long.MAX_VALUE,
retMap.get("cache_flush_dlt_min").longValue());
- Assert.assertNotNull(retMap.get("end_time"));
- retMap.clear();
- // get content by StringBuilder
- StringBuilder strBuff = new
StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE);
- memStatsHolder.getValue(strBuff);
- // System.out.println(strBuff.toString());
- strBuff.delete(0, strBuff.length());
- memStatsHolder.getAllMemStatsInfo(true, strBuff);
- // System.out.println("getAllMemStatsInfo : " + strBuff);
- strBuff.delete(0, strBuff.length());
- // case 2 started
- memStatsHolder.addAppendedMsgSize(50);
- memStatsHolder.addAppendedMsgSize(500);
- memStatsHolder.addAppendedMsgSize(5);
- memStatsHolder.addCacheFullType(true, false, false);
- memStatsHolder.addCacheFullType(false, true, false);
- memStatsHolder.addCacheFullType(false, false, true);
- memStatsHolder.addFlushTime(50, false);
- memStatsHolder.addFlushTime(10, true);
- memStatsHolder.addFlushTime(100, true);
- memStatsHolder.addFlushTime(1, false);
- memStatsHolder.addMsgWriteFail();
- memStatsHolder.addMsgWriteFail();
- memStatsHolder.addCacheReAlloc();
- memStatsHolder.addCacheReAlloc();
- memStatsHolder.addCachePending();
- memStatsHolder.addCachePending();
- memStatsHolder.addCachePending();
- memStatsHolder.getValue(retMap);
- Assert.assertNotNull(retMap.get("reset_time"));
- Assert.assertEquals(3, retMap.get("msg_in_count").longValue());
- Assert.assertEquals(500, retMap.get("msg_in_max").longValue());
- Assert.assertEquals(5, retMap.get("msg_in_min").longValue());
- Assert.assertEquals(2, retMap.get("msg_append_fail").longValue());
- Assert.assertEquals(1, retMap.get("data_size_full").longValue());
- Assert.assertEquals(1, retMap.get("index_size_full").longValue());
- Assert.assertEquals(1, retMap.get("msg_count_full").longValue());
- Assert.assertEquals(2, retMap.get("cache_time_full").longValue());
- Assert.assertEquals(3, retMap.get("flush_pending").longValue());
- Assert.assertEquals(2, retMap.get("cache_realloc").longValue());
- Assert.assertEquals(4,
retMap.get("cache_flush_dlt_count").longValue());
- Assert.assertEquals(100,
retMap.get("cache_flush_dlt_max").longValue());
- Assert.assertEquals(1, retMap.get("cache_flush_dlt_min").longValue());
- Assert.assertEquals(1,
retMap.get("cache_flush_dlt_cell_0t2").longValue());
- Assert.assertEquals(1,
retMap.get("cache_flush_dlt_cell_8t16").longValue());
- Assert.assertEquals(1,
retMap.get("cache_flush_dlt_cell_32t64").longValue());
- Assert.assertEquals(1,
retMap.get("cache_flush_dlt_cell_64t128").longValue());
- Assert.assertNotNull(retMap.get("end_time"));
- memStatsHolder.getAllMemStatsInfo(false, strBuff);
- // System.out.println("\n the second is : " + strBuff.toString());
- strBuff.delete(0, strBuff.length());
-
- }
-}
diff --git
a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/stats/MsgStoreStatsHolderTest.java
b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/stats/MsgStoreStatsHolderTest.java
new file mode 100644
index 0000000..3d41289
--- /dev/null
+++
b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/broker/stats/MsgStoreStatsHolderTest.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.server.broker.stats;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import org.apache.inlong.tubemq.corebase.TBaseConstants;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * MsgStoreStatsHolder test.
+ */
+public class MsgStoreStatsHolderTest {
+
+ @Test
+ public void testMemPartStats() {
+ MsgStoreStatsHolder msgStoreStatsHolder = new MsgStoreStatsHolder();
+ // case 1, not started
+ msgStoreStatsHolder.addCacheMsgSize(50);
+ msgStoreStatsHolder.addCacheFullType(true, false, false);
+ msgStoreStatsHolder.addCacheFullType(false, true, false);
+ msgStoreStatsHolder.addCacheFullType(false, false, true);
+ msgStoreStatsHolder.addCacheFlushTime(50, false);
+ msgStoreStatsHolder.addCacheFlushTime(10, true);
+ msgStoreStatsHolder.addMsgWriteCacheFail();
+ Map<String, Long> retMap = new LinkedHashMap<>();
+ msgStoreStatsHolder.getValue(retMap);
+ Assert.assertNotNull(retMap.get("reset_time"));
+ Assert.assertEquals(0, retMap.get("cache_msg_in_count").longValue());
+ Assert.assertEquals(Long.MIN_VALUE,
retMap.get("cache_msg_in_max").longValue());
+ Assert.assertEquals(Long.MAX_VALUE,
retMap.get("cache_msg_in_min").longValue());
+ Assert.assertEquals(0, retMap.get("cache_append_fail").longValue());
+ Assert.assertEquals(0, retMap.get("cache_data_full").longValue());
+ Assert.assertEquals(0, retMap.get("cache_index_full").longValue());
+ Assert.assertEquals(0, retMap.get("cache_count_full").longValue());
+ Assert.assertEquals(0, retMap.get("cache_time_full").longValue());
+ Assert.assertEquals(0, retMap.get("cache_flush_pending").longValue());
+ Assert.assertEquals(0, retMap.get("cache_realloc").longValue());
+ Assert.assertEquals(0,
retMap.get("cache_flush_dlt_count").longValue());
+ Assert.assertEquals(Long.MIN_VALUE,
retMap.get("cache_flush_dlt_max").longValue());
+ Assert.assertEquals(Long.MAX_VALUE,
retMap.get("cache_flush_dlt_min").longValue());
+ Assert.assertNotNull(retMap.get("end_time"));
+ retMap.clear();
+ // get content by StringBuilder
+ StringBuilder strBuff = new
StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE);
+ msgStoreStatsHolder.getValue(strBuff);
+ // System.out.println(strBuff.toString());
+ strBuff.delete(0, strBuff.length());
+ msgStoreStatsHolder.getMsgStoreStatsInfo(true, strBuff);
+ // System.out.println("getAllMemStatsInfo : " + strBuff);
+ strBuff.delete(0, strBuff.length());
+ // case 2 started
+ msgStoreStatsHolder.addCacheMsgSize(50);
+ msgStoreStatsHolder.addCacheMsgSize(500);
+ msgStoreStatsHolder.addCacheMsgSize(5);
+ msgStoreStatsHolder.addCacheFullType(true, false, false);
+ msgStoreStatsHolder.addCacheFullType(false, true, false);
+ msgStoreStatsHolder.addCacheFullType(false, false, true);
+ msgStoreStatsHolder.addCacheFlushTime(50, false);
+ msgStoreStatsHolder.addCacheFlushTime(10, true);
+ msgStoreStatsHolder.addCacheFlushTime(100, true);
+ msgStoreStatsHolder.addCacheFlushTime(1, false);
+ msgStoreStatsHolder.addMsgWriteCacheFail();
+ msgStoreStatsHolder.addMsgWriteCacheFail();
+ msgStoreStatsHolder.addCacheReAlloc();
+ msgStoreStatsHolder.addCacheReAlloc();
+ msgStoreStatsHolder.addCachePending();
+ msgStoreStatsHolder.addCachePending();
+ msgStoreStatsHolder.addCachePending();
+ msgStoreStatsHolder.getValue(retMap);
+ Assert.assertNotNull(retMap.get("reset_time"));
+ Assert.assertEquals(3, retMap.get("cache_msg_in_count").longValue());
+ Assert.assertEquals(500, retMap.get("cache_msg_in_max").longValue());
+ Assert.assertEquals(5, retMap.get("cache_msg_in_min").longValue());
+ Assert.assertEquals(2, retMap.get("cache_append_fail").longValue());
+ Assert.assertEquals(1, retMap.get("cache_data_full").longValue());
+ Assert.assertEquals(1, retMap.get("cache_index_full").longValue());
+ Assert.assertEquals(1, retMap.get("cache_count_full").longValue());
+ Assert.assertEquals(2, retMap.get("cache_time_full").longValue());
+ Assert.assertEquals(3, retMap.get("cache_flush_pending").longValue());
+ Assert.assertEquals(2, retMap.get("cache_realloc").longValue());
+ Assert.assertEquals(4,
retMap.get("cache_flush_dlt_count").longValue());
+ Assert.assertEquals(100,
retMap.get("cache_flush_dlt_max").longValue());
+ Assert.assertEquals(1, retMap.get("cache_flush_dlt_min").longValue());
+ Assert.assertEquals(1,
retMap.get("cache_flush_dlt_cell_0t2").longValue());
+ Assert.assertEquals(1,
retMap.get("cache_flush_dlt_cell_8t16").longValue());
+ Assert.assertEquals(1,
retMap.get("cache_flush_dlt_cell_32t64").longValue());
+ Assert.assertEquals(1,
retMap.get("cache_flush_dlt_cell_64t128").longValue());
+ Assert.assertNotNull(retMap.get("end_time"));
+ msgStoreStatsHolder.getMsgStoreStatsInfo(false, strBuff);
+ // System.out.println("\n the second is : " + strBuff.toString());
+ strBuff.delete(0, strBuff.length());
+ }
+
+ @Test
+ public void testFilePartStats() {
+ MsgStoreStatsHolder msgStoreStatsHolder = new MsgStoreStatsHolder();
+ // case 1, not started
+ msgStoreStatsHolder.addFileFlushStatsInfo(2, 30, 500,
+ 0, 0, true, true,
+ true, true, true, true);
+ msgStoreStatsHolder.addFileTimeoutFlushStats(1, 500, false);
+ Map<String, Long> retMap = new LinkedHashMap<>();
+ msgStoreStatsHolder.getValue(retMap);
+ Assert.assertNotNull(retMap.get("reset_time"));
+ Assert.assertEquals(0, retMap.get("file_total_msg_cnt").longValue());
+ Assert.assertEquals(0, retMap.get("file_total_data_size").longValue());
+ Assert.assertEquals(0,
retMap.get("file_total_index_size").longValue());
+ Assert.assertEquals(0,
retMap.get("file_flushed_data_count").longValue());
+ Assert.assertEquals(Long.MIN_VALUE,
retMap.get("file_flushed_data_max").longValue());
+ Assert.assertEquals(Long.MAX_VALUE,
retMap.get("file_flushed_data_min").longValue());
+ Assert.assertEquals(0,
retMap.get("file_flushed_msg_count").longValue());
+ Assert.assertEquals(Long.MIN_VALUE,
retMap.get("file_flushed_msg_max").longValue());
+ Assert.assertEquals(Long.MAX_VALUE,
retMap.get("file_flushed_msg_min").longValue());
+ Assert.assertEquals(0, retMap.get("file_index_seg").longValue());
+ Assert.assertEquals(0, retMap.get("file_meta_flush").longValue());
+ Assert.assertEquals(0, retMap.get("file_data_full").longValue());
+ Assert.assertEquals(0, retMap.get("file_count_full").longValue());
+ Assert.assertEquals(0, retMap.get("file_time_full").longValue());
+ Assert.assertNotNull(retMap.get("end_time"));
+ retMap.clear();
+ // get content by StringBuilder
+ StringBuilder strBuff = new
StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE);
+ msgStoreStatsHolder.getValue(strBuff);
+ // System.out.println(strBuff.toString());
+ strBuff.delete(0, strBuff.length());
+ // test timeout
+ msgStoreStatsHolder.addFileTimeoutFlushStats(1, 1, true);
+ msgStoreStatsHolder.getValue(retMap);
+ Assert.assertNotNull(retMap.get("reset_time"));
+ Assert.assertEquals(0, retMap.get("file_total_msg_cnt").longValue());
+ Assert.assertEquals(0, retMap.get("file_total_data_size").longValue());
+ Assert.assertEquals(0,
retMap.get("file_total_index_size").longValue());
+ Assert.assertEquals(1,
retMap.get("file_flushed_data_count").longValue());
+ Assert.assertEquals(1,
retMap.get("file_flushed_data_max").longValue());
+ Assert.assertEquals(1,
retMap.get("file_flushed_data_min").longValue());
+ Assert.assertEquals(1,
retMap.get("file_flushed_msg_count").longValue());
+ Assert.assertEquals(1, retMap.get("file_flushed_msg_max").longValue());
+ Assert.assertEquals(1, retMap.get("file_flushed_msg_min").longValue());
+ Assert.assertEquals(0, retMap.get("file_data_seg").longValue());
+ Assert.assertEquals(0, retMap.get("file_index_seg").longValue());
+ Assert.assertEquals(1, retMap.get("file_meta_flush").longValue());
+ Assert.assertEquals(0, retMap.get("file_data_full").longValue());
+ Assert.assertEquals(0, retMap.get("file_count_full").longValue());
+ Assert.assertEquals(1, retMap.get("file_time_full").longValue());
+ Assert.assertNotNull(retMap.get("end_time"));
+ retMap.clear();
+ // get value when started
+ msgStoreStatsHolder.addFileFlushStatsInfo(1, 1, 1,
+ 1, 1, true, false,
+ false, false, false, false);
+ msgStoreStatsHolder.addFileFlushStatsInfo(6, 6, 6,
+ 6, 6, false, false,
+ false, false, false, true);
+ msgStoreStatsHolder.addFileFlushStatsInfo(2, 2, 2,
+ 2, 2, false, true,
+ false, false, false, false);
+ msgStoreStatsHolder.addFileFlushStatsInfo(5, 5, 5,
+ 5, 5, false, false,
+ false, false, true, false);
+ msgStoreStatsHolder.addFileFlushStatsInfo(4, 4, 4,
+ 4, 4, false, false,
+ false, true, false, false);
+ msgStoreStatsHolder.addFileFlushStatsInfo(3, 3, 3,
+ 3, 3, false, false,
+ true, false, false, false);
+ msgStoreStatsHolder.snapShort(retMap);
+ Assert.assertNotNull(retMap.get("reset_time"));
+ Assert.assertEquals(21, retMap.get("file_total_msg_cnt").longValue());
+ Assert.assertEquals(21,
retMap.get("file_total_data_size").longValue());
+ Assert.assertEquals(21,
retMap.get("file_total_index_size").longValue());
+ Assert.assertEquals(7,
retMap.get("file_flushed_data_count").longValue());
+ Assert.assertEquals(6,
retMap.get("file_flushed_data_max").longValue());
+ Assert.assertEquals(1,
retMap.get("file_flushed_data_min").longValue());
+ Assert.assertEquals(7,
retMap.get("file_flushed_msg_count").longValue());
+ Assert.assertEquals(6, retMap.get("file_flushed_msg_max").longValue());
+ Assert.assertEquals(1, retMap.get("file_flushed_msg_min").longValue());
+ Assert.assertEquals(1, retMap.get("file_data_seg").longValue());
+ Assert.assertEquals(1, retMap.get("file_index_seg").longValue());
+ Assert.assertEquals(1, retMap.get("file_data_full").longValue());
+ Assert.assertEquals(2, retMap.get("file_meta_flush").longValue());
+ Assert.assertEquals(1, retMap.get("file_count_full").longValue());
+ Assert.assertEquals(2, retMap.get("file_time_full").longValue());
+ Assert.assertNotNull(retMap.get("end_time"));
+ retMap.clear();
+ msgStoreStatsHolder.getMsgStoreStatsInfo(true, strBuff);
+ // System.out.println(strBuff.toString());
+ strBuff.delete(0, strBuff.length());
+ }
+}