This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch make-sure-no-frame-size-problem in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 757b2ccabf05ce59325fd7337bd9f68a6308cf2c Author: Steve Yurong Su <[email protected]> AuthorDate: Sat Aug 24 03:29:34 2024 +0800 2nd commit --- .../pipe/metric/PipeDataNodeReceiverMetrics.java | 63 ++++++ .../protocol/thrift/IoTDBDataNodeReceiver.java | 211 ++++++++++++++------- .../payload/thrift/request/PipeRequestType.java | 2 +- .../thrift/request/PipeTransferSliceReq.java | 2 +- 4 files changed, 208 insertions(+), 70 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeReceiverMetrics.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeReceiverMetrics.java index eddad988041..8405e48085a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeReceiverMetrics.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeReceiverMetrics.java @@ -45,6 +45,9 @@ public class PipeDataNodeReceiverMetrics implements IMetricSet { private Timer transferSchemaPlanTimer = DoNothingMetricManager.DO_NOTHING_TIMER; private Timer transferSchemaSnapshotPieceTimer = DoNothingMetricManager.DO_NOTHING_TIMER; private Timer transferSchemaSnapshotSealTimer = DoNothingMetricManager.DO_NOTHING_TIMER; + private Timer transferConfigPlanTimer = DoNothingMetricManager.DO_NOTHING_TIMER; + private Timer transferCompressedTimer = DoNothingMetricManager.DO_NOTHING_TIMER; + private Timer transferSliceTimer = DoNothingMetricManager.DO_NOTHING_TIMER; private static final String RECEIVER = "pipeDataNodeReceiver"; @@ -102,6 +105,18 @@ public class PipeDataNodeReceiverMetrics implements IMetricSet { transferSchemaSnapshotSealTimer.updateNanos(costTimeInNanos); } + public void recordTransferConfigPlanTimer(long costTimeInNanos) { + transferConfigPlanTimer.updateNanos(costTimeInNanos); + } + + public void recordTransferCompressedTimer(long costTimeInNanos) { + transferCompressedTimer.updateNanos(costTimeInNanos); + } + + public void recordTransferSliceTimer(long costTimeInNanos) { + transferSliceTimer.updateNanos(costTimeInNanos); + } + @Override public void bindTo(AbstractMetricService metricService) { bindToTimer(metricService); @@ -212,6 +227,30 @@ public class PipeDataNodeReceiverMetrics implements IMetricSet { RECEIVER, Tag.TYPE.toString(), "transferSchemaSnapshotSeal"); + transferConfigPlanTimer = + metricService.getOrCreateTimer( + Metric.PIPE_DATANODE_RECEIVER.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferConfigPlan"); + transferCompressedTimer = + metricService.getOrCreateTimer( + Metric.PIPE_DATANODE_RECEIVER.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferCompressed"); + transferSliceTimer = + metricService.getOrCreateTimer( + Metric.PIPE_DATANODE_RECEIVER.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferSlice"); } @Override @@ -233,6 +272,9 @@ public class PipeDataNodeReceiverMetrics implements IMetricSet { transferSchemaPlanTimer = DoNothingMetricManager.DO_NOTHING_TIMER; transferSchemaSnapshotPieceTimer = DoNothingMetricManager.DO_NOTHING_TIMER; transferSchemaSnapshotSealTimer = DoNothingMetricManager.DO_NOTHING_TIMER; + transferConfigPlanTimer = DoNothingMetricManager.DO_NOTHING_TIMER; + transferCompressedTimer = DoNothingMetricManager.DO_NOTHING_TIMER; + transferSliceTimer = DoNothingMetricManager.DO_NOTHING_TIMER; metricService.remove( MetricType.TIMER, @@ -325,6 +367,27 @@ public class PipeDataNodeReceiverMetrics implements IMetricSet { RECEIVER, Tag.TYPE.toString(), "transferSchemaSnapshotSeal"); + metricService.remove( + MetricType.TIMER, + Metric.PIPE_DATANODE_RECEIVER.toString(), + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferConfigPlan"); + metricService.remove( + MetricType.TIMER, + Metric.PIPE_DATANODE_RECEIVER.toString(), + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferCompressed"); + metricService.remove( + MetricType.TIMER, + Metric.PIPE_DATANODE_RECEIVER.toString(), + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferSlice"); } public static PipeDataNodeReceiverMetrics getInstance() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java index 40f5065caec..0978dfa17cc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java @@ -28,6 +28,7 @@ import org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeReques import org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferCompressedReq; import org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferFileSealReqV1; import org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferFileSealReqV2; +import org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferSliceReq; import org.apache.iotdb.commons.pipe.pattern.IoTDBPipePattern; import org.apache.iotdb.commons.pipe.receiver.IoTDBFileReceiver; import org.apache.iotdb.commons.utils.FileUtils; @@ -151,108 +152,178 @@ public class IoTDBDataNodeReceiver extends IoTDBFileReceiver { long startTime = System.nanoTime(); final short rawRequestType = req.getType(); if (PipeRequestType.isValidatedRequestType(rawRequestType)) { - TPipeTransferResp resp; switch (PipeRequestType.valueOf(rawRequestType)) { case HANDSHAKE_DATANODE_V1: - resp = - handleTransferHandshakeV1( + { + try { + return handleTransferHandshakeV1( PipeTransferDataNodeHandshakeV1Req.fromTPipeTransferReq(req)); - PipeDataNodeReceiverMetrics.getInstance() - .recordHandshakeDatanodeV1Timer(System.nanoTime() - startTime); - return resp; + } finally { + PipeDataNodeReceiverMetrics.getInstance() + .recordHandshakeDatanodeV1Timer(System.nanoTime() - startTime); + } + } case HANDSHAKE_DATANODE_V2: - resp = - handleTransferHandshakeV2( + { + try { + return handleTransferHandshakeV2( PipeTransferDataNodeHandshakeV2Req.fromTPipeTransferReq(req)); - PipeDataNodeReceiverMetrics.getInstance() - .recordHandshakeDatanodeV2Timer(System.nanoTime() - startTime); - return resp; + } finally { + PipeDataNodeReceiverMetrics.getInstance() + .recordHandshakeDatanodeV2Timer(System.nanoTime() - startTime); + } + } case TRANSFER_TABLET_INSERT_NODE: - resp = - handleTransferTabletInsertNode( + { + try { + return handleTransferTabletInsertNode( PipeTransferTabletInsertNodeReq.fromTPipeTransferReq(req)); - PipeDataNodeReceiverMetrics.getInstance() - .recordTransferTabletInsertNodeTimer(System.nanoTime() - startTime); - return resp; + + } finally { + PipeDataNodeReceiverMetrics.getInstance() + .recordTransferTabletInsertNodeTimer(System.nanoTime() - startTime); + } + } case TRANSFER_TABLET_RAW: - resp = handleTransferTabletRaw(PipeTransferTabletRawReq.fromTPipeTransferReq(req)); - PipeDataNodeReceiverMetrics.getInstance() - .recordTransferTabletRawTimer(System.nanoTime() - startTime); - return resp; + { + try { + return handleTransferTabletRaw(PipeTransferTabletRawReq.fromTPipeTransferReq(req)); + } finally { + PipeDataNodeReceiverMetrics.getInstance() + .recordTransferTabletRawTimer(System.nanoTime() - startTime); + } + } case TRANSFER_TABLET_BINARY: - resp = - handleTransferTabletBinary(PipeTransferTabletBinaryReq.fromTPipeTransferReq(req)); - PipeDataNodeReceiverMetrics.getInstance() - .recordTransferTabletBinaryTimer(System.nanoTime() - startTime); - return resp; + { + try { + return handleTransferTabletBinary( + PipeTransferTabletBinaryReq.fromTPipeTransferReq(req)); + } finally { + PipeDataNodeReceiverMetrics.getInstance() + .recordTransferTabletBinaryTimer(System.nanoTime() - startTime); + } + } case TRANSFER_TABLET_BATCH: - resp = handleTransferTabletBatch(PipeTransferTabletBatchReq.fromTPipeTransferReq(req)); - PipeDataNodeReceiverMetrics.getInstance() - .recordTransferTabletBatchTimer(System.nanoTime() - startTime); - return resp; + { + try { + return handleTransferTabletBatch( + PipeTransferTabletBatchReq.fromTPipeTransferReq(req)); + } finally { + PipeDataNodeReceiverMetrics.getInstance() + .recordTransferTabletBatchTimer(System.nanoTime() - startTime); + } + } case TRANSFER_TS_FILE_PIECE: - resp = - handleTransferFilePiece( + { + try { + return handleTransferFilePiece( PipeTransferTsFilePieceReq.fromTPipeTransferReq(req), req instanceof AirGapPseudoTPipeTransferRequest, true); - PipeDataNodeReceiverMetrics.getInstance() - .recordTransferTsFilePieceTimer(System.nanoTime() - startTime); - return resp; + } finally { + PipeDataNodeReceiverMetrics.getInstance() + .recordTransferTsFilePieceTimer(System.nanoTime() - startTime); + } + } case TRANSFER_TS_FILE_SEAL: - resp = handleTransferFileSealV1(PipeTransferTsFileSealReq.fromTPipeTransferReq(req)); - PipeDataNodeReceiverMetrics.getInstance() - .recordTransferTsFileSealTimer(System.nanoTime() - startTime); - return resp; + { + try { + return handleTransferFileSealV1( + PipeTransferTsFileSealReq.fromTPipeTransferReq(req)); + } finally { + PipeDataNodeReceiverMetrics.getInstance() + .recordTransferTsFileSealTimer(System.nanoTime() - startTime); + } + } case TRANSFER_TS_FILE_PIECE_WITH_MOD: - resp = - handleTransferFilePiece( + { + try { + return handleTransferFilePiece( PipeTransferTsFilePieceWithModReq.fromTPipeTransferReq(req), req instanceof AirGapPseudoTPipeTransferRequest, false); - PipeDataNodeReceiverMetrics.getInstance() - .recordTransferTsFilePieceWithModTimer(System.nanoTime() - startTime); - return resp; + + } finally { + PipeDataNodeReceiverMetrics.getInstance() + .recordTransferTsFilePieceWithModTimer(System.nanoTime() - startTime); + } + } case TRANSFER_TS_FILE_SEAL_WITH_MOD: - resp = - handleTransferFileSealV2( + { + try { + return handleTransferFileSealV2( PipeTransferTsFileSealWithModReq.fromTPipeTransferReq(req)); - PipeDataNodeReceiverMetrics.getInstance() - .recordTransferTsFileSealWithModTimer(System.nanoTime() - startTime); - return resp; + } finally { + PipeDataNodeReceiverMetrics.getInstance() + .recordTransferTsFileSealWithModTimer(System.nanoTime() - startTime); + } + } case TRANSFER_SCHEMA_PLAN: - resp = handleTransferSchemaPlan(PipeTransferPlanNodeReq.fromTPipeTransferReq(req)); - PipeDataNodeReceiverMetrics.getInstance() - .recordTransferSchemaPlanTimer(System.nanoTime() - startTime); - return resp; + { + try { + return handleTransferSchemaPlan(PipeTransferPlanNodeReq.fromTPipeTransferReq(req)); + } finally { + PipeDataNodeReceiverMetrics.getInstance() + .recordTransferSchemaPlanTimer(System.nanoTime() - startTime); + } + } case TRANSFER_SCHEMA_SNAPSHOT_PIECE: - resp = - handleTransferFilePiece( + { + try { + return handleTransferFilePiece( PipeTransferSchemaSnapshotPieceReq.fromTPipeTransferReq(req), req instanceof AirGapPseudoTPipeTransferRequest, false); - PipeDataNodeReceiverMetrics.getInstance() - .recordTransferSchemaSnapshotPieceTimer(System.nanoTime() - startTime); - return resp; + + } finally { + PipeDataNodeReceiverMetrics.getInstance() + .recordTransferSchemaSnapshotPieceTimer(System.nanoTime() - startTime); + } + } case TRANSFER_SCHEMA_SNAPSHOT_SEAL: - resp = - handleTransferFileSealV2( + { + try { + return handleTransferFileSealV2( PipeTransferSchemaSnapshotSealReq.fromTPipeTransferReq(req)); - PipeDataNodeReceiverMetrics.getInstance() - .recordTransferSchemaSnapshotSealTimer(System.nanoTime() - startTime); - return resp; + + } finally { + PipeDataNodeReceiverMetrics.getInstance() + .recordTransferSchemaSnapshotSealTimer(System.nanoTime() - startTime); + } + } case HANDSHAKE_CONFIGNODE_V1: case HANDSHAKE_CONFIGNODE_V2: case TRANSFER_CONFIG_PLAN: case TRANSFER_CONFIG_SNAPSHOT_PIECE: case TRANSFER_CONFIG_SNAPSHOT_SEAL: - // Config requests will first be received by the DataNode receiver, - // then transferred to ConfigNode receiver to execute. - resp = handleTransferConfigPlan(req); - return resp; + { + try { + // Config requests will first be received by the DataNode receiver, + // then transferred to ConfigNode receiver to execute. + return handleTransferConfigPlan(req); + } finally { + PipeDataNodeReceiverMetrics.getInstance() + .recordTransferConfigPlanTimer(System.nanoTime() - startTime); + } + } + case TRANSFER_SLICE: + { + try { + return handleTransferSlice(PipeTransferSliceReq.fromTPipeTransferReq(req)); + } finally { + PipeDataNodeReceiverMetrics.getInstance() + .recordTransferSliceTimer(System.nanoTime() - startTime); + } + } case TRANSFER_COMPRESSED: - resp = receive(PipeTransferCompressedReq.fromTPipeTransferReq(req)); - return resp; + { + try { + return receive(PipeTransferCompressedReq.fromTPipeTransferReq(req)); + } finally { + PipeDataNodeReceiverMetrics.getInstance() + .recordTransferCompressedTimer(System.nanoTime() - startTime); + } + } default: break; } @@ -442,6 +513,10 @@ public class IoTDBDataNodeReceiver extends IoTDBFileReceiver { return configReceiverId.get(); } + private TPipeTransferResp handleTransferSlice(PipeTransferSliceReq pipeTransferSliceReq) { + return null; + } + /** * For {@link InsertRowsStatement} and {@link InsertMultiTabletsStatement}, the returned {@link * TSStatus} will use sub-status to record the endpoint for redirection. Each sub-status records diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeRequestType.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeRequestType.java index bccf6787f4a..0f9b6f4dafb 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeRequestType.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeRequestType.java @@ -55,7 +55,7 @@ public enum PipeRequestType { TRANSFER_COMPRESSED((short) 300), // Fallback Handling - FALLBACK_SLICE((short) 400), + TRANSFER_SLICE((short) 400), ; private final short type; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeTransferSliceReq.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeTransferSliceReq.java index ee5662f6170..f26e2e7337c 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeTransferSliceReq.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeTransferSliceReq.java @@ -87,7 +87,7 @@ public class PipeTransferSliceReq extends TPipeTransferReq { sliceReq.sliceCount = sliceCount; sliceReq.version = IoTDBConnectorRequestVersion.VERSION_1.getVersion(); - sliceReq.type = PipeRequestType.FALLBACK_SLICE.getType(); + sliceReq.type = PipeRequestType.TRANSFER_SLICE.getType(); try (final PublicBAOS byteArrayOutputStream = new PublicBAOS(); final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) { ReadWriteIOUtils.write(sliceReq.originBodySize, outputStream);
