This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 0ac0fb57e8 [IOTDB-5502] Source counters for construct batches in
IoTConsensus (#9023)
0ac0fb57e8 is described below
commit 0ac0fb57e867f4fbfee8b8fd8e54e64103f020f8
Author: Xiangpeng Hu <[email protected]>
AuthorDate: Sat Feb 11 20:11:27 2023 -0800
[IOTDB-5502] Source counters for construct batches in IoTConsensus (#9023)
---
.../consensus/iot/IoTConsensusServerImpl.java | 8 ++++
.../consensus/iot/IoTConsensusServerMetrics.java | 44 ++++++++++++++++++++++
.../iotdb/consensus/iot/logdispatcher/Batch.java | 9 +++++
.../consensus/iot/logdispatcher/LogDispatcher.java | 15 ++++++++
docs/UserGuide/Monitor-Alert/Metric-Tool.md | 2 +
docs/zh/UserGuide/Monitor-Alert/Metric-Tool.md | 28 +++++++-------
6 files changed, 93 insertions(+), 13 deletions(-)
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
index e24478179a..e903289177 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
@@ -726,6 +726,14 @@ public class IoTConsensusServerImpl {
return config;
}
+ public long getLogEntriesFromWAL() {
+ return logDispatcher.getLogEntriesFromWAL();
+ }
+
+ public long getLogEntriesFromQueue() {
+ return logDispatcher.getLogEntriesFromQueue();
+ }
+
public boolean needBlockWrite() {
return reader.getTotalSize() >
config.getReplication().getWalThrottleThreshold();
}
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerMetrics.java
b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerMetrics.java
index 1e36e20b70..82814d071f 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerMetrics.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerMetrics.java
@@ -60,6 +60,30 @@ public class IoTConsensusServerMetrics implements IMetricSet
{
impl.getThisNode().getGroupId().toString(),
Tag.TYPE.toString(),
"safeIndex");
+ MetricService.getInstance()
+ .createAutoGauge(
+ Metric.IOT_CONSENSUS.toString(),
+ MetricLevel.IMPORTANT,
+ impl,
+ IoTConsensusServerImpl::getLogEntriesFromWAL,
+ Tag.NAME.toString(),
+ "ioTConsensusServerImpl",
+ Tag.REGION.toString(),
+ impl.getThisNode().getGroupId().toString(),
+ Tag.TYPE.toString(),
+ "LogEntriesFromWAL");
+ MetricService.getInstance()
+ .createAutoGauge(
+ Metric.IOT_CONSENSUS.toString(),
+ MetricLevel.IMPORTANT,
+ impl,
+ IoTConsensusServerImpl::getLogEntriesFromQueue,
+ Tag.NAME.toString(),
+ "ioTConsensusServerImpl",
+ Tag.REGION.toString(),
+ impl.getThisNode().getGroupId().toString(),
+ Tag.TYPE.toString(),
+ "LogEntriesFromQueue");
}
@Override
@@ -84,5 +108,25 @@ public class IoTConsensusServerMetrics implements
IMetricSet {
impl.getThisNode().getGroupId().toString(),
Tag.TYPE.toString(),
"safeIndex");
+ MetricService.getInstance()
+ .remove(
+ MetricType.AUTO_GAUGE,
+ Metric.IOT_CONSENSUS.toString(),
+ Tag.NAME.toString(),
+ "ioTConsensusServerImpl",
+ Tag.REGION.toString(),
+ impl.getThisNode().getGroupId().toString(),
+ Tag.TYPE.toString(),
+ "LogEntriesFromWAL");
+ MetricService.getInstance()
+ .remove(
+ MetricType.AUTO_GAUGE,
+ Metric.IOT_CONSENSUS.toString(),
+ Tag.NAME.toString(),
+ "ioTConsensusServerImpl",
+ Tag.REGION.toString(),
+ impl.getThisNode().getGroupId().toString(),
+ Tag.TYPE.toString(),
+ "LogEntriesFromQueue");
}
}
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/Batch.java
b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/Batch.java
index 731e00951b..7a556c85a0 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/Batch.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/Batch.java
@@ -35,6 +35,8 @@ public class Batch {
private final List<TLogEntry> logEntries = new ArrayList<>();
+ private long logEntriesNumFromWAL = 0L;
+
private long serializedSize;
// indicates whether this batch has been successfully synchronized to
another node
private boolean synced;
@@ -55,6 +57,9 @@ public class Batch {
public void addTLogEntry(TLogEntry entry) {
logEntries.add(entry);
+ if (entry.fromWAL) {
+ logEntriesNumFromWAL++;
+ }
// TODO Maybe we need to add in additional fields for more accurate
calculations
serializedSize +=
entry.getData() == null ? 0 :
entry.getData().stream().mapToInt(Buffer::capacity).sum();
@@ -93,6 +98,10 @@ public class Batch {
return serializedSize;
}
+ public long getLogEntriesNumFromWAL() {
+ return logEntriesNumFromWAL;
+ }
+
@Override
public String toString() {
return "Batch{"
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
index 883952275a..cfc480a8cb 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
@@ -50,6 +50,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
/** Manage all asynchronous replication threads and corresponding async
clients */
@@ -65,6 +66,9 @@ public class LogDispatcher {
private boolean stopped = false;
+ private final AtomicLong logEntriesFromWAL = new AtomicLong(0);
+ private final AtomicLong logEntriesFromQueue = new AtomicLong(0);
+
public LogDispatcher(
IoTConsensusServerImpl impl,
IClientManager<TEndPoint, AsyncIoTConsensusServiceClient> clientManager)
{
@@ -171,6 +175,14 @@ public class LogDispatcher {
}
}
+ public long getLogEntriesFromWAL() {
+ return logEntriesFromWAL.get();
+ }
+
+ public long getLogEntriesFromQueue() {
+ return logEntriesFromQueue.get();
+ }
+
public class LogDispatcherThread implements Runnable {
private static final long PENDING_REQUEST_TAKING_TIME_OUT_IN_SEC = 10;
@@ -317,6 +329,9 @@ public class LogDispatcher {
.update((System.currentTimeMillis() - startTime) /
batch.getLogEntries().size());
// we may block here if the synchronization pipeline is full
syncStatus.addNextBatch(batch);
+ logEntriesFromWAL.addAndGet(batch.getLogEntriesNumFromWAL());
+ logEntriesFromQueue.addAndGet(
+ batch.getLogEntries().size() - batch.getLogEntriesNumFromWAL());
// sends batch asynchronously and migrates the retry logic into the
callback handler
sendBatchAsync(batch, new DispatchLogHandler(this, batch));
}
diff --git a/docs/UserGuide/Monitor-Alert/Metric-Tool.md
b/docs/UserGuide/Monitor-Alert/Metric-Tool.md
index d1fa1c5ae6..f800f84e08 100644
--- a/docs/UserGuide/Monitor-Alert/Metric-Tool.md
+++ b/docs/UserGuide/Monitor-Alert/Metric-Tool.md
@@ -171,6 +171,8 @@ carefully evaluated. The current Core-level metrics are as
follows:
| mutli_leader | name="logDispatcher-{{IP}}:{{Port}}", region="{{region}}",
type="cachedRequestInMemoryQueue" | AutoGauge | The size of cache requests of
synchronization thread in replica group |
| mutli_leader | name="IoTConsensusServerImpl", region="{{region}}",
type="searchIndex" | AutoGauge | The write process of
main process in replica group |
| mutli_leader | name="IoTConsensusServerImpl", region="{{region}}",
type="safeIndex" | AutoGauge | The sync index of
replica group |
+| mutli_leader | name="IoTConsensusServerImpl", region="{{region}}",
type="LogEntriesFromWAL" | AutoGauge | The number of logEntries
from wal in Batch |
+| mutli_leader | name="IoTConsensusServerImpl", region="{{region}}",
type="LogEntriesFromQueue" | AutoGauge | The number of logEntries
from queue in Batch |
| stage | name="iot_consensus", region="{{region}}",
type="getStateMachineLock" | Histogram | The time
consumed to get statemachine lock in main process |
| stage | name="iot_consensus", region="{{region}}",
type="checkingBeforeWrite" | Histogram | The time
consumed to precheck before write in main process |
| stage | name="iot_consensus", region="{{region}}",
type="writeStateMachine" | Histogram | The time
consumed to write statemachine in main process |
diff --git a/docs/zh/UserGuide/Monitor-Alert/Metric-Tool.md
b/docs/zh/UserGuide/Monitor-Alert/Metric-Tool.md
index 2934caf054..03190d0d7f 100644
--- a/docs/zh/UserGuide/Monitor-Alert/Metric-Tool.md
+++ b/docs/zh/UserGuide/Monitor-Alert/Metric-Tool.md
@@ -145,19 +145,21 @@ Core 级别的监控指标在系统运行中默认开启,每一个 Core 级别
#### 4.2.3. IoT共识协议统计
-| Metric | Tags
| Type | Description |
-|---------------|----------------------------------------------------------------------------------------------|-----------|------------------|
-| iot_consensus | name="logDispatcher-{{IP}}:{{Port}}", region="{{region}}",
type="currentSyncIndex" | AutoGauge | 副本组同步线程的当前同步进度 |
-| iot_consensus | name="logDispatcher-{{IP}}:{{Port}}", region="{{region}}",
type="cachedRequestInMemoryQueue" | AutoGauge | 副本组同步线程缓存队列请求总大小 |
-| iot_consensus | name="IoTConsensusServerImpl", region="{{region}}",
type="searchIndex" | AutoGauge | 副本组主流程写入进度 |
-| iot_consensus | name="IoTConsensusServerImpl", region="{{region}}",
type="safeIndex" | AutoGauge | 副本组同步进度 |
-| stage | name="iot_consensus", region="{{region}}",
type="getStateMachineLock" | Histogram | 主流程获取状态机锁耗时
|
-| stage | name="iot_consensus", region="{{region}}",
type="checkingBeforeWrite" | Histogram | 主流程写入状态机检查耗时
|
-| stage | name="iot_consensus", region="{{region}}",
type="writeStateMachine" | Histogram | 主流程写入状态机耗时
|
-| stage | name="iot_consensus", region="{{region}}",
type="offerRequestToQueue" | Histogram | 主流程尝试添加队列耗时
|
-| stage | name="iot_consensus", region="{{region}}",
type="consensusWrite" | Histogram | 主流程全写入耗时
|
-| stage | name="iot_consensus", region="{{region}}",
type="constructBatch" | Histogram | 同步线程构造 Batch 耗时
|
-| stage | name="iot_consensus", region="{{region}}",
type="syncLogTimePerRequest" | Histogram | 异步回调流程同步日志耗时
|
+| Metric | Tags
| Type | Description |
+|---------------|----------------------------------------------------------------------------------------------|-----------|----------------------|
+| iot_consensus | name="logDispatcher-{{IP}}:{{Port}}", region="{{region}}",
type="currentSyncIndex" | AutoGauge | 副本组同步线程的当前同步进度 |
+| iot_consensus | name="logDispatcher-{{IP}}:{{Port}}", region="{{region}}",
type="cachedRequestInMemoryQueue" | AutoGauge | 副本组同步线程缓存队列请求总大小 |
+| iot_consensus | name="IoTConsensusServerImpl", region="{{region}}",
type="searchIndex" | AutoGauge | 副本组主流程写入进度 |
+| iot_consensus | name="IoTConsensusServerImpl", region="{{region}}",
type="safeIndex" | AutoGauge | 副本组同步进度 |
+| iot_consensus | name="IoTConsensusServerImpl", region="{{region}}",
type="LogEntriesFromWAL" | AutoGauge | 副本组Batch中来自WAL的日志项数量 |
+| iot_consensus | name="IoTConsensusServerImpl", region="{{region}}",
type="LogEntriesFromQueue" | AutoGauge | 副本组Batch中来自队列的日志项数量 |
+| stage | name="iot_consensus", region="{{region}}",
type="getStateMachineLock" | Histogram | 主流程获取状态机锁耗时
|
+| stage | name="iot_consensus", region="{{region}}",
type="checkingBeforeWrite" | Histogram | 主流程写入状态机检查耗时
|
+| stage | name="iot_consensus", region="{{region}}",
type="writeStateMachine" | Histogram | 主流程写入状态机耗时
|
+| stage | name="iot_consensus", region="{{region}}",
type="offerRequestToQueue" | Histogram | 主流程尝试添加队列耗时
|
+| stage | name="iot_consensus", region="{{region}}",
type="consensusWrite" | Histogram | 主流程全写入耗时
|
+| stage | name="iot_consensus", region="{{region}}",
type="constructBatch" | Histogram | 同步线程构造 Batch 耗时
|
+| stage | name="iot_consensus", region="{{region}}",
type="syncLogTimePerRequest" | Histogram | 异步回调流程同步日志耗时
|
#### 4.2.4. 缓存统计