This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch ty/TableModelGrammar in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit fd1528baa1f16e130752fc5a6e3ee053c4cc0729 Author: YC27 <[email protected]> AuthorDate: Tue Jul 23 14:45:36 2024 +0800 Pipe: add ops/latency metrics for different types of pipe operations on receivers (DN / CN) (#12927) (cherry picked from commit 623fa5bac5f4be73da4b8fec56499e7beed6d72e) --- .../manager/pipe/metric/PipeConfigNodeMetrics.java | 2 + .../pipe/metric/PipeConfigNodeReceiverMetrics.java | 169 +++++++++++ .../receiver/protocol/IoTDBConfigNodeReceiver.java | 44 ++- .../iotdb/db/pipe/metric/PipeDataNodeMetrics.java | 2 + .../pipe/metric/PipeDataNodeReceiverMetrics.java | 333 +++++++++++++++++++++ .../protocol/thrift/IoTDBDataNodeReceiver.java | 112 +++++-- .../iotdb/commons/service/metric/enums/Metric.java | 2 + 7 files changed, 623 insertions(+), 41 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeMetrics.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeMetrics.java index b02679f147e..a7a41f4f359 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeMetrics.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeMetrics.java @@ -42,6 +42,7 @@ public class PipeConfigNodeMetrics implements IMetricSet { PipeConfigRegionConnectorMetrics.getInstance().bindTo(metricService); PipeConfigNodeRemainingTimeMetrics.getInstance().bindTo(metricService); PipeTemporaryMetaMetrics.getInstance().bindTo(metricService); + PipeConfigNodeReceiverMetrics.getInstance().bindTo(metricService); } @Override @@ -53,5 +54,6 @@ public class PipeConfigNodeMetrics implements IMetricSet { PipeConfigRegionConnectorMetrics.getInstance().unbindFrom(metricService); PipeConfigNodeRemainingTimeMetrics.getInstance().unbindFrom(metricService); PipeTemporaryMetaMetrics.getInstance().unbindFrom(metricService); + PipeConfigNodeReceiverMetrics.getInstance().unbindFrom(metricService); } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeReceiverMetrics.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeReceiverMetrics.java new file mode 100644 index 00000000000..e0a30f42c2e --- /dev/null +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeReceiverMetrics.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.manager.pipe.metric; + +import org.apache.iotdb.commons.service.metric.enums.Metric; +import org.apache.iotdb.commons.service.metric.enums.Tag; +import org.apache.iotdb.metrics.AbstractMetricService; +import org.apache.iotdb.metrics.impl.DoNothingMetricManager; +import org.apache.iotdb.metrics.metricsets.IMetricSet; +import org.apache.iotdb.metrics.type.Timer; +import org.apache.iotdb.metrics.utils.MetricLevel; +import org.apache.iotdb.metrics.utils.MetricType; + +public class PipeConfigNodeReceiverMetrics implements IMetricSet { + + private static final PipeConfigNodeReceiverMetrics INSTANCE = new PipeConfigNodeReceiverMetrics(); + + private Timer handshakeConfigNodeV1Timer = DoNothingMetricManager.DO_NOTHING_TIMER; + private Timer handshakeConfigNodeV2Timer = DoNothingMetricManager.DO_NOTHING_TIMER; + private Timer transferConfigPlanTimer = DoNothingMetricManager.DO_NOTHING_TIMER; + private Timer transferConfigSnapshotPieceTimer = DoNothingMetricManager.DO_NOTHING_TIMER; + private Timer transferConfigSnapshotSealTimer = DoNothingMetricManager.DO_NOTHING_TIMER; + + private static final String RECEIVER = "pipeConfigNodeReceiver"; + + private PipeConfigNodeReceiverMetrics() {} + + public void recordHandshakeConfigNodeV1Timer(long costTimeInNanos) { + handshakeConfigNodeV1Timer.updateNanos(costTimeInNanos); + } + + public void recordHandshakeConfigNodeV2Timer(long costTimeInNanos) { + handshakeConfigNodeV2Timer.updateNanos(costTimeInNanos); + } + + public void recordTransferConfigPlanTimer(long costTimeInNanos) { + transferConfigPlanTimer.updateNanos(costTimeInNanos); + } + + public void recordTransferConfigSnapshotPieceTimer(long costTimeInNanos) { + transferConfigSnapshotPieceTimer.updateNanos(costTimeInNanos); + } + + public void recordTransferConfigSnapshotSealTimer(long costTimeInNanos) { + transferConfigSnapshotSealTimer.updateNanos(costTimeInNanos); + } + + @Override + public void bindTo(AbstractMetricService metricService) { + bindToTimer(metricService); + } + + private void bindToTimer(AbstractMetricService metricService) { + handshakeConfigNodeV1Timer = + metricService.getOrCreateTimer( + Metric.PIPE_CONFIGNODE_RECEIVER.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "handshakeConfigNodeV1"); + + handshakeConfigNodeV2Timer = + metricService.getOrCreateTimer( + Metric.PIPE_CONFIGNODE_RECEIVER.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "handshakeConfigNodeV2"); + + transferConfigPlanTimer = + metricService.getOrCreateTimer( + Metric.PIPE_CONFIGNODE_RECEIVER.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferConfigPlan"); + + transferConfigSnapshotPieceTimer = + metricService.getOrCreateTimer( + Metric.PIPE_CONFIGNODE_RECEIVER.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferConfigSnapshotPiece"); + + transferConfigSnapshotSealTimer = + metricService.getOrCreateTimer( + Metric.PIPE_CONFIGNODE_RECEIVER.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferConfigSnapshotSeal"); + } + + @Override + public void unbindFrom(AbstractMetricService metricService) { + unbind(metricService); + } + + private void unbind(AbstractMetricService metricService) { + handshakeConfigNodeV1Timer = DoNothingMetricManager.DO_NOTHING_TIMER; + handshakeConfigNodeV2Timer = DoNothingMetricManager.DO_NOTHING_TIMER; + transferConfigPlanTimer = DoNothingMetricManager.DO_NOTHING_TIMER; + transferConfigSnapshotPieceTimer = DoNothingMetricManager.DO_NOTHING_TIMER; + transferConfigSnapshotSealTimer = DoNothingMetricManager.DO_NOTHING_TIMER; + + metricService.remove( + MetricType.TIMER, + Metric.PIPE_CONFIGNODE_RECEIVER.toString(), + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "handshakeConfigNodeV1"); + metricService.remove( + MetricType.TIMER, + Metric.PIPE_CONFIGNODE_RECEIVER.toString(), + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "handshakeConfigNodeV2"); + metricService.remove( + MetricType.TIMER, + Metric.PIPE_CONFIGNODE_RECEIVER.toString(), + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferConfigPlan"); + metricService.remove( + MetricType.TIMER, + Metric.PIPE_CONFIGNODE_RECEIVER.toString(), + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferConfigSnapshotPiece"); + metricService.remove( + MetricType.TIMER, + Metric.PIPE_CONFIGNODE_RECEIVER.toString(), + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferConfigSnapshotSeal"); + } + + public static PipeConfigNodeReceiverMetrics getInstance() { + return INSTANCE; + } +} diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java index 7815adde258..711d7bdf26c 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java @@ -53,6 +53,7 @@ import org.apache.iotdb.confignode.manager.pipe.connector.payload.PipeTransferCo import org.apache.iotdb.confignode.manager.pipe.connector.payload.PipeTransferConfigSnapshotSealReq; import org.apache.iotdb.confignode.manager.pipe.event.PipeConfigRegionSnapshotEvent; import org.apache.iotdb.confignode.manager.pipe.extractor.IoTDBConfigRegionExtractor; +import org.apache.iotdb.confignode.manager.pipe.metric.PipeConfigNodeReceiverMetrics; import org.apache.iotdb.confignode.manager.pipe.receiver.visitor.PipeConfigPhysicalPlanExceptionVisitor; import org.apache.iotdb.confignode.manager.pipe.receiver.visitor.PipeConfigPhysicalPlanTSStatusVisitor; import org.apache.iotdb.confignode.persistence.schema.CNPhysicalPlanGenerator; @@ -111,23 +112,44 @@ public class IoTDBConfigNodeReceiver extends IoTDBFileReceiver { .setMessage( "The receiver ConfigNode has set up a new receiver and the sender must re-send its handshake request.")); } + TPipeTransferResp resp; + long startTime = System.nanoTime(); switch (type) { case HANDSHAKE_CONFIGNODE_V1: - return handleTransferHandshakeV1( - PipeTransferConfigNodeHandshakeV1Req.fromTPipeTransferReq(req)); + resp = + handleTransferHandshakeV1( + PipeTransferConfigNodeHandshakeV1Req.fromTPipeTransferReq(req)); + PipeConfigNodeReceiverMetrics.getInstance() + .recordHandshakeConfigNodeV1Timer(System.nanoTime() - startTime); + return resp; case HANDSHAKE_CONFIGNODE_V2: - return handleTransferHandshakeV2( - PipeTransferConfigNodeHandshakeV2Req.fromTPipeTransferReq(req)); + resp = + handleTransferHandshakeV2( + PipeTransferConfigNodeHandshakeV2Req.fromTPipeTransferReq(req)); + PipeConfigNodeReceiverMetrics.getInstance() + .recordHandshakeConfigNodeV2Timer(System.nanoTime() - startTime); + return resp; case TRANSFER_CONFIG_PLAN: - return handleTransferConfigPlan(PipeTransferConfigPlanReq.fromTPipeTransferReq(req)); + resp = handleTransferConfigPlan(PipeTransferConfigPlanReq.fromTPipeTransferReq(req)); + PipeConfigNodeReceiverMetrics.getInstance() + .recordTransferConfigPlanTimer(System.nanoTime() - startTime); + return resp; case TRANSFER_CONFIG_SNAPSHOT_PIECE: - return handleTransferFilePiece( - PipeTransferConfigSnapshotPieceReq.fromTPipeTransferReq(req), - req instanceof AirGapPseudoTPipeTransferRequest, - false); + resp = + handleTransferFilePiece( + PipeTransferConfigSnapshotPieceReq.fromTPipeTransferReq(req), + req instanceof AirGapPseudoTPipeTransferRequest, + false); + PipeConfigNodeReceiverMetrics.getInstance() + .recordTransferConfigSnapshotPieceTimer(System.nanoTime() - startTime); + return resp; case TRANSFER_CONFIG_SNAPSHOT_SEAL: - return handleTransferFileSealV2( - PipeTransferConfigSnapshotSealReq.fromTPipeTransferReq(req)); + resp = + handleTransferFileSealV2( + PipeTransferConfigSnapshotSealReq.fromTPipeTransferReq(req)); + PipeConfigNodeReceiverMetrics.getInstance() + .recordTransferConfigSnapshotSealTimer(System.nanoTime() - startTime); + return resp; case TRANSFER_COMPRESSED: return receive(PipeTransferCompressedReq.fromTPipeTransferReq(req)); default: diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeMetrics.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeMetrics.java index 2f39cc75c25..172653f3b01 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeMetrics.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeMetrics.java @@ -41,6 +41,7 @@ public class PipeDataNodeMetrics implements IMetricSet { PipeSchemaRegionExtractorMetrics.getInstance().bindTo(metricService); PipeSchemaRegionConnectorMetrics.getInstance().bindTo(metricService); PipeDataNodeRemainingEventAndTimeMetrics.getInstance().bindTo(metricService); + PipeDataNodeReceiverMetrics.getInstance().bindTo(metricService); } @Override @@ -57,6 +58,7 @@ public class PipeDataNodeMetrics implements IMetricSet { PipeSchemaRegionExtractorMetrics.getInstance().unbindFrom(metricService); PipeSchemaRegionConnectorMetrics.getInstance().unbindFrom(metricService); PipeDataNodeRemainingEventAndTimeMetrics.getInstance().unbindFrom(metricService); + PipeDataNodeReceiverMetrics.getInstance().unbindFrom(metricService); } //////////////////////////// singleton //////////////////////////// diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeReceiverMetrics.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeReceiverMetrics.java new file mode 100644 index 00000000000..eddad988041 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeReceiverMetrics.java @@ -0,0 +1,333 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.metric; + +import org.apache.iotdb.commons.service.metric.enums.Metric; +import org.apache.iotdb.commons.service.metric.enums.Tag; +import org.apache.iotdb.metrics.AbstractMetricService; +import org.apache.iotdb.metrics.impl.DoNothingMetricManager; +import org.apache.iotdb.metrics.metricsets.IMetricSet; +import org.apache.iotdb.metrics.type.Timer; +import org.apache.iotdb.metrics.utils.MetricLevel; +import org.apache.iotdb.metrics.utils.MetricType; + +public class PipeDataNodeReceiverMetrics implements IMetricSet { + + private static final PipeDataNodeReceiverMetrics INSTANCE = new PipeDataNodeReceiverMetrics(); + + private Timer handshakeDatanodeV1Timer = DoNothingMetricManager.DO_NOTHING_TIMER; + private Timer handshakeDatanodeV2Timer = DoNothingMetricManager.DO_NOTHING_TIMER; + private Timer transferTabletInsertNodeTimer = DoNothingMetricManager.DO_NOTHING_TIMER; + private Timer transferTabletRawTimer = DoNothingMetricManager.DO_NOTHING_TIMER; + private Timer transferTabletBinaryTimer = DoNothingMetricManager.DO_NOTHING_TIMER; + private Timer transferTabletBatchTimer = DoNothingMetricManager.DO_NOTHING_TIMER; + private Timer transferTsFilePieceTimer = DoNothingMetricManager.DO_NOTHING_TIMER; + private Timer transferTsFileSealTimer = DoNothingMetricManager.DO_NOTHING_TIMER; + private Timer transferTsFilePieceWithModTimer = DoNothingMetricManager.DO_NOTHING_TIMER; + private Timer transferTsFileSealWithModTimer = DoNothingMetricManager.DO_NOTHING_TIMER; + private Timer transferSchemaPlanTimer = DoNothingMetricManager.DO_NOTHING_TIMER; + private Timer transferSchemaSnapshotPieceTimer = DoNothingMetricManager.DO_NOTHING_TIMER; + private Timer transferSchemaSnapshotSealTimer = DoNothingMetricManager.DO_NOTHING_TIMER; + + private static final String RECEIVER = "pipeDataNodeReceiver"; + + private PipeDataNodeReceiverMetrics() {} + + public void recordHandshakeDatanodeV1Timer(long costTimeInNanos) { + handshakeDatanodeV1Timer.updateNanos(costTimeInNanos); + } + + public void recordHandshakeDatanodeV2Timer(long costTimeInNanos) { + handshakeDatanodeV2Timer.updateNanos(costTimeInNanos); + } + + public void recordTransferTabletInsertNodeTimer(long costTimeInNanos) { + transferTabletInsertNodeTimer.updateNanos(costTimeInNanos); + } + + public void recordTransferTabletRawTimer(long costTimeInNanos) { + transferTabletRawTimer.updateNanos(costTimeInNanos); + } + + public void recordTransferTabletBinaryTimer(long costTimeInNanos) { + transferTabletBinaryTimer.updateNanos(costTimeInNanos); + } + + public void recordTransferTabletBatchTimer(long costTimeInNanos) { + transferTabletBatchTimer.updateNanos(costTimeInNanos); + } + + public void recordTransferTsFilePieceTimer(long costTimeInNanos) { + transferTsFilePieceTimer.updateNanos(costTimeInNanos); + } + + public void recordTransferTsFileSealTimer(long costTimeInNanos) { + transferTsFileSealTimer.updateNanos(costTimeInNanos); + } + + public void recordTransferTsFilePieceWithModTimer(long costTimeInNanos) { + transferTsFilePieceWithModTimer.updateNanos(costTimeInNanos); + } + + public void recordTransferTsFileSealWithModTimer(long costTimeInNanos) { + transferTsFileSealWithModTimer.updateNanos(costTimeInNanos); + } + + public void recordTransferSchemaPlanTimer(long costTimeInNanos) { + transferSchemaPlanTimer.updateNanos(costTimeInNanos); + } + + public void recordTransferSchemaSnapshotPieceTimer(long costTimeInNanos) { + transferSchemaSnapshotPieceTimer.updateNanos(costTimeInNanos); + } + + public void recordTransferSchemaSnapshotSealTimer(long costTimeInNanos) { + transferSchemaSnapshotSealTimer.updateNanos(costTimeInNanos); + } + + @Override + public void bindTo(AbstractMetricService metricService) { + bindToTimer(metricService); + } + + private void bindToTimer(AbstractMetricService metricService) { + handshakeDatanodeV1Timer = + metricService.getOrCreateTimer( + Metric.PIPE_DATANODE_RECEIVER.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "handshakeDataNodeV1"); + handshakeDatanodeV2Timer = + metricService.getOrCreateTimer( + Metric.PIPE_DATANODE_RECEIVER.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "handshakeDataNodeV2"); + transferTabletInsertNodeTimer = + metricService.getOrCreateTimer( + Metric.PIPE_DATANODE_RECEIVER.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferTabletInsertNode"); + transferTabletRawTimer = + metricService.getOrCreateTimer( + Metric.PIPE_DATANODE_RECEIVER.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferTabletRaw"); + transferTabletBinaryTimer = + metricService.getOrCreateTimer( + Metric.PIPE_DATANODE_RECEIVER.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferTabletBinary"); + transferTabletBatchTimer = + metricService.getOrCreateTimer( + Metric.PIPE_DATANODE_RECEIVER.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferTabletBatch"); + transferTsFilePieceTimer = + metricService.getOrCreateTimer( + Metric.PIPE_DATANODE_RECEIVER.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferTsFilePiece"); + transferTsFileSealTimer = + metricService.getOrCreateTimer( + Metric.PIPE_DATANODE_RECEIVER.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferTsFileSeal"); + transferTsFilePieceWithModTimer = + metricService.getOrCreateTimer( + Metric.PIPE_DATANODE_RECEIVER.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferTsFilePieceWithMod"); + transferTsFileSealWithModTimer = + metricService.getOrCreateTimer( + Metric.PIPE_DATANODE_RECEIVER.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferTsFileSealWithMod"); + transferSchemaPlanTimer = + metricService.getOrCreateTimer( + Metric.PIPE_DATANODE_RECEIVER.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferSchemaPlan"); + transferSchemaSnapshotPieceTimer = + metricService.getOrCreateTimer( + Metric.PIPE_DATANODE_RECEIVER.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferSchemaSnapshotPiece"); + transferSchemaSnapshotSealTimer = + metricService.getOrCreateTimer( + Metric.PIPE_DATANODE_RECEIVER.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferSchemaSnapshotSeal"); + } + + @Override + public void unbindFrom(AbstractMetricService metricService) { + unbind(metricService); + } + + private void unbind(AbstractMetricService metricService) { + handshakeDatanodeV1Timer = DoNothingMetricManager.DO_NOTHING_TIMER; + handshakeDatanodeV2Timer = DoNothingMetricManager.DO_NOTHING_TIMER; + transferTabletInsertNodeTimer = DoNothingMetricManager.DO_NOTHING_TIMER; + transferTabletRawTimer = DoNothingMetricManager.DO_NOTHING_TIMER; + transferTabletBinaryTimer = DoNothingMetricManager.DO_NOTHING_TIMER; + transferTabletBatchTimer = DoNothingMetricManager.DO_NOTHING_TIMER; + transferTsFilePieceTimer = DoNothingMetricManager.DO_NOTHING_TIMER; + transferTsFileSealTimer = DoNothingMetricManager.DO_NOTHING_TIMER; + transferTsFilePieceWithModTimer = DoNothingMetricManager.DO_NOTHING_TIMER; + transferTsFileSealWithModTimer = DoNothingMetricManager.DO_NOTHING_TIMER; + transferSchemaPlanTimer = DoNothingMetricManager.DO_NOTHING_TIMER; + transferSchemaSnapshotPieceTimer = DoNothingMetricManager.DO_NOTHING_TIMER; + transferSchemaSnapshotSealTimer = DoNothingMetricManager.DO_NOTHING_TIMER; + + metricService.remove( + MetricType.TIMER, + Metric.PIPE_DATANODE_RECEIVER.toString(), + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "handshakeDatanodeV1"); + metricService.remove( + MetricType.TIMER, + Metric.PIPE_DATANODE_RECEIVER.toString(), + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "handshakeDatanodeV2"); + metricService.remove( + MetricType.TIMER, + Metric.PIPE_DATANODE_RECEIVER.toString(), + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferTabletInsertNode"); + metricService.remove( + MetricType.TIMER, + Metric.PIPE_DATANODE_RECEIVER.toString(), + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferTabletRaw"); + metricService.remove( + MetricType.TIMER, + Metric.PIPE_DATANODE_RECEIVER.toString(), + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferTabletBinary"); + metricService.remove( + MetricType.TIMER, + Metric.PIPE_DATANODE_RECEIVER.toString(), + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferTabletBatch"); + metricService.remove( + MetricType.TIMER, + Metric.PIPE_DATANODE_RECEIVER.toString(), + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferTsFilePiece"); + metricService.remove( + MetricType.TIMER, + Metric.PIPE_DATANODE_RECEIVER.toString(), + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferTsFileSeal"); + metricService.remove( + MetricType.TIMER, + Metric.PIPE_DATANODE_RECEIVER.toString(), + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferTsFilePieceWithMod"); + metricService.remove( + MetricType.TIMER, + Metric.PIPE_DATANODE_RECEIVER.toString(), + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferTsFileSealWithMod"); + metricService.remove( + MetricType.TIMER, + Metric.PIPE_DATANODE_RECEIVER.toString(), + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferSchemaPlan"); + metricService.remove( + MetricType.TIMER, + Metric.PIPE_DATANODE_RECEIVER.toString(), + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferSchemaSnapshotPiece"); + metricService.remove( + MetricType.TIMER, + Metric.PIPE_DATANODE_RECEIVER.toString(), + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferSchemaSnapshotSeal"); + } + + public static PipeDataNodeReceiverMetrics getInstance() { + return INSTANCE; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java index da706539278..12916ba4506 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java @@ -49,6 +49,7 @@ import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransfer import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFileSealReq; import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFileSealWithModReq; import org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionSnapshotEvent; +import org.apache.iotdb.db.pipe.metric.PipeDataNodeReceiverMetrics; import org.apache.iotdb.db.pipe.receiver.visitor.PipePlanToStatementVisitor; import org.apache.iotdb.db.pipe.receiver.visitor.PipeStatementExceptionVisitor; import org.apache.iotdb.db.pipe.receiver.visitor.PipeStatementPatternParseVisitor; @@ -141,50 +142,99 @@ public class IoTDBDataNodeReceiver extends IoTDBFileReceiver { @Override public synchronized TPipeTransferResp receive(final TPipeTransferReq req) { try { + long startTime = System.nanoTime(); final short rawRequestType = req.getType(); if (PipeRequestType.isValidatedRequestType(rawRequestType)) { + TPipeTransferResp resp; switch (PipeRequestType.valueOf(rawRequestType)) { case HANDSHAKE_DATANODE_V1: - return handleTransferHandshakeV1( - PipeTransferDataNodeHandshakeV1Req.fromTPipeTransferReq(req)); + resp = + handleTransferHandshakeV1( + PipeTransferDataNodeHandshakeV1Req.fromTPipeTransferReq(req)); + PipeDataNodeReceiverMetrics.getInstance() + .recordHandshakeDatanodeV1Timer(System.nanoTime() - startTime); + return resp; case HANDSHAKE_DATANODE_V2: - return handleTransferHandshakeV2( - PipeTransferDataNodeHandshakeV2Req.fromTPipeTransferReq(req)); + resp = + handleTransferHandshakeV2( + PipeTransferDataNodeHandshakeV2Req.fromTPipeTransferReq(req)); + PipeDataNodeReceiverMetrics.getInstance() + .recordHandshakeDatanodeV2Timer(System.nanoTime() - startTime); + return resp; case TRANSFER_TABLET_INSERT_NODE: - return handleTransferTabletInsertNode( - PipeTransferTabletInsertNodeReq.fromTPipeTransferReq(req)); + resp = + handleTransferTabletInsertNode( + PipeTransferTabletInsertNodeReq.fromTPipeTransferReq(req)); + PipeDataNodeReceiverMetrics.getInstance() + .recordTransferTabletInsertNodeTimer(System.nanoTime() - startTime); + return resp; case TRANSFER_TABLET_RAW: - return handleTransferTabletRaw(PipeTransferTabletRawReq.fromTPipeTransferReq(req)); + resp = handleTransferTabletRaw(PipeTransferTabletRawReq.fromTPipeTransferReq(req)); + PipeDataNodeReceiverMetrics.getInstance() + .recordTransferTabletRawTimer(System.nanoTime() - startTime); + return resp; case TRANSFER_TABLET_BINARY: - return handleTransferTabletBinary( - PipeTransferTabletBinaryReq.fromTPipeTransferReq(req)); + resp = + handleTransferTabletBinary(PipeTransferTabletBinaryReq.fromTPipeTransferReq(req)); + PipeDataNodeReceiverMetrics.getInstance() + .recordTransferTabletBinaryTimer(System.nanoTime() - startTime); + return resp; case TRANSFER_TABLET_BATCH: - return handleTransferTabletBatch(PipeTransferTabletBatchReq.fromTPipeTransferReq(req)); + resp = handleTransferTabletBatch(PipeTransferTabletBatchReq.fromTPipeTransferReq(req)); + PipeDataNodeReceiverMetrics.getInstance() + .recordTransferTabletBatchTimer(System.nanoTime() - startTime); + return resp; case TRANSFER_TS_FILE_PIECE: - return handleTransferFilePiece( - PipeTransferTsFilePieceReq.fromTPipeTransferReq(req), - req instanceof AirGapPseudoTPipeTransferRequest, - true); + resp = + handleTransferFilePiece( + PipeTransferTsFilePieceReq.fromTPipeTransferReq(req), + req instanceof AirGapPseudoTPipeTransferRequest, + true); + PipeDataNodeReceiverMetrics.getInstance() + .recordTransferTsFilePieceTimer(System.nanoTime() - startTime); + return resp; case TRANSFER_TS_FILE_SEAL: - return handleTransferFileSealV1(PipeTransferTsFileSealReq.fromTPipeTransferReq(req)); + resp = handleTransferFileSealV1(PipeTransferTsFileSealReq.fromTPipeTransferReq(req)); + PipeDataNodeReceiverMetrics.getInstance() + .recordTransferTsFileSealTimer(System.nanoTime() - startTime); + return resp; case TRANSFER_TS_FILE_PIECE_WITH_MOD: - return handleTransferFilePiece( - PipeTransferTsFilePieceWithModReq.fromTPipeTransferReq(req), - req instanceof AirGapPseudoTPipeTransferRequest, - false); + resp = + handleTransferFilePiece( + PipeTransferTsFilePieceWithModReq.fromTPipeTransferReq(req), + req instanceof AirGapPseudoTPipeTransferRequest, + false); + PipeDataNodeReceiverMetrics.getInstance() + .recordTransferTsFilePieceWithModTimer(System.nanoTime() - startTime); + return resp; case TRANSFER_TS_FILE_SEAL_WITH_MOD: - return handleTransferFileSealV2( - PipeTransferTsFileSealWithModReq.fromTPipeTransferReq(req)); + resp = + handleTransferFileSealV2( + PipeTransferTsFileSealWithModReq.fromTPipeTransferReq(req)); + PipeDataNodeReceiverMetrics.getInstance() + .recordTransferTsFileSealWithModTimer(System.nanoTime() - startTime); + return resp; case TRANSFER_SCHEMA_PLAN: - return handleTransferSchemaPlan(PipeTransferPlanNodeReq.fromTPipeTransferReq(req)); + resp = handleTransferSchemaPlan(PipeTransferPlanNodeReq.fromTPipeTransferReq(req)); + PipeDataNodeReceiverMetrics.getInstance() + .recordTransferSchemaPlanTimer(System.nanoTime() - startTime); + return resp; case TRANSFER_SCHEMA_SNAPSHOT_PIECE: - return handleTransferFilePiece( - PipeTransferSchemaSnapshotPieceReq.fromTPipeTransferReq(req), - req instanceof AirGapPseudoTPipeTransferRequest, - false); + resp = + handleTransferFilePiece( + PipeTransferSchemaSnapshotPieceReq.fromTPipeTransferReq(req), + req instanceof AirGapPseudoTPipeTransferRequest, + false); + PipeDataNodeReceiverMetrics.getInstance() + .recordTransferSchemaSnapshotPieceTimer(System.nanoTime() - startTime); + return resp; case TRANSFER_SCHEMA_SNAPSHOT_SEAL: - return handleTransferFileSealV2( - PipeTransferSchemaSnapshotSealReq.fromTPipeTransferReq(req)); + resp = + handleTransferFileSealV2( + PipeTransferSchemaSnapshotSealReq.fromTPipeTransferReq(req)); + PipeDataNodeReceiverMetrics.getInstance() + .recordTransferSchemaSnapshotSealTimer(System.nanoTime() - startTime); + return resp; case HANDSHAKE_CONFIGNODE_V1: case HANDSHAKE_CONFIGNODE_V2: case TRANSFER_CONFIG_PLAN: @@ -192,9 +242,11 @@ public class IoTDBDataNodeReceiver extends IoTDBFileReceiver { case TRANSFER_CONFIG_SNAPSHOT_SEAL: // Config requests will first be received by the DataNode receiver, // then transferred to ConfigNode receiver to execute. - return handleTransferConfigPlan(req); + resp = handleTransferConfigPlan(req); + return resp; case TRANSFER_COMPRESSED: - return receive(PipeTransferCompressedReq.fromTPipeTransferReq(req)); + resp = receive(PipeTransferCompressedReq.fromTPipeTransferReq(req)); + return resp; default: break; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java index fe7033c88c1..cc096016f4b 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java @@ -133,6 +133,8 @@ public enum Metric { UNTRANSFERRED_TABLET_COUNT("untransferred_tablet_count"), UNTRANSFERRED_TSFILE_COUNT("untransferred_tsfile_count"), UNTRANSFERRED_HEARTBEAT_COUNT("untransferred_heartbeat_count"), + PIPE_DATANODE_RECEIVER("pipe_datanode_receiver"), + PIPE_CONFIGNODE_RECEIVER("pipe_confignode_receiver"), PIPE_EXTRACTOR_TABLET_SUPPLY("pipe_extractor_tablet_supply"), PIPE_EXTRACTOR_TSFILE_SUPPLY("pipe_extractor_tsfile_supply"), PIPE_EXTRACTOR_HEARTBEAT_SUPPLY("pipe_extractor_heartbeat_supply"),
