This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new ed8595aae03 Pipe: add async connector retry queue / cache hit and 
request count / commit queue metrics & initialize pipe metrics on CN (#11601)
ed8595aae03 is described below

commit ed8595aae03014446a3f56da214ada7dc2045473
Author: V_Galaxy <[email protected]>
AuthorDate: Sat Nov 25 01:47:30 2023 +0800

    Pipe: add async connector retry queue / cache hit and request count / 
commit queue metrics & initialize pipe metrics on CN (#11601)
---
 .../manager/pipe/metric/PipeConfigNodeMetrics.java |  47 +++++++
 .../manager/pipe/metric/PipeProcedureMetrics.java  |  97 ++++++++++++++
 .../manager/pipe/metric/PipeTaskInfoMetrics.java   |  99 +++++++++++++++
 .../manager/pipe/task/PipeTaskCoordinator.java     |  18 +++
 .../confignode/persistence/pipe/PipeTaskInfo.java  |  18 +++
 .../impl/pipe/AbstractOperatePipeProcedureV2.java  |   3 +
 .../procedure/impl/pipe/PipeTaskOperation.java     |  24 +++-
 .../iotdb/confignode/service/ConfigNode.java       |   3 +
 .../db/pipe/commit/PipeEventCommitManager.java     |   6 +-
 .../iotdb/db/pipe/commit/PipeEventCommitter.java   |  23 +++-
 .../thrift/async/IoTDBThriftAsyncConnector.java    |   6 +
 .../iotdb/db/pipe/metric/PipeConnectorMetrics.java |  22 ++++
 .../{PipeMetrics.java => PipeDataNodeMetrics.java} |  16 ++-
 .../db/pipe/metric/PipeEventCommitMetrics.java     | 141 +++++++++++++++++++++
 .../pipe/metric/PipeWALInsertNodeCacheMetrics.java |  24 ++++
 .../subtask/connector/PipeConnectorSubtask.java    |   6 +
 .../db/service/metrics/DataNodeMetricsHelper.java  |   4 +-
 .../dataregion/wal/utils/WALInsertNodeCache.java   |  10 ++
 .../commons/pipe/task/meta/PipeMetaKeeper.java     |  32 +++++
 .../iotdb/commons/service/metric/enums/Metric.java |   6 +
 20 files changed, 586 insertions(+), 19 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeMetrics.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeMetrics.java
new file mode 100644
index 00000000000..e12d3bba9ee
--- /dev/null
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeMetrics.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.pipe.metric;
+
+import org.apache.iotdb.confignode.manager.pipe.PipeManager;
+import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.metricsets.IMetricSet;
+
+public class PipeConfigNodeMetrics implements IMetricSet {
+
+  private final PipeTaskInfoMetrics pipeTaskInfoMetrics;
+
+  public PipeConfigNodeMetrics(PipeManager pipeManager) {
+    this.pipeTaskInfoMetrics = new PipeTaskInfoMetrics(pipeManager);
+  }
+
+  //////////////////////////// bindTo & unbindFrom (metric framework) 
////////////////////////////
+
+  @Override
+  public void bindTo(AbstractMetricService metricService) {
+    PipeProcedureMetrics.getInstance().bindTo(metricService);
+    pipeTaskInfoMetrics.bindTo(metricService);
+  }
+
+  @Override
+  public void unbindFrom(AbstractMetricService metricService) {
+    PipeProcedureMetrics.getInstance().unbindFrom(metricService);
+    pipeTaskInfoMetrics.unbindFrom(metricService);
+  }
+}
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeProcedureMetrics.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeProcedureMetrics.java
new file mode 100644
index 00000000000..021c261eac2
--- /dev/null
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeProcedureMetrics.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.pipe.metric;
+
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import org.apache.iotdb.confignode.procedure.impl.pipe.PipeTaskOperation;
+import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.type.Timer;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+public class PipeProcedureMetrics implements IMetricSet {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeProcedureMetrics.class);
+
+  private final Map<String, Timer> timerMap = new HashMap<>();
+
+  //////////////////////////// bindTo & unbindFrom (metric framework) 
////////////////////////////
+
+  @Override
+  public void bindTo(AbstractMetricService metricService) {
+    Arrays.stream(PipeTaskOperation.values())
+        .forEach(
+            op ->
+                timerMap.put(
+                    op.getName(),
+                    metricService.getOrCreateTimer(
+                        Metric.PIPE_PROCEDURE.toString(),
+                        MetricLevel.IMPORTANT,
+                        Tag.NAME.toString(),
+                        op.getName())));
+  }
+
+  @Override
+  public void unbindFrom(AbstractMetricService metricService) {
+    timerMap.forEach(
+        (name, timer) ->
+            metricService.remove(
+                MetricType.TIMER, Metric.PIPE_PROCEDURE.toString(), 
Tag.NAME.toString(), name));
+  }
+
+  //////////////////////////// pipe integration ////////////////////////////
+
+  public void updateTimer(String name, long durationMillis) {
+    Timer timer = timerMap.get(name);
+    if (timer == null) {
+      LOGGER.warn("Failed to update pipe procedure timer, PipeProcedure({}) 
does not exist", name);
+      return;
+    }
+    timer.updateMillis(durationMillis);
+  }
+
+  //////////////////////////// singleton ////////////////////////////
+
+  private static class PipeProcedureMetricsHolder {
+
+    private static final PipeProcedureMetrics INSTANCE = new 
PipeProcedureMetrics();
+
+    private PipeProcedureMetricsHolder() {
+      // empty constructor
+    }
+  }
+
+  public static PipeProcedureMetrics getInstance() {
+    return PipeProcedureMetrics.PipeProcedureMetricsHolder.INSTANCE;
+  }
+
+  private PipeProcedureMetrics() {
+    // empty constructor
+  }
+}
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeTaskInfoMetrics.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeTaskInfoMetrics.java
new file mode 100644
index 00000000000..e2448e917d0
--- /dev/null
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeTaskInfoMetrics.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.pipe.metric;
+
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import org.apache.iotdb.confignode.manager.pipe.PipeManager;
+import org.apache.iotdb.confignode.manager.pipe.task.PipeTaskCoordinator;
+import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
+
+public class PipeTaskInfoMetrics implements IMetricSet {
+
+  private final PipeManager pipeManager;
+
+  private static final String RUNNING = "running";
+
+  private static final String DROPPED = "dropped";
+
+  private static final String USER_STOPPED = "userStopped";
+
+  private static final String EXCEPTION_STOPPED = "exceptionStopped";
+
+  public PipeTaskInfoMetrics(PipeManager pipeManager) {
+    this.pipeManager = pipeManager;
+  }
+
+  //////////////////////////// bindTo & unbindFrom (metric framework) 
////////////////////////////
+
+  @Override
+  public void bindTo(AbstractMetricService metricService) {
+    PipeTaskCoordinator coordinator = pipeManager.getPipeTaskCoordinator();
+    metricService.createAutoGauge(
+        Metric.PIPE_TASK_STATUS.toString(),
+        MetricLevel.IMPORTANT,
+        coordinator,
+        PipeTaskCoordinator::runningPipeCount,
+        Tag.STATUS.toString(),
+        RUNNING);
+    metricService.createAutoGauge(
+        Metric.PIPE_TASK_STATUS.toString(),
+        MetricLevel.IMPORTANT,
+        coordinator,
+        PipeTaskCoordinator::droppedPipeCount,
+        Tag.STATUS.toString(),
+        DROPPED);
+    metricService.createAutoGauge(
+        Metric.PIPE_TASK_STATUS.toString(),
+        MetricLevel.IMPORTANT,
+        coordinator,
+        PipeTaskCoordinator::userStoppedPipeCount,
+        Tag.STATUS.toString(),
+        USER_STOPPED);
+    metricService.createAutoGauge(
+        Metric.PIPE_TASK_STATUS.toString(),
+        MetricLevel.IMPORTANT,
+        coordinator,
+        PipeTaskCoordinator::exceptionStoppedPipeCount,
+        Tag.STATUS.toString(),
+        EXCEPTION_STOPPED);
+  }
+
+  @Override
+  public void unbindFrom(AbstractMetricService metricService) {
+    metricService.remove(
+        MetricType.AUTO_GAUGE, Metric.PIPE_TASK_STATUS.toString(), 
Tag.STATUS.toString(), RUNNING);
+    metricService.remove(
+        MetricType.AUTO_GAUGE, Metric.PIPE_TASK_STATUS.toString(), 
Tag.STATUS.toString(), DROPPED);
+    metricService.remove(
+        MetricType.AUTO_GAUGE,
+        Metric.PIPE_TASK_STATUS.toString(),
+        Tag.STATUS.toString(),
+        USER_STOPPED);
+    metricService.remove(
+        MetricType.AUTO_GAUGE,
+        Metric.PIPE_TASK_STATUS.toString(),
+        Tag.STATUS.toString(),
+        EXCEPTION_STOPPED);
+  }
+}
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeTaskCoordinator.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeTaskCoordinator.java
index 062b626be14..6f4d03a5f60 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeTaskCoordinator.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeTaskCoordinator.java
@@ -179,4 +179,22 @@ public class PipeTaskCoordinator {
   public boolean canSkipNextSync() {
     return pipeTaskInfo.canSkipNextSync();
   }
+
+  //////////////////////////// APIs provided for metric framework 
////////////////////////////
+
+  public long runningPipeCount() {
+    return pipeTaskInfo.runningPipeCount();
+  }
+
+  public long droppedPipeCount() {
+    return pipeTaskInfo.droppedPipeCount();
+  }
+
+  public long userStoppedPipeCount() {
+    return pipeTaskInfo.userStoppedPipeCount();
+  }
+
+  public long exceptionStoppedPipeCount() {
+    return pipeTaskInfo.exceptionStoppedPipeCount();
+  }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index cd2c663727c..f6e0003695b 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -722,4 +722,22 @@ public class PipeTaskInfo implements SnapshotProcessor {
   public String toString() {
     return pipeMetaKeeper.toString();
   }
+
+  //////////////////////////// APIs provided for metric framework 
////////////////////////////
+
+  public long runningPipeCount() {
+    return pipeMetaKeeper.runningPipeCount();
+  }
+
+  public long droppedPipeCount() {
+    return pipeMetaKeeper.droppedPipeCount();
+  }
+
+  public long userStoppedPipeCount() {
+    return pipeMetaKeeper.userStoppedPipeCount();
+  }
+
+  public long exceptionStoppedPipeCount() {
+    return pipeMetaKeeper.exceptionStoppedPipeCount();
+  }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
index 841aff6c905..6fb364c6466 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.confignode.procedure.impl.pipe;
 
 import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
+import org.apache.iotdb.confignode.manager.pipe.metric.PipeProcedureMetrics;
 import org.apache.iotdb.confignode.persistence.pipe.PipeTaskInfo;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
@@ -149,6 +150,8 @@ public abstract class AbstractOperatePipeProcedureV2
             .getPipeTaskCoordinator()
             .updateLastSyncedVersion();
       }
+      PipeProcedureMetrics.getInstance()
+          .updateTimer(this.getOperation().getName(), this.elapsedTime());
       
configNodeProcedureEnv.getConfigManager().getPipeManager().getPipeTaskCoordinator().unlock();
       pipeTaskInfo = null;
     }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/PipeTaskOperation.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/PipeTaskOperation.java
index 74a31ac223f..4ce012f3bb1 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/PipeTaskOperation.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/PipeTaskOperation.java
@@ -20,11 +20,21 @@
 package org.apache.iotdb.confignode.procedure.impl.pipe;
 
 public enum PipeTaskOperation {
-  CREATE_PIPE,
-  START_PIPE,
-  STOP_PIPE,
-  DROP_PIPE,
-  HANDLE_LEADER_CHANGE,
-  SYNC_PIPE_META,
-  HANDLE_PIPE_META_CHANGE
+  CREATE_PIPE("createPipe"),
+  START_PIPE("startPipe"),
+  STOP_PIPE("stopPipe"),
+  DROP_PIPE("dropPipe"),
+  HANDLE_LEADER_CHANGE("handleLeaderChange"),
+  SYNC_PIPE_META("syncPipeMeta"),
+  HANDLE_PIPE_META_CHANGE("handlePipeMetaChange");
+
+  private final String name;
+
+  PipeTaskOperation(String name) {
+    this.name = name;
+  }
+
+  public String getName() {
+    return name;
+  }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
index d086e53fdd9..41d83abf92c 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
@@ -41,6 +41,7 @@ import org.apache.iotdb.confignode.conf.ConfigNodeConstant;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.conf.SystemPropertiesUtils;
 import org.apache.iotdb.confignode.manager.ConfigManager;
+import org.apache.iotdb.confignode.manager.pipe.metric.PipeConfigNodeMetrics;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
 import org.apache.iotdb.confignode.rpc.thrift.TNodeVersionInfo;
@@ -267,6 +268,8 @@ public class ConfigNode implements ConfigNodeMBean {
     MetricService.getInstance().addMetricSet(ThreadPoolMetrics.getInstance());
     initCpuMetrics();
     initSystemMetrics();
+    MetricService.getInstance()
+        .addMetricSet(new 
PipeConfigNodeMetrics(configManager.getPipeManager()));
   }
 
   private void initSystemMetrics() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/commit/PipeEventCommitManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/commit/PipeEventCommitManager.java
index 8305578ba29..8fbdcc702a7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/commit/PipeEventCommitManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/commit/PipeEventCommitManager.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.commit;
 
 import org.apache.iotdb.db.pipe.event.EnrichedEvent;
 import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
+import org.apache.iotdb.db.pipe.metric.PipeEventCommitMetrics;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -46,13 +47,16 @@ public class PipeEventCommitManager {
           "Pipe with same name is already registered on this data region, 
overwriting: {}",
           committerKey);
     }
-    eventCommitterMap.put(committerKey, new PipeEventCommitter());
+    PipeEventCommitter eventCommitter = new PipeEventCommitter(pipeName, 
dataRegionId);
+    eventCommitterMap.put(committerKey, eventCommitter);
+    PipeEventCommitMetrics.getInstance().register(eventCommitter, 
committerKey);
     LOGGER.info("Pipe committer registered for pipe on data region: {}", 
committerKey);
   }
 
   public void deregister(String pipeName, int dataRegionId) {
     final String committerKey = generateCommitterKey(pipeName, dataRegionId);
     eventCommitterMap.remove(committerKey);
+    PipeEventCommitMetrics.getInstance().deregister(committerKey);
     LOGGER.info("Pipe committer deregistered for pipe on data region: {}", 
committerKey);
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/commit/PipeEventCommitter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/commit/PipeEventCommitter.java
index fff2599f099..b41274139a1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/commit/PipeEventCommitter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/commit/PipeEventCommitter.java
@@ -34,6 +34,9 @@ public class PipeEventCommitter {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeEventCommitter.class);
 
+  private final String pipeName;
+  private final int dataRegionId;
+
   private final AtomicLong commitIdGenerator = new AtomicLong(0);
   private final AtomicLong lastCommitId = new AtomicLong(0);
 
@@ -44,8 +47,10 @@ public class PipeEventCommitter {
               event ->
                   Objects.requireNonNull(event, "committable event cannot be 
null").getCommitId()));
 
-  PipeEventCommitter() {
-    // Do nothing but make it package-private.
+  PipeEventCommitter(String pipeName, int dataRegionId) {
+    // make it package-private
+    this.pipeName = pipeName;
+    this.dataRegionId = dataRegionId;
   }
 
   public synchronized long generateCommitId() {
@@ -76,4 +81,18 @@ public class PipeEventCommitter {
       commitQueue.poll();
     }
   }
+
+  //////////////////////////// APIs provided for metric framework 
////////////////////////////
+
+  public String getPipeName() {
+    return pipeName;
+  }
+
+  public int getDataRegionId() {
+    return dataRegionId;
+  }
+
+  public long commitQueueSize() {
+    return commitQueue.size();
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
index b51b4284c8c..490bd4de764 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
@@ -501,4 +501,10 @@ public class IoTDBThriftAsyncConnector extends 
IoTDBConnector {
       tabletBatchBuilder.close();
     }
   }
+
+  //////////////////////////// APIs provided for metric framework 
////////////////////////////
+
+  public int getRetryEventQueueSize() {
+    return retryEventQueue.size();
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeConnectorMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeConnectorMetrics.java
index fff2bcae55a..6ecb7519786 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeConnectorMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeConnectorMetrics.java
@@ -105,6 +105,18 @@ public class PipeConnectorMetrics implements IMetricSet {
         String.valueOf(connector.getConnectorIndex()),
         Tag.CREATION_TIME.toString(),
         String.valueOf(connector.getCreationTime()));
+    // metrics related to IoTDBThriftAsyncConnector
+    metricService.createAutoGauge(
+        Metric.PIPE_ASYNC_CONNECTOR_RETRY_EVENT_QUEUE_SIZE.toString(),
+        MetricLevel.IMPORTANT,
+        connector,
+        PipeConnectorSubtask::getAsyncConnectorRetryEventQueueSize,
+        Tag.NAME.toString(),
+        connector.getAttributeSortedString(),
+        Tag.INDEX.toString(),
+        String.valueOf(connector.getConnectorIndex()),
+        Tag.CREATION_TIME.toString(),
+        String.valueOf(connector.getCreationTime()));
   }
 
   private void createRate(String taskID) {
@@ -191,6 +203,16 @@ public class PipeConnectorMetrics implements IMetricSet {
         String.valueOf(connector.getConnectorIndex()),
         Tag.CREATION_TIME.toString(),
         String.valueOf(connector.getCreationTime()));
+    // metrics related to IoTDBThriftAsyncConnector
+    metricService.remove(
+        MetricType.AUTO_GAUGE,
+        Metric.PIPE_ASYNC_CONNECTOR_RETRY_EVENT_QUEUE_SIZE.toString(),
+        Tag.NAME.toString(),
+        connector.getAttributeSortedString(),
+        Tag.INDEX.toString(),
+        String.valueOf(connector.getConnectorIndex()),
+        Tag.CREATION_TIME.toString(),
+        String.valueOf(connector.getCreationTime()));
   }
 
   private void removeRate(String taskID) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeMetrics.java
similarity index 82%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeMetrics.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeMetrics.java
index e437f1bd457..1f4ff103a7c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeMetrics.java
@@ -22,7 +22,7 @@ package org.apache.iotdb.db.pipe.metric;
 import org.apache.iotdb.metrics.AbstractMetricService;
 import org.apache.iotdb.metrics.metricsets.IMetricSet;
 
-public class PipeMetrics implements IMetricSet {
+public class PipeDataNodeMetrics implements IMetricSet {
 
   //////////////////////////// bindTo & unbindFrom (metric framework) 
////////////////////////////
 
@@ -35,6 +35,7 @@ public class PipeMetrics implements IMetricSet {
     PipeHeartbeatEventMetrics.getInstance().bindTo(metricService);
     PipeWALInsertNodeCacheMetrics.getInstance().bindTo(metricService);
     PipeResourceMetrics.getInstance().bindTo(metricService);
+    PipeEventCommitMetrics.getInstance().bindTo(metricService);
   }
 
   @Override
@@ -46,24 +47,25 @@ public class PipeMetrics implements IMetricSet {
     PipeHeartbeatEventMetrics.getInstance().unbindFrom(metricService);
     PipeWALInsertNodeCacheMetrics.getInstance().unbindFrom(metricService);
     PipeResourceMetrics.getInstance().unbindFrom(metricService);
+    PipeEventCommitMetrics.getInstance().unbindFrom(metricService);
   }
 
   //////////////////////////// singleton ////////////////////////////
 
-  private static class PipeMetricsHolder {
+  private static class PipeDataNodeMetricsHolder {
 
-    private static final PipeMetrics INSTANCE = new PipeMetrics();
+    private static final PipeDataNodeMetrics INSTANCE = new 
PipeDataNodeMetrics();
 
-    private PipeMetricsHolder() {
+    private PipeDataNodeMetricsHolder() {
       // empty constructor
     }
   }
 
-  public static PipeMetrics getInstance() {
-    return PipeMetrics.PipeMetricsHolder.INSTANCE;
+  public static PipeDataNodeMetrics getInstance() {
+    return PipeDataNodeMetricsHolder.INSTANCE;
   }
 
-  private PipeMetrics() {
+  private PipeDataNodeMetrics() {
     // empty constructor
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeEventCommitMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeEventCommitMetrics.java
new file mode 100644
index 00000000000..ffecc65deb2
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeEventCommitMetrics.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.metric;
+
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import org.apache.iotdb.db.pipe.commit.PipeEventCommitter;
+import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
+
+import com.google.common.collect.ImmutableSet;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class PipeEventCommitMetrics implements IMetricSet {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeEventCommitMetrics.class);
+
+  private volatile AbstractMetricService metricService;
+
+  private final Map<String, PipeEventCommitter> eventCommitterMap = new 
ConcurrentHashMap<>();
+
+  //////////////////////////// bindTo & unbindFrom (metric framework) 
////////////////////////////
+
+  @Override
+  public void bindTo(AbstractMetricService metricService) {
+    this.metricService = metricService;
+    ImmutableSet<String> committerKeys = 
ImmutableSet.copyOf(eventCommitterMap.keySet());
+    for (String committerKey : committerKeys) {
+      createMetrics(committerKey);
+    }
+  }
+
+  private void createMetrics(String committerKey) {
+    createAutoGauge(committerKey);
+  }
+
+  private void createAutoGauge(String committerKey) {
+    PipeEventCommitter eventCommitter = eventCommitterMap.get(committerKey);
+    metricService.createAutoGauge(
+        Metric.PIPE_EVENT_COMMIT_QUEUE_SIZE.toString(),
+        MetricLevel.IMPORTANT,
+        eventCommitter,
+        PipeEventCommitter::commitQueueSize,
+        Tag.NAME.toString(),
+        String.valueOf(eventCommitter.getPipeName()),
+        Tag.REGION.toString(),
+        String.valueOf(eventCommitter.getDataRegionId()));
+  }
+
+  @Override
+  public void unbindFrom(AbstractMetricService metricService) {
+    ImmutableSet<String> committerKeys = 
ImmutableSet.copyOf(eventCommitterMap.keySet());
+    for (String committerKey : committerKeys) {
+      deregister(committerKey);
+    }
+    if (!eventCommitterMap.isEmpty()) {
+      LOGGER.warn("Failed to unbind from pipe event commit metrics, event 
committer map not empty");
+    }
+  }
+
+  private void removeMetrics(String committerKey) {
+    removeAutoGauge(committerKey);
+  }
+
+  private void removeAutoGauge(String committerKey) {
+    PipeEventCommitter eventCommitter = eventCommitterMap.get(committerKey);
+    metricService.remove(
+        MetricType.AUTO_GAUGE,
+        Metric.PIPE_EVENT_COMMIT_QUEUE_SIZE.toString(),
+        Tag.NAME.toString(),
+        String.valueOf(eventCommitter.getPipeName()),
+        Tag.REGION.toString(),
+        String.valueOf(eventCommitter.getDataRegionId()));
+  }
+
+  //////////////////////////// register & deregister (pipe integration) 
////////////////////////////
+
+  public void register(@NonNull PipeEventCommitter eventCommitter, String 
committerKey) {
+    eventCommitterMap.putIfAbsent(committerKey, eventCommitter);
+    if (Objects.nonNull(metricService)) {
+      createMetrics(committerKey);
+    }
+  }
+
+  public void deregister(String committerKey) {
+    if (!eventCommitterMap.containsKey(committerKey)) {
+      LOGGER.warn(
+          "Failed to deregister pipe event commit metrics, 
PipeEventCommitter({}) does not exist",
+          committerKey);
+      return;
+    }
+    if (Objects.nonNull(committerKey)) {
+      removeMetrics(committerKey);
+    }
+    eventCommitterMap.remove(committerKey);
+  }
+
+  //////////////////////////// singleton ////////////////////////////
+
+  private static class PipeEventCommitMetricsHolder {
+
+    private static final PipeEventCommitMetrics INSTANCE = new 
PipeEventCommitMetrics();
+
+    private PipeEventCommitMetricsHolder() {
+      // empty constructor
+    }
+  }
+
+  public static PipeEventCommitMetrics getInstance() {
+    return PipeEventCommitMetrics.PipeEventCommitMetricsHolder.INSTANCE;
+  }
+
+  private PipeEventCommitMetrics() {
+    // empty constructor
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeWALInsertNodeCacheMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeWALInsertNodeCacheMetrics.java
index 2f246281f82..64c81512cc1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeWALInsertNodeCacheMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeWALInsertNodeCacheMetrics.java
@@ -67,6 +67,20 @@ public class PipeWALInsertNodeCacheMetrics implements 
IMetricSet {
         WALInsertNodeCache::getCacheHitRate,
         Tag.REGION.toString(),
         String.valueOf(dataRegionId));
+    metricService.createAutoGauge(
+        Metric.PIPE_WAL_INSERT_NODE_CACHE_HIT_COUNT.toString(),
+        MetricLevel.IMPORTANT,
+        cacheMap.get(dataRegionId),
+        WALInsertNodeCache::getCacheHitCount,
+        Tag.REGION.toString(),
+        String.valueOf(dataRegionId));
+    metricService.createAutoGauge(
+        Metric.PIPE_WAL_INSERT_NODE_CACHE_REQUEST_COUNT.toString(),
+        MetricLevel.IMPORTANT,
+        cacheMap.get(dataRegionId),
+        WALInsertNodeCache::getCacheRequestCount,
+        Tag.REGION.toString(),
+        String.valueOf(dataRegionId));
   }
 
   @Override
@@ -90,6 +104,16 @@ public class PipeWALInsertNodeCacheMetrics implements 
IMetricSet {
         Metric.PIPE_WAL_INSERT_NODE_CACHE_HIT_RATE.toString(),
         Tag.REGION.toString(),
         String.valueOf(dataRegionId));
+    metricService.remove(
+        MetricType.AUTO_GAUGE,
+        Metric.PIPE_WAL_INSERT_NODE_CACHE_HIT_COUNT.toString(),
+        Tag.REGION.toString(),
+        String.valueOf(dataRegionId));
+    metricService.remove(
+        MetricType.AUTO_GAUGE,
+        Metric.PIPE_WAL_INSERT_NODE_CACHE_REQUEST_COUNT.toString(),
+        Tag.REGION.toString(),
+        String.valueOf(dataRegionId));
   }
 
   //////////////////////////// register & deregister (pipe integration) 
////////////////////////////
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
index b8acefbada3..51f7aa26b9e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
@@ -364,4 +364,10 @@ public class PipeConnectorSubtask extends PipeSubtask {
   public Integer getPipeHeartbeatEventCount() {
     return inputPendingQueue.getPipeHeartbeatEventCount();
   }
+
+  public Integer getAsyncConnectorRetryEventQueueSize() {
+    return outputPipeConnector instanceof IoTDBThriftAsyncConnector
+        ? ((IoTDBThriftAsyncConnector) 
outputPipeConnector).getRetryEventQueueSize()
+        : 0;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java
index f673cdc4a4e..bc7f28c184e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java
@@ -29,7 +29,7 @@ import 
org.apache.iotdb.commons.service.metric.JvmGcMonitorMetrics;
 import org.apache.iotdb.commons.service.metric.MetricService;
 import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.pipe.metric.PipeMetrics;
+import org.apache.iotdb.db.pipe.metric.PipeDataNodeMetrics;
 import org.apache.iotdb.db.queryengine.metric.DataExchangeCostMetricSet;
 import org.apache.iotdb.db.queryengine.metric.DataExchangeCountMetricSet;
 import org.apache.iotdb.db.queryengine.metric.DriverSchedulerMetricSet;
@@ -84,7 +84,7 @@ public class DataNodeMetricsHelper {
     
MetricService.getInstance().addMetricSet(JvmGcMonitorMetrics.getInstance());
 
     // bind pipe related metrics
-    MetricService.getInstance().addMetricSet(PipeMetrics.getInstance());
+    
MetricService.getInstance().addMetricSet(PipeDataNodeMetrics.getInstance());
   }
 
   private static void initSystemMetrics() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
index b65bd1bb4e3..7cc10aadf25 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
@@ -209,10 +209,20 @@ public class WALInsertNodeCache {
     }
   }
 
+  //////////////////////////// APIs provided for metric framework 
////////////////////////////
+
   public double getCacheHitRate() {
     return Objects.nonNull(lruCache) ? lruCache.stats().hitRate() : 0;
   }
 
+  public double getCacheHitCount() {
+    return Objects.nonNull(lruCache) ? lruCache.stats().hitCount() : 0;
+  }
+
+  public double getCacheRequestCount() {
+    return Objects.nonNull(lruCache) ? lruCache.stats().requestCount() : 0;
+  }
+
   /////////////////////////// MemTable ///////////////////////////
 
   public void addMemTable(long memTableId) {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaKeeper.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaKeeper.java
index 347ebe78199..86b930b3b04 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaKeeper.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaKeeper.java
@@ -140,4 +140,36 @@ public class PipeMetaKeeper {
   public String toString() {
     return "PipeMetaKeeper{" + "pipeNameToPipeMetaMap=" + 
pipeNameToPipeMetaMap + '}';
   }
+
+  //////////////////////////// APIs provided for metric framework 
////////////////////////////
+
+  public long runningPipeCount() {
+    return pipeNameToPipeMetaMap.values().stream()
+        .filter(pipeMeta -> 
PipeStatus.RUNNING.equals(pipeMeta.getRuntimeMeta().getStatus().get()))
+        .count();
+  }
+
+  public long droppedPipeCount() {
+    return pipeNameToPipeMetaMap.values().stream()
+        .filter(pipeMeta -> 
PipeStatus.DROPPED.equals(pipeMeta.getRuntimeMeta().getStatus().get()))
+        .count();
+  }
+
+  public long userStoppedPipeCount() {
+    return pipeNameToPipeMetaMap.values().stream()
+        .filter(
+            pipeMeta ->
+                
PipeStatus.STOPPED.equals(pipeMeta.getRuntimeMeta().getStatus().get())
+                    && 
!pipeMeta.getRuntimeMeta().getIsStoppedByRuntimeException())
+        .count();
+  }
+
+  public long exceptionStoppedPipeCount() {
+    return pipeNameToPipeMetaMap.values().stream()
+        .filter(
+            pipeMeta ->
+                
PipeStatus.STOPPED.equals(pipeMeta.getRuntimeMeta().getStatus().get())
+                    && 
pipeMeta.getRuntimeMeta().getIsStoppedByRuntimeException())
+        .count();
+  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
index a841532f753..f31634dc36b 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
@@ -125,10 +125,16 @@ public enum Metric {
   PIPE_CONNECTOR_HEARTBEAT_TRANSFER("pipe_connector_heartbeat_transfer"),
   PIPE_HEARTBEAT_EVENT("pipe_heartbeat_event"),
   PIPE_WAL_INSERT_NODE_CACHE_HIT_RATE("pipe_wal_insert_node_cache_hit_rate"),
+  PIPE_WAL_INSERT_NODE_CACHE_HIT_COUNT("pipe_wal_insert_node_cache_hit_count"),
+  
PIPE_WAL_INSERT_NODE_CACHE_REQUEST_COUNT("pipe_wal_insert_node_cache_request_count"),
   PIPE_EXTRACTOR_TSFILE_EPOCH_STATE("pipe_extractor_tsfile_epoch_state"),
   PIPE_MEM("pipe_mem"),
   PIPE_PINNED_MEMTABLE_COUNT("pipe_pinned_memtable_count"),
   PIPE_LINKED_TSFILE_COUNT("pipe_linked_tsfile_count"),
+  
PIPE_ASYNC_CONNECTOR_RETRY_EVENT_QUEUE_SIZE("pipe_async_connector_retry_event_queue_size"),
+  PIPE_EVENT_COMMIT_QUEUE_SIZE("pipe_event_commit_queue_size"),
+  PIPE_PROCEDURE("pipe_procedure"),
+  PIPE_TASK_STATUS("pipe_task_status"),
   ;
 
   final String value;


Reply via email to