This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 623fa5bac5f Pipe: add ops/latency metrics for different types of pipe
operations on receivers (DN / CN) (#12927)
623fa5bac5f is described below
commit 623fa5bac5f4be73da4b8fec56499e7beed6d72e
Author: YC27 <[email protected]>
AuthorDate: Tue Jul 23 14:45:36 2024 +0800
Pipe: add ops/latency metrics for different types of pipe operations on
receivers (DN / CN) (#12927)
---
.../manager/pipe/metric/PipeConfigNodeMetrics.java | 2 +
.../pipe/metric/PipeConfigNodeReceiverMetrics.java | 169 +++++++++++
.../receiver/protocol/IoTDBConfigNodeReceiver.java | 44 ++-
.../iotdb/db/pipe/metric/PipeDataNodeMetrics.java | 2 +
.../pipe/metric/PipeDataNodeReceiverMetrics.java | 333 +++++++++++++++++++++
.../protocol/thrift/IoTDBDataNodeReceiver.java | 112 +++++--
.../iotdb/commons/service/metric/enums/Metric.java | 2 +
7 files changed, 623 insertions(+), 41 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeMetrics.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeMetrics.java
index b02679f147e..a7a41f4f359 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeMetrics.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeMetrics.java
@@ -42,6 +42,7 @@ public class PipeConfigNodeMetrics implements IMetricSet {
PipeConfigRegionConnectorMetrics.getInstance().bindTo(metricService);
PipeConfigNodeRemainingTimeMetrics.getInstance().bindTo(metricService);
PipeTemporaryMetaMetrics.getInstance().bindTo(metricService);
+ PipeConfigNodeReceiverMetrics.getInstance().bindTo(metricService);
}
@Override
@@ -53,5 +54,6 @@ public class PipeConfigNodeMetrics implements IMetricSet {
PipeConfigRegionConnectorMetrics.getInstance().unbindFrom(metricService);
PipeConfigNodeRemainingTimeMetrics.getInstance().unbindFrom(metricService);
PipeTemporaryMetaMetrics.getInstance().unbindFrom(metricService);
+ PipeConfigNodeReceiverMetrics.getInstance().unbindFrom(metricService);
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeReceiverMetrics.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeReceiverMetrics.java
new file mode 100644
index 00000000000..e0a30f42c2e
--- /dev/null
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeReceiverMetrics.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.pipe.metric;
+
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.impl.DoNothingMetricManager;
+import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.type.Timer;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
+
+public class PipeConfigNodeReceiverMetrics implements IMetricSet {
+
+ private static final PipeConfigNodeReceiverMetrics INSTANCE = new
PipeConfigNodeReceiverMetrics();
+
+ private Timer handshakeConfigNodeV1Timer =
DoNothingMetricManager.DO_NOTHING_TIMER;
+ private Timer handshakeConfigNodeV2Timer =
DoNothingMetricManager.DO_NOTHING_TIMER;
+ private Timer transferConfigPlanTimer =
DoNothingMetricManager.DO_NOTHING_TIMER;
+ private Timer transferConfigSnapshotPieceTimer =
DoNothingMetricManager.DO_NOTHING_TIMER;
+ private Timer transferConfigSnapshotSealTimer =
DoNothingMetricManager.DO_NOTHING_TIMER;
+
+ private static final String RECEIVER = "pipeConfigNodeReceiver";
+
+ private PipeConfigNodeReceiverMetrics() {}
+
+ public void recordHandshakeConfigNodeV1Timer(long costTimeInNanos) {
+ handshakeConfigNodeV1Timer.updateNanos(costTimeInNanos);
+ }
+
+ public void recordHandshakeConfigNodeV2Timer(long costTimeInNanos) {
+ handshakeConfigNodeV2Timer.updateNanos(costTimeInNanos);
+ }
+
+ public void recordTransferConfigPlanTimer(long costTimeInNanos) {
+ transferConfigPlanTimer.updateNanos(costTimeInNanos);
+ }
+
+ public void recordTransferConfigSnapshotPieceTimer(long costTimeInNanos) {
+ transferConfigSnapshotPieceTimer.updateNanos(costTimeInNanos);
+ }
+
+ public void recordTransferConfigSnapshotSealTimer(long costTimeInNanos) {
+ transferConfigSnapshotSealTimer.updateNanos(costTimeInNanos);
+ }
+
+ @Override
+ public void bindTo(AbstractMetricService metricService) {
+ bindToTimer(metricService);
+ }
+
+ private void bindToTimer(AbstractMetricService metricService) {
+ handshakeConfigNodeV1Timer =
+ metricService.getOrCreateTimer(
+ Metric.PIPE_CONFIGNODE_RECEIVER.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ RECEIVER,
+ Tag.TYPE.toString(),
+ "handshakeConfigNodeV1");
+
+ handshakeConfigNodeV2Timer =
+ metricService.getOrCreateTimer(
+ Metric.PIPE_CONFIGNODE_RECEIVER.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ RECEIVER,
+ Tag.TYPE.toString(),
+ "handshakeConfigNodeV2");
+
+ transferConfigPlanTimer =
+ metricService.getOrCreateTimer(
+ Metric.PIPE_CONFIGNODE_RECEIVER.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ RECEIVER,
+ Tag.TYPE.toString(),
+ "transferConfigPlan");
+
+ transferConfigSnapshotPieceTimer =
+ metricService.getOrCreateTimer(
+ Metric.PIPE_CONFIGNODE_RECEIVER.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ RECEIVER,
+ Tag.TYPE.toString(),
+ "transferConfigSnapshotPiece");
+
+ transferConfigSnapshotSealTimer =
+ metricService.getOrCreateTimer(
+ Metric.PIPE_CONFIGNODE_RECEIVER.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ RECEIVER,
+ Tag.TYPE.toString(),
+ "transferConfigSnapshotSeal");
+ }
+
+ @Override
+ public void unbindFrom(AbstractMetricService metricService) {
+ unbind(metricService);
+ }
+
+ private void unbind(AbstractMetricService metricService) {
+ handshakeConfigNodeV1Timer = DoNothingMetricManager.DO_NOTHING_TIMER;
+ handshakeConfigNodeV2Timer = DoNothingMetricManager.DO_NOTHING_TIMER;
+ transferConfigPlanTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+ transferConfigSnapshotPieceTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+ transferConfigSnapshotSealTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+
+ metricService.remove(
+ MetricType.TIMER,
+ Metric.PIPE_CONFIGNODE_RECEIVER.toString(),
+ Tag.NAME.toString(),
+ RECEIVER,
+ Tag.TYPE.toString(),
+ "handshakeConfigNodeV1");
+ metricService.remove(
+ MetricType.TIMER,
+ Metric.PIPE_CONFIGNODE_RECEIVER.toString(),
+ Tag.NAME.toString(),
+ RECEIVER,
+ Tag.TYPE.toString(),
+ "handshakeConfigNodeV2");
+ metricService.remove(
+ MetricType.TIMER,
+ Metric.PIPE_CONFIGNODE_RECEIVER.toString(),
+ Tag.NAME.toString(),
+ RECEIVER,
+ Tag.TYPE.toString(),
+ "transferConfigPlan");
+ metricService.remove(
+ MetricType.TIMER,
+ Metric.PIPE_CONFIGNODE_RECEIVER.toString(),
+ Tag.NAME.toString(),
+ RECEIVER,
+ Tag.TYPE.toString(),
+ "transferConfigSnapshotPiece");
+ metricService.remove(
+ MetricType.TIMER,
+ Metric.PIPE_CONFIGNODE_RECEIVER.toString(),
+ Tag.NAME.toString(),
+ RECEIVER,
+ Tag.TYPE.toString(),
+ "transferConfigSnapshotSeal");
+ }
+
+ public static PipeConfigNodeReceiverMetrics getInstance() {
+ return INSTANCE;
+ }
+}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
index 7815adde258..711d7bdf26c 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
@@ -53,6 +53,7 @@ import
org.apache.iotdb.confignode.manager.pipe.connector.payload.PipeTransferCo
import
org.apache.iotdb.confignode.manager.pipe.connector.payload.PipeTransferConfigSnapshotSealReq;
import
org.apache.iotdb.confignode.manager.pipe.event.PipeConfigRegionSnapshotEvent;
import
org.apache.iotdb.confignode.manager.pipe.extractor.IoTDBConfigRegionExtractor;
+import
org.apache.iotdb.confignode.manager.pipe.metric.PipeConfigNodeReceiverMetrics;
import
org.apache.iotdb.confignode.manager.pipe.receiver.visitor.PipeConfigPhysicalPlanExceptionVisitor;
import
org.apache.iotdb.confignode.manager.pipe.receiver.visitor.PipeConfigPhysicalPlanTSStatusVisitor;
import org.apache.iotdb.confignode.persistence.schema.CNPhysicalPlanGenerator;
@@ -111,23 +112,44 @@ public class IoTDBConfigNodeReceiver extends
IoTDBFileReceiver {
.setMessage(
"The receiver ConfigNode has set up a new receiver and
the sender must re-send its handshake request."));
}
+ TPipeTransferResp resp;
+ long startTime = System.nanoTime();
switch (type) {
case HANDSHAKE_CONFIGNODE_V1:
- return handleTransferHandshakeV1(
-
PipeTransferConfigNodeHandshakeV1Req.fromTPipeTransferReq(req));
+ resp =
+ handleTransferHandshakeV1(
+
PipeTransferConfigNodeHandshakeV1Req.fromTPipeTransferReq(req));
+ PipeConfigNodeReceiverMetrics.getInstance()
+ .recordHandshakeConfigNodeV1Timer(System.nanoTime() -
startTime);
+ return resp;
case HANDSHAKE_CONFIGNODE_V2:
- return handleTransferHandshakeV2(
-
PipeTransferConfigNodeHandshakeV2Req.fromTPipeTransferReq(req));
+ resp =
+ handleTransferHandshakeV2(
+
PipeTransferConfigNodeHandshakeV2Req.fromTPipeTransferReq(req));
+ PipeConfigNodeReceiverMetrics.getInstance()
+ .recordHandshakeConfigNodeV2Timer(System.nanoTime() -
startTime);
+ return resp;
case TRANSFER_CONFIG_PLAN:
- return
handleTransferConfigPlan(PipeTransferConfigPlanReq.fromTPipeTransferReq(req));
+ resp =
handleTransferConfigPlan(PipeTransferConfigPlanReq.fromTPipeTransferReq(req));
+ PipeConfigNodeReceiverMetrics.getInstance()
+ .recordTransferConfigPlanTimer(System.nanoTime() - startTime);
+ return resp;
case TRANSFER_CONFIG_SNAPSHOT_PIECE:
- return handleTransferFilePiece(
- PipeTransferConfigSnapshotPieceReq.fromTPipeTransferReq(req),
- req instanceof AirGapPseudoTPipeTransferRequest,
- false);
+ resp =
+ handleTransferFilePiece(
+
PipeTransferConfigSnapshotPieceReq.fromTPipeTransferReq(req),
+ req instanceof AirGapPseudoTPipeTransferRequest,
+ false);
+ PipeConfigNodeReceiverMetrics.getInstance()
+ .recordTransferConfigSnapshotPieceTimer(System.nanoTime() -
startTime);
+ return resp;
case TRANSFER_CONFIG_SNAPSHOT_SEAL:
- return handleTransferFileSealV2(
- PipeTransferConfigSnapshotSealReq.fromTPipeTransferReq(req));
+ resp =
+ handleTransferFileSealV2(
+
PipeTransferConfigSnapshotSealReq.fromTPipeTransferReq(req));
+ PipeConfigNodeReceiverMetrics.getInstance()
+ .recordTransferConfigSnapshotSealTimer(System.nanoTime() -
startTime);
+ return resp;
case TRANSFER_COMPRESSED:
return
receive(PipeTransferCompressedReq.fromTPipeTransferReq(req));
default:
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeMetrics.java
index 2f39cc75c25..172653f3b01 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeMetrics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeMetrics.java
@@ -41,6 +41,7 @@ public class PipeDataNodeMetrics implements IMetricSet {
PipeSchemaRegionExtractorMetrics.getInstance().bindTo(metricService);
PipeSchemaRegionConnectorMetrics.getInstance().bindTo(metricService);
PipeDataNodeRemainingEventAndTimeMetrics.getInstance().bindTo(metricService);
+ PipeDataNodeReceiverMetrics.getInstance().bindTo(metricService);
}
@Override
@@ -57,6 +58,7 @@ public class PipeDataNodeMetrics implements IMetricSet {
PipeSchemaRegionExtractorMetrics.getInstance().unbindFrom(metricService);
PipeSchemaRegionConnectorMetrics.getInstance().unbindFrom(metricService);
PipeDataNodeRemainingEventAndTimeMetrics.getInstance().unbindFrom(metricService);
+ PipeDataNodeReceiverMetrics.getInstance().unbindFrom(metricService);
}
//////////////////////////// singleton ////////////////////////////
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeReceiverMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeReceiverMetrics.java
new file mode 100644
index 00000000000..eddad988041
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeReceiverMetrics.java
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.metric;
+
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.impl.DoNothingMetricManager;
+import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.type.Timer;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
+
+public class PipeDataNodeReceiverMetrics implements IMetricSet {
+
+ private static final PipeDataNodeReceiverMetrics INSTANCE = new
PipeDataNodeReceiverMetrics();
+
+ private Timer handshakeDatanodeV1Timer =
DoNothingMetricManager.DO_NOTHING_TIMER;
+ private Timer handshakeDatanodeV2Timer =
DoNothingMetricManager.DO_NOTHING_TIMER;
+ private Timer transferTabletInsertNodeTimer =
DoNothingMetricManager.DO_NOTHING_TIMER;
+ private Timer transferTabletRawTimer =
DoNothingMetricManager.DO_NOTHING_TIMER;
+ private Timer transferTabletBinaryTimer =
DoNothingMetricManager.DO_NOTHING_TIMER;
+ private Timer transferTabletBatchTimer =
DoNothingMetricManager.DO_NOTHING_TIMER;
+ private Timer transferTsFilePieceTimer =
DoNothingMetricManager.DO_NOTHING_TIMER;
+ private Timer transferTsFileSealTimer =
DoNothingMetricManager.DO_NOTHING_TIMER;
+ private Timer transferTsFilePieceWithModTimer =
DoNothingMetricManager.DO_NOTHING_TIMER;
+ private Timer transferTsFileSealWithModTimer =
DoNothingMetricManager.DO_NOTHING_TIMER;
+ private Timer transferSchemaPlanTimer =
DoNothingMetricManager.DO_NOTHING_TIMER;
+ private Timer transferSchemaSnapshotPieceTimer =
DoNothingMetricManager.DO_NOTHING_TIMER;
+ private Timer transferSchemaSnapshotSealTimer =
DoNothingMetricManager.DO_NOTHING_TIMER;
+
+ private static final String RECEIVER = "pipeDataNodeReceiver";
+
+ private PipeDataNodeReceiverMetrics() {}
+
+ public void recordHandshakeDatanodeV1Timer(long costTimeInNanos) {
+ handshakeDatanodeV1Timer.updateNanos(costTimeInNanos);
+ }
+
+ public void recordHandshakeDatanodeV2Timer(long costTimeInNanos) {
+ handshakeDatanodeV2Timer.updateNanos(costTimeInNanos);
+ }
+
+ public void recordTransferTabletInsertNodeTimer(long costTimeInNanos) {
+ transferTabletInsertNodeTimer.updateNanos(costTimeInNanos);
+ }
+
+ public void recordTransferTabletRawTimer(long costTimeInNanos) {
+ transferTabletRawTimer.updateNanos(costTimeInNanos);
+ }
+
+ public void recordTransferTabletBinaryTimer(long costTimeInNanos) {
+ transferTabletBinaryTimer.updateNanos(costTimeInNanos);
+ }
+
+ public void recordTransferTabletBatchTimer(long costTimeInNanos) {
+ transferTabletBatchTimer.updateNanos(costTimeInNanos);
+ }
+
+ public void recordTransferTsFilePieceTimer(long costTimeInNanos) {
+ transferTsFilePieceTimer.updateNanos(costTimeInNanos);
+ }
+
+ public void recordTransferTsFileSealTimer(long costTimeInNanos) {
+ transferTsFileSealTimer.updateNanos(costTimeInNanos);
+ }
+
+ public void recordTransferTsFilePieceWithModTimer(long costTimeInNanos) {
+ transferTsFilePieceWithModTimer.updateNanos(costTimeInNanos);
+ }
+
+ public void recordTransferTsFileSealWithModTimer(long costTimeInNanos) {
+ transferTsFileSealWithModTimer.updateNanos(costTimeInNanos);
+ }
+
+ public void recordTransferSchemaPlanTimer(long costTimeInNanos) {
+ transferSchemaPlanTimer.updateNanos(costTimeInNanos);
+ }
+
+ public void recordTransferSchemaSnapshotPieceTimer(long costTimeInNanos) {
+ transferSchemaSnapshotPieceTimer.updateNanos(costTimeInNanos);
+ }
+
+ public void recordTransferSchemaSnapshotSealTimer(long costTimeInNanos) {
+ transferSchemaSnapshotSealTimer.updateNanos(costTimeInNanos);
+ }
+
+ @Override
+ public void bindTo(AbstractMetricService metricService) {
+ bindToTimer(metricService);
+ }
+
+ private void bindToTimer(AbstractMetricService metricService) {
+ handshakeDatanodeV1Timer =
+ metricService.getOrCreateTimer(
+ Metric.PIPE_DATANODE_RECEIVER.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ RECEIVER,
+ Tag.TYPE.toString(),
+ "handshakeDataNodeV1");
+ handshakeDatanodeV2Timer =
+ metricService.getOrCreateTimer(
+ Metric.PIPE_DATANODE_RECEIVER.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ RECEIVER,
+ Tag.TYPE.toString(),
+ "handshakeDataNodeV2");
+ transferTabletInsertNodeTimer =
+ metricService.getOrCreateTimer(
+ Metric.PIPE_DATANODE_RECEIVER.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ RECEIVER,
+ Tag.TYPE.toString(),
+ "transferTabletInsertNode");
+ transferTabletRawTimer =
+ metricService.getOrCreateTimer(
+ Metric.PIPE_DATANODE_RECEIVER.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ RECEIVER,
+ Tag.TYPE.toString(),
+ "transferTabletRaw");
+ transferTabletBinaryTimer =
+ metricService.getOrCreateTimer(
+ Metric.PIPE_DATANODE_RECEIVER.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ RECEIVER,
+ Tag.TYPE.toString(),
+ "transferTabletBinary");
+ transferTabletBatchTimer =
+ metricService.getOrCreateTimer(
+ Metric.PIPE_DATANODE_RECEIVER.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ RECEIVER,
+ Tag.TYPE.toString(),
+ "transferTabletBatch");
+ transferTsFilePieceTimer =
+ metricService.getOrCreateTimer(
+ Metric.PIPE_DATANODE_RECEIVER.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ RECEIVER,
+ Tag.TYPE.toString(),
+ "transferTsFilePiece");
+ transferTsFileSealTimer =
+ metricService.getOrCreateTimer(
+ Metric.PIPE_DATANODE_RECEIVER.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ RECEIVER,
+ Tag.TYPE.toString(),
+ "transferTsFileSeal");
+ transferTsFilePieceWithModTimer =
+ metricService.getOrCreateTimer(
+ Metric.PIPE_DATANODE_RECEIVER.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ RECEIVER,
+ Tag.TYPE.toString(),
+ "transferTsFilePieceWithMod");
+ transferTsFileSealWithModTimer =
+ metricService.getOrCreateTimer(
+ Metric.PIPE_DATANODE_RECEIVER.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ RECEIVER,
+ Tag.TYPE.toString(),
+ "transferTsFileSealWithMod");
+ transferSchemaPlanTimer =
+ metricService.getOrCreateTimer(
+ Metric.PIPE_DATANODE_RECEIVER.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ RECEIVER,
+ Tag.TYPE.toString(),
+ "transferSchemaPlan");
+ transferSchemaSnapshotPieceTimer =
+ metricService.getOrCreateTimer(
+ Metric.PIPE_DATANODE_RECEIVER.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ RECEIVER,
+ Tag.TYPE.toString(),
+ "transferSchemaSnapshotPiece");
+ transferSchemaSnapshotSealTimer =
+ metricService.getOrCreateTimer(
+ Metric.PIPE_DATANODE_RECEIVER.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ RECEIVER,
+ Tag.TYPE.toString(),
+ "transferSchemaSnapshotSeal");
+ }
+
+ @Override
+ public void unbindFrom(AbstractMetricService metricService) {
+ unbind(metricService);
+ }
+
+ private void unbind(AbstractMetricService metricService) {
+ handshakeDatanodeV1Timer = DoNothingMetricManager.DO_NOTHING_TIMER;
+ handshakeDatanodeV2Timer = DoNothingMetricManager.DO_NOTHING_TIMER;
+ transferTabletInsertNodeTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+ transferTabletRawTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+ transferTabletBinaryTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+ transferTabletBatchTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+ transferTsFilePieceTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+ transferTsFileSealTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+ transferTsFilePieceWithModTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+ transferTsFileSealWithModTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+ transferSchemaPlanTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+ transferSchemaSnapshotPieceTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+ transferSchemaSnapshotSealTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+
+ metricService.remove(
+ MetricType.TIMER,
+ Metric.PIPE_DATANODE_RECEIVER.toString(),
+ Tag.NAME.toString(),
+ RECEIVER,
+ Tag.TYPE.toString(),
+ "handshakeDatanodeV1");
+ metricService.remove(
+ MetricType.TIMER,
+ Metric.PIPE_DATANODE_RECEIVER.toString(),
+ Tag.NAME.toString(),
+ RECEIVER,
+ Tag.TYPE.toString(),
+ "handshakeDatanodeV2");
+ metricService.remove(
+ MetricType.TIMER,
+ Metric.PIPE_DATANODE_RECEIVER.toString(),
+ Tag.NAME.toString(),
+ RECEIVER,
+ Tag.TYPE.toString(),
+ "transferTabletInsertNode");
+ metricService.remove(
+ MetricType.TIMER,
+ Metric.PIPE_DATANODE_RECEIVER.toString(),
+ Tag.NAME.toString(),
+ RECEIVER,
+ Tag.TYPE.toString(),
+ "transferTabletRaw");
+ metricService.remove(
+ MetricType.TIMER,
+ Metric.PIPE_DATANODE_RECEIVER.toString(),
+ Tag.NAME.toString(),
+ RECEIVER,
+ Tag.TYPE.toString(),
+ "transferTabletBinary");
+ metricService.remove(
+ MetricType.TIMER,
+ Metric.PIPE_DATANODE_RECEIVER.toString(),
+ Tag.NAME.toString(),
+ RECEIVER,
+ Tag.TYPE.toString(),
+ "transferTabletBatch");
+ metricService.remove(
+ MetricType.TIMER,
+ Metric.PIPE_DATANODE_RECEIVER.toString(),
+ Tag.NAME.toString(),
+ RECEIVER,
+ Tag.TYPE.toString(),
+ "transferTsFilePiece");
+ metricService.remove(
+ MetricType.TIMER,
+ Metric.PIPE_DATANODE_RECEIVER.toString(),
+ Tag.NAME.toString(),
+ RECEIVER,
+ Tag.TYPE.toString(),
+ "transferTsFileSeal");
+ metricService.remove(
+ MetricType.TIMER,
+ Metric.PIPE_DATANODE_RECEIVER.toString(),
+ Tag.NAME.toString(),
+ RECEIVER,
+ Tag.TYPE.toString(),
+ "transferTsFilePieceWithMod");
+ metricService.remove(
+ MetricType.TIMER,
+ Metric.PIPE_DATANODE_RECEIVER.toString(),
+ Tag.NAME.toString(),
+ RECEIVER,
+ Tag.TYPE.toString(),
+ "transferTsFileSealWithMod");
+ metricService.remove(
+ MetricType.TIMER,
+ Metric.PIPE_DATANODE_RECEIVER.toString(),
+ Tag.NAME.toString(),
+ RECEIVER,
+ Tag.TYPE.toString(),
+ "transferSchemaPlan");
+ metricService.remove(
+ MetricType.TIMER,
+ Metric.PIPE_DATANODE_RECEIVER.toString(),
+ Tag.NAME.toString(),
+ RECEIVER,
+ Tag.TYPE.toString(),
+ "transferSchemaSnapshotPiece");
+ metricService.remove(
+ MetricType.TIMER,
+ Metric.PIPE_DATANODE_RECEIVER.toString(),
+ Tag.NAME.toString(),
+ RECEIVER,
+ Tag.TYPE.toString(),
+ "transferSchemaSnapshotSeal");
+ }
+
+ public static PipeDataNodeReceiverMetrics getInstance() {
+ return INSTANCE;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index da706539278..12916ba4506 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -49,6 +49,7 @@ import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransfer
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFileSealReq;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFileSealWithModReq;
import
org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionSnapshotEvent;
+import org.apache.iotdb.db.pipe.metric.PipeDataNodeReceiverMetrics;
import org.apache.iotdb.db.pipe.receiver.visitor.PipePlanToStatementVisitor;
import org.apache.iotdb.db.pipe.receiver.visitor.PipeStatementExceptionVisitor;
import
org.apache.iotdb.db.pipe.receiver.visitor.PipeStatementPatternParseVisitor;
@@ -141,50 +142,99 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
@Override
public synchronized TPipeTransferResp receive(final TPipeTransferReq req) {
try {
+ long startTime = System.nanoTime();
final short rawRequestType = req.getType();
if (PipeRequestType.isValidatedRequestType(rawRequestType)) {
+ TPipeTransferResp resp;
switch (PipeRequestType.valueOf(rawRequestType)) {
case HANDSHAKE_DATANODE_V1:
- return handleTransferHandshakeV1(
- PipeTransferDataNodeHandshakeV1Req.fromTPipeTransferReq(req));
+ resp =
+ handleTransferHandshakeV1(
+
PipeTransferDataNodeHandshakeV1Req.fromTPipeTransferReq(req));
+ PipeDataNodeReceiverMetrics.getInstance()
+ .recordHandshakeDatanodeV1Timer(System.nanoTime() - startTime);
+ return resp;
case HANDSHAKE_DATANODE_V2:
- return handleTransferHandshakeV2(
- PipeTransferDataNodeHandshakeV2Req.fromTPipeTransferReq(req));
+ resp =
+ handleTransferHandshakeV2(
+
PipeTransferDataNodeHandshakeV2Req.fromTPipeTransferReq(req));
+ PipeDataNodeReceiverMetrics.getInstance()
+ .recordHandshakeDatanodeV2Timer(System.nanoTime() - startTime);
+ return resp;
case TRANSFER_TABLET_INSERT_NODE:
- return handleTransferTabletInsertNode(
- PipeTransferTabletInsertNodeReq.fromTPipeTransferReq(req));
+ resp =
+ handleTransferTabletInsertNode(
+ PipeTransferTabletInsertNodeReq.fromTPipeTransferReq(req));
+ PipeDataNodeReceiverMetrics.getInstance()
+ .recordTransferTabletInsertNodeTimer(System.nanoTime() -
startTime);
+ return resp;
case TRANSFER_TABLET_RAW:
- return
handleTransferTabletRaw(PipeTransferTabletRawReq.fromTPipeTransferReq(req));
+ resp =
handleTransferTabletRaw(PipeTransferTabletRawReq.fromTPipeTransferReq(req));
+ PipeDataNodeReceiverMetrics.getInstance()
+ .recordTransferTabletRawTimer(System.nanoTime() - startTime);
+ return resp;
case TRANSFER_TABLET_BINARY:
- return handleTransferTabletBinary(
- PipeTransferTabletBinaryReq.fromTPipeTransferReq(req));
+ resp =
+
handleTransferTabletBinary(PipeTransferTabletBinaryReq.fromTPipeTransferReq(req));
+ PipeDataNodeReceiverMetrics.getInstance()
+ .recordTransferTabletBinaryTimer(System.nanoTime() -
startTime);
+ return resp;
case TRANSFER_TABLET_BATCH:
- return
handleTransferTabletBatch(PipeTransferTabletBatchReq.fromTPipeTransferReq(req));
+ resp =
handleTransferTabletBatch(PipeTransferTabletBatchReq.fromTPipeTransferReq(req));
+ PipeDataNodeReceiverMetrics.getInstance()
+ .recordTransferTabletBatchTimer(System.nanoTime() - startTime);
+ return resp;
case TRANSFER_TS_FILE_PIECE:
- return handleTransferFilePiece(
- PipeTransferTsFilePieceReq.fromTPipeTransferReq(req),
- req instanceof AirGapPseudoTPipeTransferRequest,
- true);
+ resp =
+ handleTransferFilePiece(
+ PipeTransferTsFilePieceReq.fromTPipeTransferReq(req),
+ req instanceof AirGapPseudoTPipeTransferRequest,
+ true);
+ PipeDataNodeReceiverMetrics.getInstance()
+ .recordTransferTsFilePieceTimer(System.nanoTime() - startTime);
+ return resp;
case TRANSFER_TS_FILE_SEAL:
- return
handleTransferFileSealV1(PipeTransferTsFileSealReq.fromTPipeTransferReq(req));
+ resp =
handleTransferFileSealV1(PipeTransferTsFileSealReq.fromTPipeTransferReq(req));
+ PipeDataNodeReceiverMetrics.getInstance()
+ .recordTransferTsFileSealTimer(System.nanoTime() - startTime);
+ return resp;
case TRANSFER_TS_FILE_PIECE_WITH_MOD:
- return handleTransferFilePiece(
- PipeTransferTsFilePieceWithModReq.fromTPipeTransferReq(req),
- req instanceof AirGapPseudoTPipeTransferRequest,
- false);
+ resp =
+ handleTransferFilePiece(
+
PipeTransferTsFilePieceWithModReq.fromTPipeTransferReq(req),
+ req instanceof AirGapPseudoTPipeTransferRequest,
+ false);
+ PipeDataNodeReceiverMetrics.getInstance()
+ .recordTransferTsFilePieceWithModTimer(System.nanoTime() -
startTime);
+ return resp;
case TRANSFER_TS_FILE_SEAL_WITH_MOD:
- return handleTransferFileSealV2(
- PipeTransferTsFileSealWithModReq.fromTPipeTransferReq(req));
+ resp =
+ handleTransferFileSealV2(
+
PipeTransferTsFileSealWithModReq.fromTPipeTransferReq(req));
+ PipeDataNodeReceiverMetrics.getInstance()
+ .recordTransferTsFileSealWithModTimer(System.nanoTime() -
startTime);
+ return resp;
case TRANSFER_SCHEMA_PLAN:
- return
handleTransferSchemaPlan(PipeTransferPlanNodeReq.fromTPipeTransferReq(req));
+ resp =
handleTransferSchemaPlan(PipeTransferPlanNodeReq.fromTPipeTransferReq(req));
+ PipeDataNodeReceiverMetrics.getInstance()
+ .recordTransferSchemaPlanTimer(System.nanoTime() - startTime);
+ return resp;
case TRANSFER_SCHEMA_SNAPSHOT_PIECE:
- return handleTransferFilePiece(
- PipeTransferSchemaSnapshotPieceReq.fromTPipeTransferReq(req),
- req instanceof AirGapPseudoTPipeTransferRequest,
- false);
+ resp =
+ handleTransferFilePiece(
+
PipeTransferSchemaSnapshotPieceReq.fromTPipeTransferReq(req),
+ req instanceof AirGapPseudoTPipeTransferRequest,
+ false);
+ PipeDataNodeReceiverMetrics.getInstance()
+ .recordTransferSchemaSnapshotPieceTimer(System.nanoTime() -
startTime);
+ return resp;
case TRANSFER_SCHEMA_SNAPSHOT_SEAL:
- return handleTransferFileSealV2(
- PipeTransferSchemaSnapshotSealReq.fromTPipeTransferReq(req));
+ resp =
+ handleTransferFileSealV2(
+
PipeTransferSchemaSnapshotSealReq.fromTPipeTransferReq(req));
+ PipeDataNodeReceiverMetrics.getInstance()
+ .recordTransferSchemaSnapshotSealTimer(System.nanoTime() -
startTime);
+ return resp;
case HANDSHAKE_CONFIGNODE_V1:
case HANDSHAKE_CONFIGNODE_V2:
case TRANSFER_CONFIG_PLAN:
@@ -192,9 +242,11 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
case TRANSFER_CONFIG_SNAPSHOT_SEAL:
// Config requests will first be received by the DataNode receiver,
// then transferred to ConfigNode receiver to execute.
- return handleTransferConfigPlan(req);
+ resp = handleTransferConfigPlan(req);
+ return resp;
case TRANSFER_COMPRESSED:
- return
receive(PipeTransferCompressedReq.fromTPipeTransferReq(req));
+ resp =
receive(PipeTransferCompressedReq.fromTPipeTransferReq(req));
+ return resp;
default:
break;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
index fe7033c88c1..cc096016f4b 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
@@ -133,6 +133,8 @@ public enum Metric {
UNTRANSFERRED_TABLET_COUNT("untransferred_tablet_count"),
UNTRANSFERRED_TSFILE_COUNT("untransferred_tsfile_count"),
UNTRANSFERRED_HEARTBEAT_COUNT("untransferred_heartbeat_count"),
+ PIPE_DATANODE_RECEIVER("pipe_datanode_receiver"),
+ PIPE_CONFIGNODE_RECEIVER("pipe_confignode_receiver"),
PIPE_EXTRACTOR_TABLET_SUPPLY("pipe_extractor_tablet_supply"),
PIPE_EXTRACTOR_TSFILE_SUPPLY("pipe_extractor_tsfile_supply"),
PIPE_EXTRACTOR_HEARTBEAT_SUPPLY("pipe_extractor_heartbeat_supply"),