This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch pipe_metric
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 1646cf1e964df8be9ea1666753162092a236345b
Author: OneSizeFitQuorum <[email protected]>
AuthorDate: Mon Aug 7 20:57:13 2023 +0800

    init
    
    Signed-off-by: OneSizeFitQuorum <[email protected]>
---
 .../iotdb/db/pipe/task/subtask/PipeSubtask.java    |  5 ++
 .../db/service/metrics/DataNodeMetricsHelper.java  |  1 +
 .../iotdb/db/service/metrics/PipeMetrics.java      | 65 ++++++++++++++++++++++
 .../iotdb/commons/service/metric/enums/Metric.java |  2 +
 4 files changed, 73 insertions(+)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
index bdc494e7622..0e580b9e03c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
@@ -23,6 +23,7 @@ import 
org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
 import org.apache.iotdb.db.pipe.event.EnrichedEvent;
 import org.apache.iotdb.db.pipe.execution.scheduler.PipeSubtaskScheduler;
+import org.apache.iotdb.db.service.metrics.PipeMetrics;
 import org.apache.iotdb.db.utils.ErrorHandlingUtils;
 import org.apache.iotdb.pipe.api.event.Event;
 
@@ -43,6 +44,8 @@ public abstract class PipeSubtask
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeSubtask.class);
 
+  private static final PipeMetrics PIPE_METRICS = PipeMetrics.getInstance();
+
   // Used for identifying the subtask
   protected final String taskID;
 
@@ -197,11 +200,13 @@ public abstract class PipeSubtask
   }
 
   protected void releaseLastEvent() {
+    long start = System.nanoTime();
     if (lastEvent != null) {
       if (lastEvent instanceof EnrichedEvent) {
         ((EnrichedEvent) 
lastEvent).decreaseReferenceCount(this.getClass().getName());
       }
       lastEvent = null;
+      PIPE_METRICS.recordEventCost(System.nanoTime() - start);
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java
index ea188be3cf8..087349cf9df 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java
@@ -62,6 +62,7 @@ public class DataNodeMetricsHelper {
     MetricService.getInstance().addMetricSet(new 
DiskMetrics(IoTDBConstant.DN_ROLE));
     MetricService.getInstance().addMetricSet(new 
NetMetrics(IoTDBConstant.DN_ROLE));
     
MetricService.getInstance().addMetricSet(ClientManagerMetrics.getInstance());
+    MetricService.getInstance().addMetricSet(PipeMetrics.getInstance());
     initCpuMetrics();
     initSystemMetrics();
     MetricService.getInstance().addMetricSet(WritingMetrics.getInstance());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/PipeMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/PipeMetrics.java
new file mode 100644
index 00000000000..56108703001
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/PipeMetrics.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.service.metrics;
+
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.impl.DoNothingMetricManager;
+import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.type.Timer;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
+
+import java.util.concurrent.TimeUnit;
+
+public class PipeMetrics implements IMetricSet {
+
+  private static final String EVENT = "event";
+  private Timer eventTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+
+  private PipeMetrics() {}
+
+  public static PipeMetrics getInstance() {
+    return PipeMetricsInstanceHolder.INSTANCE;
+  }
+
+  @Override
+  public void bindTo(AbstractMetricService metricService) {
+    eventTimer =
+        metricService.getOrCreateTimer(
+            Metric.PIPE.toString(), MetricLevel.IMPORTANT, 
Tag.TYPE.toString(), EVENT);
+  }
+
+  @Override
+  public void unbindFrom(AbstractMetricService metricService) {
+    metricService.remove(MetricType.TIMER, Metric.PIPE.toString(), 
Tag.TYPE.toString(), EVENT);
+  }
+
+  public void recordEventCost(long costTimeInNanos) {
+    eventTimer.update(costTimeInNanos, TimeUnit.NANOSECONDS);
+  }
+
+  private static class PipeMetricsInstanceHolder {
+    private static final PipeMetrics INSTANCE = new PipeMetrics();
+
+    private PipeMetricsInstanceHolder() {}
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
index c377ffbbbcd..2e997efcb78 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
@@ -88,6 +88,8 @@ public enum Metric {
   FRAGMENT_INSTANCE_MANAGER("fragment_instance_manager"),
   MEMORY_POOL("memory_pool"),
   LOCAL_EXECUTION_PLANNER("local_execution_planner"),
+  // pipe related
+  PIPE("pipe"),
   // file related
   FILE_SIZE("file_size"),
   FILE_COUNT("file_count");

Reply via email to