This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new ee459bdc81c Pipe: add memory / cache hit rate / resource reference
count / tsfile epoch metrics (#11388)
ee459bdc81c is described below
commit ee459bdc81c69330433737a8b8cd1e366b96be02
Author: V_Galaxy <[email protected]>
AuthorDate: Fri Oct 27 01:26:31 2023 +0800
Pipe: add memory / cache hit rate / resource reference count / tsfile epoch
metrics (#11388)
As title, also fix potential NPE when marking...
**NOTE:** Since there is no observed logic for the end of the
`WALInsertNodeCache` lifecycle, the `PipeWALInsertNodeCacheMetrics::deregister`
is currently not being called.
---
.../pipe/extractor/IoTDBDataRegionExtractor.java | 6 +-
.../realtime/PipeRealtimeDataRegionExtractor.java | 13 ++
.../pipe/extractor/realtime/epoch/TsFileEpoch.java | 26 +++-
.../realtime/epoch/TsFileEpochManager.java | 4 +
.../iotdb/db/pipe/metric/PipeConnectorMetrics.java | 35 ++++--
.../iotdb/db/pipe/metric/PipeExtractorMetrics.java | 74 +++++++++--
.../apache/iotdb/db/pipe/metric/PipeMetrics.java | 4 +
.../iotdb/db/pipe/metric/PipeProcessorMetrics.java | 35 ++++--
.../iotdb/db/pipe/metric/PipeResourceMetrics.java | 90 ++++++++++++++
.../pipe/metric/PipeWALInsertNodeCacheMetrics.java | 137 +++++++++++++++++++++
.../db/pipe/resource/memory/PipeMemoryManager.java | 4 +
.../resource/tsfile/PipeTsFileResourceManager.java | 4 +
.../pipe/resource/wal/PipeWALResourceManager.java | 7 ++
.../subtask/connector/PipeConnectorSubtask.java | 6 +-
.../subtask/processor/PipeProcessorSubtask.java | 6 +-
.../dataregion/wal/utils/WALInsertNodeCache.java | 11 +-
.../iotdb/commons/service/metric/enums/Metric.java | 6 +
17 files changed, 431 insertions(+), 37 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
index c0f120a0d40..ee7adb96601 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
@@ -234,11 +234,11 @@ public class IoTDBDataRegionExtractor implements
PipeExtractor {
: historicalExtractor.supply();
if (Objects.nonNull(event)) {
if (event instanceof TabletInsertionEvent) {
- PipeExtractorMetrics.getInstance().getTabletRate(taskID).mark();
+ PipeExtractorMetrics.getInstance().markTabletEvent(taskID);
} else if (event instanceof TsFileInsertionEvent) {
- PipeExtractorMetrics.getInstance().getTsFileRate(taskID).mark();
+ PipeExtractorMetrics.getInstance().markTsFileEvent(taskID);
} else if (event instanceof PipeHeartbeatEvent) {
- PipeExtractorMetrics.getInstance().getPipeHeartbeatRate(taskID).mark();
+ PipeExtractorMetrics.getInstance().markPipeHeartbeatEvent(taskID);
}
}
return event;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java
index 2634424ed0d..3087a6ef41f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java
@@ -57,6 +57,8 @@ public abstract class PipeRealtimeDataRegionExtractor
implements PipeExtractor {
protected final AtomicBoolean isClosed = new AtomicBoolean(false);
+ private String taskID;
+
protected PipeRealtimeDataRegionExtractor() {
// Do nothing
}
@@ -95,6 +97,13 @@ public abstract class PipeRealtimeDataRegionExtractor
implements PipeExtractor {
parameters.getBooleanOrDefault(
PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY,
PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_DEFAULT_VALUE);
+
+ // Metrics related to TsFileEpoch are managed in PipeExtractorMetrics.
These metrics are
+ // indexed by the taskID of IoTDBDataRegionExtractor. To avoid
PipeRealtimeDataRegionExtractor
+ // holding a reference to IoTDBDataRegionExtractor, the taskID should be
constructed to
+ // match that of IoTDBDataRegionExtractor.
+ long creationTime =
configuration.getRuntimeEnvironment().getCreationTime();
+ taskID = pipeName + "_" + dataRegionId + "_" + creationTime;
}
@Override
@@ -190,4 +199,8 @@ public abstract class PipeRealtimeDataRegionExtractor
implements PipeExtractor {
public int getPipeHeartbeatEventCount() {
return pendingQueue.getPipeHeartbeatEventCount();
}
+
+ public String getTaskID() {
+ return taskID;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpoch.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpoch.java
index f57408a9058..b0ec157f016 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpoch.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpoch.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.pipe.extractor.realtime.epoch;
import
org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionExtractor;
+import org.apache.iotdb.db.pipe.metric.PipeExtractorMetrics;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -49,6 +50,13 @@ public class TsFileEpoch {
.getAndUpdate(visitor::migrate);
}
+ public void setExtractorsRecentProcessedTsFileEpochState() {
+ dataRegionExtractor2State.forEach(
+ (extractor, state) ->
+ PipeExtractorMetrics.getInstance()
+ .setRecentProcessedTsFileEpochState(extractor.getTaskID(),
state.get()));
+ }
+
@Override
public String toString() {
return "TsFileEpoch{"
@@ -61,9 +69,19 @@ public class TsFileEpoch {
}
public enum State {
- EMPTY,
- USING_TABLET,
- USING_TSFILE,
- USING_BOTH
+ EMPTY(0),
+ USING_TABLET(1),
+ USING_BOTH(2),
+ USING_TSFILE(3);
+
+ private final int id;
+
+ State(int id) {
+ this.id = id;
+ }
+
+ public int getId() {
+ return id;
+ }
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpochManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpochManager.java
index 5b92828c00c..fdd2ad85c62 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpochManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpochManager.java
@@ -54,6 +54,10 @@ public class TsFileEpochManager {
});
final TsFileEpoch epoch = filePath2Epoch.remove(filePath);
+ // When all data corresponding to this TsFileEpoch have been extracted,
update the state
+ // of the extractors processing this TsFileEpoch.
+ epoch.setExtractorsRecentProcessedTsFileEpochState();
+
LOGGER.info("All data in TsFileEpoch {} was extracted", epoch);
return new PipeRealtimeEvent(
event,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeConnectorMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeConnectorMetrics.java
index b37aa2f056e..f2dde61092c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeConnectorMetrics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeConnectorMetrics.java
@@ -187,7 +187,7 @@ public class PipeConnectorMetrics implements IMetricSet {
public void deregister(String taskID) {
synchronized (this) {
if (!connectorMap.containsKey(taskID)) {
- LOGGER.info(
+ LOGGER.warn(
"Failed to deregister pipe connector metrics,
PipeConnectorSubtask({}) does not exist",
taskID);
return;
@@ -199,16 +199,37 @@ public class PipeConnectorMetrics implements IMetricSet {
}
}
- public Rate getTabletRate(String taskID) {
- return tabletRateMap.get(taskID);
+ public void markTabletEvent(String taskID) {
+ Rate rate = tabletRateMap.get(taskID);
+ if (rate == null) {
+ LOGGER.warn(
+ "Failed to mark pipe connector tablet event,
PipeConnectorSubtask({}) does not exist",
+ taskID);
+ return;
+ }
+ rate.mark();
}
- public Rate getTsFileRate(String taskID) {
- return tsFileRateMap.get(taskID);
+ public void markTsFileEvent(String taskID) {
+ Rate rate = tsFileRateMap.get(taskID);
+ if (rate == null) {
+ LOGGER.warn(
+ "Failed to mark pipe connector tsfile event,
PipeConnectorSubtask({}) does not exist",
+ taskID);
+ return;
+ }
+ rate.mark();
}
- public Rate getPipeHeartbeatRate(String taskID) {
- return pipeHeartbeatRateMap.get(taskID);
+ public void markPipeHeartbeatEvent(String taskID) {
+ Rate rate = pipeHeartbeatRateMap.get(taskID);
+ if (rate == null) {
+ LOGGER.warn(
+ "Failed to mark pipe connector heartbeat event,
PipeConnectorSubtask({}) does not exist",
+ taskID);
+ return;
+ }
+ rate.mark();
}
//////////////////////////// singleton ////////////////////////////
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeExtractorMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeExtractorMetrics.java
index eb30a045e00..b7f5738b152 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeExtractorMetrics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeExtractorMetrics.java
@@ -22,8 +22,10 @@ package org.apache.iotdb.db.pipe.metric;
import org.apache.iotdb.commons.service.metric.enums.Metric;
import org.apache.iotdb.commons.service.metric.enums.Tag;
import org.apache.iotdb.db.pipe.extractor.IoTDBDataRegionExtractor;
+import org.apache.iotdb.db.pipe.extractor.realtime.epoch.TsFileEpoch;
import org.apache.iotdb.metrics.AbstractMetricService;
import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.type.Gauge;
import org.apache.iotdb.metrics.type.Rate;
import org.apache.iotdb.metrics.utils.MetricLevel;
import org.apache.iotdb.metrics.utils.MetricType;
@@ -42,6 +44,8 @@ public class PipeExtractorMetrics implements IMetricSet {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeExtractorMetrics.class);
+ private AbstractMetricService metricService;
+
private final Map<String, IoTDBDataRegionExtractor> extractorMap = new
HashMap<>();
private final Map<String, Rate> tabletRateMap = new ConcurrentHashMap<>();
@@ -50,7 +54,7 @@ public class PipeExtractorMetrics implements IMetricSet {
private final Map<String, Rate> pipeHeartbeatRateMap = new
ConcurrentHashMap<>();
- private AbstractMetricService metricService;
+ private final Map<String, Gauge> recentProcessedTsFileEpochStateMap = new
ConcurrentHashMap<>();
//////////////////////////// bindTo & unbindFrom (metric framework)
////////////////////////////
@@ -67,9 +71,11 @@ public class PipeExtractorMetrics implements IMetricSet {
private void createMetrics(String taskID) {
createAutoGauge(taskID);
createRate(taskID);
+ createGauge(taskID);
}
private void createAutoGauge(String taskID) {
+ // pending event count
metricService.createAutoGauge(
Metric.UNPROCESSED_HISTORICAL_TSFILE_COUNT.toString(),
MetricLevel.IMPORTANT,
@@ -124,6 +130,16 @@ public class PipeExtractorMetrics implements IMetricSet {
taskID));
}
+ private void createGauge(String taskID) {
+ recentProcessedTsFileEpochStateMap.put(
+ taskID,
+ metricService.getOrCreateGauge(
+ Metric.PIPE_EXTRACTOR_TSFILE_EPOCH_STATE.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ taskID));
+ }
+
@Override
public void unbindFrom(AbstractMetricService metricService) {
ImmutableSet<String> taskIDs = ImmutableSet.copyOf(extractorMap.keySet());
@@ -138,9 +154,11 @@ public class PipeExtractorMetrics implements IMetricSet {
private void removeMetrics(String taskID) {
removeAutoGauge(taskID);
removeRate(taskID);
+ removeGauge(taskID);
}
private void removeAutoGauge(String taskID) {
+ // pending event count
metricService.remove(
MetricType.AUTO_GAUGE,
Metric.UNPROCESSED_HISTORICAL_TSFILE_COUNT.toString(),
@@ -184,6 +202,14 @@ public class PipeExtractorMetrics implements IMetricSet {
pipeHeartbeatRateMap.remove(taskID);
}
+ private void removeGauge(String taskID) {
+ metricService.remove(
+ MetricType.GAUGE,
+ Metric.PIPE_EXTRACTOR_TSFILE_EPOCH_STATE.toString(),
+ Tag.NAME.toString(),
+ taskID);
+ }
+
//////////////////////////// register & deregister (pipe integration)
////////////////////////////
public void register(@NonNull IoTDBDataRegionExtractor extractor) {
@@ -199,7 +225,7 @@ public class PipeExtractorMetrics implements IMetricSet {
public void deregister(String taskID) {
synchronized (this) {
if (!extractorMap.containsKey(taskID)) {
- LOGGER.info(
+ LOGGER.warn(
"Failed to deregister pipe extractor metrics,
IoTDBDataRegionExtractor({}) does not exist",
taskID);
return;
@@ -211,16 +237,48 @@ public class PipeExtractorMetrics implements IMetricSet {
}
}
- public Rate getTabletRate(String taskID) {
- return tabletRateMap.get(taskID);
+ public void markTabletEvent(String taskID) {
+ Rate rate = tabletRateMap.get(taskID);
+ if (rate == null) {
+ LOGGER.warn(
+ "Failed to mark pipe extractor tablet event,
IoTDBDataRegionExtractor({}) does not exist",
+ taskID);
+ return;
+ }
+ rate.mark();
+ }
+
+ public void markTsFileEvent(String taskID) {
+ Rate rate = tsFileRateMap.get(taskID);
+ if (rate == null) {
+ LOGGER.warn(
+ "Failed to mark pipe extractor tsfile event,
IoTDBDataRegionExtractor({}) does not exist",
+ taskID);
+ return;
+ }
+ rate.mark();
}
- public Rate getTsFileRate(String taskID) {
- return tsFileRateMap.get(taskID);
+ public void markPipeHeartbeatEvent(String taskID) {
+ Rate rate = pipeHeartbeatRateMap.get(taskID);
+ if (rate == null) {
+ LOGGER.warn(
+ "Failed to mark pipe extractor heartbeat event,
IoTDBDataRegionExtractor({}) does not exist",
+ taskID);
+ return;
+ }
+ rate.mark();
}
- public Rate getPipeHeartbeatRate(String taskID) {
- return pipeHeartbeatRateMap.get(taskID);
+ public void setRecentProcessedTsFileEpochState(String taskID,
TsFileEpoch.State state) {
+ Gauge gauge = recentProcessedTsFileEpochStateMap.get(taskID);
+ if (gauge == null) {
+ LOGGER.warn(
+ "Failed to set recent processed tsfile epoch state,
PipeRealtimeDataRegionExtractor({}) does not exist",
+ taskID);
+ return;
+ }
+ gauge.set(state.getId());
}
//////////////////////////// singleton ////////////////////////////
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeMetrics.java
index 8fa6e0c1999..e437f1bd457 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeMetrics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeMetrics.java
@@ -33,6 +33,8 @@ public class PipeMetrics implements IMetricSet {
PipeProcessorMetrics.getInstance().bindTo(metricService);
PipeConnectorMetrics.getInstance().bindTo(metricService);
PipeHeartbeatEventMetrics.getInstance().bindTo(metricService);
+ PipeWALInsertNodeCacheMetrics.getInstance().bindTo(metricService);
+ PipeResourceMetrics.getInstance().bindTo(metricService);
}
@Override
@@ -42,6 +44,8 @@ public class PipeMetrics implements IMetricSet {
PipeProcessorMetrics.getInstance().unbindFrom(metricService);
PipeConnectorMetrics.getInstance().unbindFrom(metricService);
PipeHeartbeatEventMetrics.getInstance().unbindFrom(metricService);
+ PipeWALInsertNodeCacheMetrics.getInstance().unbindFrom(metricService);
+ PipeResourceMetrics.getInstance().unbindFrom(metricService);
}
//////////////////////////// singleton ////////////////////////////
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeProcessorMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeProcessorMetrics.java
index 550b5bd56cd..7ca6be329a7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeProcessorMetrics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeProcessorMetrics.java
@@ -187,7 +187,7 @@ public class PipeProcessorMetrics implements IMetricSet {
public void deregister(String taskID) {
synchronized (this) {
if (!processorMap.containsKey(taskID)) {
- LOGGER.info(
+ LOGGER.warn(
"Failed to deregister pipe processor metrics,
PipeProcessorSubtask({}) does not exist",
taskID);
return;
@@ -199,16 +199,37 @@ public class PipeProcessorMetrics implements IMetricSet {
}
}
- public Rate getTabletRate(String taskID) {
- return tabletRateMap.get(taskID);
+ public void markTabletEvent(String taskID) {
+ Rate rate = tabletRateMap.get(taskID);
+ if (rate == null) {
+ LOGGER.warn(
+ "Failed to mark pipe processor tablet event,
PipeProcessorSubtask({}) does not exist",
+ taskID);
+ return;
+ }
+ rate.mark();
}
- public Rate getTsFileRate(String taskID) {
- return tsFileRateMap.get(taskID);
+ public void markTsFileEvent(String taskID) {
+ Rate rate = tsFileRateMap.get(taskID);
+ if (rate == null) {
+ LOGGER.warn(
+ "Failed to mark pipe processor tsfile event,
PipeProcessorSubtask({}) does not exist",
+ taskID);
+ return;
+ }
+ rate.mark();
}
- public Rate getPipeHeartbeatRate(String taskID) {
- return pipeHeartbeatRateMap.get(taskID);
+ public void markPipeHeartbeatEvent(String taskID) {
+ Rate rate = pipeHeartbeatRateMap.get(taskID);
+ if (rate == null) {
+ LOGGER.warn(
+ "Failed to mark pipe processor heartbeat event,
PipeProcessorSubtask({}) does not exist",
+ taskID);
+ return;
+ }
+ rate.mark();
}
//////////////////////////// singleton ////////////////////////////
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeResourceMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeResourceMetrics.java
new file mode 100644
index 00000000000..eb777dfbdb1
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeResourceMetrics.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.metric;
+
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
+import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryManager;
+import org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResourceManager;
+import org.apache.iotdb.db.pipe.resource.wal.PipeWALResourceManager;
+import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
+
+public class PipeResourceMetrics implements IMetricSet {
+
+ //////////////////////////// bindTo & unbindFrom (metric framework)
////////////////////////////
+
+ @Override
+ public void bindTo(AbstractMetricService metricService) {
+ // pipe memory related
+ metricService.createAutoGauge(
+ Metric.PIPE_MEM_COST.toString(),
+ MetricLevel.IMPORTANT,
+ PipeResourceManager.memory(),
+ PipeMemoryManager::getUsedMemorySizeInBytes);
+ metricService.createAutoGauge(
+ Metric.PIPE_MEM_USAGE.toString(),
+ MetricLevel.IMPORTANT,
+ PipeResourceManager.memory(),
+ PipeMemoryManager::getMemoryUsage);
+ // resource reference count
+ metricService.createAutoGauge(
+ Metric.PIPE_PINNED_MEMTABLE_COUNT.toString(),
+ MetricLevel.IMPORTANT,
+ PipeResourceManager.wal(),
+ PipeWALResourceManager::getPinnedWalCount);
+ metricService.createAutoGauge(
+ Metric.PIPE_LINKED_TSFILE_COUNT.toString(),
+ MetricLevel.IMPORTANT,
+ PipeResourceManager.tsfile(),
+ PipeTsFileResourceManager::getLinkedTsfileCount);
+ }
+
+ @Override
+ public void unbindFrom(AbstractMetricService metricService) {
+ // pipe memory related
+ metricService.remove(MetricType.AUTO_GAUGE,
Metric.PIPE_MEM_COST.toString());
+ metricService.remove(MetricType.AUTO_GAUGE,
Metric.PIPE_MEM_USAGE.toString());
+ // resource reference count
+ metricService.remove(MetricType.AUTO_GAUGE,
Metric.PIPE_PINNED_MEMTABLE_COUNT.toString());
+ metricService.remove(MetricType.AUTO_GAUGE,
Metric.PIPE_LINKED_TSFILE_COUNT.toString());
+ }
+
+ //////////////////////////// singleton ////////////////////////////
+
+ private static class PipeResourceMetricsHolder {
+
+ private static final PipeResourceMetrics INSTANCE = new
PipeResourceMetrics();
+
+ private PipeResourceMetricsHolder() {
+ // empty constructor
+ }
+ }
+
+ public static PipeResourceMetrics getInstance() {
+ return PipeResourceMetrics.PipeResourceMetricsHolder.INSTANCE;
+ }
+
+ private PipeResourceMetrics() {
+ // empty constructor
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeWALInsertNodeCacheMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeWALInsertNodeCacheMetrics.java
new file mode 100644
index 00000000000..2f246281f82
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeWALInsertNodeCacheMetrics.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.metric;
+
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import
org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALInsertNodeCache;
+import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
+
+import com.google.common.collect.ImmutableSet;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class PipeWALInsertNodeCacheMetrics implements IMetricSet {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PipeWALInsertNodeCacheMetrics.class);
+
+ private volatile AbstractMetricService metricService;
+
+ private final Map<Integer, WALInsertNodeCache> cacheMap = new
ConcurrentHashMap<>();
+
+ //////////////////////////// bindTo & unbindFrom (metric framework)
////////////////////////////
+
+ @Override
+ public void bindTo(AbstractMetricService metricService) {
+ this.metricService = metricService;
+ ImmutableSet<Integer> dataRegionIds =
ImmutableSet.copyOf(cacheMap.keySet());
+ for (Integer dataRegionId : dataRegionIds) {
+ createMetrics(dataRegionId);
+ }
+ }
+
+ private void createMetrics(Integer dataRegionId) {
+ createAutoGauge(dataRegionId);
+ }
+
+ private void createAutoGauge(Integer dataRegionId) {
+ metricService.createAutoGauge(
+ Metric.PIPE_WAL_INSERT_NODE_CACHE_HIT_RATE.toString(),
+ MetricLevel.IMPORTANT,
+ cacheMap.get(dataRegionId),
+ WALInsertNodeCache::getCacheHitRate,
+ Tag.REGION.toString(),
+ String.valueOf(dataRegionId));
+ }
+
+ @Override
+ public void unbindFrom(AbstractMetricService metricService) {
+ ImmutableSet<Integer> dataRegionIds =
ImmutableSet.copyOf(cacheMap.keySet());
+ for (Integer dataRegionId : dataRegionIds) {
+ deregister(dataRegionId);
+ }
+ if (!cacheMap.isEmpty()) {
+ LOGGER.warn("Failed to unbind from wal insert node cache metrics, cache
map not empty");
+ }
+ }
+
+ private void removeMetrics(Integer dataRegionId) {
+ removeAutoGauge(dataRegionId);
+ }
+
+ private void removeAutoGauge(Integer dataRegionId) {
+ metricService.remove(
+ MetricType.AUTO_GAUGE,
+ Metric.PIPE_WAL_INSERT_NODE_CACHE_HIT_RATE.toString(),
+ Tag.REGION.toString(),
+ String.valueOf(dataRegionId));
+ }
+
+ //////////////////////////// register & deregister (pipe integration)
////////////////////////////
+
+ public void register(@NonNull WALInsertNodeCache walInsertNodeCache, Integer
dataRegionId) {
+ cacheMap.putIfAbsent(dataRegionId, walInsertNodeCache);
+ if (Objects.nonNull(metricService)) {
+ createMetrics(dataRegionId);
+ }
+ }
+
+ public void deregister(Integer dataRegionId) {
+ // TODO: waiting called by WALInsertNodeCache
+ if (!cacheMap.containsKey(dataRegionId)) {
+ LOGGER.warn(
+ "Failed to deregister wal insert node cache metrics,
WALInsertNodeCache({}) does not exist",
+ dataRegionId);
+ return;
+ }
+ if (Objects.nonNull(metricService)) {
+ removeMetrics(dataRegionId);
+ }
+ cacheMap.remove(dataRegionId);
+ }
+
+ //////////////////////////// singleton ////////////////////////////
+
+ private static class PipeWALInsertNodeCacheMetricsHolder {
+
+ private static final PipeWALInsertNodeCacheMetrics INSTANCE =
+ new PipeWALInsertNodeCacheMetrics();
+
+ private PipeWALInsertNodeCacheMetricsHolder() {
+ // empty constructor
+ }
+ }
+
+ public static PipeWALInsertNodeCacheMetrics getInstance() {
+ return PipeWALInsertNodeCacheMetricsHolder.INSTANCE;
+ }
+
+ private PipeWALInsertNodeCacheMetrics() {
+ // empty constructor
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
index 19a6f80246c..86a8020b7f3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
@@ -85,4 +85,8 @@ public class PipeMemoryManager {
public long getTotalMemorySizeInBytes() {
return TOTAL_MEMORY_SIZE_IN_BYTES;
}
+
+ public double getMemoryUsage() {
+ return (double) usedMemorySizeInBytes / TOTAL_MEMORY_SIZE_IN_BYTES;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
index 9839d793a25..73c0240dd76 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
@@ -187,4 +187,8 @@ public class PipeTsFileResourceManager {
public synchronized void unpinTsFileResource(TsFileResource resource) throws
IOException {
decreaseFileReference(getHardlinkOrCopiedFileInPipeDir(resource.getTsFile()));
}
+
+ public int getLinkedTsfileCount() {
+ return hardlinkOrCopiedFileToReferenceMap.size();
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
index f8202372554..1b3dff90fd8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
@@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -120,4 +121,10 @@ public abstract class PipeWALResourceManager {
protected abstract void unpinInternal(long memtableId, WALEntryHandler
walEntryHandler)
throws IOException;
+
+ public int getPinnedWalCount() {
+ return Objects.nonNull(memtableIdToPipeWALResourceMap)
+ ? memtableIdToPipeWALResourceMap.size()
+ : 0;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
index 26149d791a2..05119db0ca2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
@@ -118,10 +118,10 @@ public class PipeConnectorSubtask extends PipeSubtask {
try {
if (event instanceof TabletInsertionEvent) {
outputPipeConnector.transfer((TabletInsertionEvent) event);
- PipeConnectorMetrics.getInstance().getTabletRate(taskID).mark();
+ PipeConnectorMetrics.getInstance().markTabletEvent(taskID);
} else if (event instanceof TsFileInsertionEvent) {
outputPipeConnector.transfer((TsFileInsertionEvent) event);
- PipeConnectorMetrics.getInstance().getTsFileRate(taskID).mark();
+ PipeConnectorMetrics.getInstance().markTsFileEvent(taskID);
} else if (event instanceof PipeHeartbeatEvent) {
try {
outputPipeConnector.heartbeat();
@@ -132,7 +132,7 @@ public class PipeConnectorSubtask extends PipeSubtask {
e);
}
((PipeHeartbeatEvent) event).onTransferred();
- PipeConnectorMetrics.getInstance().getPipeHeartbeatRate(taskID).mark();
+ PipeConnectorMetrics.getInstance().markPipeHeartbeatEvent(taskID);
} else {
outputPipeConnector.transfer(event);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
index 0cc38fda9d8..d20d0731b7b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
@@ -106,14 +106,14 @@ public class PipeProcessorSubtask extends PipeSubtask {
if (!isClosed.get()) {
if (event instanceof TabletInsertionEvent) {
pipeProcessor.process((TabletInsertionEvent) event,
outputEventCollector);
- PipeProcessorMetrics.getInstance().getTabletRate(taskID).mark();
+ PipeProcessorMetrics.getInstance().markTabletEvent(taskID);
} else if (event instanceof TsFileInsertionEvent) {
pipeProcessor.process((TsFileInsertionEvent) event,
outputEventCollector);
- PipeProcessorMetrics.getInstance().getTsFileRate(taskID).mark();
+ PipeProcessorMetrics.getInstance().markTsFileEvent(taskID);
} else if (event instanceof PipeHeartbeatEvent) {
pipeProcessor.process(event, outputEventCollector);
((PipeHeartbeatEvent) event).onProcessed();
-
PipeProcessorMetrics.getInstance().getPipeHeartbeatRate(taskID).mark();
+ PipeProcessorMetrics.getInstance().markPipeHeartbeatEvent(taskID);
} else {
pipeProcessor.process(event, outputEventCollector);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
index 31f5694d0e4..d60b964c9ac 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
@@ -22,6 +22,7 @@ package
org.apache.iotdb.db.storageengine.dataregion.wal.utils;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.pipe.metric.PipeWALInsertNodeCacheMetrics;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry;
@@ -44,6 +45,7 @@ import java.nio.channels.FileChannel;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -61,7 +63,7 @@ public class WALInsertNodeCache {
private volatile boolean hasPipeRunning = false;
- private WALInsertNodeCache() {
+ private WALInsertNodeCache(Integer dataRegionId) {
// TODO: try allocate memory 2 * config.getWalFileSizeThresholdInByte()
for the cache
// If allocate memory failed, disable batch load
isBatchLoadEnabled = true;
@@ -72,6 +74,7 @@ public class WALInsertNodeCache {
(Weigher<WALEntryPosition, Pair<ByteBuffer, InsertNode>>)
(position, pair) -> position.getSize())
.build(new WALInsertNodeCacheLoader());
+ PipeWALInsertNodeCacheMetrics.getInstance().register(this, dataRegionId);
}
/////////////////////////// Getter & Setter ///////////////////////////
@@ -157,6 +160,10 @@ public class WALInsertNodeCache {
}
}
+ public double getCacheHitRate() {
+ return Objects.nonNull(lruCache) ? lruCache.stats().hitRate() : 0;
+ }
+
/////////////////////////// MemTable ///////////////////////////
public void addMemTable(long memTableId) {
@@ -252,7 +259,7 @@ public class WALInsertNodeCache {
private static final Map<Integer, WALInsertNodeCache> INSTANCE_MAP = new
ConcurrentHashMap<>();
public static WALInsertNodeCache getOrCreateInstance(Integer key) {
- return INSTANCE_MAP.computeIfAbsent(key, k -> new WALInsertNodeCache());
+ return INSTANCE_MAP.computeIfAbsent(key, k -> new
WALInsertNodeCache(key));
}
private InstanceHolder() {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
index 77455bffa8b..18cc8955a64 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
@@ -118,6 +118,12 @@ public enum Metric {
PIPE_CONNECTOR_TSFILE_TRANSFER("pipe_connector_tsfile_transfer"),
PIPE_CONNECTOR_HEARTBEAT_TRANSFER("pipe_connector_heartbeat_transfer"),
PIPE_HEARTBEAT_EVENT("pipe_heartbeat_event"),
+ PIPE_WAL_INSERT_NODE_CACHE_HIT_RATE("pipe_wal_insert_node_cache_hit_rate"),
+ PIPE_EXTRACTOR_TSFILE_EPOCH_STATE("pipe_extractor_tsfile_epoch_state"),
+ PIPE_MEM_COST("pipe_mem_cost"),
+ PIPE_MEM_USAGE("pipe_mem_usage"),
+ PIPE_PINNED_MEMTABLE_COUNT("pipe_pinned_memtable_count"),
+ PIPE_LINKED_TSFILE_COUNT("pipe_linked_tsfile_count"),
;
final String value;