This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 3168bf68870 PipeConsensus: add metrics and fix some bugs for
pipeConsensus (#12723)
3168bf68870 is described below
commit 3168bf688701f8aa753032b551749f8da17a6088
Author: Peng Junzhi <[email protected]>
AuthorDate: Fri Jun 14 00:43:16 2024 -0500
PipeConsensus: add metrics and fix some bugs for pipeConsensus (#12723)
---
.../apache/iotdb/consensus/ConsensusFactory.java | 3 +
.../consensus/pipe/PipeConsensusServerImpl.java | 67 ++-
.../pipe/consensuspipe/ConsensusPipeConnector.java | 25 ++
.../pipe/consensuspipe/ConsensusPipeManager.java | 2 +
.../pipe/metric/PipeConsensusServerMetrics.java | 190 +++++++++
.../pipe/metric/PipeConsensusSyncLagManager.java | 131 ++++++
.../db/pipe/agent/task/PipeDataNodeTaskAgent.java | 5 +
.../pipeconsensus/PipeConsensusAsyncConnector.java | 151 +++++--
.../pipeconsensus/PipeConsensusSyncConnector.java | 25 +-
.../PipeConsensusTabletBatchEventHandler.java | 7 +-
.../PipeConsensusTabletInsertNodeEventHandler.java | 6 +-
.../PipeConsensusTabletInsertionEventHandler.java | 14 +-
.../PipeConsensusTabletRawEventHandler.java | 45 --
.../PipeConsensusTsFileInsertionEventHandler.java | 22 +-
.../consensus/PipeConsensusConnectorMetrics.java | 290 +++++++++++++
.../consensus/PipeConsensusReceiverMetrics.java | 384 +++++++++++++++++
.../pipeconsensus/PipeConsensusReceiver.java | 455 ++++++++++++++-------
.../subtask/processor/PipeProcessorSubtask.java | 7 +-
.../container/PipeConsensusClientMgrContainer.java | 2 +-
.../config/constant/PipeConnectorConstant.java | 1 +
.../iotdb/commons/pipe/event/EnrichedEvent.java | 4 +-
.../pipe/progress/PipeEventCommitManager.java | 8 +
.../commons/pipe/progress/PipeEventCommitter.java | 4 +
.../iotdb/commons/service/metric/enums/Metric.java | 5 +
24 files changed, 1582 insertions(+), 271 deletions(-)
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java
index ef9032516d7..8cab79e315e 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.consensus;
import
org.apache.iotdb.commons.client.container.PipeConsensusClientMgrContainer;
import org.apache.iotdb.consensus.config.ConsensusConfig;
import org.apache.iotdb.consensus.config.PipeConsensusConfig.ReplicateMode;
+import org.apache.iotdb.consensus.pipe.metric.PipeConsensusSyncLagManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -66,6 +67,8 @@ public class ConsensusFactory {
className = REAL_PIPE_CONSENSUS;
// initialize pipeConsensus' thrift component
PipeConsensusClientMgrContainer.build();
+ // initialize pipeConsensus's metric component
+ PipeConsensusSyncLagManager.build();
}
Class<?> executor = Class.forName(className);
Constructor<?> executorConstructor =
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java
index f4a64fc691b..95d5a23cdc2 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java
@@ -28,15 +28,19 @@ import
org.apache.iotdb.commons.consensus.index.ComparableConsensusRequest;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
+import org.apache.iotdb.commons.service.metric.MetricService;
+import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
import org.apache.iotdb.consensus.IStateMachine;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.config.PipeConsensusConfig;
+import org.apache.iotdb.consensus.config.PipeConsensusConfig.ReplicateMode;
import org.apache.iotdb.consensus.exception.ConsensusGroupModifyPeerException;
import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeManager;
import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeName;
import org.apache.iotdb.consensus.pipe.consensuspipe.ProgressIndexManager;
+import org.apache.iotdb.consensus.pipe.metric.PipeConsensusServerMetrics;
import org.apache.iotdb.consensus.pipe.thrift.TCheckConsensusPipeCompletedReq;
import org.apache.iotdb.consensus.pipe.thrift.TCheckConsensusPipeCompletedResp;
import
org.apache.iotdb.consensus.pipe.thrift.TNotifyPeerToCreateConsensusPipeReq;
@@ -66,7 +70,8 @@ import java.util.stream.Collectors;
public class PipeConsensusServerImpl {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeConsensusServerImpl.class);
private static final long
CHECK_TRANSMISSION_COMPLETION_INTERVAL_IN_MILLISECONDS = 2_000L;
-
+ private static final PerformanceOverviewMetrics PERFORMANCE_OVERVIEW_METRICS
=
+ PerformanceOverviewMetrics.getInstance();
private final Peer thisNode;
private final IStateMachine stateMachine;
private final Lock stateMachineLock = new ReentrantLock();
@@ -77,6 +82,8 @@ public class PipeConsensusServerImpl {
private final ConsensusPipeManager consensusPipeManager;
private final ProgressIndexManager progressIndexManager;
private final IClientManager<TEndPoint, SyncPipeConsensusServiceClient>
syncClientManager;
+ private final PipeConsensusServerMetrics pipeConsensusServerMetrics;
+ private final ReplicateMode replicateMode;
private ProgressIndex cachedProgressIndex = MinimumProgressIndex.INSTANCE;
@@ -98,6 +105,8 @@ public class PipeConsensusServerImpl {
this.consensusPipeManager = consensusPipeManager;
this.progressIndexManager = config.getPipe().getProgressIndexManager();
this.syncClientManager = syncClientManager;
+ this.pipeConsensusServerMetrics = new PipeConsensusServerMetrics(this);
+ this.replicateMode = config.getReplicateMode();
if (configuration.isEmpty()) {
peerManager.recover();
@@ -129,6 +138,7 @@ public class PipeConsensusServerImpl {
public synchronized void start(boolean startConsensusPipes) throws
IOException {
stateMachine.start();
+ MetricService.getInstance().addMetricSet(this.pipeConsensusServerMetrics);
if (startConsensusPipes) {
// start all consensus pipes
@@ -153,7 +163,7 @@ public class PipeConsensusServerImpl {
// do not roll back, because it will stop anyway
LOGGER.warn("{} cannot stop all consensus pipes", thisNode);
}
-
+
MetricService.getInstance().removeMetricSet(this.pipeConsensusServerMetrics);
stateMachine.stop();
isStarted.set(false);
}
@@ -167,6 +177,7 @@ public class PipeConsensusServerImpl {
LOGGER.warn("{} cannot drop all consensus pipes", thisNode);
}
+
MetricService.getInstance().removeMetricSet(this.pipeConsensusServerMetrics);
peerManager.clear();
stateMachine.stop();
isStarted.set(false);
@@ -183,7 +194,11 @@ public class PipeConsensusServerImpl {
}
return true;
} catch (Exception e) {
- LOGGER.warn("{} cannot create consensus pipe between {} and
{}", thisNode, peer, e);
+ LOGGER.warn(
+ "{}: cannot create consensus pipe between {} and {}",
+ e.getMessage(),
+ thisNode,
+ peer);
return false;
}
})
@@ -202,11 +217,11 @@ public class PipeConsensusServerImpl {
return true;
} catch (Exception e) {
LOGGER.warn(
- "{} cannot update consensus pipe between {} and {} to
status {}",
+ "{}: cannot update consensus pipe between {} and {} to
status {}",
+ e.getMessage(),
thisNode,
peer,
- status,
- e);
+ status);
return false;
}
})
@@ -274,12 +289,25 @@ public class PipeConsensusServerImpl {
public TSStatus write(IConsensusRequest request) {
try {
+ long consensusWriteStartTime = System.nanoTime();
stateMachineLock.lock();
+ long getStateMachineLockTime = System.nanoTime();
+ // statistic the time of acquiring stateMachine lock
+ pipeConsensusServerMetrics.recordGetStateMachineLockTime(
+ getStateMachineLockTime - consensusWriteStartTime);
+ long writeToStateMachineStartTime = System.nanoTime();
if (request instanceof ComparableConsensusRequest) {
((ComparableConsensusRequest) request)
.setProgressIndex(progressIndexManager.assignProgressIndex(thisNode.getGroupId()));
}
- return stateMachine.write(request);
+ TSStatus result = stateMachine.write(request);
+ long writeToStateMachineEndTime = System.nanoTime();
+ PERFORMANCE_OVERVIEW_METRICS.recordEngineCost(
+ writeToStateMachineEndTime - writeToStateMachineStartTime);
+ // statistic the time of writing request into stateMachine
+ pipeConsensusServerMetrics.recordUserWriteStateMachineTime(
+ writeToStateMachineEndTime - writeToStateMachineStartTime);
+ return result;
} finally {
stateMachineLock.unlock();
}
@@ -287,8 +315,23 @@ public class PipeConsensusServerImpl {
public TSStatus writeOnFollowerReplica(IConsensusRequest request) {
try {
+ long consensusWriteStartTime = System.nanoTime();
stateMachineLock.lock();
- return stateMachine.write(request);
+ long getStateMachineLockTime = System.nanoTime();
+ // statistic the time of acquiring stateMachine lock
+ pipeConsensusServerMetrics.recordGetStateMachineLockTime(
+ getStateMachineLockTime - consensusWriteStartTime);
+
+ long writeToStateMachineStartTime = System.nanoTime();
+ TSStatus result = stateMachine.write(request);
+ long writeToStateMachineEndTime = System.nanoTime();
+
+ PERFORMANCE_OVERVIEW_METRICS.recordEngineCost(
+ writeToStateMachineEndTime - writeToStateMachineStartTime);
+ // statistic the time of writing request into stateMachine
+ pipeConsensusServerMetrics.recordReplicaWriteStateMachineTime(
+ writeToStateMachineEndTime - writeToStateMachineStartTime);
+ return result;
} finally {
stateMachineLock.unlock();
}
@@ -540,4 +583,12 @@ public class PipeConsensusServerImpl {
public List<Peer> getPeers() {
return peerManager.getPeers();
}
+
+ public String getConsensusGroupId() {
+ return consensusGroupId;
+ }
+
+ public long getReplicateMode() {
+ return (replicateMode == ReplicateMode.BATCH) ? 2 : 1;
+ }
}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeConnector.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeConnector.java
new file mode 100644
index 00000000000..6f1396db972
--- /dev/null
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeConnector.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.consensus.pipe.consensuspipe;
+
+public interface ConsensusPipeConnector {
+ long getConsensusPipeCommitProgress();
+
+ long getConsensusPipeReplicateProgress();
+}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeManager.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeManager.java
index bb1bf2a1769..a6402a8610d 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeManager.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeManager.java
@@ -29,6 +29,7 @@ import com.google.common.collect.ImmutableMap;
import java.util.Map;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_CONSENSUS_GROUP_ID_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_CONSENSUS_PIPE_NAME;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_IP_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PORT_KEY;
@@ -78,6 +79,7 @@ public class ConsensusPipeManager {
.put(
CONNECTOR_CONSENSUS_GROUP_ID_KEY,
String.valueOf(consensusPipeName.getConsensusGroupId().getId()))
+ .put(CONNECTOR_CONSENSUS_PIPE_NAME, consensusPipeName.toString())
.put(CONNECTOR_IOTDB_IP_KEY, receiverPeer.getEndpoint().ip)
.put(CONNECTOR_IOTDB_PORT_KEY,
String.valueOf(receiverPeer.getEndpoint().port))
.put(CONNECTOR_IOTDB_PARALLEL_TASKS_KEY, String.valueOf(1))
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/metric/PipeConsensusServerMetrics.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/metric/PipeConsensusServerMetrics.java
new file mode 100644
index 00000000000..39bcb1e526f
--- /dev/null
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/metric/PipeConsensusServerMetrics.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.pipe.metric;
+
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import org.apache.iotdb.consensus.pipe.PipeConsensusServerImpl;
+import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.impl.DoNothingMetricManager;
+import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.type.Timer;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
+
+public class PipeConsensusServerMetrics implements IMetricSet {
+ private final PipeConsensusServerImpl impl;
+ private final PipeConsensusSyncLagManager syncLagManager;
+
+ private Timer getStateMachineLockTimer =
DoNothingMetricManager.DO_NOTHING_TIMER;
+ private Timer userWriteStateMachineTimer =
DoNothingMetricManager.DO_NOTHING_TIMER;
+ private Timer replicaWriteStateMachineTimer =
DoNothingMetricManager.DO_NOTHING_TIMER;
+
+ public PipeConsensusServerMetrics(PipeConsensusServerImpl impl) {
+ this.impl = impl;
+ this.syncLagManager =
PipeConsensusSyncLagManager.getInstance(impl.getConsensusGroupId());
+ }
+
+ private static final String IMPL = "PipeConsensusServerImpl";
+
+ public void recordGetStateMachineLockTime(long costTimeInNanos) {
+ getStateMachineLockTimer.updateNanos(costTimeInNanos);
+ }
+
+ public void recordUserWriteStateMachineTime(long costTimeInNanos) {
+ userWriteStateMachineTimer.updateNanos(costTimeInNanos);
+ }
+
+ public void recordReplicaWriteStateMachineTime(long costTimeInNanos) {
+ replicaWriteStateMachineTimer.updateNanos(costTimeInNanos);
+ }
+
+ @Override
+ public void bindTo(AbstractMetricService metricService) {
+ bindAutoGauge(metricService);
+ bindGauge(metricService);
+ bindStageTimer(metricService);
+ }
+
+ @Override
+ public void unbindFrom(AbstractMetricService metricService) {
+ unbindAutoGauge(metricService);
+ unbindGauge(metricService);
+ unbindStageTimer(metricService);
+
+ // release corresponding resource
+ PipeConsensusSyncLagManager.release(impl.getConsensusGroupId());
+ }
+
+ public void bindGauge(AbstractMetricService metricService) {
+ metricService
+ .getOrCreateGauge(
+ Metric.PIPE_CONSENSUS_MODE.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ IMPL,
+ Tag.TYPE.toString(),
+ "replicateMode")
+ .set(impl.getReplicateMode());
+ }
+
+ public void unbindGauge(AbstractMetricService metricService) {
+ metricService.remove(
+ MetricType.GAUGE,
+ Metric.PIPE_CONSENSUS_MODE.toString(),
+ Tag.NAME.toString(),
+ IMPL,
+ Tag.TYPE.toString(),
+ "replicateMode");
+ }
+
+ public void bindAutoGauge(AbstractMetricService metricService) {
+ metricService.createAutoGauge(
+ Metric.PIPE_CONSENSUS.toString(),
+ MetricLevel.IMPORTANT,
+ syncLagManager,
+ PipeConsensusSyncLagManager::calculateSyncLag,
+ Tag.NAME.toString(),
+ IMPL,
+ Tag.REGION.toString(),
+ impl.getConsensusGroupId(),
+ Tag.TYPE.toString(),
+ "syncLag");
+ }
+
+ public void unbindAutoGauge(AbstractMetricService metricService) {
+ metricService.remove(
+ MetricType.AUTO_GAUGE,
+ Metric.PIPE_CONSENSUS.toString(),
+ Tag.NAME.toString(),
+ IMPL,
+ Tag.REGION.toString(),
+ impl.getConsensusGroupId(),
+ Tag.TYPE.toString(),
+ "syncLag");
+ }
+
+ public void bindStageTimer(AbstractMetricService metricService) {
+ getStateMachineLockTimer =
+ metricService.getOrCreateTimer(
+ Metric.STAGE.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ Metric.PIPE_CONSENSUS.toString(),
+ Tag.TYPE.toString(),
+ "getStateMachineLock",
+ Tag.REGION.toString(),
+ impl.getConsensusGroupId());
+ userWriteStateMachineTimer =
+ metricService.getOrCreateTimer(
+ Metric.STAGE.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ Metric.PIPE_CONSENSUS.toString(),
+ Tag.TYPE.toString(),
+ "userWriteStateMachine",
+ Tag.REGION.toString(),
+ impl.getConsensusGroupId());
+ replicaWriteStateMachineTimer =
+ metricService.getOrCreateTimer(
+ Metric.PIPE_RECEIVE_EVENT.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ Metric.PIPE_CONSENSUS.toString(),
+ Tag.TYPE.toString(),
+ "replicaWriteStateMachine",
+ Tag.REGION.toString(),
+ impl.getConsensusGroupId());
+ }
+
+ public void unbindStageTimer(AbstractMetricService metricService) {
+ getStateMachineLockTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+ userWriteStateMachineTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+ replicaWriteStateMachineTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+
+ metricService.remove(
+ MetricType.TIMER,
+ Metric.STAGE.toString(),
+ Tag.NAME.toString(),
+ Metric.PIPE_CONSENSUS.toString(),
+ Tag.TYPE.toString(),
+ "getStateMachineLock",
+ Tag.REGION.toString(),
+ impl.getConsensusGroupId());
+ metricService.remove(
+ MetricType.TIMER,
+ Metric.STAGE.toString(),
+ Tag.NAME.toString(),
+ Metric.PIPE_CONSENSUS.toString(),
+ Tag.TYPE.toString(),
+ "writeStateMachine",
+ Tag.REGION.toString(),
+ impl.getConsensusGroupId());
+ metricService.remove(
+ MetricType.TIMER,
+ Metric.PIPE_RECEIVE_EVENT.toString(),
+ Tag.NAME.toString(),
+ Metric.PIPE_CONSENSUS.toString(),
+ Tag.TYPE.toString(),
+ "replicaWriteStateMachine",
+ Tag.REGION.toString(),
+ impl.getConsensusGroupId());
+ }
+}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/metric/PipeConsensusSyncLagManager.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/metric/PipeConsensusSyncLagManager.java
new file mode 100644
index 00000000000..6ff754e02ef
--- /dev/null
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/metric/PipeConsensusSyncLagManager.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.pipe.metric;
+
+import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeConnector;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+/**
+ * This class is used to aggregate the write progress of all Connectors to
calculate the minimum
+ * synchronization progress of all follower copies, thereby calculating
syncLag.
+ *
+ * <p>Note: every consensusGroup/dataRegion has and only has 1 instance of
this class.
+ */
+public class PipeConsensusSyncLagManager {
+ long userWriteProgress = 0;
+ long minReplicateProgress = Long.MAX_VALUE;
+ List<ConsensusPipeConnector> consensusPipeConnectorList = new
CopyOnWriteArrayList<>();
+
+ private void updateReplicateProgress() {
+ minReplicateProgress = Long.MAX_VALUE;
+ // if there isn't a consensus pipe task, replicate progress is
Long.MAX_VALUE.
+ if (consensusPipeConnectorList.isEmpty()) {
+ return;
+ }
+ // else we find the minimum progress in all consensus pipe task.
+ consensusPipeConnectorList.forEach(
+ consensusPipeConnector ->
+ minReplicateProgress =
+ Math.min(
+ minReplicateProgress,
+
consensusPipeConnector.getConsensusPipeReplicateProgress()));
+ }
+
+ private void updateUserWriteProgress() {
+ // if there isn't a consensus pipe task, user write progress is 0.
+ if (consensusPipeConnectorList.isEmpty()) {
+ userWriteProgress = 0;
+ return;
+ }
+ // since the user write progress of different consensus pipes on the same
DataRegion is the
+ // same, we only need to take out one Connector to calculate
+ try {
+ ConsensusPipeConnector connector = consensusPipeConnectorList.get(0);
+ userWriteProgress = connector.getConsensusPipeCommitProgress();
+ } catch (Exception e) {
+ // if removing the last connector happens after empty check, we may
encounter
+ // OutOfBoundsException, in this case, we set userWriteProgress to 0.
+ userWriteProgress = 0;
+ }
+ }
+
+ public void addConsensusPipeConnector(ConsensusPipeConnector
consensusPipeConnector) {
+ consensusPipeConnectorList.add(consensusPipeConnector);
+ }
+
+ public void removeConsensusPipeConnector(ConsensusPipeConnector connector) {
+ consensusPipeConnectorList.remove(connector);
+ }
+
+ /**
+ * SyncLag represents the difference between the current replica users'
write progress and the
+ * minimum synchronization progress of all other replicas. The semantics is
how much data the
+ * leader has left to synchronize.
+ */
+ public long calculateSyncLag() {
+ updateUserWriteProgress();
+ updateReplicateProgress();
+ // if there isn't a consensus pipe task, the syncLag is userWriteProgress
- 0
+ if (minReplicateProgress == Long.MAX_VALUE) {
+ return userWriteProgress;
+ } else {
+ // since we first update userWriteProgress then update
replicateProgress, there may have some
+ // cases that userWriteProgress is less than replicateProgress. In these
cases, we return 0.
+ return Math.max(userWriteProgress - minReplicateProgress, 0);
+ }
+ }
+
+ private PipeConsensusSyncLagManager() {
+ // do nothing
+ }
+
+ private static class PipeConsensusSyncLagManagerHolder {
+ private static Map<String, PipeConsensusSyncLagManager>
CONSENSU_GROUP_ID_2_INSTANCE_MAP;
+
+ private PipeConsensusSyncLagManagerHolder() {
+ // empty constructor
+ }
+
+ private static void build() {
+ if (CONSENSU_GROUP_ID_2_INSTANCE_MAP == null) {
+ CONSENSU_GROUP_ID_2_INSTANCE_MAP = new ConcurrentHashMap<>();
+ }
+ }
+ }
+
+ public static PipeConsensusSyncLagManager getInstance(String groupId) {
+ return
PipeConsensusSyncLagManagerHolder.CONSENSU_GROUP_ID_2_INSTANCE_MAP.computeIfAbsent(
+ groupId, key -> new PipeConsensusSyncLagManager());
+ }
+
+ public static void release(String groupId) {
+
PipeConsensusSyncLagManagerHolder.CONSENSU_GROUP_ID_2_INSTANCE_MAP.remove(groupId);
+ }
+
+ // Only when consensus protocol is PipeConsensus, this method will be called
once when construct
+ // consensus class.
+ public static void build() {
+ PipeConsensusSyncLagManagerHolder.build();
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index e50f1edcc69..a6abcbb9e9a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -620,6 +620,11 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
///////////////////////// Pipe Consensus /////////////////////////
+ public long getPipeCreationTime(final String pipeName) {
+ final PipeMeta pipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
+ return pipeMeta == null ? 0 : pipeMeta.getStaticMeta().getCreationTime();
+ }
+
public ProgressIndex getPipeTaskProgressIndex(final String pipeName, final
int consensusGroupId) {
if (!tryReadLockWithTimeOut(10)) {
throw new PipeException(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
index 50da4d56839..e3673c0330d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
@@ -25,26 +25,33 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.async.AsyncPipeConsensusServiceClient;
import
org.apache.iotdb.commons.client.container.PipeConsensusClientMgrContainer;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
-import org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant;
import org.apache.iotdb.commons.pipe.connector.protocol.IoTDBConnector;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.pipe.progress.PipeEventCommitManager;
+import org.apache.iotdb.commons.service.metric.MetricService;
+import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeConnector;
+import org.apache.iotdb.consensus.pipe.metric.PipeConsensusSyncLagManager;
import org.apache.iotdb.consensus.pipe.thrift.TCommitId;
import org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusTransferReq;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
import
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.handler.PipeConsensusTabletBatchEventHandler;
import
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.handler.PipeConsensusTabletInsertNodeEventHandler;
import
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.handler.PipeConsensusTsFileInsertionEventHandler;
import
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.builder.PipeConsensusAsyncBatchReqBuilder;
import
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTabletBinaryReq;
import
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTabletInsertNodeReq;
+import org.apache.iotdb.db.pipe.consensus.PipeConsensusConnectorMetrics;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
@@ -64,48 +71,51 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_CONSENSUS_GROUP_ID_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_CONSENSUS_PIPE_NAME;
+
// TODO: Optimize the network and disk io for TsFile onComplete
// TODO: support Tablet Batch
-public class PipeConsensusAsyncConnector extends IoTDBConnector {
-
+public class PipeConsensusAsyncConnector extends IoTDBConnector implements
ConsensusPipeConnector {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeConsensusAsyncConnector.class);
-
- private static final String CUSTOMIZE_EXCEPTION_MSG =
- "Failed to customize pipeConsensusAsyncConnector because there isn't
consensusGroupId passed in. Please check your construct parameters.";
-
private static final String ENQUEUE_EXCEPTION_MSG =
"Timeout: PipeConsensusConnector offers an event into transferBuffer
failed, because transferBuffer is full.";
-
private static final String THRIFT_ERROR_FORMATTER_WITHOUT_ENDPOINT =
"Failed to borrow client from client pool or exception occurred "
+ "when sending to receiver.";
-
private static final String THRIFT_ERROR_FORMATTER_WITH_ENDPOINT =
"Failed to borrow client from client pool or exception occurred "
+ "when sending to receiver %s:%s.";
-
private static final IoTDBConfig IOTDB_CONFIG =
IoTDBDescriptor.getInstance().getConfig();
-
private static final long PIPE_CONSENSUS_EVENT_ENQUEUE_TIMEOUT_IN_MS =
IOTDB_CONFIG.getConnectionTimeoutInMS() / 6;
-
private final BlockingQueue<Event> retryEventQueue = new
LinkedBlockingQueue<>();
-
// We use enrichedEvent here to make use of
EnrichedEvent.equalsInPipeConsensus
private final BlockingQueue<EnrichedEvent> transferBuffer =
new LinkedBlockingDeque<>(IOTDB_CONFIG.getPipeConsensusPipelineSize());
-
private final AtomicBoolean isClosed = new AtomicBoolean(false);
-
private final int thisDataNodeId =
IoTDBDescriptor.getInstance().getConfig().getDataNodeId();
-
+ private PipeConsensusConnectorMetrics pipeConsensusConnectorMetrics;
+ private String consensusPipeName;
private int consensusGroupId;
-
private PipeConsensusSyncConnector retryConnector;
-
private IClientManager<TEndPoint, AsyncPipeConsensusServiceClient>
asyncTransferClientManager;
-
private PipeConsensusAsyncBatchReqBuilder tabletBatchBuilder;
+ private volatile long currentReplicateProgress = 0;
+
+ @Override
+ public void validate(final PipeParameterValidator validator) throws
Exception {
+ super.validate(validator);
+ // validate consensus pipe's parameters
+ final PipeParameters parameters = validator.getParameters();
+ validator.validate(
+ args -> (boolean) args[0] || (boolean) args[1],
+ String.format(
+ "One of %s, %s must be specified in consensus pipe",
+ CONNECTOR_CONSENSUS_GROUP_ID_KEY, CONNECTOR_CONSENSUS_PIPE_NAME),
+ parameters.hasAttribute(CONNECTOR_CONSENSUS_GROUP_ID_KEY),
+ parameters.hasAttribute(CONNECTOR_CONSENSUS_PIPE_NAME));
+ }
@Override
public void customize(PipeParameters parameters,
PipeConnectorRuntimeConfiguration configuration)
@@ -113,15 +123,16 @@ public class PipeConsensusAsyncConnector extends
IoTDBConnector {
super.customize(parameters, configuration);
// Get consensusGroupId from parameters passed by PipeConsensusImpl
- if
(!parameters.hasAttribute(PipeConnectorConstant.CONNECTOR_CONSENSUS_GROUP_ID_KEY))
{
- throw new PipeException(CUSTOMIZE_EXCEPTION_MSG);
- }
- consensusGroupId =
parameters.getInt(PipeConnectorConstant.CONNECTOR_CONSENSUS_GROUP_ID_KEY);
+ consensusGroupId = parameters.getInt(CONNECTOR_CONSENSUS_GROUP_ID_KEY);
+ // Get consensusPipeName from parameters passed by PipeConsensusImpl
+ consensusPipeName = parameters.getString(CONNECTOR_CONSENSUS_PIPE_NAME);
// In PipeConsensus, one pipeConsensusTask corresponds to a
pipeConsensusConnector. Thus,
// `nodeUrls` here actually is a singletonList that contains one peer's
TEndPoint. But here we
// retain the implementation of list to cope with possible future expansion
- retryConnector = new PipeConsensusSyncConnector(nodeUrls,
consensusGroupId, thisDataNodeId);
+ retryConnector =
+ new PipeConsensusSyncConnector(
+ nodeUrls, consensusGroupId, thisDataNodeId,
pipeConsensusConnectorMetrics);
retryConnector.customize(parameters, configuration);
asyncTransferClientManager =
PipeConsensusClientMgrContainer.getInstance().getAsyncClientManager();
@@ -136,9 +147,17 @@ public class PipeConsensusAsyncConnector extends
IoTDBConnector {
// currently, tablet batch is false by default in PipeConsensus;
isTabletBatchModeEnabled = false;
+
+ // initialize metric components
+ pipeConsensusConnectorMetrics = new PipeConsensusConnectorMetrics(this);
+ PipeConsensusSyncLagManager.getInstance(getConsensusGroupIdStr())
+ .addConsensusPipeConnector(this);
+
MetricService.getInstance().addMetricSet(this.pipeConsensusConnectorMetrics);
}
- /** Add an event to transferBuffer, whose events will be asynchronizedly
transfer to receiver. */
+ /**
+ * Add an event to transferBuffer, whose events will be asynchronously
transferred to receiver.
+ */
private boolean addEvent2Buffer(EnrichedEvent event) {
try {
if (LOGGER.isDebugEnabled()) {
@@ -148,13 +167,21 @@ public class PipeConsensusAsyncConnector extends
IoTDBConnector {
event.getCommitId(),
event);
}
+ long currentTime = System.nanoTime();
boolean result =
transferBuffer.offer(
event, PIPE_CONSENSUS_EVENT_ENQUEUE_TIMEOUT_IN_MS,
TimeUnit.MILLISECONDS);
+ long duration = System.nanoTime() - currentTime;
+ pipeConsensusConnectorMetrics.recordConnectorEnqueueTimer(duration);
// add reference
if (result) {
event.increaseReferenceCount(PipeConsensusAsyncConnector.class.getName());
}
+ // if connector is closed when executing this method, need to clear this
event's reference
+ // count to avoid unnecessarily pinning some resource such as WAL.
+ if (isClosed.get()) {
+ event.clearReferenceCount(PipeConsensusAsyncConnector.class.getName());
+ }
return result;
} catch (InterruptedException e) {
LOGGER.info("PipeConsensusConnector transferBuffer queue offer is
interrupted.", e);
@@ -182,6 +209,8 @@ public class PipeConsensusAsyncConnector extends
IoTDBConnector {
current = iterator.next();
}
iterator.remove();
+ // update replicate progress
+ currentReplicateProgress = Math.max(currentReplicateProgress,
event.getCommitId());
// decrease reference count
event.decreaseReferenceCount(PipeConsensusAsyncConnector.class.getName(),
true);
}
@@ -200,18 +229,18 @@ public class PipeConsensusAsyncConnector extends
IoTDBConnector {
@Override
public void transfer(TabletInsertionEvent tabletInsertionEvent) throws
Exception {
+ syncTransferQueuedEventsIfNecessary();
+
boolean enqueueResult = addEvent2Buffer((EnrichedEvent)
tabletInsertionEvent);
if (!enqueueResult) {
throw new PipeException(ENQUEUE_EXCEPTION_MSG);
}
-
- syncTransferQueuedEventsIfNecessary();
-
// batch transfer tablets.
if (isTabletBatchModeEnabled) {
if (tabletBatchBuilder.onEvent(tabletInsertionEvent)) {
final PipeConsensusTabletBatchEventHandler
pipeConsensusTabletBatchEventHandler =
- new PipeConsensusTabletBatchEventHandler(tabletBatchBuilder, this);
+ new PipeConsensusTabletBatchEventHandler(
+ tabletBatchBuilder, this, pipeConsensusConnectorMetrics);
transfer(pipeConsensusTabletBatchEventHandler);
@@ -252,7 +281,10 @@ public class PipeConsensusAsyncConnector extends
IoTDBConnector {
insertNode, tCommitId, tConsensusGroupId, progressIndex,
thisDataNodeId);
final PipeConsensusTabletInsertNodeEventHandler
pipeConsensusInsertNodeReqHandler =
new PipeConsensusTabletInsertNodeEventHandler(
- pipeInsertNodeTabletInsertionEvent, pipeConsensusTransferReq,
this);
+ pipeInsertNodeTabletInsertionEvent,
+ pipeConsensusTransferReq,
+ this,
+ pipeConsensusConnectorMetrics);
transfer(pipeConsensusInsertNodeReqHandler);
}
@@ -284,11 +316,6 @@ public class PipeConsensusAsyncConnector extends
IoTDBConnector {
@Override
public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws
Exception {
- boolean enqueueResult = addEvent2Buffer((EnrichedEvent)
tsFileInsertionEvent);
- if (!enqueueResult) {
- throw new PipeException(ENQUEUE_EXCEPTION_MSG);
- }
-
syncTransferQueuedEventsIfNecessary();
transferBatchedEventsIfNecessary();
@@ -299,6 +326,10 @@ public class PipeConsensusAsyncConnector extends
IoTDBConnector {
return;
}
+ boolean enqueueResult = addEvent2Buffer((EnrichedEvent)
tsFileInsertionEvent);
+ if (!enqueueResult) {
+ throw new PipeException(ENQUEUE_EXCEPTION_MSG);
+ }
final PipeTsFileInsertionEvent pipeTsFileInsertionEvent =
(PipeTsFileInsertionEvent) tsFileInsertionEvent;
TCommitId tCommitId =
@@ -322,7 +353,12 @@ public class PipeConsensusAsyncConnector extends
IoTDBConnector {
final PipeConsensusTsFileInsertionEventHandler
pipeConsensusTsFileInsertionEventHandler =
new PipeConsensusTsFileInsertionEventHandler(
- pipeTsFileInsertionEvent, this, tCommitId, tConsensusGroupId,
thisDataNodeId);
+ pipeTsFileInsertionEvent,
+ this,
+ tCommitId,
+ tConsensusGroupId,
+ thisDataNodeId,
+ pipeConsensusConnectorMetrics);
transfer(pipeConsensusTsFileInsertionEventHandler);
} catch (Exception e) {
@@ -369,8 +405,9 @@ public class PipeConsensusAsyncConnector extends
IoTDBConnector {
return;
}
- transfer(new PipeConsensusTabletBatchEventHandler(tabletBatchBuilder,
this));
-
+ transfer(
+ new PipeConsensusTabletBatchEventHandler(
+ tabletBatchBuilder, this, pipeConsensusConnectorMetrics));
tabletBatchBuilder.onSuccess();
}
@@ -381,7 +418,7 @@ public class PipeConsensusAsyncConnector extends
IoTDBConnector {
* retry the {@link Event} and mark the {@link Event} as failure and
stop the pipe if the
* retry times exceeds the threshold.
*/
- private synchronized void syncTransferQueuedEventsIfNecessary() throws
Exception {
+ private void syncTransferQueuedEventsIfNecessary() throws Exception {
while (!retryEventQueue.isEmpty()) {
synchronized (this) {
if (isClosed.get() || retryEventQueue.isEmpty()) {
@@ -443,7 +480,7 @@ public class PipeConsensusAsyncConnector extends
IoTDBConnector {
return;
}
- retryEventQueue.offer(event);
+ boolean ignore = retryEventQueue.offer(event);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"PipeConsensus-ConsensusGroup-{}: Event {} transfer failed, will be
added to retry queue.",
@@ -478,6 +515,13 @@ public class PipeConsensusAsyncConnector extends
IoTDBConnector {
}
}
+ public synchronized void clearTransferBufferReferenceCount() {
+ while (!transferBuffer.isEmpty()) {
+ final EnrichedEvent event = transferBuffer.poll();
+ event.clearReferenceCount(PipeConsensusAsyncConnector.class.getName());
+ }
+ }
+
private void logOnClientException(
final AsyncPipeConsensusServiceClient client, final Exception e) {
if (client == null) {
@@ -506,13 +550,18 @@ public class PipeConsensusAsyncConnector extends
IoTDBConnector {
retryConnector.close();
clearRetryEventsReferenceCount();
+ clearTransferBufferReferenceCount();
if (tabletBatchBuilder != null) {
tabletBatchBuilder.close();
}
+
+ PipeConsensusSyncLagManager.getInstance(getConsensusGroupIdStr())
+ .removeConsensusPipeConnector(this);
+
MetricService.getInstance().removeMetricSet(this.pipeConsensusConnectorMetrics);
}
- //////////////////////////// TODO: APIs provided for metric framework
////////////////////////////
+ //////////////////////////// APIs provided for metric framework
////////////////////////////
public int getTransferBufferSize() {
return transferBuffer.size();
@@ -521,4 +570,24 @@ public class PipeConsensusAsyncConnector extends
IoTDBConnector {
public int getRetryBufferSize() {
return retryEventQueue.size();
}
+
+ @Override
+ public long getConsensusPipeCommitProgress() {
+ long creationTime =
PipeAgent.task().getPipeCreationTime(consensusPipeName);
+ String committerKey =
+ String.format("%s_%s_%s", consensusPipeName, consensusGroupId,
creationTime);
+ return
PipeEventCommitManager.getInstance().getGivenConsensusPipeCommitId(committerKey);
+ }
+
+ @Override
+ public long getConsensusPipeReplicateProgress() {
+ return currentReplicateProgress;
+ }
+
+ public String getConsensusGroupIdStr() {
+ ConsensusGroupId groupId =
+ ConsensusGroupId.Factory.create(
+ TConsensusGroupType.DataRegion.getValue(), consensusGroupId);
+ return groupId.toString();
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java
index e29d5f49398..c10c5bf93bf 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java
@@ -40,6 +40,7 @@ import
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request
import
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTsFilePieceWithModReq;
import
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTsFileSealReq;
import
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTsFileSealWithModReq;
+import org.apache.iotdb.db.pipe.consensus.PipeConsensusConnectorMetrics;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
@@ -65,30 +66,24 @@ import java.util.stream.Collectors;
/** This connector is used for PipeConsensus to transfer queued event. */
public class PipeConsensusSyncConnector extends IoTDBConnector {
-
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeConsensusSyncConnector.class);
-
private static final String PIPE_CONSENSUS_SYNC_CONNECTION_FAILED_FORMAT =
"PipeConsensus: syncClient connection to %s:%s failed when %s, because:
%s";
-
private static final String TABLET_INSERTION_NODE_SCENARIO = "transfer
insertionNode tablet";
-
private static final String TSFILE_SCENARIO = "transfer tsfile";
-
private static final String TABLET_BATCH_SCENARIO = "transfer tablet batch";
-
private final IClientManager<TEndPoint, SyncPipeConsensusServiceClient>
syncRetryClientManager;
-
private final List<TEndPoint> peers;
-
private final int thisDataNodeId;
-
private final int consensusGroupId;
-
+ private final PipeConsensusConnectorMetrics pipeConsensusConnectorMetrics;
private PipeConsensusSyncBatchReqBuilder tabletBatchBuilder;
public PipeConsensusSyncConnector(
- List<TEndPoint> peers, int consensusGroupId, int thisDataNodeId) {
+ List<TEndPoint> peers,
+ int consensusGroupId,
+ int thisDataNodeId,
+ PipeConsensusConnectorMetrics pipeConsensusConnectorMetrics) {
// In PipeConsensus, one pipeConsensusTask corresponds to a
pipeConsensusConnector. Thus,
// `peers` here actually is a singletonList that contains one peer's
TEndPoint. But here we
// retain the implementation of list to cope with possible future expansion
@@ -97,6 +92,7 @@ public class PipeConsensusSyncConnector extends
IoTDBConnector {
this.thisDataNodeId = thisDataNodeId;
this.syncRetryClientManager =
PipeConsensusClientMgrContainer.getInstance().getSyncClientManager();
+ this.pipeConsensusConnectorMetrics = pipeConsensusConnectorMetrics;
}
@Override
@@ -137,7 +133,10 @@ public class PipeConsensusSyncConnector extends
IoTDBConnector {
doTransfer();
}
} else {
+ long startTime = System.nanoTime();
doTransferWrapper((PipeInsertNodeTabletInsertionEvent)
tabletInsertionEvent);
+ long duration = System.nanoTime() - startTime;
+ pipeConsensusConnectorMetrics.recordRetryWALTransferTimer(duration);
}
} catch (Exception e) {
throw new PipeConnectionException(
@@ -154,12 +153,14 @@ public class PipeConsensusSyncConnector extends
IoTDBConnector {
// processor and will not change the event type like
//
org.apache.iotdb.db.pipe.connector.protocol.thrift.sync.IoTDBDataRegionSyncConnector
try {
+ long startTime = System.nanoTime();
// In order to commit in order
if (isTabletBatchModeEnabled && !tabletBatchBuilder.isEmpty()) {
doTransfer();
}
-
doTransfer((PipeTsFileInsertionEvent) tsFileInsertionEvent);
+ long duration = System.nanoTime() - startTime;
+ pipeConsensusConnectorMetrics.recordRetryTsFileTransferTimer(duration);
} catch (Exception e) {
throw new PipeConnectionException(
String.format(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusTabletBatchEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusTabletBatchEventHandler.java
index 25816e68fa6..de9c7bec37d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusTabletBatchEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusTabletBatchEventHandler.java
@@ -27,6 +27,7 @@ import
org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusBatchTransferResp;
import org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusTransferResp;
import
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.PipeConsensusAsyncConnector;
import
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.builder.PipeConsensusAsyncBatchReqBuilder;
+import org.apache.iotdb.db.pipe.consensus.PipeConsensusConnectorMetrics;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -48,16 +49,19 @@ public class PipeConsensusTabletBatchEventHandler
private final List<Event> events;
private final TPipeConsensusBatchTransferReq req;
private final PipeConsensusAsyncConnector connector;
+ private final PipeConsensusConnectorMetrics pipeConsensusConnectorMetrics;
public PipeConsensusTabletBatchEventHandler(
final PipeConsensusAsyncBatchReqBuilder batchBuilder,
- final PipeConsensusAsyncConnector connector)
+ final PipeConsensusAsyncConnector connector,
+ final PipeConsensusConnectorMetrics pipeConsensusConnectorMetrics)
throws IOException {
// Deep copy to keep Ids' and events' reference
requestCommitIds = batchBuilder.deepCopyRequestCommitIds();
events = batchBuilder.deepCopyEvents();
req = batchBuilder.toTPipeConsensusBatchTransferReq();
+ this.pipeConsensusConnectorMetrics = pipeConsensusConnectorMetrics;
this.connector = connector;
}
@@ -86,6 +90,7 @@ public class PipeConsensusTabletBatchEventHandler
.filter(tsStatus -> tsStatus.getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode())
.forEach(
tsStatus -> {
+ pipeConsensusConnectorMetrics.recordRetryCounter();
connector
.statusHandler()
.handle(tsStatus, tsStatus.getMessage(),
events.toString());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusTabletInsertNodeEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusTabletInsertNodeEventHandler.java
index 7b05fe297f2..cdd56d72cce 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusTabletInsertNodeEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusTabletInsertNodeEventHandler.java
@@ -23,6 +23,7 @@ import
org.apache.iotdb.commons.client.async.AsyncPipeConsensusServiceClient;
import org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusTransferReq;
import org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusTransferResp;
import
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.PipeConsensusAsyncConnector;
+import org.apache.iotdb.db.pipe.consensus.PipeConsensusConnectorMetrics;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import org.apache.thrift.TException;
@@ -33,8 +34,9 @@ public class PipeConsensusTabletInsertNodeEventHandler
public PipeConsensusTabletInsertNodeEventHandler(
PipeInsertNodeTabletInsertionEvent event,
TPipeConsensusTransferReq req,
- PipeConsensusAsyncConnector connector) {
- super(event, req, connector);
+ PipeConsensusAsyncConnector connector,
+ PipeConsensusConnectorMetrics metric) {
+ super(event, req, connector, metric);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusTabletInsertionEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusTabletInsertionEventHandler.java
index 3e12b23f463..ae6a1e5334d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusTabletInsertionEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusTabletInsertionEventHandler.java
@@ -26,6 +26,7 @@ import
org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusTransferReq;
import org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusTransferResp;
import
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.PipeConsensusAsyncConnector;
import
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler.PipeTransferTabletInsertionEventHandler;
+import org.apache.iotdb.db.pipe.consensus.PipeConsensusConnectorMetrics;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -46,13 +47,20 @@ public abstract class
PipeConsensusTabletInsertionEventHandler<E extends TPipeCo
protected final PipeConsensusAsyncConnector connector;
+ protected final PipeConsensusConnectorMetrics metric;
+
+ private final long createTime;
+
protected PipeConsensusTabletInsertionEventHandler(
TabletInsertionEvent event,
TPipeConsensusTransferReq req,
- PipeConsensusAsyncConnector connector) {
+ PipeConsensusAsyncConnector connector,
+ PipeConsensusConnectorMetrics metric) {
this.event = event;
this.req = req;
this.connector = connector;
+ this.metric = metric;
+ this.createTime = System.nanoTime();
}
public void transfer(AsyncPipeConsensusServiceClient client) throws
TException {
@@ -88,6 +96,9 @@ public abstract class
PipeConsensusTabletInsertionEventHandler<E extends TPipeCo
((EnrichedEvent) event).getCommitId());
connector.removeEventFromBuffer((EnrichedEvent) event);
}
+
+ long duration = System.nanoTime() - createTime;
+ metric.recordConnectorWalTransferTimer(duration);
} catch (Exception e) {
onError(e);
}
@@ -105,5 +116,6 @@ public abstract class
PipeConsensusTabletInsertionEventHandler<E extends TPipeCo
exception);
connector.addFailureEventToRetryQueue(event);
+ metric.recordRetryCounter();
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusTabletRawEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusTabletRawEventHandler.java
deleted file mode 100644
index 35dcae79efe..00000000000
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusTabletRawEventHandler.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.handler;
-
-import org.apache.iotdb.commons.client.async.AsyncPipeConsensusServiceClient;
-import org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusTransferReq;
-import org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusTransferResp;
-import
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.PipeConsensusAsyncConnector;
-import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
-
-import org.apache.thrift.TException;
-
-public class PipeConsensusTabletRawEventHandler
- extends
PipeConsensusTabletInsertionEventHandler<TPipeConsensusTransferResp> {
-
- public PipeConsensusTabletRawEventHandler(
- PipeRawTabletInsertionEvent event,
- TPipeConsensusTransferReq req,
- PipeConsensusAsyncConnector connector) {
- super(event, req, connector);
- }
-
- @Override
- protected void doTransfer(AsyncPipeConsensusServiceClient client,
TPipeConsensusTransferReq req)
- throws TException {
- client.pipeConsensusTransfer(req, this);
- }
-}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusTsFileInsertionEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusTsFileInsertionEventHandler.java
index 4ed42afc63c..bc8879946aa 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusTsFileInsertionEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusTsFileInsertionEventHandler.java
@@ -31,6 +31,7 @@ import
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request
import
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTsFilePieceWithModReq;
import
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTsFileSealReq;
import
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTsFileSealWithModReq;
+import org.apache.iotdb.db.pipe.consensus.PipeConsensusConnectorMetrics;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -74,12 +75,19 @@ public class PipeConsensusTsFileInsertionEventHandler
private AsyncPipeConsensusServiceClient client;
+ private final PipeConsensusConnectorMetrics metric;
+
+ private final long createTime;
+
+ private long startTransferPieceTime;
+
public PipeConsensusTsFileInsertionEventHandler(
final PipeTsFileInsertionEvent event,
final PipeConsensusAsyncConnector connector,
final TCommitId commitId,
final TConsensusGroupId consensusGroupId,
- final int thisDataNodeId)
+ final int thisDataNodeId,
+ final PipeConsensusConnectorMetrics metric)
throws FileNotFoundException {
this.event = event;
this.connector = connector;
@@ -102,15 +110,19 @@ public class PipeConsensusTsFileInsertionEventHandler
: new RandomAccessFile(tsFile, "r");
isSealSignalSent = new AtomicBoolean(false);
+
+ this.metric = metric;
+ this.createTime = System.nanoTime();
}
public void transfer(final AsyncPipeConsensusServiceClient client)
throws TException, IOException {
+ startTransferPieceTime = System.nanoTime();
+
this.client = client;
client.setShouldReturnSelf(false);
final int readLength = reader.read(readBuffer);
-
if (readLength == -1) {
if (currentFile == modFile) {
currentFile = tsFile;
@@ -217,6 +229,9 @@ public class PipeConsensusTsFileInsertionEventHandler
client.setShouldReturnSelf(true);
client.returnSelf();
}
+
+ long duration = System.nanoTime() - createTime;
+ metric.recordConnectorTsFileTransferTimer(duration);
}
return;
}
@@ -245,6 +260,8 @@ public class PipeConsensusTsFileInsertionEventHandler
.handle(status, response.getStatus().getMessage(),
tsFile.getName());
}
}
+ long duration = System.nanoTime() - startTransferPieceTime;
+ metric.recordConnectorTsFilePieceTransferTimer(duration);
transfer(client);
} catch (final Exception e) {
@@ -269,6 +286,7 @@ public class PipeConsensusTsFileInsertionEventHandler
LOGGER.warn("Failed to close file reader when failed to transfer file.",
e);
} finally {
connector.addFailureEventToRetryQueue(event);
+ metric.recordRetryCounter();
if (client != null) {
client.setShouldReturnSelf(true);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/PipeConsensusConnectorMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/PipeConsensusConnectorMetrics.java
new file mode 100644
index 00000000000..7950fd76744
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/PipeConsensusConnectorMetrics.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.consensus;
+
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.PipeConsensusAsyncConnector;
+import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.impl.DoNothingMetricManager;
+import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.type.Counter;
+import org.apache.iotdb.metrics.type.Timer;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
+
+public class PipeConsensusConnectorMetrics implements IMetricSet {
+ private final PipeConsensusAsyncConnector pipeConsensusAsyncConnector;
+
+ private Timer connectorEnqueueTimer =
DoNothingMetricManager.DO_NOTHING_TIMER;
+ private Timer connectorWALTransferTimer =
DoNothingMetricManager.DO_NOTHING_TIMER;
+ private Timer connectorTsFileTransferTimer =
DoNothingMetricManager.DO_NOTHING_TIMER;
+ private Timer connectorTsFilePieceTransferTimer =
DoNothingMetricManager.DO_NOTHING_TIMER;
+ private Timer retryWALTransferTimer =
DoNothingMetricManager.DO_NOTHING_TIMER;
+ private Timer retryTsFileTransferTimer =
DoNothingMetricManager.DO_NOTHING_TIMER;
+ private Counter retryCounter = DoNothingMetricManager.DO_NOTHING_COUNTER;
+
+ private static final String CONNECTOR = "pipeConsensusAsyncConnector";
+
+ public PipeConsensusConnectorMetrics(PipeConsensusAsyncConnector
pipeConsensusAsyncConnector) {
+ this.pipeConsensusAsyncConnector = pipeConsensusAsyncConnector;
+ }
+
+ public void recordConnectorEnqueueTimer(long costTimeInNanos) {
+ connectorEnqueueTimer.updateNanos(costTimeInNanos);
+ }
+
+ public void recordConnectorWalTransferTimer(long costTimeInNanos) {
+ connectorWALTransferTimer.updateNanos(costTimeInNanos);
+ }
+
+ public void recordConnectorTsFileTransferTimer(long costTimeInNanos) {
+ connectorTsFileTransferTimer.updateNanos(costTimeInNanos);
+ }
+
+ public void recordConnectorTsFilePieceTransferTimer(long costTimeInNanos) {
+ connectorTsFilePieceTransferTimer.updateNanos(costTimeInNanos);
+ }
+
+ public void recordRetryWALTransferTimer(long costTimeInNanos) {
+ retryWALTransferTimer.updateNanos(costTimeInNanos);
+ }
+
+ public void recordRetryTsFileTransferTimer(long costTimeInNanos) {
+ retryTsFileTransferTimer.updateNanos(costTimeInNanos);
+ }
+
+ public void recordRetryCounter() {
+ retryCounter.inc();
+ }
+
+ @Override
+ public void bindTo(AbstractMetricService metricService) {
+ bindCounter(metricService);
+ bindAutoGauge(metricService);
+ bindTimer(metricService);
+ }
+
+ @Override
+ public void unbindFrom(AbstractMetricService metricService) {
+ unbindCounter(metricService);
+ unbindAutoGauge(metricService);
+ unbindTimer(metricService);
+ }
+
+ private void bindCounter(AbstractMetricService metricService) {
+ metricService.getOrCreateCounter(
+ Metric.PIPE_RETRY_SEND_EVENT.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ CONNECTOR,
+ Tag.REGION.toString(),
+ pipeConsensusAsyncConnector.getConsensusGroupIdStr(),
+ Tag.TYPE.toString(),
+ "pipeConsensusRetryCount");
+ }
+
+ private void bindAutoGauge(AbstractMetricService metricService) {
+ metricService.createAutoGauge(
+ Metric.PIPE_SEND_EVENT.toString(),
+ MetricLevel.IMPORTANT,
+ pipeConsensusAsyncConnector,
+ PipeConsensusAsyncConnector::getTransferBufferSize,
+ Tag.NAME.toString(),
+ CONNECTOR,
+ Tag.REGION.toString(),
+ pipeConsensusAsyncConnector.getConsensusGroupIdStr(),
+ Tag.TYPE.toString(),
+ "transferBufferSize");
+ metricService.createAutoGauge(
+ Metric.PIPE_SEND_EVENT.toString(),
+ MetricLevel.IMPORTANT,
+ pipeConsensusAsyncConnector,
+ PipeConsensusAsyncConnector::getRetryBufferSize,
+ Tag.NAME.toString(),
+ CONNECTOR,
+ Tag.REGION.toString(),
+ pipeConsensusAsyncConnector.getConsensusGroupIdStr(),
+ Tag.TYPE.toString(),
+ "retryBufferSize");
+ }
+
+ private void bindTimer(AbstractMetricService metricService) {
+ connectorEnqueueTimer =
+ metricService.getOrCreateTimer(
+ Metric.PIPE_SEND_EVENT.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ CONNECTOR,
+ Tag.TYPE.toString(),
+ "connectorEnqueue",
+ Tag.REGION.toString(),
+ pipeConsensusAsyncConnector.getConsensusGroupIdStr());
+ connectorTsFilePieceTransferTimer =
+ metricService.getOrCreateTimer(
+ Metric.PIPE_SEND_EVENT.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ CONNECTOR,
+ Tag.TYPE.toString(),
+ "connectorTsFilePieceTransfer",
+ Tag.REGION.toString(),
+ pipeConsensusAsyncConnector.getConsensusGroupIdStr());
+ connectorTsFileTransferTimer =
+ metricService.getOrCreateTimer(
+ Metric.PIPE_SEND_EVENT.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ CONNECTOR,
+ Tag.TYPE.toString(),
+ "connectorTsFileTransfer",
+ Tag.REGION.toString(),
+ pipeConsensusAsyncConnector.getConsensusGroupIdStr());
+ connectorWALTransferTimer =
+ metricService.getOrCreateTimer(
+ Metric.PIPE_SEND_EVENT.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ CONNECTOR,
+ Tag.TYPE.toString(),
+ "connectorWALTransfer",
+ Tag.REGION.toString(),
+ pipeConsensusAsyncConnector.getConsensusGroupIdStr());
+ retryWALTransferTimer =
+ metricService.getOrCreateTimer(
+ Metric.PIPE_RETRY_SEND_EVENT.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ CONNECTOR,
+ Tag.TYPE.toString(),
+ "retryWALTransfer",
+ Tag.REGION.toString(),
+ pipeConsensusAsyncConnector.getConsensusGroupIdStr());
+ retryTsFileTransferTimer =
+ metricService.getOrCreateTimer(
+ Metric.PIPE_RETRY_SEND_EVENT.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ CONNECTOR,
+ Tag.TYPE.toString(),
+ "retryTsFileTransfer",
+ Tag.REGION.toString(),
+ pipeConsensusAsyncConnector.getConsensusGroupIdStr());
+ }
+
+ private void unbindCounter(AbstractMetricService metricService) {
+ retryCounter = DoNothingMetricManager.DO_NOTHING_COUNTER;
+
+ metricService.remove(
+ MetricType.COUNTER,
+ Metric.PIPE_RETRY_SEND_EVENT.toString(),
+ Tag.NAME.toString(),
+ CONNECTOR,
+ Tag.REGION.toString(),
+ pipeConsensusAsyncConnector.getConsensusGroupIdStr(),
+ Tag.TYPE.toString(),
+ "pipeConsensusRetryCount");
+ }
+
+ private void unbindAutoGauge(AbstractMetricService metricService) {
+ metricService.remove(
+ MetricType.AUTO_GAUGE,
+ Metric.PIPE_SEND_EVENT.toString(),
+ Tag.NAME.toString(),
+ CONNECTOR,
+ Tag.REGION.toString(),
+ pipeConsensusAsyncConnector.getConsensusGroupIdStr(),
+ Tag.TYPE.toString(),
+ "transferBufferSize");
+ metricService.remove(
+ MetricType.AUTO_GAUGE,
+ Metric.PIPE_SEND_EVENT.toString(),
+ Tag.NAME.toString(),
+ CONNECTOR,
+ Tag.REGION.toString(),
+ pipeConsensusAsyncConnector.getConsensusGroupIdStr(),
+ Tag.TYPE.toString(),
+ "retryBufferSize");
+ }
+
+ private void unbindTimer(AbstractMetricService metricService) {
+ connectorEnqueueTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+ connectorWALTransferTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+ connectorTsFileTransferTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+ connectorTsFilePieceTransferTimer =
DoNothingMetricManager.DO_NOTHING_TIMER;
+ retryWALTransferTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+ retryTsFileTransferTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+
+ metricService.remove(
+ MetricType.TIMER,
+ Metric.PIPE_SEND_EVENT.toString(),
+ Tag.NAME.toString(),
+ CONNECTOR,
+ Tag.TYPE.toString(),
+ "connectorTsFileTransfer",
+ Tag.REGION.toString(),
+ pipeConsensusAsyncConnector.getConsensusGroupIdStr());
+ metricService.remove(
+ MetricType.TIMER,
+ Metric.PIPE_SEND_EVENT.toString(),
+ Tag.NAME.toString(),
+ CONNECTOR,
+ Tag.TYPE.toString(),
+ "connectorTsFilePieceTransfer",
+ Tag.REGION.toString(),
+ pipeConsensusAsyncConnector.getConsensusGroupIdStr());
+ metricService.remove(
+ MetricType.TIMER,
+ Metric.PIPE_SEND_EVENT.toString(),
+ Tag.NAME.toString(),
+ CONNECTOR,
+ Tag.TYPE.toString(),
+ "connectorTsFileTransfer",
+ Tag.REGION.toString(),
+ pipeConsensusAsyncConnector.getConsensusGroupIdStr());
+ metricService.remove(
+ MetricType.TIMER,
+ Metric.PIPE_SEND_EVENT.toString(),
+ Tag.NAME.toString(),
+ CONNECTOR,
+ Tag.TYPE.toString(),
+ "connectorWALTransfer",
+ Tag.REGION.toString(),
+ pipeConsensusAsyncConnector.getConsensusGroupIdStr());
+ metricService.remove(
+ MetricType.TIMER,
+ Metric.PIPE_RETRY_SEND_EVENT.toString(),
+ Tag.NAME.toString(),
+ CONNECTOR,
+ Tag.TYPE.toString(),
+ "retryWALTransfer",
+ Tag.REGION.toString(),
+ pipeConsensusAsyncConnector.getConsensusGroupIdStr());
+ metricService.remove(
+ MetricType.TIMER,
+ Metric.PIPE_RETRY_SEND_EVENT.toString(),
+ Tag.NAME.toString(),
+ CONNECTOR,
+ Tag.TYPE.toString(),
+ "retryTsFileTransfer",
+ Tag.REGION.toString(),
+ pipeConsensusAsyncConnector.getConsensusGroupIdStr());
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/PipeConsensusReceiverMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/PipeConsensusReceiverMetrics.java
new file mode 100644
index 00000000000..58d587d8b8d
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/PipeConsensusReceiverMetrics.java
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.consensus;
+
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import
org.apache.iotdb.db.pipe.receiver.protocol.pipeconsensus.PipeConsensusReceiver;
+import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.impl.DoNothingMetricManager;
+import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.type.Timer;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
+
+public class PipeConsensusReceiverMetrics implements IMetricSet {
+ private final PipeConsensusReceiver pipeConsensusReceiver;
+
+ private Timer tsFilePieceWriteTimer =
DoNothingMetricManager.DO_NOTHING_TIMER;
+ private Timer tsFilePiecePreCheckTimer =
DoNothingMetricManager.DO_NOTHING_TIMER;
+ private Timer tsFileSealLoadTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+ private Timer tsFileSealPreCheckTimer =
DoNothingMetricManager.DO_NOTHING_TIMER;
+ private Timer borrowTsFileWriterTimer =
DoNothingMetricManager.DO_NOTHING_TIMER;
+ private Timer acquireExecutorLockTimer =
DoNothingMetricManager.DO_NOTHING_TIMER;
+ private Timer dispatchWaitingTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+ private Timer receiveWALTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+ private Timer receiveTsFileTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+ private Timer receiveEventTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+
+ private static final String RECEIVER = "pipeConsensusReceiver";
+
+ public PipeConsensusReceiverMetrics(PipeConsensusReceiver
pipeConsensusReceiver) {
+ this.pipeConsensusReceiver = pipeConsensusReceiver;
+ }
+
+ public void recordTsFilePieceWriteTime(long costTimeInNanos) {
+ tsFilePieceWriteTimer.updateNanos(costTimeInNanos);
+ }
+
+ public void recordTsFilePiecePreCheckTime(long costTimeInNanos) {
+ tsFilePiecePreCheckTimer.updateNanos(costTimeInNanos);
+ }
+
+ public void recordTsFileSealLoadTimer(long costTimeInNanos) {
+ tsFileSealLoadTimer.updateNanos(costTimeInNanos);
+ }
+
+ public void recordTsFileSealPreCheckTimer(long costTimeInNanos) {
+ tsFileSealPreCheckTimer.updateNanos(costTimeInNanos);
+ }
+
+ public void recordBorrowTsFileWriterTimer(long costTimeInNanos) {
+ borrowTsFileWriterTimer.updateNanos(costTimeInNanos);
+ }
+
+ public void recordAcquireExecutorLockTimer(long costTimeInNanos) {
+ acquireExecutorLockTimer.updateNanos(costTimeInNanos);
+ }
+
+ public void recordDispatchWaitingTimer(long costTimeInNanos) {
+ dispatchWaitingTimer.updateNanos(costTimeInNanos);
+ }
+
+ public void recordReceiveWALTimer(long costTimeInNanos) {
+ receiveWALTimer.updateNanos(costTimeInNanos);
+ }
+
+ public void recordReceiveTsFileTimer(long costTimeInNanos) {
+ receiveTsFileTimer.updateNanos(costTimeInNanos);
+ }
+
+ public void recordReceiveEventTimer(long costTimeInNanos) {
+ receiveEventTimer.updateNanos(costTimeInNanos);
+ }
+
+ @Override
+ public void bindTo(AbstractMetricService metricService) {
+ bindAutoGauge(metricService);
+ bindStageTimer(metricService);
+ bindReceiveTimer(metricService);
+ }
+
+ @Override
+ public void unbindFrom(AbstractMetricService metricService) {
+ unbindAutoGauge(metricService);
+ unbindStageTimer(metricService);
+ unbindReceiveTimer(metricService);
+ }
+
+ public void bindAutoGauge(AbstractMetricService metricService) {
+ metricService.createAutoGauge(
+ Metric.PIPE_RECEIVE_EVENT.toString(),
+ MetricLevel.IMPORTANT,
+ pipeConsensusReceiver,
+ PipeConsensusReceiver::getReceiveBufferSize,
+ Tag.NAME.toString(),
+ RECEIVER,
+ Tag.REGION.toString(),
+ pipeConsensusReceiver.getConsensusGroupIdStr(),
+ Tag.TYPE.toString(),
+ "receiveBufferSize");
+ metricService.createAutoGauge(
+ Metric.PIPE_RECEIVE_EVENT.toString(),
+ MetricLevel.IMPORTANT,
+ pipeConsensusReceiver,
+ PipeConsensusReceiver::getWALEventCount,
+ Tag.NAME.toString(),
+ RECEIVER,
+ Tag.REGION.toString(),
+ pipeConsensusReceiver.getConsensusGroupIdStr(),
+ Tag.TYPE.toString(),
+ "WALEventCount");
+ metricService.createAutoGauge(
+ Metric.PIPE_RECEIVE_EVENT.toString(),
+ MetricLevel.IMPORTANT,
+ pipeConsensusReceiver,
+ PipeConsensusReceiver::getTsFileEventCount,
+ Tag.NAME.toString(),
+ RECEIVER,
+ Tag.REGION.toString(),
+ pipeConsensusReceiver.getConsensusGroupIdStr(),
+ Tag.TYPE.toString(),
+ "tsFileEventCount");
+ }
+
+ public void bindStageTimer(AbstractMetricService metricService) {
+ tsFilePieceWriteTimer =
+ metricService.getOrCreateTimer(
+ Metric.STAGE.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ RECEIVER,
+ Tag.REGION.toString(),
+ pipeConsensusReceiver.getConsensusGroupIdStr(),
+ Tag.TYPE.toString(),
+ "tsFilePieceWrite");
+ tsFilePiecePreCheckTimer =
+ metricService.getOrCreateTimer(
+ Metric.STAGE.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ RECEIVER,
+ Tag.REGION.toString(),
+ pipeConsensusReceiver.getConsensusGroupIdStr(),
+ Tag.TYPE.toString(),
+ "tsFilePiecePreCheck");
+ tsFileSealLoadTimer =
+ metricService.getOrCreateTimer(
+ Metric.STAGE.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ RECEIVER,
+ Tag.REGION.toString(),
+ pipeConsensusReceiver.getConsensusGroupIdStr(),
+ Tag.TYPE.toString(),
+ "tsFileSealLoad");
+ tsFileSealPreCheckTimer =
+ metricService.getOrCreateTimer(
+ Metric.STAGE.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ RECEIVER,
+ Tag.REGION.toString(),
+ pipeConsensusReceiver.getConsensusGroupIdStr(),
+ Tag.TYPE.toString(),
+ "tsFileSealPreCheck");
+ borrowTsFileWriterTimer =
+ metricService.getOrCreateTimer(
+ Metric.STAGE.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ RECEIVER,
+ Tag.REGION.toString(),
+ pipeConsensusReceiver.getConsensusGroupIdStr(),
+ Tag.TYPE.toString(),
+ "borrowTsFileWriter");
+ acquireExecutorLockTimer =
+ metricService.getOrCreateTimer(
+ Metric.STAGE.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ RECEIVER,
+ Tag.REGION.toString(),
+ pipeConsensusReceiver.getConsensusGroupIdStr(),
+ Tag.TYPE.toString(),
+ "acquireExecutorLock");
+ dispatchWaitingTimer =
+ metricService.getOrCreateTimer(
+ Metric.STAGE.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ RECEIVER,
+ Tag.REGION.toString(),
+ pipeConsensusReceiver.getConsensusGroupIdStr(),
+ Tag.TYPE.toString(),
+ "dispatchWaiting");
+ }
+
+ public void bindReceiveTimer(AbstractMetricService metricService) {
+ receiveEventTimer =
+ metricService.getOrCreateTimer(
+ Metric.PIPE_RECEIVE_EVENT.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ RECEIVER,
+ Tag.REGION.toString(),
+ pipeConsensusReceiver.getConsensusGroupIdStr(),
+ Tag.TYPE.toString(),
+ "receiveEvent");
+ receiveWALTimer =
+ metricService.getOrCreateTimer(
+ Metric.PIPE_RECEIVE_EVENT.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ RECEIVER,
+ Tag.REGION.toString(),
+ pipeConsensusReceiver.getConsensusGroupIdStr(),
+ Tag.TYPE.toString(),
+ "receiveWALEvent");
+ receiveTsFileTimer =
+ metricService.getOrCreateTimer(
+ Metric.PIPE_RECEIVE_EVENT.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ RECEIVER,
+ Tag.REGION.toString(),
+ pipeConsensusReceiver.getConsensusGroupIdStr(),
+ Tag.TYPE.toString(),
+ "receiveTsFileEvent");
+ }
+
+ public void unbindAutoGauge(AbstractMetricService metricService) {
+ metricService.remove(
+ MetricType.AUTO_GAUGE,
+ Metric.PIPE_RECEIVE_EVENT.toString(),
+ Tag.NAME.toString(),
+ RECEIVER,
+ Tag.REGION.toString(),
+ pipeConsensusReceiver.getConsensusGroupIdStr(),
+ Tag.TYPE.toString(),
+ "receiveBufferSize");
+ metricService.remove(
+ MetricType.AUTO_GAUGE,
+ Metric.PIPE_RECEIVE_EVENT.toString(),
+ Tag.NAME.toString(),
+ RECEIVER,
+ Tag.REGION.toString(),
+ pipeConsensusReceiver.getConsensusGroupIdStr(),
+ Tag.TYPE.toString(),
+ "WALEventCount");
+ metricService.remove(
+ MetricType.AUTO_GAUGE,
+ Metric.PIPE_RECEIVE_EVENT.toString(),
+ Tag.NAME.toString(),
+ RECEIVER,
+ Tag.REGION.toString(),
+ pipeConsensusReceiver.getConsensusGroupIdStr(),
+ Tag.TYPE.toString(),
+ "tsFileEventCount");
+ }
+
+ public void unbindStageTimer(AbstractMetricService metricService) {
+ tsFilePieceWriteTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+ tsFilePiecePreCheckTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+ tsFileSealLoadTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+ tsFileSealPreCheckTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+ borrowTsFileWriterTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+ acquireExecutorLockTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+ dispatchWaitingTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+ metricService.remove(
+ MetricType.TIMER,
+ Metric.STAGE.toString(),
+ Tag.NAME.toString(),
+ RECEIVER,
+ Tag.REGION.toString(),
+ pipeConsensusReceiver.getConsensusGroupIdStr(),
+ Tag.TYPE.toString(),
+ "tsFilePieceWrite");
+ metricService.remove(
+ MetricType.TIMER,
+ Metric.STAGE.toString(),
+ Tag.NAME.toString(),
+ RECEIVER,
+ Tag.REGION.toString(),
+ pipeConsensusReceiver.getConsensusGroupIdStr(),
+ Tag.TYPE.toString(),
+ "tsFilePiecePreCheck");
+ metricService.remove(
+ MetricType.TIMER,
+ Metric.STAGE.toString(),
+ Tag.NAME.toString(),
+ RECEIVER,
+ Tag.REGION.toString(),
+ pipeConsensusReceiver.getConsensusGroupIdStr(),
+ Tag.TYPE.toString(),
+ "tsFileSealLoad");
+ metricService.remove(
+ MetricType.TIMER,
+ Metric.STAGE.toString(),
+ Tag.NAME.toString(),
+ RECEIVER,
+ Tag.REGION.toString(),
+ pipeConsensusReceiver.getConsensusGroupIdStr(),
+ Tag.TYPE.toString(),
+ "tsFileSealPreCheck");
+ metricService.remove(
+ MetricType.TIMER,
+ Metric.STAGE.toString(),
+ Tag.NAME.toString(),
+ RECEIVER,
+ Tag.REGION.toString(),
+ pipeConsensusReceiver.getConsensusGroupIdStr(),
+ Tag.TYPE.toString(),
+ "borrowTsFileWriter");
+ metricService.remove(
+ MetricType.TIMER,
+ Metric.STAGE.toString(),
+ Tag.NAME.toString(),
+ RECEIVER,
+ Tag.REGION.toString(),
+ pipeConsensusReceiver.getConsensusGroupIdStr(),
+ Tag.TYPE.toString(),
+ "acquireExecutorLock");
+ metricService.remove(
+ MetricType.TIMER,
+ Metric.STAGE.toString(),
+ Tag.NAME.toString(),
+ RECEIVER,
+ Tag.REGION.toString(),
+ pipeConsensusReceiver.getConsensusGroupIdStr(),
+ Tag.TYPE.toString(),
+ "dispatchWaiting");
+ }
+
+ public void unbindReceiveTimer(AbstractMetricService metricService) {
+ receiveWALTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+ receiveTsFileTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+ receiveEventTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+
+ metricService.remove(
+ MetricType.TIMER,
+ Metric.PIPE_RECEIVE_EVENT.toString(),
+ Tag.NAME.toString(),
+ RECEIVER,
+ Tag.REGION.toString(),
+ pipeConsensusReceiver.getConsensusGroupIdStr(),
+ Tag.TYPE.toString(),
+ "receiveWALEvent");
+ metricService.remove(
+ MetricType.TIMER,
+ Metric.PIPE_RECEIVE_EVENT.toString(),
+ Tag.NAME.toString(),
+ RECEIVER,
+ Tag.REGION.toString(),
+ pipeConsensusReceiver.getConsensusGroupIdStr(),
+ Tag.TYPE.toString(),
+ "receiveTsFileEvent");
+ metricService.remove(
+ MetricType.TIMER,
+ Metric.PIPE_RECEIVE_EVENT.toString(),
+ Tag.NAME.toString(),
+ RECEIVER,
+ Tag.REGION.toString(),
+ pipeConsensusReceiver.getConsensusGroupIdStr(),
+ Tag.TYPE.toString(),
+ "receiveEvent");
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
index 5d9eb0947c1..d6b106705a1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
@@ -30,6 +30,7 @@ import
org.apache.iotdb.commons.pipe.connector.payload.pipeconsensus.request.Pip
import
org.apache.iotdb.commons.pipe.connector.payload.pipeconsensus.response.PipeConsensusTransferFilePieceResp;
import
org.apache.iotdb.commons.pipe.connector.payload.thrift.response.PipeTransferFilePieceResp;
import org.apache.iotdb.commons.pipe.receiver.IoTDBReceiverAgent;
+import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
import org.apache.iotdb.consensus.pipe.PipeConsensus;
import org.apache.iotdb.consensus.pipe.PipeConsensusServerImpl;
@@ -46,6 +47,7 @@ import
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request
import
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTsFilePieceReq;
import
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTsFileSealReq;
import
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTsFileSealWithModReq;
+import org.apache.iotdb.db.pipe.consensus.PipeConsensusReceiverMetrics;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
@@ -89,16 +91,17 @@ public class PipeConsensusReceiver {
* IOTDB_CONFIG.getPipeConsensusPipelineSize();
private static final long CLOSE_TSFILE_WRITER_MAX_WAIT_TIME_IN_MS = 5000;
private static final long RETRY_WAIT_TIME = 500;
- private final RequestExecutor requestExecutor = new RequestExecutor();
+ private final RequestExecutor requestExecutor;
private final PipeConsensus pipeConsensus;
private final ConsensusGroupId consensusGroupId;
- // Used to buffer TsFile when transfer TsFile asynchronously.
private final ConsensusPipeName consensusPipeName;
private final List<String> receiverBaseDirsName;
+ // Used to buffer TsFile when transfer TsFile asynchronously.
private final PipeConsensusTsFileWriterPool pipeConsensusTsFileWriterPool =
new PipeConsensusTsFileWriterPool();
private final AtomicReference<File> receiverFileDirWithIdSuffix = new
AtomicReference<>();
- private FolderManager folderManager;
+ private final PipeConsensusReceiverMetrics pipeConsensusReceiverMetrics;
+ private final FolderManager folderManager;
public PipeConsensusReceiver(
PipeConsensus pipeConsensus,
@@ -106,7 +109,10 @@ public class PipeConsensusReceiver {
ConsensusPipeName consensusPipeName) {
this.pipeConsensus = pipeConsensus;
this.consensusGroupId = consensusGroupId;
+ this.pipeConsensusReceiverMetrics = new PipeConsensusReceiverMetrics(this);
+ this.requestExecutor = new RequestExecutor(pipeConsensusReceiverMetrics);
this.consensusPipeName = consensusPipeName;
+ MetricService.getInstance().addMetricSet(pipeConsensusReceiverMetrics);
// Each pipeConsensusReceiver has its own base directories. for example, a
default dir path is
//
data/datanode/system/pipe/consensus/receiver/__consensus.{consensusGroupId}_{leaderDataNodeId}_{followerDataNodeId}
@@ -116,13 +122,19 @@ public class PipeConsensusReceiver {
try {
this.folderManager =
new FolderManager(receiverBaseDirsName,
DirectoryStrategyType.SEQUENCE_STRATEGY);
- initiateTsFileBufferFolder();
} catch (Exception e) {
LOGGER.error(
"Fail to create pipeConsensus receiver file folders allocation
strategy because all disks of folders are full.",
e);
throw new RuntimeException(e);
}
+
+ try {
+ initiateTsFileBufferFolder();
+ } catch (Exception e) {
+ LOGGER.error("Fail to initiate file buffer folder, Error msg: {}",
e.getMessage());
+ throw new RuntimeException(e);
+ }
}
/**
@@ -130,6 +142,7 @@ public class PipeConsensusReceiver {
* load event must be synchronized.
*/
public TPipeConsensusTransferResp receive(final TPipeConsensusTransferReq
req) {
+ long startNanos = System.nanoTime();
// PreCheck: if there are these cases: read-only; null impl; inactive
impl, etc. The receiver
// will reject synchronization.
TPipeConsensusTransferResp resp = preCheckForReceiver(req);
@@ -144,18 +157,26 @@ public class PipeConsensusReceiver {
case TRANSFER_TS_FILE_PIECE_WITH_MOD:
// Just take a place in requestExecutor's buffer, the further seal
request will remove
// its place from buffer.
- requestExecutor.onRequest(req, true);
- return loadEvent(req);
+ requestExecutor.onRequest(req, true, false);
+ resp = loadEvent(req);
+ break;
case TRANSFER_TS_FILE_SEAL:
case TRANSFER_TS_FILE_SEAL_WITH_MOD:
- // TODO: check memory when logging wal(in further version)
+ // TODO: check memory when logging WAL(in further version)
+ resp = requestExecutor.onRequest(req, false, true);
+ break;
case TRANSFER_TABLET_BINARY:
case TRANSFER_TABLET_INSERT_NODE:
// TODO: support batch transfer(in further version)
case TRANSFER_TABLET_BATCH:
default:
- return requestExecutor.onRequest(req, false);
+ resp = requestExecutor.onRequest(req, false, false);
+ break;
}
+ // update receive an event's duration
+ long durationNanos = System.nanoTime() - startNanos;
+ pipeConsensusReceiverMetrics.recordReceiveEventTimer(durationNanos);
+ return resp;
}
// Unknown request type, which means the request can not be handled by
this receiver,
// maybe the version of the receiver is not compatible with the sender
@@ -185,8 +206,7 @@ public class PipeConsensusReceiver {
if (impl.isReadOnly()) {
String message =
String.format(
- "PipeConsensus-ConsensusGroupId-%s: fail to receive because
system is read-only.",
- groupId);
+ "PipeConsensus-PipeName-%s: fail to receive because system is
read-only.", groupId);
if (LOGGER.isErrorEnabled()) {
LOGGER.error(message);
}
@@ -196,7 +216,7 @@ public class PipeConsensusReceiver {
if (!impl.isActive()) {
String message =
String.format(
- "PipeConsensus-ConsensusGroupId-%s: fail to receive because peer
is inactive and not ready.",
+ "PipeConsensus-PipeName-%s: fail to receive because peer is
inactive and not ready.",
groupId);
if (LOGGER.isWarnEnabled()) {
LOGGER.warn(message);
@@ -245,13 +265,13 @@ public class PipeConsensusReceiver {
TSStatusCode.PIPE_CONSENSUS_TYPE_ERROR,
String.format("Unknown PipeConsensusRequestType %s.",
rawRequestType));
LOGGER.warn(
- "PipeConsensus-ConsensusGroupId-{}: Unknown PipeRequestType,
response status = {}.",
- consensusGroupId,
+ "PipeConsensus-PipeName-{}: Unknown PipeRequestType, response status
= {}.",
+ consensusPipeName,
status);
return new TPipeConsensusTransferResp(status);
} catch (Exception e) {
final String error = String.format("Serialization error during pipe
receiving, %s", e);
- LOGGER.warn("PipeConsensus-ConsensusGroupId-{}: {}", consensusGroupId,
error, e);
+ LOGGER.warn("PipeConsensus-PipeName-{}: {}", consensusPipeName, error,
e);
return new
TPipeConsensusTransferResp(RpcUtils.getStatus(TSStatusCode.PIPE_ERROR, error));
}
}
@@ -259,8 +279,7 @@ public class PipeConsensusReceiver {
private TPipeConsensusTransferResp handleTransferTabletInsertNode(
final PipeConsensusTabletInsertNodeReq req) throws
ConsensusGroupNotExistException {
LOGGER.info(
- "PipeConsensus-ConsensusGroupId-{}: starting to receive tablet
insertNode",
- consensusGroupId);
+ "PipeConsensus-PipeName-{}: starting to receive tablet insertNode",
consensusPipeName);
PipeConsensusServerImpl impl =
Optional.ofNullable(pipeConsensus.getImpl(consensusGroupId))
.orElseThrow(() -> new
ConsensusGroupNotExistException(consensusGroupId));
@@ -272,8 +291,7 @@ public class PipeConsensusReceiver {
private TPipeConsensusTransferResp handleTransferTabletBinary(
final PipeConsensusTabletBinaryReq req) throws
ConsensusGroupNotExistException {
- LOGGER.info(
- "PipeConsensus-ConsensusGroupId-{}: starting to receive tablet
binary", consensusGroupId);
+ LOGGER.info("PipeConsensus-PipeName-{}: starting to receive tablet
binary", consensusPipeName);
PipeConsensusServerImpl impl =
Optional.ofNullable(pipeConsensus.getImpl(consensusGroupId))
.orElseThrow(() -> new
ConsensusGroupNotExistException(consensusGroupId));
@@ -285,17 +303,20 @@ public class PipeConsensusReceiver {
private TPipeConsensusTransferResp handleTransferFilePiece(
final PipeConsensusTransferFilePieceReq req, final boolean isSingleFile)
{
- LOGGER.info(
- "PipeConsensus-ConsensusGroupId-{}: starting to receive tsFile
pieces", consensusGroupId);
- PipeConsensusTsFileWriter diskBuffer =
+ LOGGER.info("PipeConsensus-PipeName-{}: starting to receive tsFile
pieces", consensusPipeName);
+ long startBorrowTsFileWriterNanos = System.nanoTime();
+ PipeConsensusTsFileWriter tsFileWriter =
pipeConsensusTsFileWriterPool.borrowCorrespondingWriter(req.getCommitId());
+ long startPreCheckNanos = System.nanoTime();
+ pipeConsensusReceiverMetrics.recordBorrowTsFileWriterTimer(
+ startPreCheckNanos - startBorrowTsFileWriterNanos);
try {
- updateWritingFileIfNeeded(diskBuffer, req.getFileName(), isSingleFile);
- final File writingFile = diskBuffer.getWritingFile();
- final RandomAccessFile writingFileWriter =
diskBuffer.getWritingFileWriter();
+ updateWritingFileIfNeeded(tsFileWriter, req.getFileName(), isSingleFile);
+ final File writingFile = tsFileWriter.getWritingFile();
+ final RandomAccessFile writingFileWriter =
tsFileWriter.getWritingFileWriter();
- if (isWritingFileOffsetNonCorrect(diskBuffer,
req.getStartWritingOffset())) {
+ if (isWritingFileOffsetNonCorrect(tsFileWriter,
req.getStartWritingOffset())) {
if (!writingFile.getName().endsWith(TsFileConstant.TSFILE_SUFFIX)) {
// If the file is a tsFile, then the content will not be changed for
a specific
// filename. However, for other files (mod, snapshot, etc.) the
content varies for the
@@ -311,20 +332,24 @@ public class PipeConsensusReceiver {
"Request sender to reset file reader's offset from %s to
%s.",
req.getStartWritingOffset(), writingFileWriter.length()));
LOGGER.warn(
- "PipeConsensus-ConsensusGroupId-{}: File offset reset requested by
receiver, response status = {}.",
- consensusGroupId,
+ "PipeConsensus-PipeName-{}: File offset reset requested by
receiver, response status = {}.",
+ consensusPipeName,
status);
return PipeConsensusTransferFilePieceResp.toTPipeConsensusTransferResp(
status, writingFileWriter.length());
}
+ long endPreCheckNanos = System.nanoTime();
+ pipeConsensusReceiverMetrics.recordTsFilePiecePreCheckTime(
+ endPreCheckNanos - startPreCheckNanos);
writingFileWriter.write(req.getFilePiece());
+
pipeConsensusReceiverMetrics.recordTsFilePieceWriteTime(System.nanoTime() -
endPreCheckNanos);
return PipeConsensusTransferFilePieceResp.toTPipeConsensusTransferResp(
RpcUtils.SUCCESS_STATUS, writingFileWriter.length());
} catch (Exception e) {
LOGGER.warn(
- "PipeConsensus-ConsensusGroupId-{}: Failed to write file piece from
req {}.",
- consensusGroupId,
+ "PipeConsensus-PipeName-{}: Failed to write file piece from req {}.",
+ consensusPipeName,
req,
e);
final TSStatus status =
@@ -341,10 +366,13 @@ public class PipeConsensusReceiver {
}
private TPipeConsensusTransferResp handleTransferFileSeal(final
PipeConsensusTsFileSealReq req) {
- LOGGER.info(
- "PipeConsensus-ConsensusGroupId-{}: starting to receive tsFile seal",
consensusGroupId);
+ LOGGER.info("PipeConsensus-PipeName-{}: starting to receive tsFile seal",
consensusPipeName);
+ long startBorrowTsFileWriterNanos = System.nanoTime();
PipeConsensusTsFileWriter tsFileWriter =
pipeConsensusTsFileWriterPool.borrowCorrespondingWriter(req.getCommitId());
+ long startPreCheckNanos = System.nanoTime();
+ pipeConsensusReceiverMetrics.recordBorrowTsFileWriterTimer(
+ startPreCheckNanos - startBorrowTsFileWriterNanos);
File writingFile = tsFileWriter.getWritingFile();
RandomAccessFile writingFileWriter = tsFileWriter.getWritingFileWriter();
@@ -381,29 +409,33 @@ public class PipeConsensusReceiver {
// writingFile will be deleted after load if no exception occurs
tsFileWriter.setWritingFile(null);
+ long endPreCheckNanos = System.nanoTime();
+ pipeConsensusReceiverMetrics.recordTsFileSealPreCheckTimer(
+ endPreCheckNanos - startPreCheckNanos);
final TSStatus status =
loadFileToDataRegion(
fileAbsolutePath,
ProgressIndexType.deserializeFrom(ByteBuffer.wrap(req.getProgressIndex())));
+ pipeConsensusReceiverMetrics.recordTsFileSealLoadTimer(System.nanoTime()
- endPreCheckNanos);
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
// if transfer success, disk buffer will be released.
tsFileWriter.returnSelf();
LOGGER.info(
- "PipeConsensus-ConsensusGroupId-{}: Seal file {} successfully.",
- consensusGroupId,
+ "PipeConsensus-PipeName-{}: Seal file {} successfully.",
+ consensusPipeName,
fileAbsolutePath);
} else {
LOGGER.warn(
- "PipeConsensus-ConsensusGroupId-{}: Failed to seal file {},
because {}.",
- consensusGroupId,
+ "PipeConsensus-PipeName-{}: Failed to seal file {}, because {}.",
+ consensusPipeName,
fileAbsolutePath,
status.getMessage());
}
return new TPipeConsensusTransferResp(status);
} catch (IOException e) {
LOGGER.warn(
- "PipeConsensus-ConsensusGroupId-{}: Failed to seal file {} from req
{}.",
- consensusGroupId,
+ "PipeConsensus-PipeName-{}: Failed to seal file {} from req {}.",
+ consensusPipeName,
writingFile,
req,
e);
@@ -413,8 +445,8 @@ public class PipeConsensusReceiver {
String.format("Failed to seal file %s because %s", writingFile,
e.getMessage())));
} catch (LoadFileException e) {
LOGGER.warn(
- "PipeConsensus-ConsensusGroupId-{}: Failed to load file {} from req
{}.",
- consensusGroupId,
+ "PipeConsensus-PipeName-{}: Failed to load file {} from req {}.",
+ consensusPipeName,
writingFile,
req,
e);
@@ -434,10 +466,13 @@ public class PipeConsensusReceiver {
private TPipeConsensusTransferResp handleTransferFileSealWithMods(
final PipeConsensusTsFileSealWithModReq req) {
LOGGER.info(
- "PipeConsensus-ConsensusGroupId-{}: starting to receive tsFile seal
with mods",
- consensusGroupId);
+ "PipeConsensus-PipeName-{}: starting to receive tsFile seal with
mods", consensusPipeName);
+ long startBorrowTsFileWriterNanos = System.nanoTime();
PipeConsensusTsFileWriter tsFileWriter =
pipeConsensusTsFileWriterPool.borrowCorrespondingWriter(req.getCommitId());
+ long startPreCheckNanos = System.nanoTime();
+ pipeConsensusReceiverMetrics.recordBorrowTsFileWriterTimer(
+ startPreCheckNanos - startBorrowTsFileWriterNanos);
File writingFile = tsFileWriter.getWritingFile();
RandomAccessFile writingFileWriter = tsFileWriter.getWritingFileWriter();
@@ -491,29 +526,33 @@ public class PipeConsensusReceiver {
final List<String> fileAbsolutePaths =
files.stream().map(File::getAbsolutePath).collect(Collectors.toList());
+ long endPreCheckNanos = System.nanoTime();
+ pipeConsensusReceiverMetrics.recordTsFileSealPreCheckTimer(
+ endPreCheckNanos - startPreCheckNanos);
final TSStatus status =
loadFileToDataRegion(
fileAbsolutePaths.get(1),
ProgressIndexType.deserializeFrom(ByteBuffer.wrap(req.getProgressIndex())));
+ pipeConsensusReceiverMetrics.recordTsFileSealLoadTimer(System.nanoTime()
- endPreCheckNanos);
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
// if transfer success, disk buffer will be released.
tsFileWriter.returnSelf();
LOGGER.info(
- "PipeConsensus-ConsensusGroupId-{}: Seal file with mods {}
successfully.",
- consensusGroupId,
+ "PipeConsensus-PipeName-{}: Seal file with mods {} successfully.",
+ consensusPipeName,
fileAbsolutePaths);
} else {
LOGGER.warn(
- "PipeConsensus-ConsensusGroupId-{}: Failed to seal file {}, status
is {}.",
- consensusGroupId,
+ "PipeConsensus-PipeName-{}: Failed to seal file {}, status is {}.",
+ consensusPipeName,
fileAbsolutePaths,
status);
}
return new TPipeConsensusTransferResp(status);
} catch (Exception e) {
LOGGER.warn(
- "PipeConsensus-ConsensusGroupId-{}: Failed to seal file {} from req
{}.",
- consensusGroupId,
+ "PipeConsensus-PipeName-{}: Failed to seal file {} from req {}.",
+ consensusPipeName,
files,
req,
e);
@@ -546,8 +585,8 @@ public class PipeConsensusReceiver {
TSStatusCode.PIPE_CONSENSUS_TRANSFER_FILE_ERROR,
String.format("Failed to seal file %s, the file does not
exist.", fileName));
LOGGER.warn(
- "PipeConsensus-ConsensusGroupId-{}: Failed to seal file {}, because
the file does not exist.",
- consensusGroupId,
+ "PipeConsensus-PipeName-{}: Failed to seal file {}, because the file
does not exist.",
+ consensusPipeName,
fileName);
return new TPipeConsensusTransferResp(status);
}
@@ -561,9 +600,9 @@ public class PipeConsensusReceiver {
+ "The original file has length %s, but receiver file
has length %s.",
fileName, fileLength, writingFileWriter.length()));
LOGGER.warn(
- "PipeConsensus-ConsensusGroupId-{}: Failed to seal file {} when
check non final seal, because the length of file is not correct. "
+ "PipeConsensus-PipeName-{}: Failed to seal file {} when check non
final seal, because the length of file is not correct. "
+ "The original file has length {}, but receiver file has length
{}.",
- consensusGroupId,
+ consensusPipeName,
fileName,
fileLength,
writingFileWriter.length());
@@ -604,9 +643,9 @@ public class PipeConsensusReceiver {
writingFile != null && writingFile.exists() && writingFileWriter !=
null;
if (!isWritingFileAvailable) {
LOGGER.info(
- "PipeConsensus-ConsensusGroupId-{}: Writing file {} is not
available. "
+ "PipeConsensus-PipeName-{}: Writing file {} is not available. "
+ "Writing file is null: {}, writing file exists: {}, writing
file writer is null: {}.",
- consensusGroupId,
+ consensusPipeName,
writingFile,
writingFile == null,
writingFile != null && writingFile.exists(),
@@ -628,8 +667,8 @@ public class PipeConsensusReceiver {
String.format(
"Failed to seal file %s, because writing file is %s.",
fileName, writingFile));
LOGGER.warn(
- "PipeConsensus-ConsensusGroupId-{}: Failed to seal file {}, because
writing file is {}.",
- consensusGroupId,
+ "PipeConsensus-PipeName-{}: Failed to seal file {}, because writing
file is {}.",
+ consensusPipeName,
fileName,
writingFile);
return new TPipeConsensusTransferResp(status);
@@ -644,9 +683,9 @@ public class PipeConsensusReceiver {
+ "The original file has length %s, but receiver file
has length %s.",
fileName, fileLength, writingFileWriter.length()));
LOGGER.warn(
- "PipeConsensus-ConsensusGroupId-{}: Failed to seal file {} when
check final seal file, because the length of file is not correct. "
+ "PipeConsensus-PipeName-{}: Failed to seal file {} when check final
seal file, because the length of file is not correct. "
+ "The original file has length {}, but receiver file has length
{}.",
- consensusGroupId,
+ consensusPipeName,
fileName,
fileLength,
writingFileWriter.length());
@@ -670,8 +709,8 @@ public class PipeConsensusReceiver {
final boolean offsetCorrect = writingFileWriter.length() == offset;
if (!offsetCorrect) {
LOGGER.warn(
- "PipeConsensus-ConsensusGroupId-{}: Writing file {}'s offset is {},
but request sender's offset is {}.",
- consensusGroupId,
+ "PipeConsensus-PipeName-{}: Writing file {}'s offset is {}, but
request sender's offset is {}.",
+ consensusPipeName,
writingFile.getPath(),
writingFileWriter.length(),
offset);
@@ -684,13 +723,13 @@ public class PipeConsensusReceiver {
try {
diskBuffer.getWritingFileWriter().close();
LOGGER.info(
- "PipeConsensus-ConsensusGroupId-{}: Current writing file writer {}
was closed.",
- consensusGroupId,
+ "PipeConsensus-PipeName-{}: Current writing file writer {} was
closed.",
+ consensusPipeName,
diskBuffer.getWritingFile() == null ? "null" :
diskBuffer.getWritingFile().getPath());
} catch (IOException e) {
LOGGER.warn(
- "PipeConsensus-ConsensusGroupId-{}: Failed to close current
writing file writer {}, because {}.",
- consensusGroupId,
+ "PipeConsensus-PipeName-{}: Failed to close current writing file
writer {}, because {}.",
+ consensusPipeName,
diskBuffer.getWritingFile() == null ? "null" :
diskBuffer.getWritingFile().getPath(),
e.getMessage(),
e);
@@ -699,8 +738,8 @@ public class PipeConsensusReceiver {
} else {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
- "PipeConsensus-ConsensusGroupId-{}: Current writing file writer is
null. No need to close.",
- consensusGroupId.getId());
+ "PipeConsensus-PipeName-{}: Current writing file writer is null.
No need to close.",
+ consensusPipeName.toString());
}
}
}
@@ -710,13 +749,13 @@ public class PipeConsensusReceiver {
try {
FileUtils.delete(file);
LOGGER.info(
- "PipeConsensus-ConsensusGroupId-{}: Original writing file {} was
deleted.",
- consensusGroupId,
+ "PipeConsensus-PipeName-{}: Original writing file {} was deleted.",
+ consensusPipeName,
file.getPath());
} catch (IOException e) {
LOGGER.warn(
- "PipeConsensus-ConsensusGroupId-{}: Failed to delete original
writing file {}, because {}.",
- consensusGroupId,
+ "PipeConsensus-PipeName-{}: Failed to delete original writing file
{}, because {}.",
+ consensusPipeName,
file.getPath(),
e.getMessage(),
e);
@@ -724,8 +763,8 @@ public class PipeConsensusReceiver {
} else {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
- "PipeConsensus-ConsensusGroupId-{}: Original file {} is not
existed. No need to delete.",
- consensusGroupId,
+ "PipeConsensus-PipeName-{}: Original file {} is not existed. No
need to delete.",
+ consensusPipeName,
file.getPath());
}
}
@@ -737,8 +776,8 @@ public class PipeConsensusReceiver {
} else {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
- "PipeConsensus-ConsensusGroupId-{}: Current writing file is null.
No need to delete.",
- consensusGroupId.getId());
+ "PipeConsensus-PipeName-{}: Current writing file is null. No need
to delete.",
+ consensusPipeName.toString());
}
}
}
@@ -751,9 +790,9 @@ public class PipeConsensusReceiver {
}
LOGGER.info(
- "PipeConsensus-ConsensusGroupId-{}: Writing file {} is not existed or
name is not correct, try to create it. "
+ "PipeConsensus-PipeName-{}: Writing file {} is not existed or name is
not correct, try to create it. "
+ "Current writing file is {}.",
- consensusGroupId,
+ consensusPipeName,
fileName,
diskBuffer.getWritingFile() == null ? "null" :
diskBuffer.getWritingFile().getPath());
@@ -770,13 +809,13 @@ public class PipeConsensusReceiver {
if (!receiverFileDirWithIdSuffix.get().exists()) {
if (receiverFileDirWithIdSuffix.get().mkdirs()) {
LOGGER.info(
- "PipeConsensus-ConsensusGroupId-{}: Receiver file dir {} was
created.",
- consensusGroupId,
+ "PipeConsensus-PipeName-{}: Receiver file dir {} was created.",
+ consensusPipeName,
receiverFileDirWithIdSuffix.get().getPath());
} else {
LOGGER.error(
- "PipeConsensus-ConsensusGroupId-{}: Failed to create receiver file
dir {}.",
- consensusGroupId,
+ "PipeConsensus-PipeName-{}: Failed to create receiver file dir
{}.",
+ consensusPipeName,
receiverFileDirWithIdSuffix.get().getPath());
}
}
@@ -784,8 +823,8 @@ public class PipeConsensusReceiver {
diskBuffer.setWritingFile(new File(receiverFileDirWithIdSuffix.get(),
fileName));
diskBuffer.setWritingFileWriter(new
RandomAccessFile(diskBuffer.getWritingFile(), "rw"));
LOGGER.info(
- "PipeConsensus-ConsensusGroupId-{}: Writing file {} was created. Ready
to write file pieces.",
- consensusGroupId,
+ "PipeConsensus-PipeName-{}: Writing file {} was created. Ready to
write file pieces.",
+ consensusPipeName,
diskBuffer.getWritingFile().getPath());
}
@@ -801,13 +840,13 @@ public class PipeConsensusReceiver {
try {
FileUtils.deleteDirectory(receiverFileDirWithIdSuffix.get());
LOGGER.info(
- "PipeConsensus-ConsensusGroupId-{}: Original receiver file dir
{} was deleted successfully.",
- consensusGroupId,
+ "PipeConsensus-PipeName-{}: Original receiver file dir {} was
deleted successfully.",
+ consensusPipeName,
receiverFileDirWithIdSuffix.get().getPath());
} catch (IOException e) {
LOGGER.warn(
- "PipeConsensus-ConsensusGroupId-{}: Failed to delete original
receiver file dir {}, because {}.",
- consensusGroupId,
+ "PipeConsensus-PipeName-{}: Failed to delete original receiver
file dir {}, because {}.",
+ consensusPipeName,
receiverFileDirWithIdSuffix.get().getPath(),
e.getMessage(),
e);
@@ -815,16 +854,16 @@ public class PipeConsensusReceiver {
} else {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
- "PipeConsensus-ConsensusGroupId-{}: Original receiver file dir
{} is not existed. No need to delete.",
- consensusGroupId,
+ "PipeConsensus-PipeName-{}: Original receiver file dir {} is not
existed. No need to delete.",
+ consensusPipeName,
receiverFileDirWithIdSuffix.get().getPath());
}
}
receiverFileDirWithIdSuffix.set(null);
} else {
LOGGER.debug(
- "PipeConsensus-ConsensusGroupId-{}: Current receiver file dir is
null. No need to delete.",
- consensusGroupId.getId());
+ "PipeConsensus-PipeName-{}: Current receiver file dir is null. No
need to delete.",
+ consensusPipeName.toString());
}
// initiate receiverFileDirWithIdSuffix
@@ -832,8 +871,8 @@ public class PipeConsensusReceiver {
final String receiverFileBaseDir = getReceiverFileBaseDir();
if (Objects.isNull(receiverFileBaseDir)) {
LOGGER.warn(
- "PipeConsensus-ConsensusGroupId-{}: Failed to init pipeConsensus
receiver file folder manager because all disks of folders are full.",
- consensusGroupId.getId());
+ "PipeConsensus-PipeName-{}: Failed to get pipeConsensus receiver
file base directory, because your folderManager is null. May because the disk
is full.",
+ consensusPipeName.toString());
throw new DiskSpaceInsufficientException(receiverBaseDirsName);
}
// Create a new receiver file dir
@@ -841,19 +880,19 @@ public class PipeConsensusReceiver {
if (newReceiverDir.exists()) {
FileUtils.deleteDirectory(newReceiverDir);
LOGGER.info(
- "PipeConsensus-ConsensusGroupId-{}: Origin receiver file dir {}
was deleted.",
- consensusGroupId,
+ "PipeConsensus-PipeName-{}: Origin receiver file dir {} was
deleted.",
+ consensusPipeName,
newReceiverDir.getPath());
}
if (!newReceiverDir.mkdirs()) {
LOGGER.warn(
- "PipeConsensus-ConsensusGroupId-{}: Failed to create receiver file
dir {}.",
- newReceiverDir.getPath(),
- consensusGroupId.getId());
+ "PipeConsensus-PipeName-{}: Failed to create receiver file dir {}.
May because authority or dir already exists etc.",
+ consensusPipeName,
+ newReceiverDir.getPath());
throw new IOException(
String.format(
- "PipeConsensus-ConsensusGroupId-%s: Failed to create receiver
file dir %s.",
- newReceiverDir.getPath(), consensusGroupId.getId()));
+ "PipeConsensus-PipeName-%s: Failed to create receiver file dir
%s. May because authority or dir already exists etc.",
+ consensusPipeName, newReceiverDir.getPath()));
}
receiverFileDirWithIdSuffix.set(newReceiverDir);
@@ -871,7 +910,7 @@ public class PipeConsensusReceiver {
public synchronized void handleExit() {
// Clear the diskBuffers
- pipeConsensusTsFileWriterPool.handleExit(consensusGroupId);
+ pipeConsensusTsFileWriterPool.handleExit(consensusPipeName);
// Clear the original receiver file dir if exists
if (receiverFileDirWithIdSuffix.get() != null) {
@@ -879,21 +918,21 @@ public class PipeConsensusReceiver {
try {
FileUtils.deleteDirectory(receiverFileDirWithIdSuffix.get());
LOGGER.info(
- "PipeConsensus-ConsensusGroupId-{}: Receiver exit: Original
receiver file dir {} was deleted.",
- consensusGroupId,
+ "PipeConsensus-PipeName-{}: Receiver exit: Original receiver
file dir {} was deleted.",
+ consensusPipeName,
receiverFileDirWithIdSuffix.get().getPath());
} catch (IOException e) {
LOGGER.warn(
- "PipeConsensus-ConsensusGroupId-{}: Receiver exit: Delete
original receiver file dir {} error.",
- consensusGroupId,
+ "PipeConsensus-PipeName-{}: Receiver exit: Delete original
receiver file dir {} error.",
+ consensusPipeName,
receiverFileDirWithIdSuffix.get().getPath(),
e);
}
} else {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
- "PipeConsensus-ConsensusGroupId-{}: Receiver exit: Original
receiver file dir {} does not exist. No need to delete.",
- consensusGroupId,
+ "PipeConsensus-PipeName-{}: Receiver exit: Original receiver
file dir {} does not exist. No need to delete.",
+ consensusPipeName,
receiverFileDirWithIdSuffix.get().getPath());
}
}
@@ -901,14 +940,16 @@ public class PipeConsensusReceiver {
} else {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
- "PipeConsensus-ConsensusGroupId-{}: Receiver exit: Original
receiver file dir is null. No need to delete.",
- consensusGroupId.getId());
+ "PipeConsensus-PipeName-{}: Receiver exit: Original receiver file
dir is null. No need to delete.",
+ consensusPipeName.toString());
}
}
+ // remove metric
+ MetricService.getInstance().removeMetricSet(pipeConsensusReceiverMetrics);
+
LOGGER.info(
- "PipeConsensus-ConsensusGroupId-{}: Receiver exit: Receiver exited.",
- consensusGroupId.getId());
+ "PipeConsensus-PipeName-{}: Receiver exit: Receiver exited.",
consensusPipeName.toString());
}
private static class PipeConsensusTsFileWriterPool {
@@ -948,7 +989,7 @@ public class PipeConsensusReceiver {
return diskBuffer.get();
}
- public void handleExit(ConsensusGroupId consensusGroupId) {
+ public void handleExit(ConsensusPipeName consensusPipeName) {
pipeConsensusTsFileWriterPool.forEach(
diskBuffer -> {
// Wait until diskBuffer is not used by TsFileInsertionEvent or
timeout.
@@ -960,13 +1001,13 @@ public class PipeConsensusReceiver {
Thread.sleep(RETRY_WAIT_TIME);
} catch (InterruptedException e) {
LOGGER.warn(
- "PipeConsensus-ConsensusGroupId-{}: receiver thread get
interrupted when exiting.",
- consensusGroupId.getId());
+ "PipeConsensus-PipeName-{}: receiver thread get
interrupted when exiting.",
+ consensusPipeName.toString());
// avoid infinite loop
break;
}
}
- diskBuffer.closeSelf(consensusGroupId);
+ diskBuffer.closeSelf(consensusPipeName);
});
}
}
@@ -1023,19 +1064,19 @@ public class PipeConsensusReceiver {
this.commitIdOfCorrespondingHolderEvent = null;
}
- public void closeSelf(ConsensusGroupId consensusGroupId) {
+ public void closeSelf(ConsensusPipeName consensusPipeName) {
// close file writer
if (writingFileWriter != null) {
try {
writingFileWriter.close();
LOGGER.info(
- "PipeConsensus-ConsensusGroupId-{}: TsFileWriter-{} exit:
Writing file writer was closed.",
- consensusGroupId.getId(),
+ "PipeConsensus-PipeName-{}: TsFileWriter-{} exit: Writing file
writer was closed.",
+ consensusPipeName.toString(),
index);
} catch (Exception e) {
LOGGER.warn(
- "PipeConsensus-ConsensusGroupId-{}: TsFileWriter-{} exit: Close
Writing file writer error.",
- consensusGroupId,
+ "PipeConsensus-PipeName-{}: TsFileWriter-{} exit: Close Writing
file writer error.",
+ consensusPipeName,
index,
e);
}
@@ -1043,8 +1084,8 @@ public class PipeConsensusReceiver {
} else {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
- "PipeConsensus-ConsensusGroupId-{}: TsFileWriter-{} exit:
Writing file writer is null. No need to close.",
- consensusGroupId.getId(),
+ "PipeConsensus-PipeName-{}: TsFileWriter-{} exit: Writing file
writer is null. No need to close.",
+ consensusPipeName.toString(),
index);
}
}
@@ -1054,13 +1095,13 @@ public class PipeConsensusReceiver {
try {
FileUtils.delete(writingFile);
LOGGER.info(
- "PipeConsensus-ConsensusGroupId-{}: TsFileWriter exit: Writing
file {} was deleted.",
- consensusGroupId,
+ "PipeConsensus-PipeName-{}: TsFileWriter exit: Writing file {}
was deleted.",
+ consensusPipeName,
writingFile.getPath());
} catch (Exception e) {
LOGGER.warn(
- "PipeConsensus-ConsensusGroupId-{}: TsFileWriter exit: Delete
writing file {} error.",
- consensusGroupId,
+ "PipeConsensus-PipeName-{}: TsFileWriter exit: Delete writing
file {} error.",
+ consensusPipeName,
writingFile.getPath(),
e);
}
@@ -1068,8 +1109,8 @@ public class PipeConsensusReceiver {
} else {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
- "PipeConsensus-ConsensusGroupId-{}: TsFileWriter exit: Writing
file is null. No need to delete.",
- consensusGroupId.getId());
+ "PipeConsensus-PipeName-{}: TsFileWriter exit: Writing file is
null. No need to delete.",
+ consensusPipeName.toString());
}
}
}
@@ -1083,38 +1124,57 @@ public class PipeConsensusReceiver {
// An ordered set that buffers transfer requests' TCommitId, whose length
is not larger than
// PIPE_CONSENSUS_PIPELINE_SIZE.
// Here we use set is to avoid duplicate events being received in some
special cases
- private final TreeSet<TCommitId> reqExecutionOrderBuffer;
+ private final TreeSet<RequestMeta> reqExecutionOrderBuffer;
private final Lock lock;
private final Condition condition;
- private long onSyncedCommitIndex = -1;
+ private final PipeConsensusReceiverMetrics metric;
+ private long onSyncedCommitIndex = 0;
private int connectorRebootTimes = 0;
+ private volatile int WALEventCount = 0;
+ private volatile int tsFileEventCount = 0;
- public RequestExecutor() {
- reqExecutionOrderBuffer =
+ public RequestExecutor(PipeConsensusReceiverMetrics metric) {
+ this.reqExecutionOrderBuffer =
new TreeSet<>(
- Comparator.comparingInt(TCommitId::getRebootTimes)
- .thenComparingLong(TCommitId::getCommitIndex));
- lock = new ReentrantLock();
- condition = lock.newCondition();
+ Comparator.comparingInt(RequestMeta::getRebootTimes)
+ .thenComparingLong(RequestMeta::getCommitIndex));
+ this.lock = new ReentrantLock();
+ this.condition = lock.newCondition();
+ this.metric = metric;
}
- private void onSuccess(long nextSyncedCommitIndex) {
+ private void onSuccess(long nextSyncedCommitIndex, boolean
isTransferTsFileSeal) {
LOGGER.info(
- "PipeConsensus-ConsensusGroupId-{}: process no.{} event
successfully!",
- consensusGroupId,
+ "PipeConsensus-PipeName-{}: process no.{} event successfully!",
+ consensusPipeName,
nextSyncedCommitIndex);
- reqExecutionOrderBuffer.pollFirst();
+ RequestMeta curMeta = reqExecutionOrderBuffer.pollFirst();
onSyncedCommitIndex = nextSyncedCommitIndex;
+ // update metric, notice that curMeta is never null.
+ if (isTransferTsFileSeal) {
+ tsFileEventCount--;
+ metric.recordReceiveTsFileTimer(System.nanoTime() -
curMeta.getStartApplyNanos());
+ } else {
+ WALEventCount--;
+ metric.recordReceiveWALTimer(System.nanoTime() -
curMeta.getStartApplyNanos());
+ }
}
private TPipeConsensusTransferResp onRequest(
- final TPipeConsensusTransferReq req, final boolean
isTransferTsFilePiece) {
+ final TPipeConsensusTransferReq req,
+ final boolean isTransferTsFilePiece,
+ final boolean isTransferTsFileSeal) {
+ long startAcquireLockNanos = System.nanoTime();
lock.lock();
try {
+ long startDispatchNanos = System.nanoTime();
+ metric.recordAcquireExecutorLockTimer(startDispatchNanos -
startAcquireLockNanos);
+
TCommitId tCommitId = req.getCommitId();
+ RequestMeta requestMeta = new RequestMeta(tCommitId);
LOGGER.info(
- "PipeConsensus-ConsensusGroup-{}: start to receive no.{} event",
- consensusGroupId,
+ "PipeConsensus-PipeName-{}: start to receive no.{} event",
+ consensusPipeName,
tCommitId.getCommitIndex());
// if a req is deprecated, we will discard it
// This case may happen in this scenario: leader has transferred {1,2}
and is intending to
@@ -1132,8 +1192,8 @@ public class PipeConsensusReceiver {
TSStatusCode.PIPE_CONSENSUS_DEPRECATED_REQUEST,
"PipeConsensus receiver received a deprecated request,
which may be sent before the connector restart. Consider to discard it"));
LOGGER.info(
- "PipeConsensus-ConsensusGroup-{}: received a deprecated request,
which may be sent before the connector restart. Consider to discard it",
- consensusGroupId);
+ "PipeConsensus-PipeName-{}: received a deprecated request, which
may be sent before the connector restart. Consider to discard it",
+ consensusPipeName);
return new TPipeConsensusTransferResp(status);
}
// Judge whether connector has rebooted or not, if the rebootTimes
increases compared to
@@ -1141,16 +1201,28 @@ public class PipeConsensusReceiver {
if (tCommitId.getRebootTimes() > connectorRebootTimes) {
resetWithNewestRebootTime(tCommitId.getRebootTimes());
}
- reqExecutionOrderBuffer.add(tCommitId);
+ // update metric
+ if (isTransferTsFilePiece &&
!reqExecutionOrderBuffer.contains(requestMeta)) {
+ // only update tsFileEventCount when tsFileEvent is first enqueue.
+ tsFileEventCount++;
+ }
+ if (!isTransferTsFileSeal && !isTransferTsFilePiece) {
+ WALEventCount++;
+ }
+ reqExecutionOrderBuffer.add(requestMeta);
+
// TsFilePieceTransferEvent will not enter further procedure, it just
holds a place in
// buffer. Only after the corresponding sealing event is processed,
this event can be
// dequeued.
if (isTransferTsFilePiece) {
+ long startApplyNanos = System.nanoTime();
+ metric.recordDispatchWaitingTimer(startApplyNanos -
startDispatchNanos);
+ requestMeta.setStartApplyNanos(startApplyNanos);
return null;
}
if (reqExecutionOrderBuffer.size() >=
IOTDB_CONFIG.getPipeConsensusPipelineSize()
- && !reqExecutionOrderBuffer.first().equals(tCommitId)) {
+ && !reqExecutionOrderBuffer.first().equals(requestMeta)) {
// If reqBuffer is full and current thread do not hold the
reqBuffer's peek, this req
// is not supposed to be processed. So current thread should notify
the corresponding
// threads to process the peek.
@@ -1159,8 +1231,11 @@ public class PipeConsensusReceiver {
// Polling to process
while (true) {
- if (reqExecutionOrderBuffer.first().equals(tCommitId)
+ if (reqExecutionOrderBuffer.first().equals(requestMeta)
&& tCommitId.getCommitIndex() == onSyncedCommitIndex + 1) {
+ long startApplyNanos = System.nanoTime();
+ metric.recordDispatchWaitingTimer(startApplyNanos -
startDispatchNanos);
+ requestMeta.setStartApplyNanos(startApplyNanos);
// If current req is supposed to be process, load this event
through
// DataRegionStateMachine.
TPipeConsensusTransferResp resp = loadEvent(req);
@@ -1171,19 +1246,22 @@ public class PipeConsensusReceiver {
// when the last seal req is applied, we can discard this event.
if (resp != null
&& resp.getStatus().getCode() ==
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- onSuccess(onSyncedCommitIndex + 1);
+ onSuccess(onSyncedCommitIndex + 1, isTransferTsFileSeal);
}
return resp;
}
if (reqExecutionOrderBuffer.size() >=
IOTDB_CONFIG.getPipeConsensusPipelineSize()
- && reqExecutionOrderBuffer.first().equals(tCommitId)) {
+ && reqExecutionOrderBuffer.first().equals(requestMeta)) {
+ long startApplyNanos = System.nanoTime();
+ metric.recordDispatchWaitingTimer(startApplyNanos -
startDispatchNanos);
+ requestMeta.setStartApplyNanos(startApplyNanos);
// If the reqBuffer is full and its peek is hold by current
thread, load this event.
TPipeConsensusTransferResp resp = loadEvent(req);
if (resp != null
&& resp.getStatus().getCode() ==
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- onSuccess(tCommitId.getCommitIndex());
+ onSuccess(tCommitId.getCommitIndex(), isTransferTsFileSeal);
// signal all other reqs that may wait for this event
condition.signalAll();
}
@@ -1203,12 +1281,15 @@ public class PipeConsensusReceiver {
if (timeout
&& reqExecutionOrderBuffer.size() <
IOTDB_CONFIG.getPipeConsensusPipelineSize()
&& reqExecutionOrderBuffer.first() != null
- && reqExecutionOrderBuffer.first().equals(tCommitId)) {
+ && reqExecutionOrderBuffer.first().equals(requestMeta)) {
+ long startApplyNanos = System.nanoTime();
+ metric.recordDispatchWaitingTimer(startApplyNanos -
startDispatchNanos);
+ requestMeta.setStartApplyNanos(startApplyNanos);
TPipeConsensusTransferResp resp = loadEvent(req);
if (resp != null
&& resp.getStatus().getCode() ==
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- onSuccess(tCommitId.getCommitIndex());
+ onSuccess(tCommitId.getCommitIndex(), isTransferTsFileSeal);
// signal all other reqs that may wait for this event
condition.signalAll();
}
@@ -1216,8 +1297,8 @@ public class PipeConsensusReceiver {
}
} catch (InterruptedException e) {
LOGGER.warn(
- "PipeConsensus-ConsensusGroup-{}: current waiting is
interrupted. onSyncedCommitIndex: {}. Exception: ",
- consensusGroupId,
+ "PipeConsensus-PipeName-{}: current waiting is interrupted.
onSyncedCommitIndex: {}. Exception: ",
+ consensusPipeName,
tCommitId.getCommitIndex(),
e);
Thread.currentThread().interrupt();
@@ -1240,12 +1321,76 @@ public class PipeConsensusReceiver {
*/
private void resetWithNewestRebootTime(int connectorRebootTimes) {
LOGGER.info(
- "PipeConsensus-ConsensusGroup-{}: receiver detected an newer
rebootTimes, which indicates the leader has rebooted. receiver will reset all
its data.",
- consensusGroupId);
+ "PipeConsensus-PipeName-{}: receiver detected an newer rebootTimes,
which indicates the leader has rebooted. receiver will reset all its data.",
+ consensusPipeName);
this.reqExecutionOrderBuffer.clear();
this.onSyncedCommitIndex = -1;
// sync the follower's connectorRebootTimes with connector's actual
rebootTimes
this.connectorRebootTimes = connectorRebootTimes;
}
}
+
+ private static class RequestMeta {
+ private final TCommitId commitId;
+ private long startApplyNanos = 0;
+
+ public RequestMeta(TCommitId commitId) {
+ this.commitId = commitId;
+ }
+
+ public int getRebootTimes() {
+ return commitId.getRebootTimes();
+ }
+
+ public long getCommitIndex() {
+ return commitId.getCommitIndex();
+ }
+
+ public void setStartApplyNanos(long startApplyNanos) {
+ // Notice that a tsFileInsertionEvent will enter RequestExecutor
multiple times, we only need
+ // to record the time of the first apply
+ if (startApplyNanos == 0) {
+ this.startApplyNanos = startApplyNanos;
+ }
+ }
+
+ public long getStartApplyNanos() {
+ return startApplyNanos;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ RequestMeta that = (RequestMeta) o;
+ return commitId.equals(that.commitId);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(commitId);
+ }
+ }
+
+ //////////////////////////// APIs provided for metric framework
////////////////////////////
+
+ public int getReceiveBufferSize() {
+ return this.requestExecutor.reqExecutionOrderBuffer.size();
+ }
+
+ public int getWALEventCount() {
+ return this.requestExecutor.WALEventCount;
+ }
+
+ public int getTsFileEventCount() {
+ return this.requestExecutor.tsFileEventCount;
+ }
+
+ public String getConsensusGroupIdStr() {
+ return consensusGroupId.toString();
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
index 2b48becaed2..32f37e20818 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.metric.PipeProcessorMetrics;
+import org.apache.iotdb.db.pipe.processor.pipeconsensus.PipeConsensusProcessor;
import org.apache.iotdb.db.pipe.task.connection.PipeEventCollector;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.db.utils.ErrorHandlingUtils;
@@ -148,8 +149,12 @@ public class PipeProcessorSubtask extends
PipeReportableSubtask {
outputEventCollector);
}
}
+
final boolean shouldReport =
- !isClosed.get() &&
outputEventCollector.hasNoCollectInvocationAfterReset();
+ !isClosed.get()
+ && outputEventCollector.hasNoCollectInvocationAfterReset()
+ // Events generated from consensusPipe's transferred data should
never be reported.
+ && !(pipeProcessor instanceof PipeConsensusProcessor);
if (shouldReport && event instanceof EnrichedEvent) {
PipeEventCommitManager.getInstance()
.enrichWithCommitterKeyAndCommitId((EnrichedEvent) event,
creationTime, regionId);
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/container/PipeConsensusClientMgrContainer.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/container/PipeConsensusClientMgrContainer.java
index 5a52eb34409..6fc04163433 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/container/PipeConsensusClientMgrContainer.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/container/PipeConsensusClientMgrContainer.java
@@ -42,7 +42,7 @@ public class PipeConsensusClientMgrContainer {
private final IClientManager<TEndPoint, AsyncPipeConsensusServiceClient>
asyncClientManager;
private final IClientManager<TEndPoint, SyncPipeConsensusServiceClient>
syncClientManager;
- public PipeConsensusClientMgrContainer() {
+ private PipeConsensusClientMgrContainer() {
// load rpc client config
PipeConsensusClientProperty config =
PipeConsensusClientProperty.newBuilder()
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
index 8d6879ae8c2..dfcf35c03ca 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
@@ -195,6 +195,7 @@ public class PipeConnectorConstant {
public static final String SINK_CONSUMER_GROUP_KEY = "sink.consumer-group";
public static final String CONNECTOR_CONSENSUS_GROUP_ID_KEY =
"connector.consensus.group-id";
+ public static final String CONNECTOR_CONSENSUS_PIPE_NAME =
"connector.consensus.pipe-name";
private PipeConnectorConstant() {
throw new IllegalStateException("Utility class");
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
index 3742c3c6028..b43e37788cd 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
@@ -342,8 +342,8 @@ public abstract class EnrichedEvent implements Event {
}
/**
- * Used for pipeConsensus. In PipeConsensus, we only need commiterKey,
commitId and rebootTimes to
- * uniquely identify an event
+ * Used for pipeConsensus. In PipeConsensus, we only need committerKey,
commitId and rebootTimes
+ * to uniquely identify an event
*/
public boolean equalsInPipeConsensus(Object o) {
if (this == o) {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/progress/PipeEventCommitManager.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/progress/PipeEventCommitManager.java
index 9f9369ae56b..0b740f7e12a 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/progress/PipeEventCommitManager.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/progress/PipeEventCommitManager.java
@@ -137,6 +137,14 @@ public class PipeEventCommitManager {
// Do nothing but make it private.
}
+ public long getGivenConsensusPipeCommitId(String committerKey) {
+ final PipeEventCommitter committer = eventCommitterMap.get(committerKey);
+ if (committer == null) {
+ return 0;
+ }
+ return committer.getCurrentCommitId();
+ }
+
private static class PipeEventCommitManagerHolder {
private static final PipeEventCommitManager INSTANCE = new
PipeEventCommitManager();
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/progress/PipeEventCommitter.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/progress/PipeEventCommitter.java
index 04502441daf..559e53ee15b 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/progress/PipeEventCommitter.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/progress/PipeEventCommitter.java
@@ -136,4 +136,8 @@ public class PipeEventCommitter {
public long commitQueueSize() {
return commitQueue.size();
}
+
+ public long getCurrentCommitId() {
+ return commitIdGenerator.get();
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
index c207435d3f1..76a40ce2e3d 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
@@ -55,6 +55,11 @@ public enum Metric {
IOT_CONSENSUS("iot_consensus"),
IOT_SEND_LOG("iot_send_log"),
IOT_RECEIVE_LOG("iot_receive_log"),
+ PIPE_CONSENSUS("pipe_consensus"),
+ PIPE_CONSENSUS_MODE("pipe_consensus_mode"),
+ PIPE_SEND_EVENT("pipe_send_event"),
+ PIPE_RETRY_SEND_EVENT("pipe_retry_send_event"),
+ PIPE_RECEIVE_EVENT("pipe_receive_event"),
RATIS_CONSENSUS_WRITE("ratis_consensus_write"),
RATIS_CONSENSUS_READ("ratis_consensus_read"),
// storage engine related