This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 0ac0fb57e8 [IOTDB-5502] Source counters for construct batches in 
IoTConsensus (#9023)
0ac0fb57e8 is described below

commit 0ac0fb57e867f4fbfee8b8fd8e54e64103f020f8
Author: Xiangpeng Hu <[email protected]>
AuthorDate: Sat Feb 11 20:11:27 2023 -0800

    [IOTDB-5502] Source counters for construct batches in IoTConsensus (#9023)
---
 .../consensus/iot/IoTConsensusServerImpl.java      |  8 ++++
 .../consensus/iot/IoTConsensusServerMetrics.java   | 44 ++++++++++++++++++++++
 .../iotdb/consensus/iot/logdispatcher/Batch.java   |  9 +++++
 .../consensus/iot/logdispatcher/LogDispatcher.java | 15 ++++++++
 docs/UserGuide/Monitor-Alert/Metric-Tool.md        |  2 +
 docs/zh/UserGuide/Monitor-Alert/Metric-Tool.md     | 28 +++++++-------
 6 files changed, 93 insertions(+), 13 deletions(-)

diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
index e24478179a..e903289177 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
@@ -726,6 +726,14 @@ public class IoTConsensusServerImpl {
     return config;
   }
 
+  public long getLogEntriesFromWAL() {
+    return logDispatcher.getLogEntriesFromWAL();
+  }
+
+  public long getLogEntriesFromQueue() {
+    return logDispatcher.getLogEntriesFromQueue();
+  }
+
   public boolean needBlockWrite() {
     return reader.getTotalSize() > 
config.getReplication().getWalThrottleThreshold();
   }
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerMetrics.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerMetrics.java
index 1e36e20b70..82814d071f 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerMetrics.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerMetrics.java
@@ -60,6 +60,30 @@ public class IoTConsensusServerMetrics implements IMetricSet 
{
             impl.getThisNode().getGroupId().toString(),
             Tag.TYPE.toString(),
             "safeIndex");
+    MetricService.getInstance()
+        .createAutoGauge(
+            Metric.IOT_CONSENSUS.toString(),
+            MetricLevel.IMPORTANT,
+            impl,
+            IoTConsensusServerImpl::getLogEntriesFromWAL,
+            Tag.NAME.toString(),
+            "ioTConsensusServerImpl",
+            Tag.REGION.toString(),
+            impl.getThisNode().getGroupId().toString(),
+            Tag.TYPE.toString(),
+            "LogEntriesFromWAL");
+    MetricService.getInstance()
+        .createAutoGauge(
+            Metric.IOT_CONSENSUS.toString(),
+            MetricLevel.IMPORTANT,
+            impl,
+            IoTConsensusServerImpl::getLogEntriesFromQueue,
+            Tag.NAME.toString(),
+            "ioTConsensusServerImpl",
+            Tag.REGION.toString(),
+            impl.getThisNode().getGroupId().toString(),
+            Tag.TYPE.toString(),
+            "LogEntriesFromQueue");
   }
 
   @Override
@@ -84,5 +108,25 @@ public class IoTConsensusServerMetrics implements 
IMetricSet {
             impl.getThisNode().getGroupId().toString(),
             Tag.TYPE.toString(),
             "safeIndex");
+    MetricService.getInstance()
+        .remove(
+            MetricType.AUTO_GAUGE,
+            Metric.IOT_CONSENSUS.toString(),
+            Tag.NAME.toString(),
+            "ioTConsensusServerImpl",
+            Tag.REGION.toString(),
+            impl.getThisNode().getGroupId().toString(),
+            Tag.TYPE.toString(),
+            "LogEntriesFromWAL");
+    MetricService.getInstance()
+        .remove(
+            MetricType.AUTO_GAUGE,
+            Metric.IOT_CONSENSUS.toString(),
+            Tag.NAME.toString(),
+            "ioTConsensusServerImpl",
+            Tag.REGION.toString(),
+            impl.getThisNode().getGroupId().toString(),
+            Tag.TYPE.toString(),
+            "LogEntriesFromQueue");
   }
 }
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/Batch.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/Batch.java
index 731e00951b..7a556c85a0 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/Batch.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/Batch.java
@@ -35,6 +35,8 @@ public class Batch {
 
   private final List<TLogEntry> logEntries = new ArrayList<>();
 
+  private long logEntriesNumFromWAL = 0L;
+
   private long serializedSize;
   // indicates whether this batch has been successfully synchronized to 
another node
   private boolean synced;
@@ -55,6 +57,9 @@ public class Batch {
 
   public void addTLogEntry(TLogEntry entry) {
     logEntries.add(entry);
+    if (entry.fromWAL) {
+      logEntriesNumFromWAL++;
+    }
     // TODO Maybe we need to add in additional fields for more accurate 
calculations
     serializedSize +=
         entry.getData() == null ? 0 : 
entry.getData().stream().mapToInt(Buffer::capacity).sum();
@@ -93,6 +98,10 @@ public class Batch {
     return serializedSize;
   }
 
+  public long getLogEntriesNumFromWAL() {
+    return logEntriesNumFromWAL;
+  }
+
   @Override
   public String toString() {
     return "Batch{"
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
index 883952275a..cfc480a8cb 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
@@ -50,6 +50,7 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
 /** Manage all asynchronous replication threads and corresponding async 
clients */
@@ -65,6 +66,9 @@ public class LogDispatcher {
 
   private boolean stopped = false;
 
+  private final AtomicLong logEntriesFromWAL = new AtomicLong(0);
+  private final AtomicLong logEntriesFromQueue = new AtomicLong(0);
+
   public LogDispatcher(
       IoTConsensusServerImpl impl,
       IClientManager<TEndPoint, AsyncIoTConsensusServiceClient> clientManager) 
{
@@ -171,6 +175,14 @@ public class LogDispatcher {
     }
   }
 
+  public long getLogEntriesFromWAL() {
+    return logEntriesFromWAL.get();
+  }
+
+  public long getLogEntriesFromQueue() {
+    return logEntriesFromQueue.get();
+  }
+
   public class LogDispatcherThread implements Runnable {
 
     private static final long PENDING_REQUEST_TAKING_TIME_OUT_IN_SEC = 10;
@@ -317,6 +329,9 @@ public class LogDispatcher {
               .update((System.currentTimeMillis() - startTime) / 
batch.getLogEntries().size());
           // we may block here if the synchronization pipeline is full
           syncStatus.addNextBatch(batch);
+          logEntriesFromWAL.addAndGet(batch.getLogEntriesNumFromWAL());
+          logEntriesFromQueue.addAndGet(
+              batch.getLogEntries().size() - batch.getLogEntriesNumFromWAL());
           // sends batch asynchronously and migrates the retry logic into the 
callback handler
           sendBatchAsync(batch, new DispatchLogHandler(this, batch));
         }
diff --git a/docs/UserGuide/Monitor-Alert/Metric-Tool.md 
b/docs/UserGuide/Monitor-Alert/Metric-Tool.md
index d1fa1c5ae6..f800f84e08 100644
--- a/docs/UserGuide/Monitor-Alert/Metric-Tool.md
+++ b/docs/UserGuide/Monitor-Alert/Metric-Tool.md
@@ -171,6 +171,8 @@ carefully evaluated. The current Core-level metrics are as 
follows:
 | mutli_leader | name="logDispatcher-{{IP}}:{{Port}}", region="{{region}}", 
type="cachedRequestInMemoryQueue" | AutoGauge | The size of cache requests of 
synchronization thread in replica group |
 | mutli_leader | name="IoTConsensusServerImpl", region="{{region}}", 
type="searchIndex"                       | AutoGauge | The write process of 
main process in replica group                    |
 | mutli_leader | name="IoTConsensusServerImpl", region="{{region}}", 
type="safeIndex"                         | AutoGauge | The sync index of 
replica group                                       |
+| mutli_leader | name="IoTConsensusServerImpl", region="{{region}}", 
type="LogEntriesFromWAL"                 | AutoGauge | The number of logEntries 
from wal in Batch                            |
+| mutli_leader | name="IoTConsensusServerImpl", region="{{region}}", 
type="LogEntriesFromQueue"               | AutoGauge | The number of logEntries 
from queue in Batch                          |
 | stage        | name="iot_consensus", region="{{region}}", 
type="getStateMachineLock"                        | Histogram | The time 
consumed to get statemachine lock in main process            |
 | stage        | name="iot_consensus", region="{{region}}", 
type="checkingBeforeWrite"                        | Histogram | The time 
consumed to precheck before write in main process            |
 | stage        | name="iot_consensus", region="{{region}}", 
type="writeStateMachine"                          | Histogram | The time 
consumed to write statemachine in main process               |
diff --git a/docs/zh/UserGuide/Monitor-Alert/Metric-Tool.md 
b/docs/zh/UserGuide/Monitor-Alert/Metric-Tool.md
index 2934caf054..03190d0d7f 100644
--- a/docs/zh/UserGuide/Monitor-Alert/Metric-Tool.md
+++ b/docs/zh/UserGuide/Monitor-Alert/Metric-Tool.md
@@ -145,19 +145,21 @@ Core 级别的监控指标在系统运行中默认开启,每一个 Core 级别
 
 #### 4.2.3. IoT共识协议统计
 
-| Metric        | Tags                                                         
                                | Type      | Description      |
-|---------------|----------------------------------------------------------------------------------------------|-----------|------------------|
-| iot_consensus | name="logDispatcher-{{IP}}:{{Port}}", region="{{region}}", 
type="currentSyncIndex"           | AutoGauge | 副本组同步线程的当前同步进度   |
-| iot_consensus | name="logDispatcher-{{IP}}:{{Port}}", region="{{region}}", 
type="cachedRequestInMemoryQueue" | AutoGauge | 副本组同步线程缓存队列请求总大小 |
-| iot_consensus | name="IoTConsensusServerImpl", region="{{region}}", 
type="searchIndex"                       | AutoGauge | 副本组主流程写入进度       |
-| iot_consensus | name="IoTConsensusServerImpl", region="{{region}}", 
type="safeIndex"                         | AutoGauge | 副本组同步进度          |
-| stage         | name="iot_consensus", region="{{region}}", 
type="getStateMachineLock"                        | Histogram | 主流程获取状态机锁耗时     
 |
-| stage         | name="iot_consensus", region="{{region}}", 
type="checkingBeforeWrite"                        | Histogram | 主流程写入状态机检查耗时    
 |
-| stage         | name="iot_consensus", region="{{region}}", 
type="writeStateMachine"                          | Histogram | 主流程写入状态机耗时      
 |
-| stage         | name="iot_consensus", region="{{region}}", 
type="offerRequestToQueue"                        | Histogram | 主流程尝试添加队列耗时     
 |
-| stage         | name="iot_consensus", region="{{region}}", 
type="consensusWrite"                             | Histogram | 主流程全写入耗时        
 |
-| stage         | name="iot_consensus", region="{{region}}", 
type="constructBatch"                             | Histogram | 同步线程构造 Batch 耗时 
 |
-| stage         | name="iot_consensus", region="{{region}}", 
type="syncLogTimePerRequest"                      | Histogram | 异步回调流程同步日志耗时    
 |
+| Metric        | Tags                                                         
                                | Type      | Description          |
+|---------------|----------------------------------------------------------------------------------------------|-----------|----------------------|
+| iot_consensus | name="logDispatcher-{{IP}}:{{Port}}", region="{{region}}", 
type="currentSyncIndex"           | AutoGauge | 副本组同步线程的当前同步进度       |
+| iot_consensus | name="logDispatcher-{{IP}}:{{Port}}", region="{{region}}", 
type="cachedRequestInMemoryQueue" | AutoGauge | 副本组同步线程缓存队列请求总大小     |
+| iot_consensus | name="IoTConsensusServerImpl", region="{{region}}", 
type="searchIndex"                       | AutoGauge | 副本组主流程写入进度           |
+| iot_consensus | name="IoTConsensusServerImpl", region="{{region}}", 
type="safeIndex"                         | AutoGauge | 副本组同步进度              |
+| iot_consensus | name="IoTConsensusServerImpl", region="{{region}}", 
type="LogEntriesFromWAL"                 | AutoGauge | 副本组Batch中来自WAL的日志项数量 |
+| iot_consensus | name="IoTConsensusServerImpl", region="{{region}}", 
type="LogEntriesFromQueue"               | AutoGauge | 副本组Batch中来自队列的日志项数量  |
+| stage         | name="iot_consensus", region="{{region}}", 
type="getStateMachineLock"                        | Histogram | 主流程获取状态机锁耗时     
     |
+| stage         | name="iot_consensus", region="{{region}}", 
type="checkingBeforeWrite"                        | Histogram | 主流程写入状态机检查耗时    
     |
+| stage         | name="iot_consensus", region="{{region}}", 
type="writeStateMachine"                          | Histogram | 主流程写入状态机耗时      
     |
+| stage         | name="iot_consensus", region="{{region}}", 
type="offerRequestToQueue"                        | Histogram | 主流程尝试添加队列耗时     
     |
+| stage         | name="iot_consensus", region="{{region}}", 
type="consensusWrite"                             | Histogram | 主流程全写入耗时        
     |
+| stage         | name="iot_consensus", region="{{region}}", 
type="constructBatch"                             | Histogram | 同步线程构造 Batch 耗时 
     |
+| stage         | name="iot_consensus", region="{{region}}", 
type="syncLogTimePerRequest"                      | Histogram | 异步回调流程同步日志耗时    
     |
 
 #### 4.2.4. 缓存统计
 

Reply via email to