This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 08b3772a798 Pipe: check metric service before marking pipe metrics to 
avoid useless warn logs (#12303)
08b3772a798 is described below

commit 08b3772a798ad1c6aa8ad969f8351e1d7057e6a8
Author: V_Galaxy <[email protected]>
AuthorDate: Mon Apr 8 20:23:29 2024 +0800

    Pipe: check metric service before marking pipe metrics to avoid useless 
warn logs (#12303)
---
 .../iotdb/db/pipe/metric/PipeConnectorMetrics.java | 46 ++++++++++----------
 .../iotdb/db/pipe/metric/PipeExtractorMetrics.java | 49 ++++++++++++----------
 .../iotdb/db/pipe/metric/PipeProcessorMetrics.java | 46 ++++++++++----------
 3 files changed, 78 insertions(+), 63 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeConnectorMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeConnectorMetrics.java
index 6ecb7519786..caaeecf5e05 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeConnectorMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeConnectorMetrics.java
@@ -42,7 +42,7 @@ public class PipeConnectorMetrics implements IMetricSet {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeConnectorMetrics.class);
 
-  private AbstractMetricService metricService;
+  private volatile AbstractMetricService metricService;
 
   private final Map<String, PipeConnectorSubtask> connectorMap = new 
HashMap<>();
 
@@ -57,10 +57,9 @@ public class PipeConnectorMetrics implements IMetricSet {
   @Override
   public void bindTo(AbstractMetricService metricService) {
     this.metricService = metricService;
-    synchronized (this) {
-      for (String taskID : connectorMap.keySet()) {
-        createMetrics(taskID);
-      }
+    ImmutableSet<String> taskIDs = ImmutableSet.copyOf(connectorMap.keySet());
+    for (String taskID : taskIDs) {
+      createMetrics(taskID);
     }
   }
 
@@ -254,30 +253,29 @@ public class PipeConnectorMetrics implements IMetricSet {
 
   public void register(@NonNull PipeConnectorSubtask pipeConnectorSubtask) {
     String taskID = pipeConnectorSubtask.getTaskID();
-    synchronized (this) {
-      connectorMap.putIfAbsent(taskID, pipeConnectorSubtask);
-      if (Objects.nonNull(metricService)) {
-        createMetrics(taskID);
-      }
+    connectorMap.putIfAbsent(taskID, pipeConnectorSubtask);
+    if (Objects.nonNull(metricService)) {
+      createMetrics(taskID);
     }
   }
 
   public void deregister(String taskID) {
-    synchronized (this) {
-      if (!connectorMap.containsKey(taskID)) {
-        LOGGER.warn(
-            "Failed to deregister pipe connector metrics, 
PipeConnectorSubtask({}) does not exist",
-            taskID);
-        return;
-      }
-      if (Objects.nonNull(metricService)) {
-        removeMetrics(taskID);
-      }
-      connectorMap.remove(taskID);
+    if (!connectorMap.containsKey(taskID)) {
+      LOGGER.warn(
+          "Failed to deregister pipe connector metrics, 
PipeConnectorSubtask({}) does not exist",
+          taskID);
+      return;
+    }
+    if (Objects.nonNull(metricService)) {
+      removeMetrics(taskID);
     }
+    connectorMap.remove(taskID);
   }
 
   public void markTabletEvent(String taskID) {
+    if (Objects.isNull(metricService)) {
+      return;
+    }
     Rate rate = tabletRateMap.get(taskID);
     if (rate == null) {
       LOGGER.warn(
@@ -289,6 +287,9 @@ public class PipeConnectorMetrics implements IMetricSet {
   }
 
   public void markTsFileEvent(String taskID) {
+    if (Objects.isNull(metricService)) {
+      return;
+    }
     Rate rate = tsFileRateMap.get(taskID);
     if (rate == null) {
       LOGGER.warn(
@@ -300,6 +301,9 @@ public class PipeConnectorMetrics implements IMetricSet {
   }
 
   public void markPipeHeartbeatEvent(String taskID) {
+    if (Objects.isNull(metricService)) {
+      return;
+    }
     Rate rate = pipeHeartbeatRateMap.get(taskID);
     if (rate == null) {
       LOGGER.warn(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeExtractorMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeExtractorMetrics.java
index e3e9cceb8a9..310300453f7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeExtractorMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeExtractorMetrics.java
@@ -43,7 +43,7 @@ public class PipeExtractorMetrics implements IMetricSet {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeExtractorMetrics.class);
 
-  private AbstractMetricService metricService;
+  private volatile AbstractMetricService metricService;
 
   private final Map<String, IoTDBDataRegionExtractor> extractorMap = new 
ConcurrentHashMap<>();
 
@@ -64,10 +64,9 @@ public class PipeExtractorMetrics implements IMetricSet {
   @Override
   public void bindTo(AbstractMetricService metricService) {
     this.metricService = metricService;
-    synchronized (this) {
-      for (String taskID : extractorMap.keySet()) {
-        createMetrics(taskID);
-      }
+    ImmutableSet<String> taskIDs = ImmutableSet.copyOf(extractorMap.keySet());
+    for (String taskID : taskIDs) {
+      createMetrics(taskID);
     }
   }
 
@@ -291,30 +290,29 @@ public class PipeExtractorMetrics implements IMetricSet {
 
   public void register(@NonNull IoTDBDataRegionExtractor extractor) {
     String taskID = extractor.getTaskID();
-    synchronized (this) {
-      extractorMap.putIfAbsent(taskID, extractor);
-      if (Objects.nonNull(metricService)) {
-        createMetrics(taskID);
-      }
+    extractorMap.putIfAbsent(taskID, extractor);
+    if (Objects.nonNull(metricService)) {
+      createMetrics(taskID);
     }
   }
 
   public void deregister(String taskID) {
-    synchronized (this) {
-      if (!extractorMap.containsKey(taskID)) {
-        LOGGER.warn(
-            "Failed to deregister pipe extractor metrics, 
IoTDBDataRegionExtractor({}) does not exist",
-            taskID);
-        return;
-      }
-      if (Objects.nonNull(metricService)) {
-        removeMetrics(taskID);
-      }
-      extractorMap.remove(taskID);
+    if (!extractorMap.containsKey(taskID)) {
+      LOGGER.warn(
+          "Failed to deregister pipe extractor metrics, 
IoTDBDataRegionExtractor({}) does not exist",
+          taskID);
+      return;
+    }
+    if (Objects.nonNull(metricService)) {
+      removeMetrics(taskID);
     }
+    extractorMap.remove(taskID);
   }
 
   public void markTabletEvent(String taskID) {
+    if (Objects.isNull(metricService)) {
+      return;
+    }
     Rate rate = tabletRateMap.get(taskID);
     if (rate == null) {
       LOGGER.warn(
@@ -326,6 +324,9 @@ public class PipeExtractorMetrics implements IMetricSet {
   }
 
   public void markTsFileEvent(String taskID) {
+    if (Objects.isNull(metricService)) {
+      return;
+    }
     Rate rate = tsFileRateMap.get(taskID);
     if (rate == null) {
       LOGGER.warn(
@@ -337,6 +338,9 @@ public class PipeExtractorMetrics implements IMetricSet {
   }
 
   public void markPipeHeartbeatEvent(String taskID) {
+    if (Objects.isNull(metricService)) {
+      return;
+    }
     Rate rate = pipeHeartbeatRateMap.get(taskID);
     if (rate == null) {
       LOGGER.warn(
@@ -348,6 +352,9 @@ public class PipeExtractorMetrics implements IMetricSet {
   }
 
   public void setRecentProcessedTsFileEpochState(String taskID, 
TsFileEpoch.State state) {
+    if (Objects.isNull(metricService)) {
+      return;
+    }
     Gauge gauge = recentProcessedTsFileEpochStateMap.get(taskID);
     if (gauge == null) {
       LOGGER.warn(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeProcessorMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeProcessorMetrics.java
index 291c3529d8b..711c258fd8b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeProcessorMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeProcessorMetrics.java
@@ -42,7 +42,7 @@ public class PipeProcessorMetrics implements IMetricSet {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeProcessorMetrics.class);
 
-  private AbstractMetricService metricService;
+  private volatile AbstractMetricService metricService;
 
   private final Map<String, PipeProcessorSubtask> processorMap = new 
HashMap<>();
 
@@ -57,10 +57,9 @@ public class PipeProcessorMetrics implements IMetricSet {
   @Override
   public void bindTo(AbstractMetricService metricService) {
     this.metricService = metricService;
-    synchronized (this) {
-      for (String taskID : processorMap.keySet()) {
-        createMetrics(taskID);
-      }
+    ImmutableSet<String> taskIDs = ImmutableSet.copyOf(processorMap.keySet());
+    for (String taskID : taskIDs) {
+      createMetrics(taskID);
     }
   }
 
@@ -232,30 +231,29 @@ public class PipeProcessorMetrics implements IMetricSet {
 
   public void register(@NonNull PipeProcessorSubtask pipeProcessorSubtask) {
     String taskID = pipeProcessorSubtask.getTaskID();
-    synchronized (this) {
-      processorMap.putIfAbsent(taskID, pipeProcessorSubtask);
-      if (Objects.nonNull(metricService)) {
-        createMetrics(taskID);
-      }
+    processorMap.putIfAbsent(taskID, pipeProcessorSubtask);
+    if (Objects.nonNull(metricService)) {
+      createMetrics(taskID);
     }
   }
 
   public void deregister(String taskID) {
-    synchronized (this) {
-      if (!processorMap.containsKey(taskID)) {
-        LOGGER.warn(
-            "Failed to deregister pipe processor metrics, 
PipeProcessorSubtask({}) does not exist",
-            taskID);
-        return;
-      }
-      if (Objects.nonNull(metricService)) {
-        removeMetrics(taskID);
-      }
-      processorMap.remove(taskID);
+    if (!processorMap.containsKey(taskID)) {
+      LOGGER.warn(
+          "Failed to deregister pipe processor metrics, 
PipeProcessorSubtask({}) does not exist",
+          taskID);
+      return;
+    }
+    if (Objects.nonNull(metricService)) {
+      removeMetrics(taskID);
     }
+    processorMap.remove(taskID);
   }
 
   public void markTabletEvent(String taskID) {
+    if (Objects.isNull(metricService)) {
+      return;
+    }
     Rate rate = tabletRateMap.get(taskID);
     if (rate == null) {
       LOGGER.warn(
@@ -267,6 +265,9 @@ public class PipeProcessorMetrics implements IMetricSet {
   }
 
   public void markTsFileEvent(String taskID) {
+    if (Objects.isNull(metricService)) {
+      return;
+    }
     Rate rate = tsFileRateMap.get(taskID);
     if (rate == null) {
       LOGGER.warn(
@@ -278,6 +279,9 @@ public class PipeProcessorMetrics implements IMetricSet {
   }
 
   public void markPipeHeartbeatEvent(String taskID) {
+    if (Objects.isNull(metricService)) {
+      return;
+    }
     Rate rate = pipeHeartbeatRateMap.get(taskID);
     if (rate == null) {
       LOGGER.warn(

Reply via email to