This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 8179015e8c1 [To dev/1.3] Pipe: Add InsertNode & tsFile transmission
time metrics (#15668) (#15670)
8179015e8c1 is described below
commit 8179015e8c1f012e0a63357a2dc58f72ec86ced4
Author: Caideyipi <[email protected]>
AuthorDate: Sat Jun 7 17:02:51 2025 +0800
[To dev/1.3] Pipe: Add InsertNode & tsFile transmission time metrics
(#15668) (#15670)
* Pipe: Add InsertNode transmission time
* Pipe: Add InsertNode transmission time
* update
* update
* fix
* fix
* fix
Co-authored-by: Zhenyu Luo <[email protected]>
---
.../tablet/PipeInsertNodeTabletInsertionEvent.java | 5 +-
.../common/tsfile/PipeTsFileInsertionEvent.java | 4 +-
.../PipeDataNodeRemainingEventAndTimeMetrics.java | 83 +++++++++++++++++++---
.../PipeDataNodeRemainingEventAndTimeOperator.java | 21 ++++++
.../iotdb/commons/service/metric/enums/Metric.java | 3 +
5 files changed, 104 insertions(+), 12 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index 5ead2164dbf..e7814916e5d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -87,6 +87,8 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
private ProgressIndex progressIndex;
+ private long extractTime = 0;
+
public PipeInsertNodeTabletInsertionEvent(
final WALEntryHandler walEntryHandler,
final PartialPath devicePath,
@@ -154,6 +156,7 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
@Override
public boolean internallyIncreaseResourceReferenceCount(final String
holderMessage) {
+ extractTime = System.nanoTime();
try {
PipeDataNodeResourceManager.wal().pin(walEntryHandler);
if (Objects.nonNull(pipeName)) {
@@ -194,7 +197,7 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
if (Objects.nonNull(pipeName)) {
PipeDataNodeAgent.task().decreaseFloatingMemoryUsageInByte(pipeName,
ramBytesUsed());
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
- .decreaseInsertNodeEventCount(pipeName, creationTime);
+ .decreaseInsertNodeEventCount(pipeName, creationTime,
System.nanoTime() - extractTime);
}
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index a2843392636..3aa9b4a6b13 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -67,6 +67,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
private final TsFileResource resource;
private File tsFile;
+ private long extractTime = 0;
// This is true iff the modFile exists and should be transferred
private boolean isWithMod;
@@ -249,6 +250,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
@Override
public boolean internallyIncreaseResourceReferenceCount(final String
holderMessage) {
+ extractTime = System.nanoTime();
try {
tsFile =
PipeDataNodeResourceManager.tsfile().increaseFileReference(tsFile, true,
resource);
if (isWithMod) {
@@ -289,7 +291,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
} finally {
if (Objects.nonNull(pipeName)) {
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
- .decreaseTsFileEventCount(pipeName, creationTime);
+ .decreaseTsFileEventCount(pipeName, creationTime,
System.nanoTime() - extractTime);
}
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeMetrics.java
index 045f0201fcc..354a980edfd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeMetrics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeMetrics.java
@@ -26,7 +26,9 @@ import org.apache.iotdb.commons.service.metric.enums.Tag;
import org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor;
import
org.apache.iotdb.db.pipe.extractor.schemaregion.IoTDBSchemaRegionExtractor;
import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.impl.DoNothingMetricManager;
import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.type.Histogram;
import org.apache.iotdb.metrics.utils.MetricLevel;
import org.apache.iotdb.metrics.utils.MetricType;
@@ -38,6 +40,7 @@ import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
public class PipeDataNodeRemainingEventAndTimeMetrics implements IMetricSet {
@@ -50,11 +53,28 @@ public class PipeDataNodeRemainingEventAndTimeMetrics
implements IMetricSet {
private final Map<String, PipeDataNodeRemainingEventAndTimeOperator>
remainingEventAndTimeOperatorMap = new ConcurrentHashMap<>();
+ private static Histogram PIPE_DATANODE_INSERTNODE_TRANSFER_TIME_HISTOGRAM =
+ DoNothingMetricManager.DO_NOTHING_HISTOGRAM;
+ private static Histogram PIPE_DATANODE_EVENT_TRANSFER_TIME_HISTOGRAM =
+ DoNothingMetricManager.DO_NOTHING_HISTOGRAM;
+
//////////////////////////// bindTo & unbindFrom (metric framework)
////////////////////////////
@Override
public void bindTo(final AbstractMetricService metricService) {
this.metricService = metricService;
+ PIPE_DATANODE_INSERTNODE_TRANSFER_TIME_HISTOGRAM =
+ metricService.getOrCreateHistogram(
+ Metric.PIPE_DATANODE_EVENT_TRANSFER.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ "insert_node");
+ PIPE_DATANODE_EVENT_TRANSFER_TIME_HISTOGRAM =
+ metricService.getOrCreateHistogram(
+ Metric.PIPE_DATANODE_EVENT_TRANSFER.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ "tsfile");
ImmutableSet.copyOf(remainingEventAndTimeOperatorMap.keySet()).forEach(this::createMetrics);
}
@@ -83,6 +103,20 @@ public class PipeDataNodeRemainingEventAndTimeMetrics
implements IMetricSet {
operator.getPipeName(),
Tag.CREATION_TIME.toString(),
String.valueOf(operator.getCreationTime()));
+
+ operator.setInsertNodeTransferTimer(
+ metricService.getOrCreateTimer(
+ Metric.PIPE_INSERT_NODE_EVENT_TRANSFER_TIME.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ operator.getPipeName()));
+
+ operator.setTsFileTransferTimer(
+ metricService.getOrCreateTimer(
+ Metric.PIPE_TSFILE_EVENT_TRANSFER_TIME.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ operator.getPipeName()));
}
public boolean mayRemainingInsertEventExceedLimit(final String pipeID) {
@@ -117,6 +151,17 @@ public class PipeDataNodeRemainingEventAndTimeMetrics
implements IMetricSet {
LOGGER.warn(
"Failed to unbind from pipe remaining event and time metrics,
RemainingEventAndTimeOperator map not empty");
}
+ metricService.remove(
+ MetricType.HISTOGRAM,
+ Metric.PIPE_DATANODE_EVENT_TRANSFER.toString(),
+ Tag.NAME.toString(),
+ "insert_node");
+
+ metricService.remove(
+ MetricType.HISTOGRAM,
+ Metric.PIPE_DATANODE_EVENT_TRANSFER.toString(),
+ Tag.NAME.toString(),
+ "tsfile");
}
private void removeMetrics(final String pipeID) {
@@ -140,6 +185,16 @@ public class PipeDataNodeRemainingEventAndTimeMetrics
implements IMetricSet {
operator.getPipeName(),
Tag.CREATION_TIME.toString(),
String.valueOf(operator.getCreationTime()));
+ metricService.remove(
+ MetricType.TIMER,
+ Metric.PIPE_INSERT_NODE_EVENT_TRANSFER_TIME.toString(),
+ Tag.NAME.toString(),
+ operator.getPipeName());
+ metricService.remove(
+ MetricType.TIMER,
+ Metric.PIPE_TSFILE_EVENT_TRANSFER_TIME.toString(),
+ Tag.NAME.toString(),
+ operator.getPipeName());
remainingEventAndTimeOperatorMap.remove(pipeID);
}
@@ -181,12 +236,16 @@ public class PipeDataNodeRemainingEventAndTimeMetrics
implements IMetricSet {
.increaseInsertNodeEventCount();
}
- public void decreaseInsertNodeEventCount(final String pipeName, final long
creationTime) {
- remainingEventAndTimeOperatorMap
- .computeIfAbsent(
+ public void decreaseInsertNodeEventCount(
+ final String pipeName, final long creationTime, final long transferTime)
{
+ PipeDataNodeRemainingEventAndTimeOperator operator =
+ remainingEventAndTimeOperatorMap.computeIfAbsent(
pipeName + "_" + creationTime,
- k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName,
creationTime))
- .decreaseInsertNodeEventCount();
+ k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName,
creationTime));
+ operator.decreaseInsertNodeEventCount();
+
+ operator.getInsertNodeTransferTimer().update(transferTime,
TimeUnit.NANOSECONDS);
+ PIPE_DATANODE_INSERTNODE_TRANSFER_TIME_HISTOGRAM.update(transferTime);
}
public void increaseRawTabletEventCount(final String pipeName, final long
creationTime) {
@@ -213,12 +272,16 @@ public class PipeDataNodeRemainingEventAndTimeMetrics
implements IMetricSet {
.increaseTsFileEventCount();
}
- public void decreaseTsFileEventCount(final String pipeName, final long
creationTime) {
- remainingEventAndTimeOperatorMap
- .computeIfAbsent(
+ public void decreaseTsFileEventCount(
+ final String pipeName, final long creationTime, final long transferTime)
{
+ final PipeDataNodeRemainingEventAndTimeOperator operator =
+ remainingEventAndTimeOperatorMap.computeIfAbsent(
pipeName + "_" + creationTime,
- k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName,
creationTime))
- .decreaseTsFileEventCount();
+ k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName,
creationTime));
+
+ operator.decreaseTsFileEventCount();
+ operator.getTsFileTransferTimer().update(transferTime,
TimeUnit.NANOSECONDS);
+ PIPE_DATANODE_EVENT_TRANSFER_TIME_HISTOGRAM.update(transferTime);
}
public void increaseHeartbeatEventCount(final String pipeName, final long
creationTime) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
index 7aee55df429..2de7e0053f8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
@@ -25,6 +25,8 @@ import
org.apache.iotdb.commons.pipe.metric.PipeRemainingOperator;
import
org.apache.iotdb.db.pipe.extractor.schemaregion.IoTDBSchemaRegionExtractor;
import org.apache.iotdb.metrics.core.IoTDBMetricManager;
import org.apache.iotdb.metrics.core.type.IoTDBHistogram;
+import org.apache.iotdb.metrics.impl.DoNothingMetricManager;
+import org.apache.iotdb.metrics.type.Timer;
import org.apache.iotdb.pipe.api.event.Event;
import com.codahale.metrics.Clock;
@@ -54,6 +56,9 @@ class PipeDataNodeRemainingEventAndTimeOperator extends
PipeRemainingOperator {
private final IoTDBHistogram collectInvocationHistogram =
(IoTDBHistogram) IoTDBMetricManager.getInstance().createHistogram();
+ private Timer insertNodeTransferTimer =
DoNothingMetricManager.DO_NOTHING_TIMER;
+ private Timer tsfileTransferTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+
private volatile long lastInsertNodeEventCountSmoothingTime = Long.MIN_VALUE;
private final Meter insertNodeEventCountMeter =
new Meter(new ExponentialMovingAverages(), Clock.defaultClock());
@@ -230,6 +235,22 @@ class PipeDataNodeRemainingEventAndTimeOperator extends
PipeRemainingOperator {
collectInvocationHistogram.update(Math.max(collectInvocationCount, 1));
}
+ public void setInsertNodeTransferTimer(Timer insertNodeTransferTimer) {
+ this.insertNodeTransferTimer = insertNodeTransferTimer;
+ }
+
+ public Timer getInsertNodeTransferTimer() {
+ return insertNodeTransferTimer;
+ }
+
+ public void setTsFileTransferTimer(Timer tsFileTransferTimer) {
+ this.tsfileTransferTimer = tsFileTransferTimer;
+ }
+
+ public Timer getTsFileTransferTimer() {
+ return tsfileTransferTimer;
+ }
+
//////////////////////////// Switch ////////////////////////////
// Thread-safe & Idempotent
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
index a0f131eefec..145b4adaf94 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
@@ -174,6 +174,9 @@ public enum Metric {
PIPE_CONNECTOR_SCHEMA_TRANSFER("pipe_connector_schema_transfer"),
PIPE_DATANODE_REMAINING_EVENT_COUNT("pipe_datanode_remaining_event_count"),
PIPE_DATANODE_REMAINING_TIME("pipe_datanode_remaining_time"),
+ PIPE_INSERT_NODE_EVENT_TRANSFER_TIME("pipe_insert_node_event_transfer_time"),
+ PIPE_TSFILE_EVENT_TRANSFER_TIME("pipe_tsfile_event_transfer_time"),
+ PIPE_DATANODE_EVENT_TRANSFER("pipe_datanode_event_transfer"),
PIPE_CONFIG_LINKED_QUEUE_SIZE("pipe_config_linked_queue_size"),
UNTRANSFERRED_CONFIG_COUNT("untransferred_config_count"),
PIPE_CONNECTOR_CONFIG_TRANSFER("pipe_connector_config_transfer"),