This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new ed8595aae03 Pipe: add async connector retry queue / cache hit and
request count / commit queue metrics & initialize pipe metrics on CN (#11601)
ed8595aae03 is described below
commit ed8595aae03014446a3f56da214ada7dc2045473
Author: V_Galaxy <[email protected]>
AuthorDate: Sat Nov 25 01:47:30 2023 +0800
Pipe: add async connector retry queue / cache hit and request count /
commit queue metrics & initialize pipe metrics on CN (#11601)
---
.../manager/pipe/metric/PipeConfigNodeMetrics.java | 47 +++++++
.../manager/pipe/metric/PipeProcedureMetrics.java | 97 ++++++++++++++
.../manager/pipe/metric/PipeTaskInfoMetrics.java | 99 +++++++++++++++
.../manager/pipe/task/PipeTaskCoordinator.java | 18 +++
.../confignode/persistence/pipe/PipeTaskInfo.java | 18 +++
.../impl/pipe/AbstractOperatePipeProcedureV2.java | 3 +
.../procedure/impl/pipe/PipeTaskOperation.java | 24 +++-
.../iotdb/confignode/service/ConfigNode.java | 3 +
.../db/pipe/commit/PipeEventCommitManager.java | 6 +-
.../iotdb/db/pipe/commit/PipeEventCommitter.java | 23 +++-
.../thrift/async/IoTDBThriftAsyncConnector.java | 6 +
.../iotdb/db/pipe/metric/PipeConnectorMetrics.java | 22 ++++
.../{PipeMetrics.java => PipeDataNodeMetrics.java} | 16 ++-
.../db/pipe/metric/PipeEventCommitMetrics.java | 141 +++++++++++++++++++++
.../pipe/metric/PipeWALInsertNodeCacheMetrics.java | 24 ++++
.../subtask/connector/PipeConnectorSubtask.java | 6 +
.../db/service/metrics/DataNodeMetricsHelper.java | 4 +-
.../dataregion/wal/utils/WALInsertNodeCache.java | 10 ++
.../commons/pipe/task/meta/PipeMetaKeeper.java | 32 +++++
.../iotdb/commons/service/metric/enums/Metric.java | 6 +
20 files changed, 586 insertions(+), 19 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeMetrics.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeMetrics.java
new file mode 100644
index 00000000000..e12d3bba9ee
--- /dev/null
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeMetrics.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.pipe.metric;
+
+import org.apache.iotdb.confignode.manager.pipe.PipeManager;
+import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.metricsets.IMetricSet;
+
+public class PipeConfigNodeMetrics implements IMetricSet {
+
+ private final PipeTaskInfoMetrics pipeTaskInfoMetrics;
+
+ public PipeConfigNodeMetrics(PipeManager pipeManager) {
+ this.pipeTaskInfoMetrics = new PipeTaskInfoMetrics(pipeManager);
+ }
+
+ //////////////////////////// bindTo & unbindFrom (metric framework)
////////////////////////////
+
+ @Override
+ public void bindTo(AbstractMetricService metricService) {
+ PipeProcedureMetrics.getInstance().bindTo(metricService);
+ pipeTaskInfoMetrics.bindTo(metricService);
+ }
+
+ @Override
+ public void unbindFrom(AbstractMetricService metricService) {
+ PipeProcedureMetrics.getInstance().unbindFrom(metricService);
+ pipeTaskInfoMetrics.unbindFrom(metricService);
+ }
+}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeProcedureMetrics.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeProcedureMetrics.java
new file mode 100644
index 00000000000..021c261eac2
--- /dev/null
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeProcedureMetrics.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.pipe.metric;
+
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import org.apache.iotdb.confignode.procedure.impl.pipe.PipeTaskOperation;
+import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.type.Timer;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+public class PipeProcedureMetrics implements IMetricSet {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PipeProcedureMetrics.class);
+
+ private final Map<String, Timer> timerMap = new HashMap<>();
+
+ //////////////////////////// bindTo & unbindFrom (metric framework)
////////////////////////////
+
+ @Override
+ public void bindTo(AbstractMetricService metricService) {
+ Arrays.stream(PipeTaskOperation.values())
+ .forEach(
+ op ->
+ timerMap.put(
+ op.getName(),
+ metricService.getOrCreateTimer(
+ Metric.PIPE_PROCEDURE.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ op.getName())));
+ }
+
+ @Override
+ public void unbindFrom(AbstractMetricService metricService) {
+ timerMap.forEach(
+ (name, timer) ->
+ metricService.remove(
+ MetricType.TIMER, Metric.PIPE_PROCEDURE.toString(),
Tag.NAME.toString(), name));
+ }
+
+ //////////////////////////// pipe integration ////////////////////////////
+
+ public void updateTimer(String name, long durationMillis) {
+ Timer timer = timerMap.get(name);
+ if (timer == null) {
+ LOGGER.warn("Failed to update pipe procedure timer, PipeProcedure({})
does not exist", name);
+ return;
+ }
+ timer.updateMillis(durationMillis);
+ }
+
+ //////////////////////////// singleton ////////////////////////////
+
+ private static class PipeProcedureMetricsHolder {
+
+ private static final PipeProcedureMetrics INSTANCE = new
PipeProcedureMetrics();
+
+ private PipeProcedureMetricsHolder() {
+ // empty constructor
+ }
+ }
+
+ public static PipeProcedureMetrics getInstance() {
+ return PipeProcedureMetrics.PipeProcedureMetricsHolder.INSTANCE;
+ }
+
+ private PipeProcedureMetrics() {
+ // empty constructor
+ }
+}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeTaskInfoMetrics.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeTaskInfoMetrics.java
new file mode 100644
index 00000000000..e2448e917d0
--- /dev/null
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeTaskInfoMetrics.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.pipe.metric;
+
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import org.apache.iotdb.confignode.manager.pipe.PipeManager;
+import org.apache.iotdb.confignode.manager.pipe.task.PipeTaskCoordinator;
+import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
+
+public class PipeTaskInfoMetrics implements IMetricSet {
+
+ private final PipeManager pipeManager;
+
+ private static final String RUNNING = "running";
+
+ private static final String DROPPED = "dropped";
+
+ private static final String USER_STOPPED = "userStopped";
+
+ private static final String EXCEPTION_STOPPED = "exceptionStopped";
+
+ public PipeTaskInfoMetrics(PipeManager pipeManager) {
+ this.pipeManager = pipeManager;
+ }
+
+ //////////////////////////// bindTo & unbindFrom (metric framework)
////////////////////////////
+
+ @Override
+ public void bindTo(AbstractMetricService metricService) {
+ PipeTaskCoordinator coordinator = pipeManager.getPipeTaskCoordinator();
+ metricService.createAutoGauge(
+ Metric.PIPE_TASK_STATUS.toString(),
+ MetricLevel.IMPORTANT,
+ coordinator,
+ PipeTaskCoordinator::runningPipeCount,
+ Tag.STATUS.toString(),
+ RUNNING);
+ metricService.createAutoGauge(
+ Metric.PIPE_TASK_STATUS.toString(),
+ MetricLevel.IMPORTANT,
+ coordinator,
+ PipeTaskCoordinator::droppedPipeCount,
+ Tag.STATUS.toString(),
+ DROPPED);
+ metricService.createAutoGauge(
+ Metric.PIPE_TASK_STATUS.toString(),
+ MetricLevel.IMPORTANT,
+ coordinator,
+ PipeTaskCoordinator::userStoppedPipeCount,
+ Tag.STATUS.toString(),
+ USER_STOPPED);
+ metricService.createAutoGauge(
+ Metric.PIPE_TASK_STATUS.toString(),
+ MetricLevel.IMPORTANT,
+ coordinator,
+ PipeTaskCoordinator::exceptionStoppedPipeCount,
+ Tag.STATUS.toString(),
+ EXCEPTION_STOPPED);
+ }
+
+ @Override
+ public void unbindFrom(AbstractMetricService metricService) {
+ metricService.remove(
+ MetricType.AUTO_GAUGE, Metric.PIPE_TASK_STATUS.toString(),
Tag.STATUS.toString(), RUNNING);
+ metricService.remove(
+ MetricType.AUTO_GAUGE, Metric.PIPE_TASK_STATUS.toString(),
Tag.STATUS.toString(), DROPPED);
+ metricService.remove(
+ MetricType.AUTO_GAUGE,
+ Metric.PIPE_TASK_STATUS.toString(),
+ Tag.STATUS.toString(),
+ USER_STOPPED);
+ metricService.remove(
+ MetricType.AUTO_GAUGE,
+ Metric.PIPE_TASK_STATUS.toString(),
+ Tag.STATUS.toString(),
+ EXCEPTION_STOPPED);
+ }
+}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeTaskCoordinator.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeTaskCoordinator.java
index 062b626be14..6f4d03a5f60 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeTaskCoordinator.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeTaskCoordinator.java
@@ -179,4 +179,22 @@ public class PipeTaskCoordinator {
public boolean canSkipNextSync() {
return pipeTaskInfo.canSkipNextSync();
}
+
+ //////////////////////////// APIs provided for metric framework
////////////////////////////
+
+ public long runningPipeCount() {
+ return pipeTaskInfo.runningPipeCount();
+ }
+
+ public long droppedPipeCount() {
+ return pipeTaskInfo.droppedPipeCount();
+ }
+
+ public long userStoppedPipeCount() {
+ return pipeTaskInfo.userStoppedPipeCount();
+ }
+
+ public long exceptionStoppedPipeCount() {
+ return pipeTaskInfo.exceptionStoppedPipeCount();
+ }
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index cd2c663727c..f6e0003695b 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -722,4 +722,22 @@ public class PipeTaskInfo implements SnapshotProcessor {
public String toString() {
return pipeMetaKeeper.toString();
}
+
+ //////////////////////////// APIs provided for metric framework
////////////////////////////
+
+ public long runningPipeCount() {
+ return pipeMetaKeeper.runningPipeCount();
+ }
+
+ public long droppedPipeCount() {
+ return pipeMetaKeeper.droppedPipeCount();
+ }
+
+ public long userStoppedPipeCount() {
+ return pipeMetaKeeper.userStoppedPipeCount();
+ }
+
+ public long exceptionStoppedPipeCount() {
+ return pipeMetaKeeper.exceptionStoppedPipeCount();
+ }
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
index 841aff6c905..6fb364c6466 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.confignode.procedure.impl.pipe;
import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
+import org.apache.iotdb.confignode.manager.pipe.metric.PipeProcedureMetrics;
import org.apache.iotdb.confignode.persistence.pipe.PipeTaskInfo;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
@@ -149,6 +150,8 @@ public abstract class AbstractOperatePipeProcedureV2
.getPipeTaskCoordinator()
.updateLastSyncedVersion();
}
+ PipeProcedureMetrics.getInstance()
+ .updateTimer(this.getOperation().getName(), this.elapsedTime());
configNodeProcedureEnv.getConfigManager().getPipeManager().getPipeTaskCoordinator().unlock();
pipeTaskInfo = null;
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/PipeTaskOperation.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/PipeTaskOperation.java
index 74a31ac223f..4ce012f3bb1 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/PipeTaskOperation.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/PipeTaskOperation.java
@@ -20,11 +20,21 @@
package org.apache.iotdb.confignode.procedure.impl.pipe;
public enum PipeTaskOperation {
- CREATE_PIPE,
- START_PIPE,
- STOP_PIPE,
- DROP_PIPE,
- HANDLE_LEADER_CHANGE,
- SYNC_PIPE_META,
- HANDLE_PIPE_META_CHANGE
+ CREATE_PIPE("createPipe"),
+ START_PIPE("startPipe"),
+ STOP_PIPE("stopPipe"),
+ DROP_PIPE("dropPipe"),
+ HANDLE_LEADER_CHANGE("handleLeaderChange"),
+ SYNC_PIPE_META("syncPipeMeta"),
+ HANDLE_PIPE_META_CHANGE("handlePipeMetaChange");
+
+ private final String name;
+
+ PipeTaskOperation(String name) {
+ this.name = name;
+ }
+
+ public String getName() {
+ return name;
+ }
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
index d086e53fdd9..41d83abf92c 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
@@ -41,6 +41,7 @@ import org.apache.iotdb.confignode.conf.ConfigNodeConstant;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.conf.SystemPropertiesUtils;
import org.apache.iotdb.confignode.manager.ConfigManager;
+import org.apache.iotdb.confignode.manager.pipe.metric.PipeConfigNodeMetrics;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
import org.apache.iotdb.confignode.rpc.thrift.TNodeVersionInfo;
@@ -267,6 +268,8 @@ public class ConfigNode implements ConfigNodeMBean {
MetricService.getInstance().addMetricSet(ThreadPoolMetrics.getInstance());
initCpuMetrics();
initSystemMetrics();
+ MetricService.getInstance()
+ .addMetricSet(new
PipeConfigNodeMetrics(configManager.getPipeManager()));
}
private void initSystemMetrics() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/commit/PipeEventCommitManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/commit/PipeEventCommitManager.java
index 8305578ba29..8fbdcc702a7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/commit/PipeEventCommitManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/commit/PipeEventCommitManager.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.commit;
import org.apache.iotdb.db.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
+import org.apache.iotdb.db.pipe.metric.PipeEventCommitMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,13 +47,16 @@ public class PipeEventCommitManager {
"Pipe with same name is already registered on this data region,
overwriting: {}",
committerKey);
}
- eventCommitterMap.put(committerKey, new PipeEventCommitter());
+ PipeEventCommitter eventCommitter = new PipeEventCommitter(pipeName,
dataRegionId);
+ eventCommitterMap.put(committerKey, eventCommitter);
+ PipeEventCommitMetrics.getInstance().register(eventCommitter,
committerKey);
LOGGER.info("Pipe committer registered for pipe on data region: {}",
committerKey);
}
public void deregister(String pipeName, int dataRegionId) {
final String committerKey = generateCommitterKey(pipeName, dataRegionId);
eventCommitterMap.remove(committerKey);
+ PipeEventCommitMetrics.getInstance().deregister(committerKey);
LOGGER.info("Pipe committer deregistered for pipe on data region: {}",
committerKey);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/commit/PipeEventCommitter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/commit/PipeEventCommitter.java
index fff2599f099..b41274139a1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/commit/PipeEventCommitter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/commit/PipeEventCommitter.java
@@ -34,6 +34,9 @@ public class PipeEventCommitter {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeEventCommitter.class);
+ private final String pipeName;
+ private final int dataRegionId;
+
private final AtomicLong commitIdGenerator = new AtomicLong(0);
private final AtomicLong lastCommitId = new AtomicLong(0);
@@ -44,8 +47,10 @@ public class PipeEventCommitter {
event ->
Objects.requireNonNull(event, "committable event cannot be
null").getCommitId()));
- PipeEventCommitter() {
- // Do nothing but make it package-private.
+ PipeEventCommitter(String pipeName, int dataRegionId) {
+ // make it package-private
+ this.pipeName = pipeName;
+ this.dataRegionId = dataRegionId;
}
public synchronized long generateCommitId() {
@@ -76,4 +81,18 @@ public class PipeEventCommitter {
commitQueue.poll();
}
}
+
+ //////////////////////////// APIs provided for metric framework
////////////////////////////
+
+ public String getPipeName() {
+ return pipeName;
+ }
+
+ public int getDataRegionId() {
+ return dataRegionId;
+ }
+
+ public long commitQueueSize() {
+ return commitQueue.size();
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
index b51b4284c8c..490bd4de764 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
@@ -501,4 +501,10 @@ public class IoTDBThriftAsyncConnector extends
IoTDBConnector {
tabletBatchBuilder.close();
}
}
+
+ //////////////////////////// APIs provided for metric framework
////////////////////////////
+
+ public int getRetryEventQueueSize() {
+ return retryEventQueue.size();
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeConnectorMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeConnectorMetrics.java
index fff2bcae55a..6ecb7519786 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeConnectorMetrics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeConnectorMetrics.java
@@ -105,6 +105,18 @@ public class PipeConnectorMetrics implements IMetricSet {
String.valueOf(connector.getConnectorIndex()),
Tag.CREATION_TIME.toString(),
String.valueOf(connector.getCreationTime()));
+ // metrics related to IoTDBThriftAsyncConnector
+ metricService.createAutoGauge(
+ Metric.PIPE_ASYNC_CONNECTOR_RETRY_EVENT_QUEUE_SIZE.toString(),
+ MetricLevel.IMPORTANT,
+ connector,
+ PipeConnectorSubtask::getAsyncConnectorRetryEventQueueSize,
+ Tag.NAME.toString(),
+ connector.getAttributeSortedString(),
+ Tag.INDEX.toString(),
+ String.valueOf(connector.getConnectorIndex()),
+ Tag.CREATION_TIME.toString(),
+ String.valueOf(connector.getCreationTime()));
}
private void createRate(String taskID) {
@@ -191,6 +203,16 @@ public class PipeConnectorMetrics implements IMetricSet {
String.valueOf(connector.getConnectorIndex()),
Tag.CREATION_TIME.toString(),
String.valueOf(connector.getCreationTime()));
+ // metrics related to IoTDBThriftAsyncConnector
+ metricService.remove(
+ MetricType.AUTO_GAUGE,
+ Metric.PIPE_ASYNC_CONNECTOR_RETRY_EVENT_QUEUE_SIZE.toString(),
+ Tag.NAME.toString(),
+ connector.getAttributeSortedString(),
+ Tag.INDEX.toString(),
+ String.valueOf(connector.getConnectorIndex()),
+ Tag.CREATION_TIME.toString(),
+ String.valueOf(connector.getCreationTime()));
}
private void removeRate(String taskID) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeMetrics.java
similarity index 82%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeMetrics.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeMetrics.java
index e437f1bd457..1f4ff103a7c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeMetrics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeMetrics.java
@@ -22,7 +22,7 @@ package org.apache.iotdb.db.pipe.metric;
import org.apache.iotdb.metrics.AbstractMetricService;
import org.apache.iotdb.metrics.metricsets.IMetricSet;
-public class PipeMetrics implements IMetricSet {
+public class PipeDataNodeMetrics implements IMetricSet {
//////////////////////////// bindTo & unbindFrom (metric framework)
////////////////////////////
@@ -35,6 +35,7 @@ public class PipeMetrics implements IMetricSet {
PipeHeartbeatEventMetrics.getInstance().bindTo(metricService);
PipeWALInsertNodeCacheMetrics.getInstance().bindTo(metricService);
PipeResourceMetrics.getInstance().bindTo(metricService);
+ PipeEventCommitMetrics.getInstance().bindTo(metricService);
}
@Override
@@ -46,24 +47,25 @@ public class PipeMetrics implements IMetricSet {
PipeHeartbeatEventMetrics.getInstance().unbindFrom(metricService);
PipeWALInsertNodeCacheMetrics.getInstance().unbindFrom(metricService);
PipeResourceMetrics.getInstance().unbindFrom(metricService);
+ PipeEventCommitMetrics.getInstance().unbindFrom(metricService);
}
//////////////////////////// singleton ////////////////////////////
- private static class PipeMetricsHolder {
+ private static class PipeDataNodeMetricsHolder {
- private static final PipeMetrics INSTANCE = new PipeMetrics();
+ private static final PipeDataNodeMetrics INSTANCE = new
PipeDataNodeMetrics();
- private PipeMetricsHolder() {
+ private PipeDataNodeMetricsHolder() {
// empty constructor
}
}
- public static PipeMetrics getInstance() {
- return PipeMetrics.PipeMetricsHolder.INSTANCE;
+ public static PipeDataNodeMetrics getInstance() {
+ return PipeDataNodeMetricsHolder.INSTANCE;
}
- private PipeMetrics() {
+ private PipeDataNodeMetrics() {
// empty constructor
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeEventCommitMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeEventCommitMetrics.java
new file mode 100644
index 00000000000..ffecc65deb2
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeEventCommitMetrics.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.metric;
+
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import org.apache.iotdb.db.pipe.commit.PipeEventCommitter;
+import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
+
+import com.google.common.collect.ImmutableSet;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class PipeEventCommitMetrics implements IMetricSet {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PipeEventCommitMetrics.class);
+
+ private volatile AbstractMetricService metricService;
+
+ private final Map<String, PipeEventCommitter> eventCommitterMap = new
ConcurrentHashMap<>();
+
+ //////////////////////////// bindTo & unbindFrom (metric framework)
////////////////////////////
+
+ @Override
+ public void bindTo(AbstractMetricService metricService) {
+ this.metricService = metricService;
+ ImmutableSet<String> committerKeys =
ImmutableSet.copyOf(eventCommitterMap.keySet());
+ for (String committerKey : committerKeys) {
+ createMetrics(committerKey);
+ }
+ }
+
+ private void createMetrics(String committerKey) {
+ createAutoGauge(committerKey);
+ }
+
+ private void createAutoGauge(String committerKey) {
+ PipeEventCommitter eventCommitter = eventCommitterMap.get(committerKey);
+ metricService.createAutoGauge(
+ Metric.PIPE_EVENT_COMMIT_QUEUE_SIZE.toString(),
+ MetricLevel.IMPORTANT,
+ eventCommitter,
+ PipeEventCommitter::commitQueueSize,
+ Tag.NAME.toString(),
+ String.valueOf(eventCommitter.getPipeName()),
+ Tag.REGION.toString(),
+ String.valueOf(eventCommitter.getDataRegionId()));
+ }
+
+ @Override
+ public void unbindFrom(AbstractMetricService metricService) {
+ ImmutableSet<String> committerKeys =
ImmutableSet.copyOf(eventCommitterMap.keySet());
+ for (String committerKey : committerKeys) {
+ deregister(committerKey);
+ }
+ if (!eventCommitterMap.isEmpty()) {
+ LOGGER.warn("Failed to unbind from pipe event commit metrics, event
committer map not empty");
+ }
+ }
+
+ private void removeMetrics(String committerKey) {
+ removeAutoGauge(committerKey);
+ }
+
+ private void removeAutoGauge(String committerKey) {
+ PipeEventCommitter eventCommitter = eventCommitterMap.get(committerKey);
+ metricService.remove(
+ MetricType.AUTO_GAUGE,
+ Metric.PIPE_EVENT_COMMIT_QUEUE_SIZE.toString(),
+ Tag.NAME.toString(),
+ String.valueOf(eventCommitter.getPipeName()),
+ Tag.REGION.toString(),
+ String.valueOf(eventCommitter.getDataRegionId()));
+ }
+
+ //////////////////////////// register & deregister (pipe integration)
////////////////////////////
+
+ public void register(@NonNull PipeEventCommitter eventCommitter, String
committerKey) {
+ eventCommitterMap.putIfAbsent(committerKey, eventCommitter);
+ if (Objects.nonNull(metricService)) {
+ createMetrics(committerKey);
+ }
+ }
+
+ public void deregister(String committerKey) {
+ if (!eventCommitterMap.containsKey(committerKey)) {
+ LOGGER.warn(
+ "Failed to deregister pipe event commit metrics,
PipeEventCommitter({}) does not exist",
+ committerKey);
+ return;
+ }
+ if (Objects.nonNull(committerKey)) {
+ removeMetrics(committerKey);
+ }
+ eventCommitterMap.remove(committerKey);
+ }
+
+ //////////////////////////// singleton ////////////////////////////
+
+ private static class PipeEventCommitMetricsHolder {
+
+ private static final PipeEventCommitMetrics INSTANCE = new
PipeEventCommitMetrics();
+
+ private PipeEventCommitMetricsHolder() {
+ // empty constructor
+ }
+ }
+
+ public static PipeEventCommitMetrics getInstance() {
+ return PipeEventCommitMetrics.PipeEventCommitMetricsHolder.INSTANCE;
+ }
+
+ private PipeEventCommitMetrics() {
+ // empty constructor
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeWALInsertNodeCacheMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeWALInsertNodeCacheMetrics.java
index 2f246281f82..64c81512cc1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeWALInsertNodeCacheMetrics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeWALInsertNodeCacheMetrics.java
@@ -67,6 +67,20 @@ public class PipeWALInsertNodeCacheMetrics implements
IMetricSet {
WALInsertNodeCache::getCacheHitRate,
Tag.REGION.toString(),
String.valueOf(dataRegionId));
+ metricService.createAutoGauge(
+ Metric.PIPE_WAL_INSERT_NODE_CACHE_HIT_COUNT.toString(),
+ MetricLevel.IMPORTANT,
+ cacheMap.get(dataRegionId),
+ WALInsertNodeCache::getCacheHitCount,
+ Tag.REGION.toString(),
+ String.valueOf(dataRegionId));
+ metricService.createAutoGauge(
+ Metric.PIPE_WAL_INSERT_NODE_CACHE_REQUEST_COUNT.toString(),
+ MetricLevel.IMPORTANT,
+ cacheMap.get(dataRegionId),
+ WALInsertNodeCache::getCacheRequestCount,
+ Tag.REGION.toString(),
+ String.valueOf(dataRegionId));
}
@Override
@@ -90,6 +104,16 @@ public class PipeWALInsertNodeCacheMetrics implements
IMetricSet {
Metric.PIPE_WAL_INSERT_NODE_CACHE_HIT_RATE.toString(),
Tag.REGION.toString(),
String.valueOf(dataRegionId));
+ metricService.remove(
+ MetricType.AUTO_GAUGE,
+ Metric.PIPE_WAL_INSERT_NODE_CACHE_HIT_COUNT.toString(),
+ Tag.REGION.toString(),
+ String.valueOf(dataRegionId));
+ metricService.remove(
+ MetricType.AUTO_GAUGE,
+ Metric.PIPE_WAL_INSERT_NODE_CACHE_REQUEST_COUNT.toString(),
+ Tag.REGION.toString(),
+ String.valueOf(dataRegionId));
}
//////////////////////////// register & deregister (pipe integration)
////////////////////////////
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
index b8acefbada3..51f7aa26b9e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
@@ -364,4 +364,10 @@ public class PipeConnectorSubtask extends PipeSubtask {
public Integer getPipeHeartbeatEventCount() {
return inputPendingQueue.getPipeHeartbeatEventCount();
}
+
+ public Integer getAsyncConnectorRetryEventQueueSize() {
+ return outputPipeConnector instanceof IoTDBThriftAsyncConnector
+ ? ((IoTDBThriftAsyncConnector)
outputPipeConnector).getRetryEventQueueSize()
+ : 0;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java
index f673cdc4a4e..bc7f28c184e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java
@@ -29,7 +29,7 @@ import
org.apache.iotdb.commons.service.metric.JvmGcMonitorMetrics;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.pipe.metric.PipeMetrics;
+import org.apache.iotdb.db.pipe.metric.PipeDataNodeMetrics;
import org.apache.iotdb.db.queryengine.metric.DataExchangeCostMetricSet;
import org.apache.iotdb.db.queryengine.metric.DataExchangeCountMetricSet;
import org.apache.iotdb.db.queryengine.metric.DriverSchedulerMetricSet;
@@ -84,7 +84,7 @@ public class DataNodeMetricsHelper {
MetricService.getInstance().addMetricSet(JvmGcMonitorMetrics.getInstance());
// bind pipe related metrics
- MetricService.getInstance().addMetricSet(PipeMetrics.getInstance());
+
MetricService.getInstance().addMetricSet(PipeDataNodeMetrics.getInstance());
}
private static void initSystemMetrics() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
index b65bd1bb4e3..7cc10aadf25 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
@@ -209,10 +209,20 @@ public class WALInsertNodeCache {
}
}
+ //////////////////////////// APIs provided for metric framework
////////////////////////////
+
public double getCacheHitRate() {
return Objects.nonNull(lruCache) ? lruCache.stats().hitRate() : 0;
}
+ public double getCacheHitCount() {
+ return Objects.nonNull(lruCache) ? lruCache.stats().hitCount() : 0;
+ }
+
+ public double getCacheRequestCount() {
+ return Objects.nonNull(lruCache) ? lruCache.stats().requestCount() : 0;
+ }
+
/////////////////////////// MemTable ///////////////////////////
public void addMemTable(long memTableId) {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaKeeper.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaKeeper.java
index 347ebe78199..86b930b3b04 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaKeeper.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaKeeper.java
@@ -140,4 +140,36 @@ public class PipeMetaKeeper {
public String toString() {
return "PipeMetaKeeper{" + "pipeNameToPipeMetaMap=" +
pipeNameToPipeMetaMap + '}';
}
+
+ //////////////////////////// APIs provided for metric framework
////////////////////////////
+
+ public long runningPipeCount() {
+ return pipeNameToPipeMetaMap.values().stream()
+ .filter(pipeMeta ->
PipeStatus.RUNNING.equals(pipeMeta.getRuntimeMeta().getStatus().get()))
+ .count();
+ }
+
+ public long droppedPipeCount() {
+ return pipeNameToPipeMetaMap.values().stream()
+ .filter(pipeMeta ->
PipeStatus.DROPPED.equals(pipeMeta.getRuntimeMeta().getStatus().get()))
+ .count();
+ }
+
+ public long userStoppedPipeCount() {
+ return pipeNameToPipeMetaMap.values().stream()
+ .filter(
+ pipeMeta ->
+
PipeStatus.STOPPED.equals(pipeMeta.getRuntimeMeta().getStatus().get())
+ &&
!pipeMeta.getRuntimeMeta().getIsStoppedByRuntimeException())
+ .count();
+ }
+
+ public long exceptionStoppedPipeCount() {
+ return pipeNameToPipeMetaMap.values().stream()
+ .filter(
+ pipeMeta ->
+
PipeStatus.STOPPED.equals(pipeMeta.getRuntimeMeta().getStatus().get())
+ &&
pipeMeta.getRuntimeMeta().getIsStoppedByRuntimeException())
+ .count();
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
index a841532f753..f31634dc36b 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
@@ -125,10 +125,16 @@ public enum Metric {
PIPE_CONNECTOR_HEARTBEAT_TRANSFER("pipe_connector_heartbeat_transfer"),
PIPE_HEARTBEAT_EVENT("pipe_heartbeat_event"),
PIPE_WAL_INSERT_NODE_CACHE_HIT_RATE("pipe_wal_insert_node_cache_hit_rate"),
+ PIPE_WAL_INSERT_NODE_CACHE_HIT_COUNT("pipe_wal_insert_node_cache_hit_count"),
+
PIPE_WAL_INSERT_NODE_CACHE_REQUEST_COUNT("pipe_wal_insert_node_cache_request_count"),
PIPE_EXTRACTOR_TSFILE_EPOCH_STATE("pipe_extractor_tsfile_epoch_state"),
PIPE_MEM("pipe_mem"),
PIPE_PINNED_MEMTABLE_COUNT("pipe_pinned_memtable_count"),
PIPE_LINKED_TSFILE_COUNT("pipe_linked_tsfile_count"),
+
PIPE_ASYNC_CONNECTOR_RETRY_EVENT_QUEUE_SIZE("pipe_async_connector_retry_event_queue_size"),
+ PIPE_EVENT_COMMIT_QUEUE_SIZE("pipe_event_commit_queue_size"),
+ PIPE_PROCEDURE("pipe_procedure"),
+ PIPE_TASK_STATUS("pipe_task_status"),
;
final String value;