This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new aeeb705f2 [INLONG-6997][TubeMQ] Add sendMessage and getMessage latency
statistics (#7013)
aeeb705f2 is described below
commit aeeb705f2e1b5d8cc60822e9a6d6813e5ca059a3
Author: Goson Zhang <[email protected]>
AuthorDate: Thu Dec 22 10:06:13 2022 +0800
[INLONG-6997][TubeMQ] Add sendMessage and getMessage latency statistics
(#7013)
---
.../tubemq/client/common/ClientStatsInfo.java | 2 +-
.../tubemq/corebase/metric/impl/ESTHistogram.java | 259 ++++++++++++++-------
.../tubemq/corebase/metric/HistogramTest.java | 8 +-
.../tubemq/server/broker/BrokerServiceServer.java | 14 +-
.../server/broker/stats/BrokerSrvStatsHolder.java | 120 +++++++---
.../server/broker/web/BrokerAdminServlet.java | 2 +-
6 files changed, 287 insertions(+), 118 deletions(-)
diff --git
a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/ClientStatsInfo.java
b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/ClientStatsInfo.java
index e6a942bab..c721dec6c 100644
---
a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/ClientStatsInfo.java
+++
b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/ClientStatsInfo.java
@@ -241,7 +241,7 @@ public class ClientStatsInfo {
strBuff.append(",");
statsSet.csmLatencyStats.snapShort(strBuff, false);
strBuff.append(",");
- statsSet.confirmDltStats.getValue(strBuff, false);
+ statsSet.confirmDltStats.snapShort(strBuff, false);
}
} else {
statsSet.msgCallDltStats.getValue(strBuff, false);
diff --git
a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/impl/ESTHistogram.java
b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/impl/ESTHistogram.java
index c0839af94..06a4135f8 100644
---
a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/impl/ESTHistogram.java
+++
b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/impl/ESTHistogram.java
@@ -41,6 +41,12 @@ public class ESTHistogram extends BaseMetric implements
Histogram {
// Natural logarithm of 2
private static final double LOG2_VALUE = Math.log(2);
// Simple statistic items
+ private final String p99FullKey;
+ private final String p99ShortKey = "P99";
+ private final String p999FullKey;
+ private final String p999ShortKey = "P999";
+ private final String p9999FullKey;
+ private final String p9999ShortKey = "P9999";
private final LongStatsCounter count;
private final LongMinGauge min;
private final LongMaxGauge max;
@@ -55,6 +61,9 @@ public class ESTHistogram extends BaseMetric implements
Histogram {
*/
public ESTHistogram(String metricName, String prefix) {
super(metricName, prefix);
+ this.p99FullKey = getFullName() + "_" + p99ShortKey;
+ this.p999FullKey = getFullName() + "_" + p999ShortKey;
+ this.p9999FullKey = getFullName() + "_" + p9999ShortKey;
this.count = new LongStatsCounter("count", getFullName());
this.min = new LongMinGauge("min", getFullName());
this.max = new LongMaxGauge("max", getFullName());
@@ -92,117 +101,199 @@ public class ESTHistogram extends BaseMetric implements
Histogram {
@Override
public void getValue(Map<String, Long> keyValMap, boolean includeZero) {
- keyValMap.put(this.count.getFullName(), this.count.getValue());
- keyValMap.put(this.min.getFullName(), this.min.getValue());
- keyValMap.put(this.max.getFullName(), this.max.getValue());
- if (includeZero) {
- for (int i = 0; i < NUM_BUCKETS; i++) {
- keyValMap.put(this.buckets[i].getFullName(),
this.buckets[i].getValue());
- }
- } else {
- long tmpValue;
- for (int i = 0; i < NUM_BUCKETS; i++) {
- tmpValue = this.buckets[i].getValue();
- if (tmpValue > 0) {
- keyValMap.put(this.buckets[i].getFullName(), tmpValue);
- }
- }
- }
+ getValue2Map(keyValMap, false, includeZero);
}
@Override
public void getValue(StringBuilder strBuff, boolean includeZero) {
- strBuff.append("\"").append(getFullName()).append("\":")
- .append("{\"").append(this.count.getShortName()).append("\":")
- .append(this.count.getValue()).append(",\"")
- .append(this.min.getShortName()).append("\":")
- .append(this.min.getValue()).append(",\"")
- .append(this.max.getShortName()).append("\":")
- .append(this.max.getValue()).append(",\"cells\":{");
- int count = 0;
- if (includeZero) {
- for (int i = 0; i < NUM_BUCKETS; i++) {
- if (count++ > 0) {
- strBuff.append(",");
- }
-
strBuff.append("\"").append(this.buckets[i].getShortName()).append("\":")
- .append(this.buckets[i].getValue());
- }
- } else {
- long tmpValue;
- for (int i = 0; i < NUM_BUCKETS; i++) {
- tmpValue = this.buckets[i].getValue();
- if (tmpValue > 0) {
- if (count++ > 0) {
- strBuff.append(",");
- }
-
strBuff.append("\"").append(this.buckets[i].getShortName()).append("\":").append(tmpValue);
- }
- }
- }
- strBuff.append("}}");
+ getValue2StrBuff(strBuff, false, includeZero);
}
@Override
public void snapShort(Map<String, Long> keyValMap, boolean includeZero) {
- keyValMap.put(this.count.getFullName(), this.count.getAndResetValue());
- keyValMap.put(this.min.getFullName(), this.min.getAndResetValue());
- keyValMap.put(this.max.getFullName(), this.max.getAndResetValue());
- if (includeZero) {
- for (int i = 0; i < NUM_BUCKETS; i++) {
- keyValMap.put(this.buckets[i].getFullName(),
this.buckets[i].getAndResetValue());
- }
+ getValue2Map(keyValMap, true, includeZero);
+ }
+
+ @Override
+ public void snapShort(StringBuilder strBuff, boolean includeZero) {
+ getValue2StrBuff(strBuff, true, includeZero);
+ }
+
+ @Override
+ public void clear() {
+ this.count.clear();
+ this.min.clear();
+ this.max.clear();
+ for (int i = 0; i < NUM_BUCKETS; i++) {
+ this.buckets[i].clear();
+ }
+ }
+
+ private void getValue2Map(Map<String, Long> keyValMap,
+ boolean snapShot, boolean includeZero) {
+ long p99Cnt = 0L;
+ long p999Cnt = 0L;
+ long p9999Cnt = 0L;
+ int maxValIndex = -1;
+ int match99Index = 0;
+ int match999Index = 0;
+ int match9999Index = 0;
+ long curCnt;
+ long maxValue;
+ long minValue;
+ if (snapShot) {
+ curCnt = this.count.getAndResetValue();
+ maxValue = this.max.getAndResetValue();
+ minValue = this.min.getAndResetValue();
} else {
- long tmpValue;
- for (int i = 0; i < NUM_BUCKETS; i++) {
- tmpValue = this.buckets[i].getAndResetValue();
- if (tmpValue > 0) {
- keyValMap.put(this.buckets[i].getFullName(), tmpValue);
+ curCnt = this.count.getValue();
+ maxValue = this.max.getValue();
+ minValue = this.min.getValue();
+ }
+ if (curCnt > 0) {
+ p99Cnt = (long) (curCnt * 0.99);
+ p999Cnt = (long) (curCnt * 0.999);
+ p9999Cnt = (long) (curCnt * 0.9999);
+ }
+ // put key and value
+ keyValMap.put(this.count.getFullName(), curCnt);
+ keyValMap.put(this.min.getFullName(), minValue);
+ keyValMap.put(this.max.getFullName(), maxValue);
+ long bucketItemVal;
+ for (int i = 0; i < NUM_BUCKETS; i++) {
+ bucketItemVal = snapShot
+ ? this.buckets[i].getAndResetValue()
+ : this.buckets[i].getValue();
+ if (bucketItemVal == 0) {
+ if (includeZero) {
+ keyValMap.put(this.buckets[i].getFullName(),
bucketItemVal);
+ }
+ } else {
+ keyValMap.put(this.buckets[i].getFullName(), bucketItemVal);
+ maxValIndex = i;
+ // calculate p99 value
+ if (p99Cnt > 0) {
+ p99Cnt -= bucketItemVal;
+ if (p99Cnt <= 0) {
+ match99Index = i;
+ }
+ }
+ // calculate p999 value
+ if (p999Cnt > 0) {
+ p999Cnt -= bucketItemVal;
+ if (p999Cnt <= 0) {
+ match999Index = i;
+ }
+ }
+ // calculate p9999 value
+ if (p9999Cnt > 0) {
+ p9999Cnt -= bucketItemVal;
+ if (p9999Cnt <= 0) {
+ match9999Index = i;
+ }
}
}
}
+ keyValMap.put(p99FullKey,
+ getPxValue(match99Index, maxValIndex, maxValue));
+ keyValMap.put(p999FullKey,
+ getPxValue(match999Index, maxValIndex, maxValue));
+ keyValMap.put(p9999FullKey,
+ getPxValue(match9999Index, maxValIndex, maxValue));
}
- @Override
- public void snapShort(StringBuilder strBuff, boolean includeZero) {
+ private void getValue2StrBuff(StringBuilder strBuff,
+ boolean snapShot, boolean includeZero) {
+ long p99Cnt = 0L;
+ long p999Cnt = 0L;
+ long p9999Cnt = 0L;
+ int maxValIndex = -1;
+ int match99Index = 0;
+ int match999Index = 0;
+ int match9999Index = 0;
+ long curCnt;
+ long maxValue;
+ long minValue;
+ if (snapShot) {
+ curCnt = this.count.getAndResetValue();
+ maxValue = this.max.getAndResetValue();
+ minValue = this.min.getAndResetValue();
+ } else {
+ curCnt = this.count.getValue();
+ maxValue = this.max.getValue();
+ minValue = this.min.getValue();
+ }
+ if (curCnt > 0) {
+ p99Cnt = (long) (curCnt * 0.99);
+ p999Cnt = (long) (curCnt * 0.999);
+ p9999Cnt = (long) (curCnt * 0.9999);
+ }
+ // put key and value
strBuff.append("\"").append(getFullName()).append("\":")
- .append("{\"").append(this.count.getShortName()).append("\":")
- .append(this.count.getAndResetValue()).append(",\"")
+ .append("{\"").append(this.count.getShortName())
+ .append("\":").append(curCnt).append(",\"")
.append(this.min.getShortName()).append("\":")
- .append(this.min.getAndResetValue()).append(",\"")
+ .append(minValue).append(",\"")
.append(this.max.getShortName()).append("\":")
- .append(this.max.getAndResetValue()).append(",\"cells\":{");
+ .append(maxValue).append(",\"cells\":{");
int count = 0;
- if (includeZero) {
- for (int i = 0; i < NUM_BUCKETS; i++) {
+ long bucketItemVal;
+ for (int i = 0; i < NUM_BUCKETS; i++) {
+ bucketItemVal = snapShot
+ ? this.buckets[i].getAndResetValue()
+ : this.buckets[i].getValue();
+ if (bucketItemVal == 0) {
+ if (includeZero) {
+ if (count++ > 0) {
+ strBuff.append(",");
+ }
+ strBuff.append("\"").append(this.buckets[i].getShortName())
+ .append("\":").append(bucketItemVal);
+ }
+ } else {
if (count++ > 0) {
strBuff.append(",");
}
-
strBuff.append("\"").append(this.buckets[i].getShortName()).append("\":")
- .append(this.buckets[i].getAndResetValue());
- }
- } else {
- long tmpValue;
- for (int i = 0; i < NUM_BUCKETS; i++) {
- tmpValue = this.buckets[i].getAndResetValue();
- if (tmpValue > 0) {
- if (count++ > 0) {
- strBuff.append(",");
+ strBuff.append("\"").append(this.buckets[i].getShortName())
+ .append("\":").append(bucketItemVal);
+ maxValIndex = i;
+ // calculate p99 value
+ if (p99Cnt > 0) {
+ p99Cnt -= bucketItemVal;
+ if (p99Cnt <= 0) {
+ match99Index = i;
+ }
+ }
+ // calculate p999 value
+ if (p999Cnt > 0) {
+ p999Cnt -= bucketItemVal;
+ if (p999Cnt <= 0) {
+ match999Index = i;
+ }
+ }
+ // calculate p9999 value
+ if (p9999Cnt > 0) {
+ p9999Cnt -= bucketItemVal;
+ if (p9999Cnt <= 0) {
+ match9999Index = i;
}
-
strBuff.append("\"").append(this.buckets[i].getShortName()).append("\":").append(tmpValue);
}
}
}
- strBuff.append("}}");
+ strBuff.append("},\"").append(p99ShortKey).append("\":")
+ .append(getPxValue(match99Index, maxValIndex, maxValue))
+ .append(",\"").append(p999ShortKey).append("\":")
+ .append(getPxValue(match999Index, maxValIndex, maxValue))
+ .append(",\"").append(p9999ShortKey).append("\":")
+ .append(getPxValue(match9999Index, maxValIndex, maxValue))
+ .append("}");
}
- @Override
- public void clear() {
- this.count.clear();
- this.min.clear();
- this.max.clear();
- for (int i = 0; i < NUM_BUCKETS; i++) {
- this.buckets[i].clear();
+ private long getPxValue(int endValIndex, int maxValIndex, long maxValue) {
+ if (endValIndex < maxValIndex) {
+ return (long) Math.pow(2, endValIndex + 1);
+ } else {
+ return (maxValue == Long.MIN_VALUE) ? 0 : maxValue;
}
}
}
diff --git
a/inlong-tubemq/tubemq-core/src/test/java/org/apache/inlong/tubemq/corebase/metric/HistogramTest.java
b/inlong-tubemq/tubemq-core/src/test/java/org/apache/inlong/tubemq/corebase/metric/HistogramTest.java
index a7ff30b45..43af3aca2 100644
---
a/inlong-tubemq/tubemq-core/src/test/java/org/apache/inlong/tubemq/corebase/metric/HistogramTest.java
+++
b/inlong-tubemq/tubemq-core/src/test/java/org/apache/inlong/tubemq/corebase/metric/HistogramTest.java
@@ -93,7 +93,8 @@ public class HistogramTest {
estHistogram.getValue(strBuff, false);
String result1 = "\"disk_dlt\":{\"count\":7,\"min\":-5,\"max\":131100,"
+
"\"cells\":{\"cell_0t2\":1,\"cell_16t32\":1,\"cell_512t1024\":1"
- + ",\"cell_65536t131072\":2,\"cell_131072tMax\":2}}";
+ +
",\"cell_65536t131072\":2,\"cell_131072tMax\":2},\"P99\":131100"
+ + ",\"P999\":131100,\"P9999\":131100}";
Assert.assertEquals(result1, strBuff.toString());
strBuff.delete(0, strBuff.length());
// test for map
@@ -122,8 +123,9 @@ public class HistogramTest {
tmpMap.clear();
// test get value by strBuff
estHistogram.getValue(strBuff, false);
- String result2 =
-
"\"disk_dlt\":{\"count\":2,\"min\":1,\"max\":100,\"cells\":{\"cell_0t2\":1,\"cell_64t128\":1}}";
+ String result2 = "\"disk_dlt\":{\"count\":2,\"min\":1,\"max\":100,"
+ + "\"cells\":{\"cell_0t2\":1,\"cell_64t128\":1},"
+ + "\"P99\":2,\"P999\":2,\"P9999\":2}";
Assert.assertEquals(result2, strBuff.toString());
strBuff.delete(0, strBuff.length());
// test clear()
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java
index e87a70ec0..658277519 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java
@@ -278,6 +278,7 @@ public class BrokerServiceServer implements
BrokerReadService, BrokerWriteServic
public GetMessageResponseB2C getMessagesC2B(GetMessageRequestC2B request,
final String rmtAddress,
boolean overtls) throws Throwable {
+ final long startTime = System.currentTimeMillis();
final GetMessageResponseB2C.Builder builder =
GetMessageResponseB2C.newBuilder();
builder.setSuccess(false);
@@ -379,9 +380,9 @@ public class BrokerServiceServer implements
BrokerReadService, BrokerWriteServic
request.getLastPackConsumed(),
request.getManualCommitOffset(),
clientId, this.tubeConfig.getHostName(),
rmtAddrInfo, isEscFlowCtrl, strBuffer);
if (msgResult.isSuccess) {
- consumerNodeInfo.setLastProcInfo(System.currentTimeMillis(),
- msgResult.lastRdDataOffset,
- msgResult.totalMsgSize);
+ long endTime = System.currentTimeMillis();
+ consumerNodeInfo.setLastProcInfo(endTime,
+ msgResult.lastRdDataOffset, msgResult.totalMsgSize);
getCounterGroup.add(msgResult.tmpCounters);
AuditUtils.addConsumeRecord(msgResult.tmpCounters);
builder.setEscFlowCtrl(false);
@@ -393,6 +394,7 @@ public class BrokerServiceServer implements
BrokerReadService, BrokerWriteServic
builder.setErrMsg("OK!");
builder.addAllMessages(msgResult.transferedMessageList);
builder.setMaxOffset(msgResult.getMaxOffset());
+ BrokerSrvStatsHolder.updGetMsgLatency(endTime - startTime);
return builder.build();
} else {
builder.setErrCode(msgResult.getRetCode());
@@ -506,6 +508,7 @@ public class BrokerServiceServer implements
BrokerReadService, BrokerWriteServic
int msgCount, final Set<String> filterCondSet,
final StringBuilder sb) throws Exception {
MessageStore dataStore = null;
+ final long startTime = System.currentTimeMillis();
if (!this.started.get()
|| ServiceStatusHolder.isReadServiceStop()) {
sb.append("{\"result\":false,\"errCode\":")
@@ -547,6 +550,7 @@ public class BrokerServiceServer implements
BrokerReadService, BrokerWriteServic
}
GetMessageResult getMessageResult =
storeManager.getMessages(dataStore, topicName,
partitionId, msgCount, filterCondSet);
+ BrokerSrvStatsHolder.updGetMsgLatency(System.currentTimeMillis() -
startTime);
if ((getMessageResult.transferedMessageList == null)
|| (getMessageResult.transferedMessageList.isEmpty())) {
sb.append("{\"result\":false,\"errCode\":401,\"errMsg\":\"")
@@ -596,6 +600,7 @@ public class BrokerServiceServer implements
BrokerReadService, BrokerWriteServic
final String rmtAddress,
boolean overtls) throws Throwable {
ProcessResult result = new ProcessResult();
+ final long startTime = System.currentTimeMillis();
final StringBuilder strBuffer = new StringBuilder(512);
SendMessageResponseB2P.Builder builder =
SendMessageResponseB2P.newBuilder();
builder.setSuccess(false);
@@ -689,6 +694,7 @@ public class BrokerServiceServer implements
BrokerReadService, BrokerWriteServic
builder.setMessageId(appendResult.getMsgId());
builder.setAppendTime(appendResult.getAppendTime());
builder.setAppendOffset(appendResult.getAppendIndexOffset());
+
BrokerSrvStatsHolder.updSendMsgLatency(System.currentTimeMillis() - startTime);
return builder.build();
} else {
builder.setErrCode(TErrCodeConstants.SERVER_RECEIVE_OVERFLOW);
@@ -1211,6 +1217,7 @@ public class BrokerServiceServer implements
BrokerReadService, BrokerWriteServic
public CommitOffsetResponseB2C consumerCommitC2B(CommitOffsetRequestC2B
request,
final String rmtAddress,
boolean overtls) throws Throwable {
+ final long startTime = System.currentTimeMillis();
final CommitOffsetResponseB2C.Builder builder =
CommitOffsetResponseB2C.newBuilder();
builder.setSuccess(false);
builder.setCurrOffset(-1);
@@ -1301,6 +1308,7 @@ public class BrokerServiceServer implements
BrokerReadService, BrokerWriteServic
.append(consumerNodeInfo.getConsumerId())
.append(", partition is : ").append(partStr).toString());
}
+ BrokerSrvStatsHolder.updConfirmLatency(System.currentTimeMillis() -
startTime);
return builder.build();
}
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/BrokerSrvStatsHolder.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/BrokerSrvStatsHolder.java
index 7a5bfd745..ee00ca7e3 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/BrokerSrvStatsHolder.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/BrokerSrvStatsHolder.java
@@ -44,8 +44,8 @@ public class BrokerSrvStatsHolder {
private static final AtomicInteger writableIndex = new AtomicInteger(0);
// Last snapshot time
private static final AtomicLong lstSnapshotTime = new AtomicLong(0);
- // whether the DiskSync statistic is closed
- private static volatile boolean diskSyncClosed = false;
+ // whether the detail statistic is closed
+ private static volatile boolean detailStatsClosed = false;
// Initial service statistic set
static {
@@ -79,21 +79,21 @@ public class BrokerSrvStatsHolder {
}
/**
- * Set manually the DiskSync statistic status.
+ * Set manually the detail statistic status.
*
* @param enableStats enable or disable the statistic.
*/
- public static synchronized void setDiskSyncStatsStatus(boolean
enableStats) {
- BrokerSrvStatsHolder.diskSyncClosed = !enableStats;
+ public static synchronized void setDetailStatsStatus(boolean enableStats) {
+ BrokerSrvStatsHolder.detailStatsClosed = !enableStats;
}
/**
- * Query whether the statistic is closed.
+ * Query whether the detail statistic is closed.
*
* @return the statistic status
*/
- public static boolean isDiskSyncStatsClosed() {
- return BrokerSrvStatsHolder.diskSyncClosed;
+ public static boolean isDetailStatsClosed() {
+ return BrokerSrvStatsHolder.detailStatsClosed;
}
// metric set operate APIs end
@@ -127,7 +127,7 @@ public class BrokerSrvStatsHolder {
}
public static void updDiskSyncDataDlt(long dltTime) {
- if (diskSyncClosed) {
+ if (detailStatsClosed) {
return;
}
switchableSets[getIndex()].fileSyncDltStats.update(dltTime);
@@ -136,6 +136,35 @@ public class BrokerSrvStatsHolder {
public static void updZKSyncDataDlt(long dltTime) {
switchableSets[getIndex()].zkSyncDltStats.update(dltTime);
}
+
+ public static void updSendMsgLatency(long dltTime) {
+ if (detailStatsClosed) {
+ return;
+ }
+ switchableSets[getIndex()].msgPubLatencyStats.update(dltTime);
+ }
+
+ public static void incSendMsgOverFlowCnt() {
+ if (detailStatsClosed) {
+ return;
+ }
+ switchableSets[getIndex()].errPubOverFlowStats.incValue();
+ }
+
+ public static void updGetMsgLatency(long dltTime) {
+ if (detailStatsClosed) {
+ return;
+ }
+ switchableSets[getIndex()].msgSubLatencyStats.update(dltTime);
+ }
+
+ public static void updConfirmLatency(long dltTime) {
+ if (detailStatsClosed) {
+ return;
+ }
+ switchableSets[getIndex()].msgConfirmLatencyStats.update(dltTime);
+ }
+
// metric set operate APIs end
// private functions
@@ -156,12 +185,10 @@ public class BrokerSrvStatsHolder {
Map<String, Long> statsMap) {
statsMap.put(statsSet.lstResetTime.getFullName(),
statsSet.lstResetTime.getSinceTime());
- statsMap.put("isDiskSyncClosed", (diskSyncClosed ? 1L : 0L));
+ statsMap.put("detailStatsClosed", (detailStatsClosed ? 1L : 0L));
if (resetValue) {
- statsSet.fileSyncDltStats.snapShort(statsMap, false);
statsMap.put(statsSet.fileIOExcStats.getFullName(),
statsSet.fileIOExcStats.getAndResetValue());
- statsSet.zkSyncDltStats.snapShort(statsMap, false);
statsMap.put(statsSet.zkExcStats.getFullName(),
statsSet.zkExcStats.getAndResetValue());
statsMap.put(statsSet.brokerTimeoutStats.getFullName(),
@@ -172,11 +199,16 @@ public class BrokerSrvStatsHolder {
csmOnlineCnt.getAndResetValue());
statsMap.put(statsSet.csmTimeoutStats.getFullName(),
statsSet.csmTimeoutStats.getAndResetValue());
+ statsMap.put(statsSet.errPubOverFlowStats.getFullName(),
+ statsSet.errPubOverFlowStats.getAndResetValue());
+ statsSet.fileSyncDltStats.snapShort(statsMap, false);
+ statsSet.zkSyncDltStats.snapShort(statsMap, false);
+ statsSet.msgPubLatencyStats.snapShort(statsMap, false);
+ statsSet.msgSubLatencyStats.snapShort(statsMap, false);
+ statsSet.msgConfirmLatencyStats.snapShort(statsMap, false);
} else {
- statsSet.fileSyncDltStats.getValue(statsMap, false);
statsMap.put(statsSet.fileIOExcStats.getFullName(),
statsSet.fileIOExcStats.getValue());
- statsSet.zkSyncDltStats.getValue(statsMap, false);
statsMap.put(statsSet.zkExcStats.getFullName(),
statsSet.zkExcStats.getValue());
statsMap.put(statsSet.brokerTimeoutStats.getFullName(),
@@ -187,6 +219,13 @@ public class BrokerSrvStatsHolder {
csmOnlineCnt.getValue());
statsMap.put(statsSet.csmTimeoutStats.getFullName(),
statsSet.csmTimeoutStats.getValue());
+ statsMap.put(statsSet.errPubOverFlowStats.getFullName(),
+ statsSet.errPubOverFlowStats.getValue());
+ statsSet.fileSyncDltStats.getValue(statsMap, false);
+ statsSet.zkSyncDltStats.getValue(statsMap, false);
+ statsSet.msgPubLatencyStats.getValue(statsMap, false);
+ statsSet.msgSubLatencyStats.getValue(statsMap, false);
+ statsSet.msgConfirmLatencyStats.getValue(statsMap, false);
}
}
@@ -195,15 +234,11 @@ public class BrokerSrvStatsHolder {
StringBuilder strBuff) {
strBuff.append("{\"").append(statsSet.lstResetTime.getFullName())
.append("\":\"").append(statsSet.lstResetTime.getStrSinceTime())
- .append("\",\"isDiskSyncClosed\":").append(diskSyncClosed)
- .append(",");
+ .append("\",\"detailStatsClosed\":").append(detailStatsClosed);
if (resetValue) {
- statsSet.fileSyncDltStats.snapShort(strBuff, false);
strBuff.append(",\"").append(statsSet.fileIOExcStats.getFullName())
.append("\":").append(statsSet.fileIOExcStats.getAndResetValue())
- .append(",");
- statsSet.zkSyncDltStats.snapShort(strBuff, false);
- strBuff.append(",\"").append(statsSet.zkExcStats.getFullName())
+ .append(",\"").append(statsSet.zkExcStats.getFullName())
.append("\":").append(statsSet.zkExcStats.getAndResetValue())
.append(",\"").append(statsSet.brokerTimeoutStats.getFullName())
.append("\":").append(statsSet.brokerTimeoutStats.getAndResetValue())
@@ -213,14 +248,23 @@ public class BrokerSrvStatsHolder {
.append("\":").append(csmOnlineCnt.getAndResetValue())
.append(",\"").append(statsSet.csmTimeoutStats.getFullName())
.append("\":").append(statsSet.csmTimeoutStats.getAndResetValue())
- .append("}");
- } else {
+
.append(",\"").append(statsSet.errPubOverFlowStats.getFullName())
+
.append("\":").append(statsSet.errPubOverFlowStats.getAndResetValue())
+ .append(",");
statsSet.fileSyncDltStats.snapShort(strBuff, false);
+ strBuff.append(",");
+ statsSet.zkSyncDltStats.snapShort(strBuff, false);
+ strBuff.append(",");
+ statsSet.msgPubLatencyStats.snapShort(strBuff, false);
+ strBuff.append(",");
+ statsSet.msgSubLatencyStats.snapShort(strBuff, false);
+ strBuff.append(",");
+ statsSet.msgConfirmLatencyStats.snapShort(strBuff, false);
+ strBuff.append("}");
+ } else {
strBuff.append(",\"").append(statsSet.fileIOExcStats.getFullName())
.append("\":").append(statsSet.fileIOExcStats.getValue())
- .append(",");
- statsSet.zkSyncDltStats.snapShort(strBuff, false);
- strBuff.append(",\"").append(statsSet.zkExcStats.getFullName())
+ .append(",\"").append(statsSet.zkExcStats.getFullName())
.append("\":").append(statsSet.zkExcStats.getValue())
.append(",\"").append(statsSet.brokerTimeoutStats.getFullName())
.append("\":").append(statsSet.brokerTimeoutStats.getValue())
@@ -230,7 +274,19 @@ public class BrokerSrvStatsHolder {
.append("\":").append(csmOnlineCnt.getValue())
.append(",\"").append(statsSet.csmTimeoutStats.getFullName())
.append("\":").append(statsSet.csmTimeoutStats.getValue())
- .append("}");
+
.append(",\"").append(statsSet.errPubOverFlowStats.getFullName())
+
.append("\":").append(statsSet.errPubOverFlowStats.getValue())
+ .append(",");
+ statsSet.fileSyncDltStats.getValue(strBuff, false);
+ strBuff.append(",");
+ statsSet.zkSyncDltStats.getValue(strBuff, false);
+ strBuff.append(",");
+ statsSet.msgPubLatencyStats.getValue(strBuff, false);
+ strBuff.append(",");
+ statsSet.msgSubLatencyStats.getValue(strBuff, false);
+ strBuff.append(",");
+ statsSet.msgConfirmLatencyStats.getValue(strBuff, false);
+ strBuff.append("}");
}
}
@@ -282,6 +338,18 @@ public class BrokerSrvStatsHolder {
// Consumer 2 Broker status statistics
protected final LongStatsCounter csmTimeoutStats =
new LongStatsCounter("consume_timeout_cnt", null);
+ // sendMessage process latency statistics
+ protected final ESTHistogram msgPubLatencyStats =
+ new ESTHistogram("msg_put_dlt", null);
+ // error sendMessage response distribution statistics
+ protected final LongStatsCounter errPubOverFlowStats =
+ new LongStatsCounter("msg_put_overflow", null);
+ // getMessage process latency statistics
+ protected final ESTHistogram msgSubLatencyStats =
+ new ESTHistogram("msg_get_dlt", null);
+ // confirm process latency statistics
+ protected final ESTHistogram msgConfirmLatencyStats =
+ new ESTHistogram("msg_confirm_dlt", null);
public ServiceStatsSet() {
resetSinceTime();
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/web/BrokerAdminServlet.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/web/BrokerAdminServlet.java
index c7a39244d..3e723c541 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/web/BrokerAdminServlet.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/web/BrokerAdminServlet.java
@@ -1328,7 +1328,7 @@ public class BrokerAdminServlet extends
AbstractWebHandler {
}
if (inMetricType == BrokerStatsType.SERVICESTATUS
|| inMetricType == BrokerStatsType.ALL) {
- BrokerSrvStatsHolder.setDiskSyncStatsStatus(enable);
+ BrokerSrvStatsHolder.setDetailStatsStatus(enable);
}
if (inMetricType == BrokerStatsType.MSGSTORE
|| inMetricType == BrokerStatsType.ALL) {