This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new aeeb705f2 [INLONG-6997][TubeMQ] Add sendMessage and getMessage latency 
statistics (#7013)
aeeb705f2 is described below

commit aeeb705f2e1b5d8cc60822e9a6d6813e5ca059a3
Author: Goson Zhang <[email protected]>
AuthorDate: Thu Dec 22 10:06:13 2022 +0800

    [INLONG-6997][TubeMQ] Add sendMessage and getMessage latency statistics 
(#7013)
---
 .../tubemq/client/common/ClientStatsInfo.java      |   2 +-
 .../tubemq/corebase/metric/impl/ESTHistogram.java  | 259 ++++++++++++++-------
 .../tubemq/corebase/metric/HistogramTest.java      |   8 +-
 .../tubemq/server/broker/BrokerServiceServer.java  |  14 +-
 .../server/broker/stats/BrokerSrvStatsHolder.java  | 120 +++++++---
 .../server/broker/web/BrokerAdminServlet.java      |   2 +-
 6 files changed, 287 insertions(+), 118 deletions(-)

diff --git 
a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/ClientStatsInfo.java
 
b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/ClientStatsInfo.java
index e6a942bab..c721dec6c 100644
--- 
a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/ClientStatsInfo.java
+++ 
b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/ClientStatsInfo.java
@@ -241,7 +241,7 @@ public class ClientStatsInfo {
                     strBuff.append(",");
                     statsSet.csmLatencyStats.snapShort(strBuff, false);
                     strBuff.append(",");
-                    statsSet.confirmDltStats.getValue(strBuff, false);
+                    statsSet.confirmDltStats.snapShort(strBuff, false);
                 }
             } else {
                 statsSet.msgCallDltStats.getValue(strBuff, false);
diff --git 
a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/impl/ESTHistogram.java
 
b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/impl/ESTHistogram.java
index c0839af94..06a4135f8 100644
--- 
a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/impl/ESTHistogram.java
+++ 
b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/impl/ESTHistogram.java
@@ -41,6 +41,12 @@ public class ESTHistogram extends BaseMetric implements 
Histogram {
     // Natural logarithm of 2
     private static final double LOG2_VALUE = Math.log(2);
     // Simple statistic items
+    private final String p99FullKey;
+    private final String p99ShortKey = "P99";
+    private final String p999FullKey;
+    private final String p999ShortKey = "P999";
+    private final String p9999FullKey;
+    private final String p9999ShortKey = "P9999";
     private final LongStatsCounter count;
     private final LongMinGauge min;
     private final LongMaxGauge max;
@@ -55,6 +61,9 @@ public class ESTHistogram extends BaseMetric implements 
Histogram {
      */
     public ESTHistogram(String metricName, String prefix) {
         super(metricName, prefix);
+        this.p99FullKey = getFullName() + "_" + p99ShortKey;
+        this.p999FullKey = getFullName() + "_" + p999ShortKey;
+        this.p9999FullKey = getFullName() + "_" + p9999ShortKey;
         this.count = new LongStatsCounter("count", getFullName());
         this.min = new LongMinGauge("min", getFullName());
         this.max = new LongMaxGauge("max", getFullName());
@@ -92,117 +101,199 @@ public class ESTHistogram extends BaseMetric implements 
Histogram {
 
     @Override
     public void getValue(Map<String, Long> keyValMap, boolean includeZero) {
-        keyValMap.put(this.count.getFullName(), this.count.getValue());
-        keyValMap.put(this.min.getFullName(), this.min.getValue());
-        keyValMap.put(this.max.getFullName(), this.max.getValue());
-        if (includeZero) {
-            for (int i = 0; i < NUM_BUCKETS; i++) {
-                keyValMap.put(this.buckets[i].getFullName(), 
this.buckets[i].getValue());
-            }
-        } else {
-            long tmpValue;
-            for (int i = 0; i < NUM_BUCKETS; i++) {
-                tmpValue = this.buckets[i].getValue();
-                if (tmpValue > 0) {
-                    keyValMap.put(this.buckets[i].getFullName(), tmpValue);
-                }
-            }
-        }
+        getValue2Map(keyValMap, false, includeZero);
     }
 
     @Override
     public void getValue(StringBuilder strBuff, boolean includeZero) {
-        strBuff.append("\"").append(getFullName()).append("\":")
-                .append("{\"").append(this.count.getShortName()).append("\":")
-                .append(this.count.getValue()).append(",\"")
-                .append(this.min.getShortName()).append("\":")
-                .append(this.min.getValue()).append(",\"")
-                .append(this.max.getShortName()).append("\":")
-                .append(this.max.getValue()).append(",\"cells\":{");
-        int count = 0;
-        if (includeZero) {
-            for (int i = 0; i < NUM_BUCKETS; i++) {
-                if (count++ > 0) {
-                    strBuff.append(",");
-                }
-                
strBuff.append("\"").append(this.buckets[i].getShortName()).append("\":")
-                        .append(this.buckets[i].getValue());
-            }
-        } else {
-            long tmpValue;
-            for (int i = 0; i < NUM_BUCKETS; i++) {
-                tmpValue = this.buckets[i].getValue();
-                if (tmpValue > 0) {
-                    if (count++ > 0) {
-                        strBuff.append(",");
-                    }
-                    
strBuff.append("\"").append(this.buckets[i].getShortName()).append("\":").append(tmpValue);
-                }
-            }
-        }
-        strBuff.append("}}");
+        getValue2StrBuff(strBuff, false, includeZero);
     }
 
     @Override
     public void snapShort(Map<String, Long> keyValMap, boolean includeZero) {
-        keyValMap.put(this.count.getFullName(), this.count.getAndResetValue());
-        keyValMap.put(this.min.getFullName(), this.min.getAndResetValue());
-        keyValMap.put(this.max.getFullName(), this.max.getAndResetValue());
-        if (includeZero) {
-            for (int i = 0; i < NUM_BUCKETS; i++) {
-                keyValMap.put(this.buckets[i].getFullName(), 
this.buckets[i].getAndResetValue());
-            }
+        getValue2Map(keyValMap, true, includeZero);
+    }
+
+    @Override
+    public void snapShort(StringBuilder strBuff, boolean includeZero) {
+        getValue2StrBuff(strBuff, true, includeZero);
+    }
+
+    @Override
+    public void clear() {
+        this.count.clear();
+        this.min.clear();
+        this.max.clear();
+        for (int i = 0; i < NUM_BUCKETS; i++) {
+            this.buckets[i].clear();
+        }
+    }
+
+    private void getValue2Map(Map<String, Long> keyValMap,
+            boolean snapShot, boolean includeZero) {
+        long p99Cnt = 0L;
+        long p999Cnt = 0L;
+        long p9999Cnt = 0L;
+        int maxValIndex = -1;
+        int match99Index = 0;
+        int match999Index = 0;
+        int match9999Index = 0;
+        long curCnt;
+        long maxValue;
+        long minValue;
+        if (snapShot) {
+            curCnt = this.count.getAndResetValue();
+            maxValue = this.max.getAndResetValue();
+            minValue = this.min.getAndResetValue();
         } else {
-            long tmpValue;
-            for (int i = 0; i < NUM_BUCKETS; i++) {
-                tmpValue = this.buckets[i].getAndResetValue();
-                if (tmpValue > 0) {
-                    keyValMap.put(this.buckets[i].getFullName(), tmpValue);
+            curCnt = this.count.getValue();
+            maxValue = this.max.getValue();
+            minValue = this.min.getValue();
+        }
+        if (curCnt > 0) {
+            p99Cnt = (long) (curCnt * 0.99);
+            p999Cnt = (long) (curCnt * 0.999);
+            p9999Cnt = (long) (curCnt * 0.9999);
+        }
+        // put key and value
+        keyValMap.put(this.count.getFullName(), curCnt);
+        keyValMap.put(this.min.getFullName(), minValue);
+        keyValMap.put(this.max.getFullName(), maxValue);
+        long bucketItemVal;
+        for (int i = 0; i < NUM_BUCKETS; i++) {
+            bucketItemVal = snapShot
+                    ? this.buckets[i].getAndResetValue()
+                    : this.buckets[i].getValue();
+            if (bucketItemVal == 0) {
+                if (includeZero) {
+                    keyValMap.put(this.buckets[i].getFullName(), 
bucketItemVal);
+                }
+            } else {
+                keyValMap.put(this.buckets[i].getFullName(), bucketItemVal);
+                maxValIndex = i;
+                // calculate p99 value
+                if (p99Cnt > 0) {
+                    p99Cnt -= bucketItemVal;
+                    if (p99Cnt <= 0) {
+                        match99Index = i;
+                    }
+                }
+                // calculate p999 value
+                if (p999Cnt > 0) {
+                    p999Cnt -= bucketItemVal;
+                    if (p999Cnt <= 0) {
+                        match999Index = i;
+                    }
+                }
+                // calculate p9999 value
+                if (p9999Cnt > 0) {
+                    p9999Cnt -= bucketItemVal;
+                    if (p9999Cnt <= 0) {
+                        match9999Index = i;
+                    }
                 }
             }
         }
+        keyValMap.put(p99FullKey,
+                getPxValue(match99Index, maxValIndex, maxValue));
+        keyValMap.put(p999FullKey,
+                getPxValue(match999Index, maxValIndex, maxValue));
+        keyValMap.put(p9999FullKey,
+                getPxValue(match9999Index, maxValIndex, maxValue));
     }
 
-    @Override
-    public void snapShort(StringBuilder strBuff, boolean includeZero) {
+    private void getValue2StrBuff(StringBuilder strBuff,
+            boolean snapShot, boolean includeZero) {
+        long p99Cnt = 0L;
+        long p999Cnt = 0L;
+        long p9999Cnt = 0L;
+        int maxValIndex = -1;
+        int match99Index = 0;
+        int match999Index = 0;
+        int match9999Index = 0;
+        long curCnt;
+        long maxValue;
+        long minValue;
+        if (snapShot) {
+            curCnt = this.count.getAndResetValue();
+            maxValue = this.max.getAndResetValue();
+            minValue = this.min.getAndResetValue();
+        } else {
+            curCnt = this.count.getValue();
+            maxValue = this.max.getValue();
+            minValue = this.min.getValue();
+        }
+        if (curCnt > 0) {
+            p99Cnt = (long) (curCnt * 0.99);
+            p999Cnt = (long) (curCnt * 0.999);
+            p9999Cnt = (long) (curCnt * 0.9999);
+        }
+        // put key and value
         strBuff.append("\"").append(getFullName()).append("\":")
-                .append("{\"").append(this.count.getShortName()).append("\":")
-                .append(this.count.getAndResetValue()).append(",\"")
+                .append("{\"").append(this.count.getShortName())
+                .append("\":").append(curCnt).append(",\"")
                 .append(this.min.getShortName()).append("\":")
-                .append(this.min.getAndResetValue()).append(",\"")
+                .append(minValue).append(",\"")
                 .append(this.max.getShortName()).append("\":")
-                .append(this.max.getAndResetValue()).append(",\"cells\":{");
+                .append(maxValue).append(",\"cells\":{");
         int count = 0;
-        if (includeZero) {
-            for (int i = 0; i < NUM_BUCKETS; i++) {
+        long bucketItemVal;
+        for (int i = 0; i < NUM_BUCKETS; i++) {
+            bucketItemVal = snapShot
+                    ? this.buckets[i].getAndResetValue()
+                    : this.buckets[i].getValue();
+            if (bucketItemVal == 0) {
+                if (includeZero) {
+                    if (count++ > 0) {
+                        strBuff.append(",");
+                    }
+                    strBuff.append("\"").append(this.buckets[i].getShortName())
+                            .append("\":").append(bucketItemVal);
+                }
+            } else {
                 if (count++ > 0) {
                     strBuff.append(",");
                 }
-                
strBuff.append("\"").append(this.buckets[i].getShortName()).append("\":")
-                        .append(this.buckets[i].getAndResetValue());
-            }
-        } else {
-            long tmpValue;
-            for (int i = 0; i < NUM_BUCKETS; i++) {
-                tmpValue = this.buckets[i].getAndResetValue();
-                if (tmpValue > 0) {
-                    if (count++ > 0) {
-                        strBuff.append(",");
+                strBuff.append("\"").append(this.buckets[i].getShortName())
+                        .append("\":").append(bucketItemVal);
+                maxValIndex = i;
+                // calculate p99 value
+                if (p99Cnt > 0) {
+                    p99Cnt -= bucketItemVal;
+                    if (p99Cnt <= 0) {
+                        match99Index = i;
+                    }
+                }
+                // calculate p999 value
+                if (p999Cnt > 0) {
+                    p999Cnt -= bucketItemVal;
+                    if (p999Cnt <= 0) {
+                        match999Index = i;
+                    }
+                }
+                // calculate p9999 value
+                if (p9999Cnt > 0) {
+                    p9999Cnt -= bucketItemVal;
+                    if (p9999Cnt <= 0) {
+                        match9999Index = i;
                     }
-                    
strBuff.append("\"").append(this.buckets[i].getShortName()).append("\":").append(tmpValue);
                 }
             }
         }
-        strBuff.append("}}");
+        strBuff.append("},\"").append(p99ShortKey).append("\":")
+                .append(getPxValue(match99Index, maxValIndex, maxValue))
+                .append(",\"").append(p999ShortKey).append("\":")
+                .append(getPxValue(match999Index, maxValIndex, maxValue))
+                .append(",\"").append(p9999ShortKey).append("\":")
+                .append(getPxValue(match9999Index, maxValIndex, maxValue))
+                .append("}");
     }
 
-    @Override
-    public void clear() {
-        this.count.clear();
-        this.min.clear();
-        this.max.clear();
-        for (int i = 0; i < NUM_BUCKETS; i++) {
-            this.buckets[i].clear();
+    private long getPxValue(int endValIndex, int maxValIndex, long maxValue) {
+        if (endValIndex < maxValIndex) {
+            return (long) Math.pow(2, endValIndex + 1);
+        } else {
+            return (maxValue == Long.MIN_VALUE) ? 0 : maxValue;
         }
     }
 }
diff --git 
a/inlong-tubemq/tubemq-core/src/test/java/org/apache/inlong/tubemq/corebase/metric/HistogramTest.java
 
b/inlong-tubemq/tubemq-core/src/test/java/org/apache/inlong/tubemq/corebase/metric/HistogramTest.java
index a7ff30b45..43af3aca2 100644
--- 
a/inlong-tubemq/tubemq-core/src/test/java/org/apache/inlong/tubemq/corebase/metric/HistogramTest.java
+++ 
b/inlong-tubemq/tubemq-core/src/test/java/org/apache/inlong/tubemq/corebase/metric/HistogramTest.java
@@ -93,7 +93,8 @@ public class HistogramTest {
         estHistogram.getValue(strBuff, false);
         String result1 = "\"disk_dlt\":{\"count\":7,\"min\":-5,\"max\":131100,"
                 + 
"\"cells\":{\"cell_0t2\":1,\"cell_16t32\":1,\"cell_512t1024\":1"
-                + ",\"cell_65536t131072\":2,\"cell_131072tMax\":2}}";
+                + 
",\"cell_65536t131072\":2,\"cell_131072tMax\":2},\"P99\":131100"
+                + ",\"P999\":131100,\"P9999\":131100}";
         Assert.assertEquals(result1, strBuff.toString());
         strBuff.delete(0, strBuff.length());
         // test for map
@@ -122,8 +123,9 @@ public class HistogramTest {
         tmpMap.clear();
         // test get value by strBuff
         estHistogram.getValue(strBuff, false);
-        String result2 =
-                
"\"disk_dlt\":{\"count\":2,\"min\":1,\"max\":100,\"cells\":{\"cell_0t2\":1,\"cell_64t128\":1}}";
+        String result2 = "\"disk_dlt\":{\"count\":2,\"min\":1,\"max\":100,"
+                + "\"cells\":{\"cell_0t2\":1,\"cell_64t128\":1},"
+                + "\"P99\":2,\"P999\":2,\"P9999\":2}";
         Assert.assertEquals(result2, strBuff.toString());
         strBuff.delete(0, strBuff.length());
         // test clear()
diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java
index e87a70ec0..658277519 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java
@@ -278,6 +278,7 @@ public class BrokerServiceServer implements 
BrokerReadService, BrokerWriteServic
     public GetMessageResponseB2C getMessagesC2B(GetMessageRequestC2B request,
             final String rmtAddress,
             boolean overtls) throws Throwable {
+        final long startTime = System.currentTimeMillis();
         final GetMessageResponseB2C.Builder builder =
                 GetMessageResponseB2C.newBuilder();
         builder.setSuccess(false);
@@ -379,9 +380,9 @@ public class BrokerServiceServer implements 
BrokerReadService, BrokerWriteServic
                             request.getLastPackConsumed(), 
request.getManualCommitOffset(),
                             clientId, this.tubeConfig.getHostName(), 
rmtAddrInfo, isEscFlowCtrl, strBuffer);
             if (msgResult.isSuccess) {
-                consumerNodeInfo.setLastProcInfo(System.currentTimeMillis(),
-                        msgResult.lastRdDataOffset,
-                        msgResult.totalMsgSize);
+                long endTime = System.currentTimeMillis();
+                consumerNodeInfo.setLastProcInfo(endTime,
+                        msgResult.lastRdDataOffset, msgResult.totalMsgSize);
                 getCounterGroup.add(msgResult.tmpCounters);
                 AuditUtils.addConsumeRecord(msgResult.tmpCounters);
                 builder.setEscFlowCtrl(false);
@@ -393,6 +394,7 @@ public class BrokerServiceServer implements 
BrokerReadService, BrokerWriteServic
                 builder.setErrMsg("OK!");
                 builder.addAllMessages(msgResult.transferedMessageList);
                 builder.setMaxOffset(msgResult.getMaxOffset());
+                BrokerSrvStatsHolder.updGetMsgLatency(endTime - startTime);
                 return builder.build();
             } else {
                 builder.setErrCode(msgResult.getRetCode());
@@ -506,6 +508,7 @@ public class BrokerServiceServer implements 
BrokerReadService, BrokerWriteServic
             int msgCount, final Set<String> filterCondSet,
             final StringBuilder sb) throws Exception {
         MessageStore dataStore = null;
+        final long startTime = System.currentTimeMillis();
         if (!this.started.get()
                 || ServiceStatusHolder.isReadServiceStop()) {
             sb.append("{\"result\":false,\"errCode\":")
@@ -547,6 +550,7 @@ public class BrokerServiceServer implements 
BrokerReadService, BrokerWriteServic
             }
             GetMessageResult getMessageResult =
                     storeManager.getMessages(dataStore, topicName, 
partitionId, msgCount, filterCondSet);
+            BrokerSrvStatsHolder.updGetMsgLatency(System.currentTimeMillis() - 
startTime);
             if ((getMessageResult.transferedMessageList == null)
                     || (getMessageResult.transferedMessageList.isEmpty())) {
                 sb.append("{\"result\":false,\"errCode\":401,\"errMsg\":\"")
@@ -596,6 +600,7 @@ public class BrokerServiceServer implements 
BrokerReadService, BrokerWriteServic
             final String rmtAddress,
             boolean overtls) throws Throwable {
         ProcessResult result = new ProcessResult();
+        final long startTime = System.currentTimeMillis();
         final StringBuilder strBuffer = new StringBuilder(512);
         SendMessageResponseB2P.Builder builder = 
SendMessageResponseB2P.newBuilder();
         builder.setSuccess(false);
@@ -689,6 +694,7 @@ public class BrokerServiceServer implements 
BrokerReadService, BrokerWriteServic
                 builder.setMessageId(appendResult.getMsgId());
                 builder.setAppendTime(appendResult.getAppendTime());
                 builder.setAppendOffset(appendResult.getAppendIndexOffset());
+                
BrokerSrvStatsHolder.updSendMsgLatency(System.currentTimeMillis() - startTime);
                 return builder.build();
             } else {
                 builder.setErrCode(TErrCodeConstants.SERVER_RECEIVE_OVERFLOW);
@@ -1211,6 +1217,7 @@ public class BrokerServiceServer implements 
BrokerReadService, BrokerWriteServic
     public CommitOffsetResponseB2C consumerCommitC2B(CommitOffsetRequestC2B 
request,
             final String rmtAddress,
             boolean overtls) throws Throwable {
+        final long startTime = System.currentTimeMillis();
         final CommitOffsetResponseB2C.Builder builder = 
CommitOffsetResponseB2C.newBuilder();
         builder.setSuccess(false);
         builder.setCurrOffset(-1);
@@ -1301,6 +1308,7 @@ public class BrokerServiceServer implements 
BrokerReadService, BrokerWriteServic
                     .append(consumerNodeInfo.getConsumerId())
                     .append(", partition is : ").append(partStr).toString());
         }
+        BrokerSrvStatsHolder.updConfirmLatency(System.currentTimeMillis() - 
startTime);
         return builder.build();
     }
 
diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/BrokerSrvStatsHolder.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/BrokerSrvStatsHolder.java
index 7a5bfd745..ee00ca7e3 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/BrokerSrvStatsHolder.java
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/BrokerSrvStatsHolder.java
@@ -44,8 +44,8 @@ public class BrokerSrvStatsHolder {
     private static final AtomicInteger writableIndex = new AtomicInteger(0);
     // Last snapshot time
     private static final AtomicLong lstSnapshotTime = new AtomicLong(0);
-    // whether the DiskSync statistic is closed
-    private static volatile boolean diskSyncClosed = false;
+    // whether the detail statistic is closed
+    private static volatile boolean detailStatsClosed = false;
 
     // Initial service statistic set
     static {
@@ -79,21 +79,21 @@ public class BrokerSrvStatsHolder {
     }
 
     /**
-     * Set manually the DiskSync statistic status.
+     * Set manually the detail statistic status.
      *
      * @param enableStats  enable or disable the statistic.
      */
-    public static synchronized void setDiskSyncStatsStatus(boolean 
enableStats) {
-        BrokerSrvStatsHolder.diskSyncClosed = !enableStats;
+    public static synchronized void setDetailStatsStatus(boolean enableStats) {
+        BrokerSrvStatsHolder.detailStatsClosed = !enableStats;
     }
 
     /**
-     * Query whether the statistic is closed.
+     * Query whether the detail statistic is closed.
      *
      * @return the statistic status
      */
-    public static boolean isDiskSyncStatsClosed() {
-        return BrokerSrvStatsHolder.diskSyncClosed;
+    public static boolean isDetailStatsClosed() {
+        return BrokerSrvStatsHolder.detailStatsClosed;
     }
 
     // metric set operate APIs end
@@ -127,7 +127,7 @@ public class BrokerSrvStatsHolder {
     }
 
     public static void updDiskSyncDataDlt(long dltTime) {
-        if (diskSyncClosed) {
+        if (detailStatsClosed) {
             return;
         }
         switchableSets[getIndex()].fileSyncDltStats.update(dltTime);
@@ -136,6 +136,35 @@ public class BrokerSrvStatsHolder {
     public static void updZKSyncDataDlt(long dltTime) {
         switchableSets[getIndex()].zkSyncDltStats.update(dltTime);
     }
+
+    public static void updSendMsgLatency(long dltTime) {
+        if (detailStatsClosed) {
+            return;
+        }
+        switchableSets[getIndex()].msgPubLatencyStats.update(dltTime);
+    }
+
+    public static void incSendMsgOverFlowCnt() {
+        if (detailStatsClosed) {
+            return;
+        }
+        switchableSets[getIndex()].errPubOverFlowStats.incValue();
+    }
+
+    public static void updGetMsgLatency(long dltTime) {
+        if (detailStatsClosed) {
+            return;
+        }
+        switchableSets[getIndex()].msgSubLatencyStats.update(dltTime);
+    }
+
+    public static void updConfirmLatency(long dltTime) {
+        if (detailStatsClosed) {
+            return;
+        }
+        switchableSets[getIndex()].msgConfirmLatencyStats.update(dltTime);
+    }
+
     // metric set operate APIs end
 
     // private functions
@@ -156,12 +185,10 @@ public class BrokerSrvStatsHolder {
             Map<String, Long> statsMap) {
         statsMap.put(statsSet.lstResetTime.getFullName(),
                 statsSet.lstResetTime.getSinceTime());
-        statsMap.put("isDiskSyncClosed", (diskSyncClosed ? 1L : 0L));
+        statsMap.put("detailStatsClosed", (detailStatsClosed ? 1L : 0L));
         if (resetValue) {
-            statsSet.fileSyncDltStats.snapShort(statsMap, false);
             statsMap.put(statsSet.fileIOExcStats.getFullName(),
                     statsSet.fileIOExcStats.getAndResetValue());
-            statsSet.zkSyncDltStats.snapShort(statsMap, false);
             statsMap.put(statsSet.zkExcStats.getFullName(),
                     statsSet.zkExcStats.getAndResetValue());
             statsMap.put(statsSet.brokerTimeoutStats.getFullName(),
@@ -172,11 +199,16 @@ public class BrokerSrvStatsHolder {
                     csmOnlineCnt.getAndResetValue());
             statsMap.put(statsSet.csmTimeoutStats.getFullName(),
                     statsSet.csmTimeoutStats.getAndResetValue());
+            statsMap.put(statsSet.errPubOverFlowStats.getFullName(),
+                    statsSet.errPubOverFlowStats.getAndResetValue());
+            statsSet.fileSyncDltStats.snapShort(statsMap, false);
+            statsSet.zkSyncDltStats.snapShort(statsMap, false);
+            statsSet.msgPubLatencyStats.snapShort(statsMap, false);
+            statsSet.msgSubLatencyStats.snapShort(statsMap, false);
+            statsSet.msgConfirmLatencyStats.snapShort(statsMap, false);
         } else {
-            statsSet.fileSyncDltStats.getValue(statsMap, false);
             statsMap.put(statsSet.fileIOExcStats.getFullName(),
                     statsSet.fileIOExcStats.getValue());
-            statsSet.zkSyncDltStats.getValue(statsMap, false);
             statsMap.put(statsSet.zkExcStats.getFullName(),
                     statsSet.zkExcStats.getValue());
             statsMap.put(statsSet.brokerTimeoutStats.getFullName(),
@@ -187,6 +219,13 @@ public class BrokerSrvStatsHolder {
                     csmOnlineCnt.getValue());
             statsMap.put(statsSet.csmTimeoutStats.getFullName(),
                     statsSet.csmTimeoutStats.getValue());
+            statsMap.put(statsSet.errPubOverFlowStats.getFullName(),
+                    statsSet.errPubOverFlowStats.getValue());
+            statsSet.fileSyncDltStats.getValue(statsMap, false);
+            statsSet.zkSyncDltStats.getValue(statsMap, false);
+            statsSet.msgPubLatencyStats.getValue(statsMap, false);
+            statsSet.msgSubLatencyStats.getValue(statsMap, false);
+            statsSet.msgConfirmLatencyStats.getValue(statsMap, false);
         }
     }
 
@@ -195,15 +234,11 @@ public class BrokerSrvStatsHolder {
             StringBuilder strBuff) {
         strBuff.append("{\"").append(statsSet.lstResetTime.getFullName())
                 
.append("\":\"").append(statsSet.lstResetTime.getStrSinceTime())
-                .append("\",\"isDiskSyncClosed\":").append(diskSyncClosed)
-                .append(",");
+                .append("\",\"detailStatsClosed\":").append(detailStatsClosed);
         if (resetValue) {
-            statsSet.fileSyncDltStats.snapShort(strBuff, false);
             strBuff.append(",\"").append(statsSet.fileIOExcStats.getFullName())
                     
.append("\":").append(statsSet.fileIOExcStats.getAndResetValue())
-                    .append(",");
-            statsSet.zkSyncDltStats.snapShort(strBuff, false);
-            strBuff.append(",\"").append(statsSet.zkExcStats.getFullName())
+                    .append(",\"").append(statsSet.zkExcStats.getFullName())
                     
.append("\":").append(statsSet.zkExcStats.getAndResetValue())
                     
.append(",\"").append(statsSet.brokerTimeoutStats.getFullName())
                     
.append("\":").append(statsSet.brokerTimeoutStats.getAndResetValue())
@@ -213,14 +248,23 @@ public class BrokerSrvStatsHolder {
                     .append("\":").append(csmOnlineCnt.getAndResetValue())
                     
.append(",\"").append(statsSet.csmTimeoutStats.getFullName())
                     
.append("\":").append(statsSet.csmTimeoutStats.getAndResetValue())
-                    .append("}");
-        } else {
+                    
.append(",\"").append(statsSet.errPubOverFlowStats.getFullName())
+                    
.append("\":").append(statsSet.errPubOverFlowStats.getAndResetValue())
+                    .append(",");
             statsSet.fileSyncDltStats.snapShort(strBuff, false);
+            strBuff.append(",");
+            statsSet.zkSyncDltStats.snapShort(strBuff, false);
+            strBuff.append(",");
+            statsSet.msgPubLatencyStats.snapShort(strBuff, false);
+            strBuff.append(",");
+            statsSet.msgSubLatencyStats.snapShort(strBuff, false);
+            strBuff.append(",");
+            statsSet.msgConfirmLatencyStats.snapShort(strBuff, false);
+            strBuff.append("}");
+        } else {
             strBuff.append(",\"").append(statsSet.fileIOExcStats.getFullName())
                     .append("\":").append(statsSet.fileIOExcStats.getValue())
-                    .append(",");
-            statsSet.zkSyncDltStats.snapShort(strBuff, false);
-            strBuff.append(",\"").append(statsSet.zkExcStats.getFullName())
+                    .append(",\"").append(statsSet.zkExcStats.getFullName())
                     .append("\":").append(statsSet.zkExcStats.getValue())
                     
.append(",\"").append(statsSet.brokerTimeoutStats.getFullName())
                     
.append("\":").append(statsSet.brokerTimeoutStats.getValue())
@@ -230,7 +274,19 @@ public class BrokerSrvStatsHolder {
                     .append("\":").append(csmOnlineCnt.getValue())
                     
.append(",\"").append(statsSet.csmTimeoutStats.getFullName())
                     .append("\":").append(statsSet.csmTimeoutStats.getValue())
-                    .append("}");
+                    
.append(",\"").append(statsSet.errPubOverFlowStats.getFullName())
+                    
.append("\":").append(statsSet.errPubOverFlowStats.getValue())
+                    .append(",");
+            statsSet.fileSyncDltStats.getValue(strBuff, false);
+            strBuff.append(",");
+            statsSet.zkSyncDltStats.getValue(strBuff, false);
+            strBuff.append(",");
+            statsSet.msgPubLatencyStats.getValue(strBuff, false);
+            strBuff.append(",");
+            statsSet.msgSubLatencyStats.getValue(strBuff, false);
+            strBuff.append(",");
+            statsSet.msgConfirmLatencyStats.getValue(strBuff, false);
+            strBuff.append("}");
         }
     }
 
@@ -282,6 +338,18 @@ public class BrokerSrvStatsHolder {
         // Consumer 2 Broker status statistics
         protected final LongStatsCounter csmTimeoutStats =
                 new LongStatsCounter("consume_timeout_cnt", null);
+        // sendMessage process latency statistics
+        protected final ESTHistogram msgPubLatencyStats =
+                new ESTHistogram("msg_put_dlt", null);
+        // error sendMessage response distribution statistics
+        protected final LongStatsCounter errPubOverFlowStats =
+                new LongStatsCounter("msg_put_overflow", null);
+        // getMessage process latency statistics
+        protected final ESTHistogram msgSubLatencyStats =
+                new ESTHistogram("msg_get_dlt", null);
+        // confirm process latency statistics
+        protected final ESTHistogram msgConfirmLatencyStats =
+                new ESTHistogram("msg_confirm_dlt", null);
 
         public ServiceStatsSet() {
             resetSinceTime();
diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/web/BrokerAdminServlet.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/web/BrokerAdminServlet.java
index c7a39244d..3e723c541 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/web/BrokerAdminServlet.java
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/web/BrokerAdminServlet.java
@@ -1328,7 +1328,7 @@ public class BrokerAdminServlet extends 
AbstractWebHandler {
         }
         if (inMetricType == BrokerStatsType.SERVICESTATUS
                 || inMetricType == BrokerStatsType.ALL) {
-            BrokerSrvStatsHolder.setDiskSyncStatsStatus(enable);
+            BrokerSrvStatsHolder.setDetailStatsStatus(enable);
         }
         if (inMetricType == BrokerStatsType.MSGSTORE
                 || inMetricType == BrokerStatsType.ALL) {

Reply via email to