This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch ty/TableModelGrammar
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit fd1528baa1f16e130752fc5a6e3ee053c4cc0729
Author: YC27 <[email protected]>
AuthorDate: Tue Jul 23 14:45:36 2024 +0800

    Pipe: add ops/latency metrics for different types of pipe operations on 
receivers (DN / CN) (#12927)
    
    (cherry picked from commit 623fa5bac5f4be73da4b8fec56499e7beed6d72e)
---
 .../manager/pipe/metric/PipeConfigNodeMetrics.java |   2 +
 .../pipe/metric/PipeConfigNodeReceiverMetrics.java | 169 +++++++++++
 .../receiver/protocol/IoTDBConfigNodeReceiver.java |  44 ++-
 .../iotdb/db/pipe/metric/PipeDataNodeMetrics.java  |   2 +
 .../pipe/metric/PipeDataNodeReceiverMetrics.java   | 333 +++++++++++++++++++++
 .../protocol/thrift/IoTDBDataNodeReceiver.java     | 112 +++++--
 .../iotdb/commons/service/metric/enums/Metric.java |   2 +
 7 files changed, 623 insertions(+), 41 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeMetrics.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeMetrics.java
index b02679f147e..a7a41f4f359 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeMetrics.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeMetrics.java
@@ -42,6 +42,7 @@ public class PipeConfigNodeMetrics implements IMetricSet {
     PipeConfigRegionConnectorMetrics.getInstance().bindTo(metricService);
     PipeConfigNodeRemainingTimeMetrics.getInstance().bindTo(metricService);
     PipeTemporaryMetaMetrics.getInstance().bindTo(metricService);
+    PipeConfigNodeReceiverMetrics.getInstance().bindTo(metricService);
   }
 
   @Override
@@ -53,5 +54,6 @@ public class PipeConfigNodeMetrics implements IMetricSet {
     PipeConfigRegionConnectorMetrics.getInstance().unbindFrom(metricService);
     PipeConfigNodeRemainingTimeMetrics.getInstance().unbindFrom(metricService);
     PipeTemporaryMetaMetrics.getInstance().unbindFrom(metricService);
+    PipeConfigNodeReceiverMetrics.getInstance().unbindFrom(metricService);
   }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeReceiverMetrics.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeReceiverMetrics.java
new file mode 100644
index 00000000000..e0a30f42c2e
--- /dev/null
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeReceiverMetrics.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.pipe.metric;
+
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.impl.DoNothingMetricManager;
+import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.type.Timer;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
+
+public class PipeConfigNodeReceiverMetrics implements IMetricSet {
+
+  private static final PipeConfigNodeReceiverMetrics INSTANCE = new 
PipeConfigNodeReceiverMetrics();
+
+  private Timer handshakeConfigNodeV1Timer = 
DoNothingMetricManager.DO_NOTHING_TIMER;
+  private Timer handshakeConfigNodeV2Timer = 
DoNothingMetricManager.DO_NOTHING_TIMER;
+  private Timer transferConfigPlanTimer = 
DoNothingMetricManager.DO_NOTHING_TIMER;
+  private Timer transferConfigSnapshotPieceTimer = 
DoNothingMetricManager.DO_NOTHING_TIMER;
+  private Timer transferConfigSnapshotSealTimer = 
DoNothingMetricManager.DO_NOTHING_TIMER;
+
+  private static final String RECEIVER = "pipeConfigNodeReceiver";
+
+  private PipeConfigNodeReceiverMetrics() {}
+
+  public void recordHandshakeConfigNodeV1Timer(long costTimeInNanos) {
+    handshakeConfigNodeV1Timer.updateNanos(costTimeInNanos);
+  }
+
+  public void recordHandshakeConfigNodeV2Timer(long costTimeInNanos) {
+    handshakeConfigNodeV2Timer.updateNanos(costTimeInNanos);
+  }
+
+  public void recordTransferConfigPlanTimer(long costTimeInNanos) {
+    transferConfigPlanTimer.updateNanos(costTimeInNanos);
+  }
+
+  public void recordTransferConfigSnapshotPieceTimer(long costTimeInNanos) {
+    transferConfigSnapshotPieceTimer.updateNanos(costTimeInNanos);
+  }
+
+  public void recordTransferConfigSnapshotSealTimer(long costTimeInNanos) {
+    transferConfigSnapshotSealTimer.updateNanos(costTimeInNanos);
+  }
+
+  @Override
+  public void bindTo(AbstractMetricService metricService) {
+    bindToTimer(metricService);
+  }
+
+  private void bindToTimer(AbstractMetricService metricService) {
+    handshakeConfigNodeV1Timer =
+        metricService.getOrCreateTimer(
+            Metric.PIPE_CONFIGNODE_RECEIVER.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            RECEIVER,
+            Tag.TYPE.toString(),
+            "handshakeConfigNodeV1");
+
+    handshakeConfigNodeV2Timer =
+        metricService.getOrCreateTimer(
+            Metric.PIPE_CONFIGNODE_RECEIVER.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            RECEIVER,
+            Tag.TYPE.toString(),
+            "handshakeConfigNodeV2");
+
+    transferConfigPlanTimer =
+        metricService.getOrCreateTimer(
+            Metric.PIPE_CONFIGNODE_RECEIVER.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            RECEIVER,
+            Tag.TYPE.toString(),
+            "transferConfigPlan");
+
+    transferConfigSnapshotPieceTimer =
+        metricService.getOrCreateTimer(
+            Metric.PIPE_CONFIGNODE_RECEIVER.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            RECEIVER,
+            Tag.TYPE.toString(),
+            "transferConfigSnapshotPiece");
+
+    transferConfigSnapshotSealTimer =
+        metricService.getOrCreateTimer(
+            Metric.PIPE_CONFIGNODE_RECEIVER.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            RECEIVER,
+            Tag.TYPE.toString(),
+            "transferConfigSnapshotSeal");
+  }
+
+  @Override
+  public void unbindFrom(AbstractMetricService metricService) {
+    unbind(metricService);
+  }
+
+  private void unbind(AbstractMetricService metricService) {
+    handshakeConfigNodeV1Timer = DoNothingMetricManager.DO_NOTHING_TIMER;
+    handshakeConfigNodeV2Timer = DoNothingMetricManager.DO_NOTHING_TIMER;
+    transferConfigPlanTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+    transferConfigSnapshotPieceTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+    transferConfigSnapshotSealTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+
+    metricService.remove(
+        MetricType.TIMER,
+        Metric.PIPE_CONFIGNODE_RECEIVER.toString(),
+        Tag.NAME.toString(),
+        RECEIVER,
+        Tag.TYPE.toString(),
+        "handshakeConfigNodeV1");
+    metricService.remove(
+        MetricType.TIMER,
+        Metric.PIPE_CONFIGNODE_RECEIVER.toString(),
+        Tag.NAME.toString(),
+        RECEIVER,
+        Tag.TYPE.toString(),
+        "handshakeConfigNodeV2");
+    metricService.remove(
+        MetricType.TIMER,
+        Metric.PIPE_CONFIGNODE_RECEIVER.toString(),
+        Tag.NAME.toString(),
+        RECEIVER,
+        Tag.TYPE.toString(),
+        "transferConfigPlan");
+    metricService.remove(
+        MetricType.TIMER,
+        Metric.PIPE_CONFIGNODE_RECEIVER.toString(),
+        Tag.NAME.toString(),
+        RECEIVER,
+        Tag.TYPE.toString(),
+        "transferConfigSnapshotPiece");
+    metricService.remove(
+        MetricType.TIMER,
+        Metric.PIPE_CONFIGNODE_RECEIVER.toString(),
+        Tag.NAME.toString(),
+        RECEIVER,
+        Tag.TYPE.toString(),
+        "transferConfigSnapshotSeal");
+  }
+
+  public static PipeConfigNodeReceiverMetrics getInstance() {
+    return INSTANCE;
+  }
+}
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
index 7815adde258..711d7bdf26c 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
@@ -53,6 +53,7 @@ import 
org.apache.iotdb.confignode.manager.pipe.connector.payload.PipeTransferCo
 import 
org.apache.iotdb.confignode.manager.pipe.connector.payload.PipeTransferConfigSnapshotSealReq;
 import 
org.apache.iotdb.confignode.manager.pipe.event.PipeConfigRegionSnapshotEvent;
 import 
org.apache.iotdb.confignode.manager.pipe.extractor.IoTDBConfigRegionExtractor;
+import 
org.apache.iotdb.confignode.manager.pipe.metric.PipeConfigNodeReceiverMetrics;
 import 
org.apache.iotdb.confignode.manager.pipe.receiver.visitor.PipeConfigPhysicalPlanExceptionVisitor;
 import 
org.apache.iotdb.confignode.manager.pipe.receiver.visitor.PipeConfigPhysicalPlanTSStatusVisitor;
 import org.apache.iotdb.confignode.persistence.schema.CNPhysicalPlanGenerator;
@@ -111,23 +112,44 @@ public class IoTDBConfigNodeReceiver extends 
IoTDBFileReceiver {
                   .setMessage(
                       "The receiver ConfigNode has set up a new receiver and 
the sender must re-send its handshake request."));
         }
+        TPipeTransferResp resp;
+        long startTime = System.nanoTime();
         switch (type) {
           case HANDSHAKE_CONFIGNODE_V1:
-            return handleTransferHandshakeV1(
-                
PipeTransferConfigNodeHandshakeV1Req.fromTPipeTransferReq(req));
+            resp =
+                handleTransferHandshakeV1(
+                    
PipeTransferConfigNodeHandshakeV1Req.fromTPipeTransferReq(req));
+            PipeConfigNodeReceiverMetrics.getInstance()
+                .recordHandshakeConfigNodeV1Timer(System.nanoTime() - 
startTime);
+            return resp;
           case HANDSHAKE_CONFIGNODE_V2:
-            return handleTransferHandshakeV2(
-                
PipeTransferConfigNodeHandshakeV2Req.fromTPipeTransferReq(req));
+            resp =
+                handleTransferHandshakeV2(
+                    
PipeTransferConfigNodeHandshakeV2Req.fromTPipeTransferReq(req));
+            PipeConfigNodeReceiverMetrics.getInstance()
+                .recordHandshakeConfigNodeV2Timer(System.nanoTime() - 
startTime);
+            return resp;
           case TRANSFER_CONFIG_PLAN:
-            return 
handleTransferConfigPlan(PipeTransferConfigPlanReq.fromTPipeTransferReq(req));
+            resp = 
handleTransferConfigPlan(PipeTransferConfigPlanReq.fromTPipeTransferReq(req));
+            PipeConfigNodeReceiverMetrics.getInstance()
+                .recordTransferConfigPlanTimer(System.nanoTime() - startTime);
+            return resp;
           case TRANSFER_CONFIG_SNAPSHOT_PIECE:
-            return handleTransferFilePiece(
-                PipeTransferConfigSnapshotPieceReq.fromTPipeTransferReq(req),
-                req instanceof AirGapPseudoTPipeTransferRequest,
-                false);
+            resp =
+                handleTransferFilePiece(
+                    
PipeTransferConfigSnapshotPieceReq.fromTPipeTransferReq(req),
+                    req instanceof AirGapPseudoTPipeTransferRequest,
+                    false);
+            PipeConfigNodeReceiverMetrics.getInstance()
+                .recordTransferConfigSnapshotPieceTimer(System.nanoTime() - 
startTime);
+            return resp;
           case TRANSFER_CONFIG_SNAPSHOT_SEAL:
-            return handleTransferFileSealV2(
-                PipeTransferConfigSnapshotSealReq.fromTPipeTransferReq(req));
+            resp =
+                handleTransferFileSealV2(
+                    
PipeTransferConfigSnapshotSealReq.fromTPipeTransferReq(req));
+            PipeConfigNodeReceiverMetrics.getInstance()
+                .recordTransferConfigSnapshotSealTimer(System.nanoTime() - 
startTime);
+            return resp;
           case TRANSFER_COMPRESSED:
             return 
receive(PipeTransferCompressedReq.fromTPipeTransferReq(req));
           default:
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeMetrics.java
index 2f39cc75c25..172653f3b01 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeMetrics.java
@@ -41,6 +41,7 @@ public class PipeDataNodeMetrics implements IMetricSet {
     PipeSchemaRegionExtractorMetrics.getInstance().bindTo(metricService);
     PipeSchemaRegionConnectorMetrics.getInstance().bindTo(metricService);
     
PipeDataNodeRemainingEventAndTimeMetrics.getInstance().bindTo(metricService);
+    PipeDataNodeReceiverMetrics.getInstance().bindTo(metricService);
   }
 
   @Override
@@ -57,6 +58,7 @@ public class PipeDataNodeMetrics implements IMetricSet {
     PipeSchemaRegionExtractorMetrics.getInstance().unbindFrom(metricService);
     PipeSchemaRegionConnectorMetrics.getInstance().unbindFrom(metricService);
     
PipeDataNodeRemainingEventAndTimeMetrics.getInstance().unbindFrom(metricService);
+    PipeDataNodeReceiverMetrics.getInstance().unbindFrom(metricService);
   }
 
   //////////////////////////// singleton ////////////////////////////
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeReceiverMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeReceiverMetrics.java
new file mode 100644
index 00000000000..eddad988041
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeReceiverMetrics.java
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.metric;
+
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.impl.DoNothingMetricManager;
+import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.type.Timer;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
+
+public class PipeDataNodeReceiverMetrics implements IMetricSet {
+
+  private static final PipeDataNodeReceiverMetrics INSTANCE = new 
PipeDataNodeReceiverMetrics();
+
+  private Timer handshakeDatanodeV1Timer = 
DoNothingMetricManager.DO_NOTHING_TIMER;
+  private Timer handshakeDatanodeV2Timer = 
DoNothingMetricManager.DO_NOTHING_TIMER;
+  private Timer transferTabletInsertNodeTimer = 
DoNothingMetricManager.DO_NOTHING_TIMER;
+  private Timer transferTabletRawTimer = 
DoNothingMetricManager.DO_NOTHING_TIMER;
+  private Timer transferTabletBinaryTimer = 
DoNothingMetricManager.DO_NOTHING_TIMER;
+  private Timer transferTabletBatchTimer = 
DoNothingMetricManager.DO_NOTHING_TIMER;
+  private Timer transferTsFilePieceTimer = 
DoNothingMetricManager.DO_NOTHING_TIMER;
+  private Timer transferTsFileSealTimer = 
DoNothingMetricManager.DO_NOTHING_TIMER;
+  private Timer transferTsFilePieceWithModTimer = 
DoNothingMetricManager.DO_NOTHING_TIMER;
+  private Timer transferTsFileSealWithModTimer = 
DoNothingMetricManager.DO_NOTHING_TIMER;
+  private Timer transferSchemaPlanTimer = 
DoNothingMetricManager.DO_NOTHING_TIMER;
+  private Timer transferSchemaSnapshotPieceTimer = 
DoNothingMetricManager.DO_NOTHING_TIMER;
+  private Timer transferSchemaSnapshotSealTimer = 
DoNothingMetricManager.DO_NOTHING_TIMER;
+
+  private static final String RECEIVER = "pipeDataNodeReceiver";
+
+  private PipeDataNodeReceiverMetrics() {}
+
+  public void recordHandshakeDatanodeV1Timer(long costTimeInNanos) {
+    handshakeDatanodeV1Timer.updateNanos(costTimeInNanos);
+  }
+
+  public void recordHandshakeDatanodeV2Timer(long costTimeInNanos) {
+    handshakeDatanodeV2Timer.updateNanos(costTimeInNanos);
+  }
+
+  public void recordTransferTabletInsertNodeTimer(long costTimeInNanos) {
+    transferTabletInsertNodeTimer.updateNanos(costTimeInNanos);
+  }
+
+  public void recordTransferTabletRawTimer(long costTimeInNanos) {
+    transferTabletRawTimer.updateNanos(costTimeInNanos);
+  }
+
+  public void recordTransferTabletBinaryTimer(long costTimeInNanos) {
+    transferTabletBinaryTimer.updateNanos(costTimeInNanos);
+  }
+
+  public void recordTransferTabletBatchTimer(long costTimeInNanos) {
+    transferTabletBatchTimer.updateNanos(costTimeInNanos);
+  }
+
+  public void recordTransferTsFilePieceTimer(long costTimeInNanos) {
+    transferTsFilePieceTimer.updateNanos(costTimeInNanos);
+  }
+
+  public void recordTransferTsFileSealTimer(long costTimeInNanos) {
+    transferTsFileSealTimer.updateNanos(costTimeInNanos);
+  }
+
+  public void recordTransferTsFilePieceWithModTimer(long costTimeInNanos) {
+    transferTsFilePieceWithModTimer.updateNanos(costTimeInNanos);
+  }
+
+  public void recordTransferTsFileSealWithModTimer(long costTimeInNanos) {
+    transferTsFileSealWithModTimer.updateNanos(costTimeInNanos);
+  }
+
+  public void recordTransferSchemaPlanTimer(long costTimeInNanos) {
+    transferSchemaPlanTimer.updateNanos(costTimeInNanos);
+  }
+
+  public void recordTransferSchemaSnapshotPieceTimer(long costTimeInNanos) {
+    transferSchemaSnapshotPieceTimer.updateNanos(costTimeInNanos);
+  }
+
+  public void recordTransferSchemaSnapshotSealTimer(long costTimeInNanos) {
+    transferSchemaSnapshotSealTimer.updateNanos(costTimeInNanos);
+  }
+
+  @Override
+  public void bindTo(AbstractMetricService metricService) {
+    bindToTimer(metricService);
+  }
+
+  private void bindToTimer(AbstractMetricService metricService) {
+    handshakeDatanodeV1Timer =
+        metricService.getOrCreateTimer(
+            Metric.PIPE_DATANODE_RECEIVER.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            RECEIVER,
+            Tag.TYPE.toString(),
+            "handshakeDataNodeV1");
+    handshakeDatanodeV2Timer =
+        metricService.getOrCreateTimer(
+            Metric.PIPE_DATANODE_RECEIVER.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            RECEIVER,
+            Tag.TYPE.toString(),
+            "handshakeDataNodeV2");
+    transferTabletInsertNodeTimer =
+        metricService.getOrCreateTimer(
+            Metric.PIPE_DATANODE_RECEIVER.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            RECEIVER,
+            Tag.TYPE.toString(),
+            "transferTabletInsertNode");
+    transferTabletRawTimer =
+        metricService.getOrCreateTimer(
+            Metric.PIPE_DATANODE_RECEIVER.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            RECEIVER,
+            Tag.TYPE.toString(),
+            "transferTabletRaw");
+    transferTabletBinaryTimer =
+        metricService.getOrCreateTimer(
+            Metric.PIPE_DATANODE_RECEIVER.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            RECEIVER,
+            Tag.TYPE.toString(),
+            "transferTabletBinary");
+    transferTabletBatchTimer =
+        metricService.getOrCreateTimer(
+            Metric.PIPE_DATANODE_RECEIVER.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            RECEIVER,
+            Tag.TYPE.toString(),
+            "transferTabletBatch");
+    transferTsFilePieceTimer =
+        metricService.getOrCreateTimer(
+            Metric.PIPE_DATANODE_RECEIVER.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            RECEIVER,
+            Tag.TYPE.toString(),
+            "transferTsFilePiece");
+    transferTsFileSealTimer =
+        metricService.getOrCreateTimer(
+            Metric.PIPE_DATANODE_RECEIVER.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            RECEIVER,
+            Tag.TYPE.toString(),
+            "transferTsFileSeal");
+    transferTsFilePieceWithModTimer =
+        metricService.getOrCreateTimer(
+            Metric.PIPE_DATANODE_RECEIVER.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            RECEIVER,
+            Tag.TYPE.toString(),
+            "transferTsFilePieceWithMod");
+    transferTsFileSealWithModTimer =
+        metricService.getOrCreateTimer(
+            Metric.PIPE_DATANODE_RECEIVER.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            RECEIVER,
+            Tag.TYPE.toString(),
+            "transferTsFileSealWithMod");
+    transferSchemaPlanTimer =
+        metricService.getOrCreateTimer(
+            Metric.PIPE_DATANODE_RECEIVER.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            RECEIVER,
+            Tag.TYPE.toString(),
+            "transferSchemaPlan");
+    transferSchemaSnapshotPieceTimer =
+        metricService.getOrCreateTimer(
+            Metric.PIPE_DATANODE_RECEIVER.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            RECEIVER,
+            Tag.TYPE.toString(),
+            "transferSchemaSnapshotPiece");
+    transferSchemaSnapshotSealTimer =
+        metricService.getOrCreateTimer(
+            Metric.PIPE_DATANODE_RECEIVER.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            RECEIVER,
+            Tag.TYPE.toString(),
+            "transferSchemaSnapshotSeal");
+  }
+
+  @Override
+  public void unbindFrom(AbstractMetricService metricService) {
+    unbind(metricService);
+  }
+
+  private void unbind(AbstractMetricService metricService) {
+    handshakeDatanodeV1Timer = DoNothingMetricManager.DO_NOTHING_TIMER;
+    handshakeDatanodeV2Timer = DoNothingMetricManager.DO_NOTHING_TIMER;
+    transferTabletInsertNodeTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+    transferTabletRawTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+    transferTabletBinaryTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+    transferTabletBatchTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+    transferTsFilePieceTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+    transferTsFileSealTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+    transferTsFilePieceWithModTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+    transferTsFileSealWithModTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+    transferSchemaPlanTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+    transferSchemaSnapshotPieceTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+    transferSchemaSnapshotSealTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+
+    metricService.remove(
+        MetricType.TIMER,
+        Metric.PIPE_DATANODE_RECEIVER.toString(),
+        Tag.NAME.toString(),
+        RECEIVER,
+        Tag.TYPE.toString(),
+        "handshakeDatanodeV1");
+    metricService.remove(
+        MetricType.TIMER,
+        Metric.PIPE_DATANODE_RECEIVER.toString(),
+        Tag.NAME.toString(),
+        RECEIVER,
+        Tag.TYPE.toString(),
+        "handshakeDatanodeV2");
+    metricService.remove(
+        MetricType.TIMER,
+        Metric.PIPE_DATANODE_RECEIVER.toString(),
+        Tag.NAME.toString(),
+        RECEIVER,
+        Tag.TYPE.toString(),
+        "transferTabletInsertNode");
+    metricService.remove(
+        MetricType.TIMER,
+        Metric.PIPE_DATANODE_RECEIVER.toString(),
+        Tag.NAME.toString(),
+        RECEIVER,
+        Tag.TYPE.toString(),
+        "transferTabletRaw");
+    metricService.remove(
+        MetricType.TIMER,
+        Metric.PIPE_DATANODE_RECEIVER.toString(),
+        Tag.NAME.toString(),
+        RECEIVER,
+        Tag.TYPE.toString(),
+        "transferTabletBinary");
+    metricService.remove(
+        MetricType.TIMER,
+        Metric.PIPE_DATANODE_RECEIVER.toString(),
+        Tag.NAME.toString(),
+        RECEIVER,
+        Tag.TYPE.toString(),
+        "transferTabletBatch");
+    metricService.remove(
+        MetricType.TIMER,
+        Metric.PIPE_DATANODE_RECEIVER.toString(),
+        Tag.NAME.toString(),
+        RECEIVER,
+        Tag.TYPE.toString(),
+        "transferTsFilePiece");
+    metricService.remove(
+        MetricType.TIMER,
+        Metric.PIPE_DATANODE_RECEIVER.toString(),
+        Tag.NAME.toString(),
+        RECEIVER,
+        Tag.TYPE.toString(),
+        "transferTsFileSeal");
+    metricService.remove(
+        MetricType.TIMER,
+        Metric.PIPE_DATANODE_RECEIVER.toString(),
+        Tag.NAME.toString(),
+        RECEIVER,
+        Tag.TYPE.toString(),
+        "transferTsFilePieceWithMod");
+    metricService.remove(
+        MetricType.TIMER,
+        Metric.PIPE_DATANODE_RECEIVER.toString(),
+        Tag.NAME.toString(),
+        RECEIVER,
+        Tag.TYPE.toString(),
+        "transferTsFileSealWithMod");
+    metricService.remove(
+        MetricType.TIMER,
+        Metric.PIPE_DATANODE_RECEIVER.toString(),
+        Tag.NAME.toString(),
+        RECEIVER,
+        Tag.TYPE.toString(),
+        "transferSchemaPlan");
+    metricService.remove(
+        MetricType.TIMER,
+        Metric.PIPE_DATANODE_RECEIVER.toString(),
+        Tag.NAME.toString(),
+        RECEIVER,
+        Tag.TYPE.toString(),
+        "transferSchemaSnapshotPiece");
+    metricService.remove(
+        MetricType.TIMER,
+        Metric.PIPE_DATANODE_RECEIVER.toString(),
+        Tag.NAME.toString(),
+        RECEIVER,
+        Tag.TYPE.toString(),
+        "transferSchemaSnapshotSeal");
+  }
+
+  public static PipeDataNodeReceiverMetrics getInstance() {
+    return INSTANCE;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index da706539278..12916ba4506 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -49,6 +49,7 @@ import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransfer
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFileSealReq;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFileSealWithModReq;
 import 
org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionSnapshotEvent;
+import org.apache.iotdb.db.pipe.metric.PipeDataNodeReceiverMetrics;
 import org.apache.iotdb.db.pipe.receiver.visitor.PipePlanToStatementVisitor;
 import org.apache.iotdb.db.pipe.receiver.visitor.PipeStatementExceptionVisitor;
 import 
org.apache.iotdb.db.pipe.receiver.visitor.PipeStatementPatternParseVisitor;
@@ -141,50 +142,99 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
   @Override
   public synchronized TPipeTransferResp receive(final TPipeTransferReq req) {
     try {
+      long startTime = System.nanoTime();
       final short rawRequestType = req.getType();
       if (PipeRequestType.isValidatedRequestType(rawRequestType)) {
+        TPipeTransferResp resp;
         switch (PipeRequestType.valueOf(rawRequestType)) {
           case HANDSHAKE_DATANODE_V1:
-            return handleTransferHandshakeV1(
-                PipeTransferDataNodeHandshakeV1Req.fromTPipeTransferReq(req));
+            resp =
+                handleTransferHandshakeV1(
+                    
PipeTransferDataNodeHandshakeV1Req.fromTPipeTransferReq(req));
+            PipeDataNodeReceiverMetrics.getInstance()
+                .recordHandshakeDatanodeV1Timer(System.nanoTime() - startTime);
+            return resp;
           case HANDSHAKE_DATANODE_V2:
-            return handleTransferHandshakeV2(
-                PipeTransferDataNodeHandshakeV2Req.fromTPipeTransferReq(req));
+            resp =
+                handleTransferHandshakeV2(
+                    
PipeTransferDataNodeHandshakeV2Req.fromTPipeTransferReq(req));
+            PipeDataNodeReceiverMetrics.getInstance()
+                .recordHandshakeDatanodeV2Timer(System.nanoTime() - startTime);
+            return resp;
           case TRANSFER_TABLET_INSERT_NODE:
-            return handleTransferTabletInsertNode(
-                PipeTransferTabletInsertNodeReq.fromTPipeTransferReq(req));
+            resp =
+                handleTransferTabletInsertNode(
+                    PipeTransferTabletInsertNodeReq.fromTPipeTransferReq(req));
+            PipeDataNodeReceiverMetrics.getInstance()
+                .recordTransferTabletInsertNodeTimer(System.nanoTime() - 
startTime);
+            return resp;
           case TRANSFER_TABLET_RAW:
-            return 
handleTransferTabletRaw(PipeTransferTabletRawReq.fromTPipeTransferReq(req));
+            resp = 
handleTransferTabletRaw(PipeTransferTabletRawReq.fromTPipeTransferReq(req));
+            PipeDataNodeReceiverMetrics.getInstance()
+                .recordTransferTabletRawTimer(System.nanoTime() - startTime);
+            return resp;
           case TRANSFER_TABLET_BINARY:
-            return handleTransferTabletBinary(
-                PipeTransferTabletBinaryReq.fromTPipeTransferReq(req));
+            resp =
+                
handleTransferTabletBinary(PipeTransferTabletBinaryReq.fromTPipeTransferReq(req));
+            PipeDataNodeReceiverMetrics.getInstance()
+                .recordTransferTabletBinaryTimer(System.nanoTime() - 
startTime);
+            return resp;
           case TRANSFER_TABLET_BATCH:
-            return 
handleTransferTabletBatch(PipeTransferTabletBatchReq.fromTPipeTransferReq(req));
+            resp = 
handleTransferTabletBatch(PipeTransferTabletBatchReq.fromTPipeTransferReq(req));
+            PipeDataNodeReceiverMetrics.getInstance()
+                .recordTransferTabletBatchTimer(System.nanoTime() - startTime);
+            return resp;
           case TRANSFER_TS_FILE_PIECE:
-            return handleTransferFilePiece(
-                PipeTransferTsFilePieceReq.fromTPipeTransferReq(req),
-                req instanceof AirGapPseudoTPipeTransferRequest,
-                true);
+            resp =
+                handleTransferFilePiece(
+                    PipeTransferTsFilePieceReq.fromTPipeTransferReq(req),
+                    req instanceof AirGapPseudoTPipeTransferRequest,
+                    true);
+            PipeDataNodeReceiverMetrics.getInstance()
+                .recordTransferTsFilePieceTimer(System.nanoTime() - startTime);
+            return resp;
           case TRANSFER_TS_FILE_SEAL:
-            return 
handleTransferFileSealV1(PipeTransferTsFileSealReq.fromTPipeTransferReq(req));
+            resp = 
handleTransferFileSealV1(PipeTransferTsFileSealReq.fromTPipeTransferReq(req));
+            PipeDataNodeReceiverMetrics.getInstance()
+                .recordTransferTsFileSealTimer(System.nanoTime() - startTime);
+            return resp;
           case TRANSFER_TS_FILE_PIECE_WITH_MOD:
-            return handleTransferFilePiece(
-                PipeTransferTsFilePieceWithModReq.fromTPipeTransferReq(req),
-                req instanceof AirGapPseudoTPipeTransferRequest,
-                false);
+            resp =
+                handleTransferFilePiece(
+                    
PipeTransferTsFilePieceWithModReq.fromTPipeTransferReq(req),
+                    req instanceof AirGapPseudoTPipeTransferRequest,
+                    false);
+            PipeDataNodeReceiverMetrics.getInstance()
+                .recordTransferTsFilePieceWithModTimer(System.nanoTime() - 
startTime);
+            return resp;
           case TRANSFER_TS_FILE_SEAL_WITH_MOD:
-            return handleTransferFileSealV2(
-                PipeTransferTsFileSealWithModReq.fromTPipeTransferReq(req));
+            resp =
+                handleTransferFileSealV2(
+                    
PipeTransferTsFileSealWithModReq.fromTPipeTransferReq(req));
+            PipeDataNodeReceiverMetrics.getInstance()
+                .recordTransferTsFileSealWithModTimer(System.nanoTime() - 
startTime);
+            return resp;
           case TRANSFER_SCHEMA_PLAN:
-            return 
handleTransferSchemaPlan(PipeTransferPlanNodeReq.fromTPipeTransferReq(req));
+            resp = 
handleTransferSchemaPlan(PipeTransferPlanNodeReq.fromTPipeTransferReq(req));
+            PipeDataNodeReceiverMetrics.getInstance()
+                .recordTransferSchemaPlanTimer(System.nanoTime() - startTime);
+            return resp;
           case TRANSFER_SCHEMA_SNAPSHOT_PIECE:
-            return handleTransferFilePiece(
-                PipeTransferSchemaSnapshotPieceReq.fromTPipeTransferReq(req),
-                req instanceof AirGapPseudoTPipeTransferRequest,
-                false);
+            resp =
+                handleTransferFilePiece(
+                    
PipeTransferSchemaSnapshotPieceReq.fromTPipeTransferReq(req),
+                    req instanceof AirGapPseudoTPipeTransferRequest,
+                    false);
+            PipeDataNodeReceiverMetrics.getInstance()
+                .recordTransferSchemaSnapshotPieceTimer(System.nanoTime() - 
startTime);
+            return resp;
           case TRANSFER_SCHEMA_SNAPSHOT_SEAL:
-            return handleTransferFileSealV2(
-                PipeTransferSchemaSnapshotSealReq.fromTPipeTransferReq(req));
+            resp =
+                handleTransferFileSealV2(
+                    
PipeTransferSchemaSnapshotSealReq.fromTPipeTransferReq(req));
+            PipeDataNodeReceiverMetrics.getInstance()
+                .recordTransferSchemaSnapshotSealTimer(System.nanoTime() - 
startTime);
+            return resp;
           case HANDSHAKE_CONFIGNODE_V1:
           case HANDSHAKE_CONFIGNODE_V2:
           case TRANSFER_CONFIG_PLAN:
@@ -192,9 +242,11 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
           case TRANSFER_CONFIG_SNAPSHOT_SEAL:
             // Config requests will first be received by the DataNode receiver,
             // then transferred to ConfigNode receiver to execute.
-            return handleTransferConfigPlan(req);
+            resp = handleTransferConfigPlan(req);
+            return resp;
           case TRANSFER_COMPRESSED:
-            return 
receive(PipeTransferCompressedReq.fromTPipeTransferReq(req));
+            resp = 
receive(PipeTransferCompressedReq.fromTPipeTransferReq(req));
+            return resp;
           default:
             break;
         }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
index fe7033c88c1..cc096016f4b 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
@@ -133,6 +133,8 @@ public enum Metric {
   UNTRANSFERRED_TABLET_COUNT("untransferred_tablet_count"),
   UNTRANSFERRED_TSFILE_COUNT("untransferred_tsfile_count"),
   UNTRANSFERRED_HEARTBEAT_COUNT("untransferred_heartbeat_count"),
+  PIPE_DATANODE_RECEIVER("pipe_datanode_receiver"),
+  PIPE_CONFIGNODE_RECEIVER("pipe_confignode_receiver"),
   PIPE_EXTRACTOR_TABLET_SUPPLY("pipe_extractor_tablet_supply"),
   PIPE_EXTRACTOR_TSFILE_SUPPLY("pipe_extractor_tsfile_supply"),
   PIPE_EXTRACTOR_HEARTBEAT_SUPPLY("pipe_extractor_heartbeat_supply"),


Reply via email to