This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 08b3772a798 Pipe: check metric service before marking pipe metrics to
avoid useless warn logs (#12303)
08b3772a798 is described below
commit 08b3772a798ad1c6aa8ad969f8351e1d7057e6a8
Author: V_Galaxy <[email protected]>
AuthorDate: Mon Apr 8 20:23:29 2024 +0800
Pipe: check metric service before marking pipe metrics to avoid useless
warn logs (#12303)
---
.../iotdb/db/pipe/metric/PipeConnectorMetrics.java | 46 ++++++++++----------
.../iotdb/db/pipe/metric/PipeExtractorMetrics.java | 49 ++++++++++++----------
.../iotdb/db/pipe/metric/PipeProcessorMetrics.java | 46 ++++++++++----------
3 files changed, 78 insertions(+), 63 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeConnectorMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeConnectorMetrics.java
index 6ecb7519786..caaeecf5e05 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeConnectorMetrics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeConnectorMetrics.java
@@ -42,7 +42,7 @@ public class PipeConnectorMetrics implements IMetricSet {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeConnectorMetrics.class);
- private AbstractMetricService metricService;
+ private volatile AbstractMetricService metricService;
private final Map<String, PipeConnectorSubtask> connectorMap = new
HashMap<>();
@@ -57,10 +57,9 @@ public class PipeConnectorMetrics implements IMetricSet {
@Override
public void bindTo(AbstractMetricService metricService) {
this.metricService = metricService;
- synchronized (this) {
- for (String taskID : connectorMap.keySet()) {
- createMetrics(taskID);
- }
+ ImmutableSet<String> taskIDs = ImmutableSet.copyOf(connectorMap.keySet());
+ for (String taskID : taskIDs) {
+ createMetrics(taskID);
}
}
@@ -254,30 +253,29 @@ public class PipeConnectorMetrics implements IMetricSet {
public void register(@NonNull PipeConnectorSubtask pipeConnectorSubtask) {
String taskID = pipeConnectorSubtask.getTaskID();
- synchronized (this) {
- connectorMap.putIfAbsent(taskID, pipeConnectorSubtask);
- if (Objects.nonNull(metricService)) {
- createMetrics(taskID);
- }
+ connectorMap.putIfAbsent(taskID, pipeConnectorSubtask);
+ if (Objects.nonNull(metricService)) {
+ createMetrics(taskID);
}
}
public void deregister(String taskID) {
- synchronized (this) {
- if (!connectorMap.containsKey(taskID)) {
- LOGGER.warn(
- "Failed to deregister pipe connector metrics,
PipeConnectorSubtask({}) does not exist",
- taskID);
- return;
- }
- if (Objects.nonNull(metricService)) {
- removeMetrics(taskID);
- }
- connectorMap.remove(taskID);
+ if (!connectorMap.containsKey(taskID)) {
+ LOGGER.warn(
+ "Failed to deregister pipe connector metrics,
PipeConnectorSubtask({}) does not exist",
+ taskID);
+ return;
+ }
+ if (Objects.nonNull(metricService)) {
+ removeMetrics(taskID);
}
+ connectorMap.remove(taskID);
}
public void markTabletEvent(String taskID) {
+ if (Objects.isNull(metricService)) {
+ return;
+ }
Rate rate = tabletRateMap.get(taskID);
if (rate == null) {
LOGGER.warn(
@@ -289,6 +287,9 @@ public class PipeConnectorMetrics implements IMetricSet {
}
public void markTsFileEvent(String taskID) {
+ if (Objects.isNull(metricService)) {
+ return;
+ }
Rate rate = tsFileRateMap.get(taskID);
if (rate == null) {
LOGGER.warn(
@@ -300,6 +301,9 @@ public class PipeConnectorMetrics implements IMetricSet {
}
public void markPipeHeartbeatEvent(String taskID) {
+ if (Objects.isNull(metricService)) {
+ return;
+ }
Rate rate = pipeHeartbeatRateMap.get(taskID);
if (rate == null) {
LOGGER.warn(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeExtractorMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeExtractorMetrics.java
index e3e9cceb8a9..310300453f7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeExtractorMetrics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeExtractorMetrics.java
@@ -43,7 +43,7 @@ public class PipeExtractorMetrics implements IMetricSet {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeExtractorMetrics.class);
- private AbstractMetricService metricService;
+ private volatile AbstractMetricService metricService;
private final Map<String, IoTDBDataRegionExtractor> extractorMap = new
ConcurrentHashMap<>();
@@ -64,10 +64,9 @@ public class PipeExtractorMetrics implements IMetricSet {
@Override
public void bindTo(AbstractMetricService metricService) {
this.metricService = metricService;
- synchronized (this) {
- for (String taskID : extractorMap.keySet()) {
- createMetrics(taskID);
- }
+ ImmutableSet<String> taskIDs = ImmutableSet.copyOf(extractorMap.keySet());
+ for (String taskID : taskIDs) {
+ createMetrics(taskID);
}
}
@@ -291,30 +290,29 @@ public class PipeExtractorMetrics implements IMetricSet {
public void register(@NonNull IoTDBDataRegionExtractor extractor) {
String taskID = extractor.getTaskID();
- synchronized (this) {
- extractorMap.putIfAbsent(taskID, extractor);
- if (Objects.nonNull(metricService)) {
- createMetrics(taskID);
- }
+ extractorMap.putIfAbsent(taskID, extractor);
+ if (Objects.nonNull(metricService)) {
+ createMetrics(taskID);
}
}
public void deregister(String taskID) {
- synchronized (this) {
- if (!extractorMap.containsKey(taskID)) {
- LOGGER.warn(
- "Failed to deregister pipe extractor metrics,
IoTDBDataRegionExtractor({}) does not exist",
- taskID);
- return;
- }
- if (Objects.nonNull(metricService)) {
- removeMetrics(taskID);
- }
- extractorMap.remove(taskID);
+ if (!extractorMap.containsKey(taskID)) {
+ LOGGER.warn(
+ "Failed to deregister pipe extractor metrics,
IoTDBDataRegionExtractor({}) does not exist",
+ taskID);
+ return;
+ }
+ if (Objects.nonNull(metricService)) {
+ removeMetrics(taskID);
}
+ extractorMap.remove(taskID);
}
public void markTabletEvent(String taskID) {
+ if (Objects.isNull(metricService)) {
+ return;
+ }
Rate rate = tabletRateMap.get(taskID);
if (rate == null) {
LOGGER.warn(
@@ -326,6 +324,9 @@ public class PipeExtractorMetrics implements IMetricSet {
}
public void markTsFileEvent(String taskID) {
+ if (Objects.isNull(metricService)) {
+ return;
+ }
Rate rate = tsFileRateMap.get(taskID);
if (rate == null) {
LOGGER.warn(
@@ -337,6 +338,9 @@ public class PipeExtractorMetrics implements IMetricSet {
}
public void markPipeHeartbeatEvent(String taskID) {
+ if (Objects.isNull(metricService)) {
+ return;
+ }
Rate rate = pipeHeartbeatRateMap.get(taskID);
if (rate == null) {
LOGGER.warn(
@@ -348,6 +352,9 @@ public class PipeExtractorMetrics implements IMetricSet {
}
public void setRecentProcessedTsFileEpochState(String taskID,
TsFileEpoch.State state) {
+ if (Objects.isNull(metricService)) {
+ return;
+ }
Gauge gauge = recentProcessedTsFileEpochStateMap.get(taskID);
if (gauge == null) {
LOGGER.warn(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeProcessorMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeProcessorMetrics.java
index 291c3529d8b..711c258fd8b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeProcessorMetrics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeProcessorMetrics.java
@@ -42,7 +42,7 @@ public class PipeProcessorMetrics implements IMetricSet {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeProcessorMetrics.class);
- private AbstractMetricService metricService;
+ private volatile AbstractMetricService metricService;
private final Map<String, PipeProcessorSubtask> processorMap = new
HashMap<>();
@@ -57,10 +57,9 @@ public class PipeProcessorMetrics implements IMetricSet {
@Override
public void bindTo(AbstractMetricService metricService) {
this.metricService = metricService;
- synchronized (this) {
- for (String taskID : processorMap.keySet()) {
- createMetrics(taskID);
- }
+ ImmutableSet<String> taskIDs = ImmutableSet.copyOf(processorMap.keySet());
+ for (String taskID : taskIDs) {
+ createMetrics(taskID);
}
}
@@ -232,30 +231,29 @@ public class PipeProcessorMetrics implements IMetricSet {
public void register(@NonNull PipeProcessorSubtask pipeProcessorSubtask) {
String taskID = pipeProcessorSubtask.getTaskID();
- synchronized (this) {
- processorMap.putIfAbsent(taskID, pipeProcessorSubtask);
- if (Objects.nonNull(metricService)) {
- createMetrics(taskID);
- }
+ processorMap.putIfAbsent(taskID, pipeProcessorSubtask);
+ if (Objects.nonNull(metricService)) {
+ createMetrics(taskID);
}
}
public void deregister(String taskID) {
- synchronized (this) {
- if (!processorMap.containsKey(taskID)) {
- LOGGER.warn(
- "Failed to deregister pipe processor metrics,
PipeProcessorSubtask({}) does not exist",
- taskID);
- return;
- }
- if (Objects.nonNull(metricService)) {
- removeMetrics(taskID);
- }
- processorMap.remove(taskID);
+ if (!processorMap.containsKey(taskID)) {
+ LOGGER.warn(
+ "Failed to deregister pipe processor metrics,
PipeProcessorSubtask({}) does not exist",
+ taskID);
+ return;
+ }
+ if (Objects.nonNull(metricService)) {
+ removeMetrics(taskID);
}
+ processorMap.remove(taskID);
}
public void markTabletEvent(String taskID) {
+ if (Objects.isNull(metricService)) {
+ return;
+ }
Rate rate = tabletRateMap.get(taskID);
if (rate == null) {
LOGGER.warn(
@@ -267,6 +265,9 @@ public class PipeProcessorMetrics implements IMetricSet {
}
public void markTsFileEvent(String taskID) {
+ if (Objects.isNull(metricService)) {
+ return;
+ }
Rate rate = tsFileRateMap.get(taskID);
if (rate == null) {
LOGGER.warn(
@@ -278,6 +279,9 @@ public class PipeProcessorMetrics implements IMetricSet {
}
public void markPipeHeartbeatEvent(String taskID) {
+ if (Objects.isNull(metricService)) {
+ return;
+ }
Rate rate = pipeHeartbeatRateMap.get(taskID);
if (rate == null) {
LOGGER.warn(