This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new cc5ed977507 Pipe: refined key set of tags for pipe metrics & fixed hit
rate in PipeWALInsertNodeCacheMetrics always `1.0`(#11484)
cc5ed977507 is described below
commit cc5ed977507c3eb0aa5d51cbeadd44c71cf9744f
Author: V_Galaxy <[email protected]>
AuthorDate: Tue Nov 7 14:19:20 2023 +0800
Pipe: refined key set of tags for pipe metrics & fixed hit rate in
PipeWALInsertNodeCacheMetrics always `1.0`(#11484)
To provide more detailed legend information to Grafana and enable
aggregation by pipe name or data region, this PR modifies some of the tag key
sets for pipe metrics:
- before: `{name (TaskID)}`
- after this PR: `{Name (PipeName / AttributeSortedString), CreationTime,
Index, DataRegion}`
Additionally, it fixes the issue where the hit rate in
PipeWALInsertNodeCacheMetrics is always `1.0`.
------
* chore: refined set of tags for pipe metrics
* fix: enable lruCache stats
---
.../pipe/extractor/IoTDBDataRegionExtractor.java | 21 +++-
.../iotdb/db/pipe/metric/PipeConnectorMetrics.java | 86 +++++++++++++---
.../iotdb/db/pipe/metric/PipeExtractorMetrics.java | 114 +++++++++++++++++----
.../iotdb/db/pipe/metric/PipeProcessorMetrics.java | 86 +++++++++++++---
.../db/pipe/task/stage/PipeTaskProcessorStage.java | 3 +
.../iotdb/db/pipe/task/subtask/PipeSubtask.java | 10 +-
.../subtask/connector/PipeConnectorSubtask.java | 43 +++++---
.../connector/PipeConnectorSubtaskManager.java | 10 +-
.../subtask/processor/PipeProcessorSubtask.java | 21 +++-
.../dataregion/wal/utils/WALInsertNodeCache.java | 1 +
.../PipeConnectorSubtaskExecutorTest.java | 3 +
.../PipeProcessorSubtaskExecutorTest.java | 3 +
.../iotdb/commons/service/metric/enums/Tag.java | 4 +-
13 files changed, 336 insertions(+), 69 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
index 15f6696118d..b5c6aea4f15 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
@@ -72,7 +72,10 @@ public class IoTDBDataRegionExtractor implements
PipeExtractor {
private PipeHistoricalDataRegionExtractor historicalExtractor;
private PipeRealtimeDataRegionExtractor realtimeExtractor;
+ // Record these variables to provide corresponding value to tag key of
monitoring metrics
private String taskID;
+ private String pipeName;
+ private long creationTime;
private int dataRegionId;
public IoTDBDataRegionExtractor() {
@@ -182,8 +185,8 @@ public class IoTDBDataRegionExtractor implements
PipeExtractor {
throws Exception {
dataRegionId =
((PipeTaskExtractorRuntimeEnvironment)
configuration.getRuntimeEnvironment()).getRegionId();
- String pipeName = configuration.getRuntimeEnvironment().getPipeName();
- long creationTime =
configuration.getRuntimeEnvironment().getCreationTime();
+ pipeName = configuration.getRuntimeEnvironment().getPipeName();
+ creationTime = configuration.getRuntimeEnvironment().getCreationTime();
taskID = pipeName + "_" + dataRegionId + "_" + creationTime;
historicalExtractor.customize(parameters, configuration);
@@ -282,10 +285,24 @@ public class IoTDBDataRegionExtractor implements
PipeExtractor {
PipeExtractorMetrics.getInstance().deregister(taskID);
}
+ //////////////////////////// APIs provided for metric framework
////////////////////////////
+
public String getTaskID() {
return taskID;
}
+ public String getPipeName() {
+ return pipeName;
+ }
+
+ public int getDataRegionId() {
+ return dataRegionId;
+ }
+
+ public long getCreationTime() {
+ return creationTime;
+ }
+
public int getHistoricalTsFileInsertionEventCount() {
return hasBeenStarted.get() ? historicalExtractor.getPendingQueueSize() :
0;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeConnectorMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeConnectorMetrics.java
index f2dde61092c..fff2bcae55a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeConnectorMetrics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeConnectorMetrics.java
@@ -70,51 +70,79 @@ public class PipeConnectorMetrics implements IMetricSet {
}
private void createAutoGauge(String taskID) {
+ PipeConnectorSubtask connector = connectorMap.get(taskID);
+ // pending event count
metricService.createAutoGauge(
Metric.UNTRANSFERRED_TABLET_COUNT.toString(),
MetricLevel.IMPORTANT,
- connectorMap.get(taskID),
+ connector,
PipeConnectorSubtask::getTabletInsertionEventCount,
Tag.NAME.toString(),
- taskID);
+ connector.getAttributeSortedString(),
+ Tag.INDEX.toString(),
+ String.valueOf(connector.getConnectorIndex()),
+ Tag.CREATION_TIME.toString(),
+ String.valueOf(connector.getCreationTime()));
metricService.createAutoGauge(
Metric.UNTRANSFERRED_TSFILE_COUNT.toString(),
MetricLevel.IMPORTANT,
- connectorMap.get(taskID),
+ connector,
PipeConnectorSubtask::getTsFileInsertionEventCount,
Tag.NAME.toString(),
- taskID);
+ connector.getAttributeSortedString(),
+ Tag.INDEX.toString(),
+ String.valueOf(connector.getConnectorIndex()),
+ Tag.CREATION_TIME.toString(),
+ String.valueOf(connector.getCreationTime()));
metricService.createAutoGauge(
Metric.UNTRANSFERRED_HEARTBEAT_COUNT.toString(),
MetricLevel.IMPORTANT,
- connectorMap.get(taskID),
+ connector,
PipeConnectorSubtask::getPipeHeartbeatEventCount,
Tag.NAME.toString(),
- taskID);
+ connector.getAttributeSortedString(),
+ Tag.INDEX.toString(),
+ String.valueOf(connector.getConnectorIndex()),
+ Tag.CREATION_TIME.toString(),
+ String.valueOf(connector.getCreationTime()));
}
private void createRate(String taskID) {
+ PipeConnectorSubtask connector = connectorMap.get(taskID);
+ // transfer event rate
tabletRateMap.put(
taskID,
metricService.getOrCreateRate(
Metric.PIPE_CONNECTOR_TABLET_TRANSFER.toString(),
MetricLevel.IMPORTANT,
Tag.NAME.toString(),
- taskID));
+ connector.getAttributeSortedString(),
+ Tag.INDEX.toString(),
+ String.valueOf(connector.getConnectorIndex()),
+ Tag.CREATION_TIME.toString(),
+ String.valueOf(connector.getCreationTime())));
tsFileRateMap.put(
taskID,
metricService.getOrCreateRate(
Metric.PIPE_CONNECTOR_TSFILE_TRANSFER.toString(),
MetricLevel.IMPORTANT,
Tag.NAME.toString(),
- taskID));
+ connector.getAttributeSortedString(),
+ Tag.INDEX.toString(),
+ String.valueOf(connector.getConnectorIndex()),
+ Tag.CREATION_TIME.toString(),
+ String.valueOf(connector.getCreationTime())));
pipeHeartbeatRateMap.put(
taskID,
metricService.getOrCreateRate(
Metric.PIPE_CONNECTOR_HEARTBEAT_TRANSFER.toString(),
MetricLevel.IMPORTANT,
Tag.NAME.toString(),
- taskID));
+ connector.getAttributeSortedString(),
+ Tag.INDEX.toString(),
+ String.valueOf(connector.getConnectorIndex()),
+ Tag.CREATION_TIME.toString(),
+ String.valueOf(connector.getCreationTime())));
}
@Override
@@ -134,39 +162,67 @@ public class PipeConnectorMetrics implements IMetricSet {
}
private void removeAutoGauge(String taskID) {
+ PipeConnectorSubtask connector = connectorMap.get(taskID);
+ // pending event count
metricService.remove(
MetricType.AUTO_GAUGE,
Metric.UNTRANSFERRED_TABLET_COUNT.toString(),
Tag.NAME.toString(),
- taskID);
+ connector.getAttributeSortedString(),
+ Tag.INDEX.toString(),
+ String.valueOf(connector.getConnectorIndex()),
+ Tag.CREATION_TIME.toString(),
+ String.valueOf(connector.getCreationTime()));
metricService.remove(
MetricType.AUTO_GAUGE,
Metric.UNTRANSFERRED_TSFILE_COUNT.toString(),
Tag.NAME.toString(),
- taskID);
+ connector.getAttributeSortedString(),
+ Tag.INDEX.toString(),
+ String.valueOf(connector.getConnectorIndex()),
+ Tag.CREATION_TIME.toString(),
+ String.valueOf(connector.getCreationTime()));
metricService.remove(
MetricType.AUTO_GAUGE,
Metric.UNTRANSFERRED_HEARTBEAT_COUNT.toString(),
Tag.NAME.toString(),
- taskID);
+ connector.getAttributeSortedString(),
+ Tag.INDEX.toString(),
+ String.valueOf(connector.getConnectorIndex()),
+ Tag.CREATION_TIME.toString(),
+ String.valueOf(connector.getCreationTime()));
}
private void removeRate(String taskID) {
+ PipeConnectorSubtask connector = connectorMap.get(taskID);
+ // transfer event rate
metricService.remove(
MetricType.RATE,
Metric.PIPE_CONNECTOR_TABLET_TRANSFER.toString(),
Tag.NAME.toString(),
- taskID);
+ connector.getAttributeSortedString(),
+ Tag.INDEX.toString(),
+ String.valueOf(connector.getConnectorIndex()),
+ Tag.CREATION_TIME.toString(),
+ String.valueOf(connector.getCreationTime()));
metricService.remove(
MetricType.RATE,
Metric.PIPE_CONNECTOR_TSFILE_TRANSFER.toString(),
Tag.NAME.toString(),
- taskID);
+ connector.getAttributeSortedString(),
+ Tag.INDEX.toString(),
+ String.valueOf(connector.getConnectorIndex()),
+ Tag.CREATION_TIME.toString(),
+ String.valueOf(connector.getCreationTime()));
metricService.remove(
MetricType.RATE,
Metric.PIPE_CONNECTOR_HEARTBEAT_TRANSFER.toString(),
Tag.NAME.toString(),
- taskID);
+ connector.getAttributeSortedString(),
+ Tag.INDEX.toString(),
+ String.valueOf(connector.getConnectorIndex()),
+ Tag.CREATION_TIME.toString(),
+ String.valueOf(connector.getCreationTime()));
tabletRateMap.remove(taskID);
tsFileRateMap.remove(taskID);
pipeHeartbeatRateMap.remove(taskID);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeExtractorMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeExtractorMetrics.java
index b7f5738b152..f56672c7d6d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeExtractorMetrics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeExtractorMetrics.java
@@ -75,69 +75,106 @@ public class PipeExtractorMetrics implements IMetricSet {
}
private void createAutoGauge(String taskID) {
+ IoTDBDataRegionExtractor extractor = extractorMap.get(taskID);
// pending event count
metricService.createAutoGauge(
Metric.UNPROCESSED_HISTORICAL_TSFILE_COUNT.toString(),
MetricLevel.IMPORTANT,
- extractorMap.get(taskID),
+ extractor,
IoTDBDataRegionExtractor::getHistoricalTsFileInsertionEventCount,
Tag.NAME.toString(),
- taskID);
+ extractor.getPipeName(),
+ Tag.REGION.toString(),
+ String.valueOf(extractor.getDataRegionId()),
+ Tag.CREATION_TIME.toString(),
+ String.valueOf(extractor.getCreationTime()));
metricService.createAutoGauge(
Metric.UNPROCESSED_REALTIME_TSFILE_COUNT.toString(),
MetricLevel.IMPORTANT,
- extractorMap.get(taskID),
+ extractor,
IoTDBDataRegionExtractor::getRealtimeTsFileInsertionEventCount,
Tag.NAME.toString(),
- taskID);
+ extractor.getPipeName(),
+ Tag.REGION.toString(),
+ String.valueOf(extractor.getDataRegionId()),
+ Tag.CREATION_TIME.toString(),
+ String.valueOf(extractor.getCreationTime()));
metricService.createAutoGauge(
Metric.UNPROCESSED_TABLET_COUNT.toString(),
MetricLevel.IMPORTANT,
- extractorMap.get(taskID),
+ extractor,
IoTDBDataRegionExtractor::getTabletInsertionEventCount,
Tag.NAME.toString(),
- taskID);
+ extractor.getPipeName(),
+ Tag.REGION.toString(),
+ String.valueOf(extractor.getDataRegionId()),
+ Tag.CREATION_TIME.toString(),
+ String.valueOf(extractor.getCreationTime()));
metricService.createAutoGauge(
Metric.UNPROCESSED_HEARTBEAT_COUNT.toString(),
MetricLevel.IMPORTANT,
- extractorMap.get(taskID),
+ extractor,
IoTDBDataRegionExtractor::getPipeHeartbeatEventCount,
Tag.NAME.toString(),
- taskID);
+ extractor.getPipeName(),
+ Tag.REGION.toString(),
+ String.valueOf(extractor.getDataRegionId()),
+ Tag.CREATION_TIME.toString(),
+ String.valueOf(extractor.getCreationTime()));
}
private void createRate(String taskID) {
+ IoTDBDataRegionExtractor extractor = extractorMap.get(taskID);
+ // supply event rate
tabletRateMap.put(
taskID,
metricService.getOrCreateRate(
Metric.PIPE_EXTRACTOR_TABLET_SUPPLY.toString(),
MetricLevel.IMPORTANT,
Tag.NAME.toString(),
- taskID));
+ extractor.getPipeName(),
+ Tag.REGION.toString(),
+ String.valueOf(extractor.getDataRegionId()),
+ Tag.CREATION_TIME.toString(),
+ String.valueOf(extractor.getCreationTime())));
tsFileRateMap.put(
taskID,
metricService.getOrCreateRate(
Metric.PIPE_EXTRACTOR_TSFILE_SUPPLY.toString(),
MetricLevel.IMPORTANT,
Tag.NAME.toString(),
- taskID));
+ extractor.getPipeName(),
+ Tag.REGION.toString(),
+ String.valueOf(extractor.getDataRegionId()),
+ Tag.CREATION_TIME.toString(),
+ String.valueOf(extractor.getCreationTime())));
pipeHeartbeatRateMap.put(
taskID,
metricService.getOrCreateRate(
Metric.PIPE_EXTRACTOR_HEARTBEAT_SUPPLY.toString(),
MetricLevel.IMPORTANT,
Tag.NAME.toString(),
- taskID));
+ extractor.getPipeName(),
+ Tag.REGION.toString(),
+ String.valueOf(extractor.getDataRegionId()),
+ Tag.CREATION_TIME.toString(),
+ String.valueOf(extractor.getCreationTime())));
}
private void createGauge(String taskID) {
+ IoTDBDataRegionExtractor extractor = extractorMap.get(taskID);
+ // tsfile epoch state
recentProcessedTsFileEpochStateMap.put(
taskID,
metricService.getOrCreateGauge(
Metric.PIPE_EXTRACTOR_TSFILE_EPOCH_STATE.toString(),
MetricLevel.IMPORTANT,
Tag.NAME.toString(),
- taskID));
+ extractor.getPipeName(),
+ Tag.REGION.toString(),
+ String.valueOf(extractor.getDataRegionId()),
+ Tag.CREATION_TIME.toString(),
+ String.valueOf(extractor.getCreationTime())));
}
@Override
@@ -158,56 +195,93 @@ public class PipeExtractorMetrics implements IMetricSet {
}
private void removeAutoGauge(String taskID) {
+ IoTDBDataRegionExtractor extractor = extractorMap.get(taskID);
// pending event count
metricService.remove(
MetricType.AUTO_GAUGE,
Metric.UNPROCESSED_HISTORICAL_TSFILE_COUNT.toString(),
Tag.NAME.toString(),
- taskID);
+ extractor.getPipeName(),
+ Tag.REGION.toString(),
+ String.valueOf(extractor.getDataRegionId()),
+ Tag.CREATION_TIME.toString(),
+ String.valueOf(extractor.getCreationTime()));
metricService.remove(
MetricType.AUTO_GAUGE,
Metric.UNPROCESSED_REALTIME_TSFILE_COUNT.toString(),
Tag.NAME.toString(),
- taskID);
+ extractor.getPipeName(),
+ Tag.REGION.toString(),
+ String.valueOf(extractor.getDataRegionId()),
+ Tag.CREATION_TIME.toString(),
+ String.valueOf(extractor.getCreationTime()));
metricService.remove(
MetricType.AUTO_GAUGE,
Metric.UNPROCESSED_TABLET_COUNT.toString(),
Tag.NAME.toString(),
- taskID);
+ extractor.getPipeName(),
+ Tag.REGION.toString(),
+ String.valueOf(extractor.getDataRegionId()),
+ Tag.CREATION_TIME.toString(),
+ String.valueOf(extractor.getCreationTime()));
metricService.remove(
MetricType.AUTO_GAUGE,
Metric.UNPROCESSED_HEARTBEAT_COUNT.toString(),
Tag.NAME.toString(),
- taskID);
+ extractor.getPipeName(),
+ Tag.REGION.toString(),
+ String.valueOf(extractor.getDataRegionId()),
+ Tag.CREATION_TIME.toString(),
+ String.valueOf(extractor.getCreationTime()));
}
private void removeRate(String taskID) {
+ IoTDBDataRegionExtractor extractor = extractorMap.get(taskID);
+ // supply event rate
metricService.remove(
MetricType.RATE,
Metric.PIPE_EXTRACTOR_TABLET_SUPPLY.toString(),
Tag.NAME.toString(),
- taskID);
+ extractor.getPipeName(),
+ Tag.REGION.toString(),
+ String.valueOf(extractor.getDataRegionId()),
+ Tag.CREATION_TIME.toString(),
+ String.valueOf(extractor.getCreationTime()));
metricService.remove(
MetricType.RATE,
Metric.PIPE_EXTRACTOR_TSFILE_SUPPLY.toString(),
Tag.NAME.toString(),
- taskID);
+ extractor.getPipeName(),
+ Tag.REGION.toString(),
+ String.valueOf(extractor.getDataRegionId()),
+ Tag.CREATION_TIME.toString(),
+ String.valueOf(extractor.getCreationTime()));
metricService.remove(
MetricType.RATE,
Metric.PIPE_EXTRACTOR_HEARTBEAT_SUPPLY.toString(),
Tag.NAME.toString(),
- taskID);
+ extractor.getPipeName(),
+ Tag.REGION.toString(),
+ String.valueOf(extractor.getDataRegionId()),
+ Tag.CREATION_TIME.toString(),
+ String.valueOf(extractor.getCreationTime()));
tabletRateMap.remove(taskID);
tsFileRateMap.remove(taskID);
pipeHeartbeatRateMap.remove(taskID);
}
private void removeGauge(String taskID) {
+ IoTDBDataRegionExtractor extractor = extractorMap.get(taskID);
+ // tsfile epoch state
metricService.remove(
MetricType.GAUGE,
Metric.PIPE_EXTRACTOR_TSFILE_EPOCH_STATE.toString(),
Tag.NAME.toString(),
- taskID);
+ extractor.getPipeName(),
+ Tag.REGION.toString(),
+ String.valueOf(extractor.getDataRegionId()),
+ Tag.CREATION_TIME.toString(),
+ String.valueOf(extractor.getCreationTime()));
}
//////////////////////////// register & deregister (pipe integration)
////////////////////////////
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeProcessorMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeProcessorMetrics.java
index 7ca6be329a7..291c3529d8b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeProcessorMetrics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeProcessorMetrics.java
@@ -70,51 +70,79 @@ public class PipeProcessorMetrics implements IMetricSet {
}
private void createAutoGauge(String taskID) {
+ PipeProcessorSubtask processor = processorMap.get(taskID);
+ // pending event count
metricService.createAutoGauge(
Metric.BUFFERED_TABLET_COUNT.toString(),
MetricLevel.IMPORTANT,
- processorMap.get(taskID),
+ processor,
PipeProcessorSubtask::getTabletInsertionEventCount,
Tag.NAME.toString(),
- taskID);
+ processor.getPipeName(),
+ Tag.REGION.toString(),
+ String.valueOf(processor.getDataRegionId()),
+ Tag.CREATION_TIME.toString(),
+ String.valueOf(processor.getCreationTime()));
metricService.createAutoGauge(
Metric.BUFFERED_TSFILE_COUNT.toString(),
MetricLevel.IMPORTANT,
- processorMap.get(taskID),
+ processor,
PipeProcessorSubtask::getTsFileInsertionEventCount,
Tag.NAME.toString(),
- taskID);
+ processor.getPipeName(),
+ Tag.REGION.toString(),
+ String.valueOf(processor.getDataRegionId()),
+ Tag.CREATION_TIME.toString(),
+ String.valueOf(processor.getCreationTime()));
metricService.createAutoGauge(
Metric.BUFFERED_HEARTBEAT_COUNT.toString(),
MetricLevel.IMPORTANT,
- processorMap.get(taskID),
+ processor,
PipeProcessorSubtask::getPipeHeartbeatEventCount,
Tag.NAME.toString(),
- taskID);
+ processor.getPipeName(),
+ Tag.REGION.toString(),
+ String.valueOf(processor.getDataRegionId()),
+ Tag.CREATION_TIME.toString(),
+ String.valueOf(processor.getCreationTime()));
}
private void createRate(String taskID) {
+ PipeProcessorSubtask processor = processorMap.get(taskID);
+ // process event rate
tabletRateMap.put(
taskID,
metricService.getOrCreateRate(
Metric.PIPE_PROCESSOR_TABLET_PROCESS.toString(),
MetricLevel.IMPORTANT,
Tag.NAME.toString(),
- taskID));
+ processor.getPipeName(),
+ Tag.REGION.toString(),
+ String.valueOf(processor.getDataRegionId()),
+ Tag.CREATION_TIME.toString(),
+ String.valueOf(processor.getCreationTime())));
tsFileRateMap.put(
taskID,
metricService.getOrCreateRate(
Metric.PIPE_PROCESSOR_TSFILE_PROCESS.toString(),
MetricLevel.IMPORTANT,
Tag.NAME.toString(),
- taskID));
+ processor.getPipeName(),
+ Tag.REGION.toString(),
+ String.valueOf(processor.getDataRegionId()),
+ Tag.CREATION_TIME.toString(),
+ String.valueOf(processor.getCreationTime())));
pipeHeartbeatRateMap.put(
taskID,
metricService.getOrCreateRate(
Metric.PIPE_PROCESSOR_HEARTBEAT_PROCESS.toString(),
MetricLevel.IMPORTANT,
Tag.NAME.toString(),
- taskID));
+ processor.getPipeName(),
+ Tag.REGION.toString(),
+ String.valueOf(processor.getDataRegionId()),
+ Tag.CREATION_TIME.toString(),
+ String.valueOf(processor.getCreationTime())));
}
@Override
@@ -134,39 +162,67 @@ public class PipeProcessorMetrics implements IMetricSet {
}
private void removeAutoGauge(String taskID) {
+ PipeProcessorSubtask processor = processorMap.get(taskID);
+ // pending event count
metricService.remove(
MetricType.AUTO_GAUGE,
Metric.BUFFERED_TABLET_COUNT.toString(),
Tag.NAME.toString(),
- taskID);
+ processor.getPipeName(),
+ Tag.REGION.toString(),
+ String.valueOf(processor.getDataRegionId()),
+ Tag.CREATION_TIME.toString(),
+ String.valueOf(processor.getCreationTime()));
metricService.remove(
MetricType.AUTO_GAUGE,
Metric.BUFFERED_TSFILE_COUNT.toString(),
Tag.NAME.toString(),
- taskID);
+ processor.getPipeName(),
+ Tag.REGION.toString(),
+ String.valueOf(processor.getDataRegionId()),
+ Tag.CREATION_TIME.toString(),
+ String.valueOf(processor.getCreationTime()));
metricService.remove(
MetricType.AUTO_GAUGE,
Metric.BUFFERED_HEARTBEAT_COUNT.toString(),
Tag.NAME.toString(),
- taskID);
+ processor.getPipeName(),
+ Tag.REGION.toString(),
+ String.valueOf(processor.getDataRegionId()),
+ Tag.CREATION_TIME.toString(),
+ String.valueOf(processor.getCreationTime()));
}
private void removeRate(String taskID) {
+ PipeProcessorSubtask processor = processorMap.get(taskID);
+ // process event rate
metricService.remove(
MetricType.RATE,
Metric.PIPE_PROCESSOR_TABLET_PROCESS.toString(),
Tag.NAME.toString(),
- taskID);
+ processor.getPipeName(),
+ Tag.REGION.toString(),
+ String.valueOf(processor.getDataRegionId()),
+ Tag.CREATION_TIME.toString(),
+ String.valueOf(processor.getCreationTime()));
metricService.remove(
MetricType.RATE,
Metric.PIPE_PROCESSOR_TSFILE_PROCESS.toString(),
Tag.NAME.toString(),
- taskID);
+ processor.getPipeName(),
+ Tag.REGION.toString(),
+ String.valueOf(processor.getDataRegionId()),
+ Tag.CREATION_TIME.toString(),
+ String.valueOf(processor.getCreationTime()));
metricService.remove(
MetricType.RATE,
Metric.PIPE_PROCESSOR_HEARTBEAT_PROCESS.toString(),
Tag.NAME.toString(),
- taskID);
+ processor.getPipeName(),
+ Tag.REGION.toString(),
+ String.valueOf(processor.getDataRegionId()),
+ Tag.CREATION_TIME.toString(),
+ String.valueOf(processor.getCreationTime()));
tabletRateMap.remove(taskID);
tsFileRateMap.remove(taskID);
pipeHeartbeatRateMap.remove(taskID);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
index 4835810dd17..66d48365b04 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
@@ -96,6 +96,9 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
this.pipeProcessorSubtask =
new PipeProcessorSubtask(
taskId,
+ creationTime,
+ pipeName,
+ dataRegionId.getId(),
pipeExtractorInputEventSupplier,
pipeProcessor,
pipeConnectorOutputEventCollector);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
index d4e35c35200..4614a25d8c5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
@@ -46,6 +46,9 @@ public abstract class PipeSubtask
// Used for identifying the subtask
protected final String taskID;
+ // Record these variables to provide corresponding value to tag key of
monitoring metrics
+ protected long creationTime;
+
// For thread pool to execute subtasks
protected ListeningExecutorService subtaskWorkerThreadPoolExecutor;
@@ -59,9 +62,10 @@ public abstract class PipeSubtask
protected final AtomicInteger retryCount = new AtomicInteger(0);
protected Event lastEvent;
- protected PipeSubtask(String taskID) {
+ protected PipeSubtask(String taskID, long creationTime) {
super();
this.taskID = taskID;
+ this.creationTime = creationTime;
}
public abstract void bindExecutors(
@@ -237,6 +241,10 @@ public abstract class PipeSubtask
return taskID;
}
+ public long getCreationTime() {
+ return creationTime;
+ }
+
public int getRetryCount() {
return retryCount.get();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
index 05119db0ca2..283a3f71510 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
@@ -59,23 +59,20 @@ public class PipeConnectorSubtask extends PipeSubtask {
protected final DecoratingLock callbackDecoratingLock = new DecoratingLock();
protected ExecutorService subtaskCallbackListeningExecutor;
- public Integer getTsFileInsertionEventCount() {
- return inputPendingQueue.getTsFileInsertionEventCount();
- }
-
- public Integer getTabletInsertionEventCount() {
- return inputPendingQueue.getTabletInsertionEventCount();
- }
-
- public Integer getPipeHeartbeatEventCount() {
- return inputPendingQueue.getPipeHeartbeatEventCount();
- }
+ // Record these variables to provide corresponding value to tag key of
monitoring metrics
+ private final String attributeSortedString;
+ private final int connectorIndex;
public PipeConnectorSubtask(
String taskID,
+ long creationTime,
+ String attributeSortedString,
+ int connectorIndex,
BoundedBlockingPendingQueue<Event> inputPendingQueue,
PipeConnector outputPipeConnector) {
- super(taskID);
+ super(taskID, creationTime);
+ this.attributeSortedString = attributeSortedString;
+ this.connectorIndex = connectorIndex;
this.inputPendingQueue = inputPendingQueue;
this.outputPipeConnector = outputPipeConnector;
PipeConnectorMetrics.getInstance().register(this);
@@ -292,4 +289,26 @@ public class PipeConnectorSubtask extends PipeSubtask {
super.close();
}
}
+
+ //////////////////////////// APIs provided for metric framework
////////////////////////////
+
+ public String getAttributeSortedString() {
+ return attributeSortedString;
+ }
+
+ public int getConnectorIndex() {
+ return connectorIndex;
+ }
+
+ public Integer getTsFileInsertionEventCount() {
+ return inputPendingQueue.getTsFileInsertionEventCount();
+ }
+
+ public Integer getTabletInsertionEventCount() {
+ return inputPendingQueue.getTabletInsertionEventCount();
+ }
+
+ public Integer getPipeHeartbeatEventCount() {
+ return inputPendingQueue.getPipeHeartbeatEventCount();
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
index 1c07e78f029..76fa00cd652 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
@@ -86,7 +86,7 @@ public class PipeConnectorSubtaskManager {
new BoundedBlockingPendingQueue<>(
PipeConfig.getInstance().getPipeConnectorPendingQueueSize());
- for (int i = 0; i < connectorNum; i++) {
+ for (int connectorIndex = 0; connectorIndex < connectorNum;
connectorIndex++) {
final PipeConnector pipeConnector =
CONNECTOR_CONSTRUCTORS
.getOrDefault(
@@ -110,7 +110,13 @@ public class PipeConnectorSubtaskManager {
final PipeConnectorSubtask pipeConnectorSubtask =
new PipeConnectorSubtask(
String.format(
- "%s_%s_%s", attributeSortedString,
pipeRuntimeEnvironment.getCreationTime(), i),
+ "%s_%s_%s",
+ attributeSortedString,
+ pipeRuntimeEnvironment.getCreationTime(),
+ connectorIndex),
+ pipeRuntimeEnvironment.getCreationTime(),
+ attributeSortedString,
+ connectorIndex,
pendingQueue,
pipeConnector);
final PipeConnectorSubtaskLifeCycle pipeConnectorSubtaskLifeCycle =
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
index d20d0731b7b..35fdac641fa 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
@@ -49,12 +49,21 @@ public class PipeProcessorSubtask extends PipeSubtask {
private final PipeProcessor pipeProcessor;
private final PipeEventCollector outputEventCollector;
+ // Record these variables to provide corresponding value to tag key of
monitoring metrics
+ private final String pipeName;
+ private final int dataRegionId;
+
public PipeProcessorSubtask(
String taskID,
+ long creationTime,
+ String pipeName,
+ int dataRegionId,
EventSupplier inputEventSupplier,
PipeProcessor pipeProcessor,
PipeEventCollector outputEventCollector) {
- super(taskID);
+ super(taskID, creationTime);
+ this.pipeName = pipeName;
+ this.dataRegionId = dataRegionId;
this.inputEventSupplier = inputEventSupplier;
this.pipeProcessor = pipeProcessor;
this.outputEventCollector = outputEventCollector;
@@ -180,6 +189,16 @@ public class PipeProcessorSubtask extends PipeSubtask {
return taskID.hashCode();
}
+ //////////////////////////// APIs provided for metric framework
////////////////////////////
+
+ public String getPipeName() {
+ return pipeName;
+ }
+
+ public int getDataRegionId() {
+ return dataRegionId;
+ }
+
public int getTabletInsertionEventCount() {
return outputEventCollector.getTabletInsertionEventCount();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
index da651cf4322..ecb4bb3483a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
@@ -83,6 +83,7 @@ public class WALInsertNodeCache {
.weigher(
(Weigher<WALEntryPosition, Pair<ByteBuffer, InsertNode>>)
(position, pair) -> position.getSize())
+ .recordStats()
.build(new WALInsertNodeCacheLoader());
PipeWALInsertNodeCacheMetrics.getInstance().register(this, dataRegionId);
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/execution/PipeConnectorSubtaskExecutorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/execution/PipeConnectorSubtaskExecutorTest.java
index dbc66e9a099..9dc36a14667 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/execution/PipeConnectorSubtaskExecutorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/execution/PipeConnectorSubtaskExecutorTest.java
@@ -39,6 +39,9 @@ public class PipeConnectorSubtaskExecutorTest extends
PipeSubtaskExecutorTest {
Mockito.spy(
new PipeConnectorSubtask(
"PipeConnectorSubtaskExecutorTest",
+ System.currentTimeMillis(),
+ "TestAttributeSortedString",
+ 0,
mock(BoundedBlockingPendingQueue.class),
mock(PipeConnector.class)));
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/execution/PipeProcessorSubtaskExecutorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/execution/PipeProcessorSubtaskExecutorTest.java
index eea84c9bb6d..8a8be617152 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/execution/PipeProcessorSubtaskExecutorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/execution/PipeProcessorSubtaskExecutorTest.java
@@ -40,6 +40,9 @@ public class PipeProcessorSubtaskExecutorTest extends
PipeSubtaskExecutorTest {
Mockito.spy(
new PipeProcessorSubtask(
"PipeProcessorSubtaskExecutorTest",
+ System.currentTimeMillis(),
+ "TestPipe",
+ 0,
mock(EventSupplier.class),
mock(PipeProcessor.class),
mock(PipeEventCollector.class)));
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Tag.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Tag.java
index 786d5e2a68c..f3062737d92 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Tag.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Tag.java
@@ -28,7 +28,9 @@ public enum Tag {
FROM("from"),
STAGE("stage"),
OPERATION("operation"),
- INTERFACE("interface");
+ INTERFACE("interface"),
+ CREATION_TIME("creation_time"),
+ INDEX("index");
final String value;