This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch make-sure-no-frame-size-problem
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 757b2ccabf05ce59325fd7337bd9f68a6308cf2c
Author: Steve Yurong Su <[email protected]>
AuthorDate: Sat Aug 24 03:29:34 2024 +0800

    2nd commit
---
 .../pipe/metric/PipeDataNodeReceiverMetrics.java   |  63 ++++++
 .../protocol/thrift/IoTDBDataNodeReceiver.java     | 211 ++++++++++++++-------
 .../payload/thrift/request/PipeRequestType.java    |   2 +-
 .../thrift/request/PipeTransferSliceReq.java       |   2 +-
 4 files changed, 208 insertions(+), 70 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeReceiverMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeReceiverMetrics.java
index eddad988041..8405e48085a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeReceiverMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeReceiverMetrics.java
@@ -45,6 +45,9 @@ public class PipeDataNodeReceiverMetrics implements 
IMetricSet {
   private Timer transferSchemaPlanTimer = 
DoNothingMetricManager.DO_NOTHING_TIMER;
   private Timer transferSchemaSnapshotPieceTimer = 
DoNothingMetricManager.DO_NOTHING_TIMER;
   private Timer transferSchemaSnapshotSealTimer = 
DoNothingMetricManager.DO_NOTHING_TIMER;
+  private Timer transferConfigPlanTimer = 
DoNothingMetricManager.DO_NOTHING_TIMER;
+  private Timer transferCompressedTimer = 
DoNothingMetricManager.DO_NOTHING_TIMER;
+  private Timer transferSliceTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
 
   private static final String RECEIVER = "pipeDataNodeReceiver";
 
@@ -102,6 +105,18 @@ public class PipeDataNodeReceiverMetrics implements 
IMetricSet {
     transferSchemaSnapshotSealTimer.updateNanos(costTimeInNanos);
   }
 
+  public void recordTransferConfigPlanTimer(long costTimeInNanos) {
+    transferConfigPlanTimer.updateNanos(costTimeInNanos);
+  }
+
+  public void recordTransferCompressedTimer(long costTimeInNanos) {
+    transferCompressedTimer.updateNanos(costTimeInNanos);
+  }
+
+  public void recordTransferSliceTimer(long costTimeInNanos) {
+    transferSliceTimer.updateNanos(costTimeInNanos);
+  }
+
   @Override
   public void bindTo(AbstractMetricService metricService) {
     bindToTimer(metricService);
@@ -212,6 +227,30 @@ public class PipeDataNodeReceiverMetrics implements 
IMetricSet {
             RECEIVER,
             Tag.TYPE.toString(),
             "transferSchemaSnapshotSeal");
+    transferConfigPlanTimer =
+        metricService.getOrCreateTimer(
+            Metric.PIPE_DATANODE_RECEIVER.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            RECEIVER,
+            Tag.TYPE.toString(),
+            "transferConfigPlan");
+    transferCompressedTimer =
+        metricService.getOrCreateTimer(
+            Metric.PIPE_DATANODE_RECEIVER.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            RECEIVER,
+            Tag.TYPE.toString(),
+            "transferCompressed");
+    transferSliceTimer =
+        metricService.getOrCreateTimer(
+            Metric.PIPE_DATANODE_RECEIVER.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            RECEIVER,
+            Tag.TYPE.toString(),
+            "transferSlice");
   }
 
   @Override
@@ -233,6 +272,9 @@ public class PipeDataNodeReceiverMetrics implements 
IMetricSet {
     transferSchemaPlanTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
     transferSchemaSnapshotPieceTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
     transferSchemaSnapshotSealTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+    transferConfigPlanTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+    transferCompressedTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+    transferSliceTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
 
     metricService.remove(
         MetricType.TIMER,
@@ -325,6 +367,27 @@ public class PipeDataNodeReceiverMetrics implements 
IMetricSet {
         RECEIVER,
         Tag.TYPE.toString(),
         "transferSchemaSnapshotSeal");
+    metricService.remove(
+        MetricType.TIMER,
+        Metric.PIPE_DATANODE_RECEIVER.toString(),
+        Tag.NAME.toString(),
+        RECEIVER,
+        Tag.TYPE.toString(),
+        "transferConfigPlan");
+    metricService.remove(
+        MetricType.TIMER,
+        Metric.PIPE_DATANODE_RECEIVER.toString(),
+        Tag.NAME.toString(),
+        RECEIVER,
+        Tag.TYPE.toString(),
+        "transferCompressed");
+    metricService.remove(
+        MetricType.TIMER,
+        Metric.PIPE_DATANODE_RECEIVER.toString(),
+        Tag.NAME.toString(),
+        RECEIVER,
+        Tag.TYPE.toString(),
+        "transferSlice");
   }
 
   public static PipeDataNodeReceiverMetrics getInstance() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index 40f5065caec..0978dfa17cc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -28,6 +28,7 @@ import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeReques
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferCompressedReq;
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferFileSealReqV1;
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferFileSealReqV2;
+import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferSliceReq;
 import org.apache.iotdb.commons.pipe.pattern.IoTDBPipePattern;
 import org.apache.iotdb.commons.pipe.receiver.IoTDBFileReceiver;
 import org.apache.iotdb.commons.utils.FileUtils;
@@ -151,108 +152,178 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
       long startTime = System.nanoTime();
       final short rawRequestType = req.getType();
       if (PipeRequestType.isValidatedRequestType(rawRequestType)) {
-        TPipeTransferResp resp;
         switch (PipeRequestType.valueOf(rawRequestType)) {
           case HANDSHAKE_DATANODE_V1:
-            resp =
-                handleTransferHandshakeV1(
+            {
+              try {
+                return handleTransferHandshakeV1(
                     
PipeTransferDataNodeHandshakeV1Req.fromTPipeTransferReq(req));
-            PipeDataNodeReceiverMetrics.getInstance()
-                .recordHandshakeDatanodeV1Timer(System.nanoTime() - startTime);
-            return resp;
+              } finally {
+                PipeDataNodeReceiverMetrics.getInstance()
+                    .recordHandshakeDatanodeV1Timer(System.nanoTime() - 
startTime);
+              }
+            }
           case HANDSHAKE_DATANODE_V2:
-            resp =
-                handleTransferHandshakeV2(
+            {
+              try {
+                return handleTransferHandshakeV2(
                     
PipeTransferDataNodeHandshakeV2Req.fromTPipeTransferReq(req));
-            PipeDataNodeReceiverMetrics.getInstance()
-                .recordHandshakeDatanodeV2Timer(System.nanoTime() - startTime);
-            return resp;
+              } finally {
+                PipeDataNodeReceiverMetrics.getInstance()
+                    .recordHandshakeDatanodeV2Timer(System.nanoTime() - 
startTime);
+              }
+            }
           case TRANSFER_TABLET_INSERT_NODE:
-            resp =
-                handleTransferTabletInsertNode(
+            {
+              try {
+                return handleTransferTabletInsertNode(
                     PipeTransferTabletInsertNodeReq.fromTPipeTransferReq(req));
-            PipeDataNodeReceiverMetrics.getInstance()
-                .recordTransferTabletInsertNodeTimer(System.nanoTime() - 
startTime);
-            return resp;
+
+              } finally {
+                PipeDataNodeReceiverMetrics.getInstance()
+                    .recordTransferTabletInsertNodeTimer(System.nanoTime() - 
startTime);
+              }
+            }
           case TRANSFER_TABLET_RAW:
-            resp = 
handleTransferTabletRaw(PipeTransferTabletRawReq.fromTPipeTransferReq(req));
-            PipeDataNodeReceiverMetrics.getInstance()
-                .recordTransferTabletRawTimer(System.nanoTime() - startTime);
-            return resp;
+            {
+              try {
+                return 
handleTransferTabletRaw(PipeTransferTabletRawReq.fromTPipeTransferReq(req));
+              } finally {
+                PipeDataNodeReceiverMetrics.getInstance()
+                    .recordTransferTabletRawTimer(System.nanoTime() - 
startTime);
+              }
+            }
           case TRANSFER_TABLET_BINARY:
-            resp =
-                
handleTransferTabletBinary(PipeTransferTabletBinaryReq.fromTPipeTransferReq(req));
-            PipeDataNodeReceiverMetrics.getInstance()
-                .recordTransferTabletBinaryTimer(System.nanoTime() - 
startTime);
-            return resp;
+            {
+              try {
+                return handleTransferTabletBinary(
+                    PipeTransferTabletBinaryReq.fromTPipeTransferReq(req));
+              } finally {
+                PipeDataNodeReceiverMetrics.getInstance()
+                    .recordTransferTabletBinaryTimer(System.nanoTime() - 
startTime);
+              }
+            }
           case TRANSFER_TABLET_BATCH:
-            resp = 
handleTransferTabletBatch(PipeTransferTabletBatchReq.fromTPipeTransferReq(req));
-            PipeDataNodeReceiverMetrics.getInstance()
-                .recordTransferTabletBatchTimer(System.nanoTime() - startTime);
-            return resp;
+            {
+              try {
+                return handleTransferTabletBatch(
+                    PipeTransferTabletBatchReq.fromTPipeTransferReq(req));
+              } finally {
+                PipeDataNodeReceiverMetrics.getInstance()
+                    .recordTransferTabletBatchTimer(System.nanoTime() - 
startTime);
+              }
+            }
           case TRANSFER_TS_FILE_PIECE:
-            resp =
-                handleTransferFilePiece(
+            {
+              try {
+                return handleTransferFilePiece(
                     PipeTransferTsFilePieceReq.fromTPipeTransferReq(req),
                     req instanceof AirGapPseudoTPipeTransferRequest,
                     true);
-            PipeDataNodeReceiverMetrics.getInstance()
-                .recordTransferTsFilePieceTimer(System.nanoTime() - startTime);
-            return resp;
+              } finally {
+                PipeDataNodeReceiverMetrics.getInstance()
+                    .recordTransferTsFilePieceTimer(System.nanoTime() - 
startTime);
+              }
+            }
           case TRANSFER_TS_FILE_SEAL:
-            resp = 
handleTransferFileSealV1(PipeTransferTsFileSealReq.fromTPipeTransferReq(req));
-            PipeDataNodeReceiverMetrics.getInstance()
-                .recordTransferTsFileSealTimer(System.nanoTime() - startTime);
-            return resp;
+            {
+              try {
+                return handleTransferFileSealV1(
+                    PipeTransferTsFileSealReq.fromTPipeTransferReq(req));
+              } finally {
+                PipeDataNodeReceiverMetrics.getInstance()
+                    .recordTransferTsFileSealTimer(System.nanoTime() - 
startTime);
+              }
+            }
           case TRANSFER_TS_FILE_PIECE_WITH_MOD:
-            resp =
-                handleTransferFilePiece(
+            {
+              try {
+                return handleTransferFilePiece(
                     
PipeTransferTsFilePieceWithModReq.fromTPipeTransferReq(req),
                     req instanceof AirGapPseudoTPipeTransferRequest,
                     false);
-            PipeDataNodeReceiverMetrics.getInstance()
-                .recordTransferTsFilePieceWithModTimer(System.nanoTime() - 
startTime);
-            return resp;
+
+              } finally {
+                PipeDataNodeReceiverMetrics.getInstance()
+                    .recordTransferTsFilePieceWithModTimer(System.nanoTime() - 
startTime);
+              }
+            }
           case TRANSFER_TS_FILE_SEAL_WITH_MOD:
-            resp =
-                handleTransferFileSealV2(
+            {
+              try {
+                return handleTransferFileSealV2(
                     
PipeTransferTsFileSealWithModReq.fromTPipeTransferReq(req));
-            PipeDataNodeReceiverMetrics.getInstance()
-                .recordTransferTsFileSealWithModTimer(System.nanoTime() - 
startTime);
-            return resp;
+              } finally {
+                PipeDataNodeReceiverMetrics.getInstance()
+                    .recordTransferTsFileSealWithModTimer(System.nanoTime() - 
startTime);
+              }
+            }
           case TRANSFER_SCHEMA_PLAN:
-            resp = 
handleTransferSchemaPlan(PipeTransferPlanNodeReq.fromTPipeTransferReq(req));
-            PipeDataNodeReceiverMetrics.getInstance()
-                .recordTransferSchemaPlanTimer(System.nanoTime() - startTime);
-            return resp;
+            {
+              try {
+                return 
handleTransferSchemaPlan(PipeTransferPlanNodeReq.fromTPipeTransferReq(req));
+              } finally {
+                PipeDataNodeReceiverMetrics.getInstance()
+                    .recordTransferSchemaPlanTimer(System.nanoTime() - 
startTime);
+              }
+            }
           case TRANSFER_SCHEMA_SNAPSHOT_PIECE:
-            resp =
-                handleTransferFilePiece(
+            {
+              try {
+                return handleTransferFilePiece(
                     
PipeTransferSchemaSnapshotPieceReq.fromTPipeTransferReq(req),
                     req instanceof AirGapPseudoTPipeTransferRequest,
                     false);
-            PipeDataNodeReceiverMetrics.getInstance()
-                .recordTransferSchemaSnapshotPieceTimer(System.nanoTime() - 
startTime);
-            return resp;
+
+              } finally {
+                PipeDataNodeReceiverMetrics.getInstance()
+                    .recordTransferSchemaSnapshotPieceTimer(System.nanoTime() 
- startTime);
+              }
+            }
           case TRANSFER_SCHEMA_SNAPSHOT_SEAL:
-            resp =
-                handleTransferFileSealV2(
+            {
+              try {
+                return handleTransferFileSealV2(
                     
PipeTransferSchemaSnapshotSealReq.fromTPipeTransferReq(req));
-            PipeDataNodeReceiverMetrics.getInstance()
-                .recordTransferSchemaSnapshotSealTimer(System.nanoTime() - 
startTime);
-            return resp;
+
+              } finally {
+                PipeDataNodeReceiverMetrics.getInstance()
+                    .recordTransferSchemaSnapshotSealTimer(System.nanoTime() - 
startTime);
+              }
+            }
           case HANDSHAKE_CONFIGNODE_V1:
           case HANDSHAKE_CONFIGNODE_V2:
           case TRANSFER_CONFIG_PLAN:
           case TRANSFER_CONFIG_SNAPSHOT_PIECE:
           case TRANSFER_CONFIG_SNAPSHOT_SEAL:
-            // Config requests will first be received by the DataNode receiver,
-            // then transferred to ConfigNode receiver to execute.
-            resp = handleTransferConfigPlan(req);
-            return resp;
+            {
+              try {
+                // Config requests will first be received by the DataNode 
receiver,
+                // then transferred to ConfigNode receiver to execute.
+                return handleTransferConfigPlan(req);
+              } finally {
+                PipeDataNodeReceiverMetrics.getInstance()
+                    .recordTransferConfigPlanTimer(System.nanoTime() - 
startTime);
+              }
+            }
+          case TRANSFER_SLICE:
+            {
+              try {
+                return 
handleTransferSlice(PipeTransferSliceReq.fromTPipeTransferReq(req));
+              } finally {
+                PipeDataNodeReceiverMetrics.getInstance()
+                    .recordTransferSliceTimer(System.nanoTime() - startTime);
+              }
+            }
           case TRANSFER_COMPRESSED:
-            resp = 
receive(PipeTransferCompressedReq.fromTPipeTransferReq(req));
-            return resp;
+            {
+              try {
+                return 
receive(PipeTransferCompressedReq.fromTPipeTransferReq(req));
+              } finally {
+                PipeDataNodeReceiverMetrics.getInstance()
+                    .recordTransferCompressedTimer(System.nanoTime() - 
startTime);
+              }
+            }
           default:
             break;
         }
@@ -442,6 +513,10 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
     return configReceiverId.get();
   }
 
+  private TPipeTransferResp handleTransferSlice(PipeTransferSliceReq 
pipeTransferSliceReq) {
+    return null;
+  }
+
   /**
    * For {@link InsertRowsStatement} and {@link InsertMultiTabletsStatement}, 
the returned {@link
    * TSStatus} will use sub-status to record the endpoint for redirection. 
Each sub-status records
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeRequestType.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeRequestType.java
index bccf6787f4a..0f9b6f4dafb 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeRequestType.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeRequestType.java
@@ -55,7 +55,7 @@ public enum PipeRequestType {
   TRANSFER_COMPRESSED((short) 300),
 
   // Fallback Handling
-  FALLBACK_SLICE((short) 400),
+  TRANSFER_SLICE((short) 400),
   ;
 
   private final short type;
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeTransferSliceReq.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeTransferSliceReq.java
index ee5662f6170..f26e2e7337c 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeTransferSliceReq.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeTransferSliceReq.java
@@ -87,7 +87,7 @@ public class PipeTransferSliceReq extends TPipeTransferReq {
     sliceReq.sliceCount = sliceCount;
 
     sliceReq.version = IoTDBConnectorRequestVersion.VERSION_1.getVersion();
-    sliceReq.type = PipeRequestType.FALLBACK_SLICE.getType();
+    sliceReq.type = PipeRequestType.TRANSFER_SLICE.getType();
     try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
         final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
       ReadWriteIOUtils.write(sliceReq.originBodySize, outputStream);

Reply via email to